Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

事件与流

流程跑起来后,常见需求有两类。第一类是图内控制:A 步完成后触发 B、C 两个分支,两个外部事件都到齐后再继续。第二类是对外输出:把模型生成进度、审批提示、处理日志推给 UI 或 SSE。

这两类不要混在一起。

通道作用写入方消费方
emit / whenflow 内部控制流chunk 或外部 executionflow.when(...).to(...) 后面的 chunk
runtime streamexecution 对外输出chunkexecution.get_async_runtime_stream(...)
observation event框架级观测Agently runtimeEvent Center / DevTools

emit 决定“哪个 chunk 被触发”。runtime stream 决定“外部能看到什么进度”。observation event 是框架观测,不参与业务图调度。

emit / when:在图里发信号

python
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow(name="ticket-route")


async def prepare(data: TriggerFlowRuntimeData):
    ticket = {"id": data.input["ticket_id"], "team": "billing"}
    await data.async_set_state("ticket", ticket)
    await data.async_emit("TicketPrepared", ticket)
    return ticket


async def route(data: TriggerFlowRuntimeData):
    await data.async_set_state("route", {"team": data.input["team"]})


flow.to(prepare)
flow.when("TicketPrepared").to(route)

data.async_emit("TicketPrepared", ticket) 发出一个业务事件。挂在 flow.when("TicketPrepared") 上的 chunk 会收到这个 payload,它的 data.input 就是 ticket

同一个事件可以有多个分支:

python
flow.when("TicketPrepared").to(route)
flow.when("TicketPrepared").to(write_audit_log)
flow.when("TicketPrepared").to(send_metric)

这些分支会并发触发。每一次 emit 都是一条业务信号,不会因为名字相同被自动去重。

外部也可以 emit

execution 还处于 open 状态时,外部代码可以向它发事件:

python
execution = await flow.async_start_execution({"ticket_id": "T-1024"})
await execution.async_emit("UserAddedNote", {"text": "客户已补充发票信息"})

seal()close() 后,新的外部 emit 会被拒绝。这个边界让服务可以安全收尾:seal 之后只处理已经进入 execution 的工作,不再接新提交。

AND join:等多个信号都到齐

有些流程不是“收到一个事件就继续”,而是要等多个事件都到:

python
flow.when({"event": ["done:classify", "done:retrieve"]}, mode="and").to(merge_context)

join 状态属于单个 execution。不要把 join 进度放进 flow_data,否则并发 execution 会互相污染。

chunk 内部 emit 的信号会继承当前 runtime scope。batchfor_each 或 chunk 内 fan-out 产生的信号,在 join 时会保持同一组关联。外部直接 emit 的信号没有共同 scope;如果业务上要把多个外部提交按同一个对象关联,payload 里要带明确的 correlation key,或者先进入一个有 scope 的 flow stage。

emit_nowait:旁路可以不等

data.emit_nowait(event, payload) 是 fire-and-forget。它适合日志、指标、带外通知这类旁路:

python
async def main_step(data):
    result = {"ok": True}
    data.emit_nowait("StepFinished", result)
    return result

如果下游结果会影响主流程判断,用 await data.async_emit(...);如果只是旁路,不想让主链等待,用 emit_nowait(...)

runtime stream:把进度推给外部

runtime stream 属于一次 execution。chunk 往里推 item,外部按到达顺序消费。

python
async def draft_reply(data: TriggerFlowRuntimeData):
    await data.async_put_into_stream({"type": "status", "message": "开始生成回复"})
    await data.async_put_into_stream({"type": "delta", "content": "您好,"})
    await data.async_set_state("reply", "您好,我们已经收到您的问题。")


execution = flow.create_execution(auto_close=False)
await execution.async_start({"ticket_id": "T-1024"})

close_task = asyncio.create_task(execution.async_close())

async for item in execution.get_async_runtime_stream(timeout=None):
    send_to_sse(item)

snapshot = await close_task

几个事实:

  • data.async_put_into_stream(item) 推一个 item。
  • execution.get_async_runtime_stream(timeout=...) 是 async generator。
  • execution close 时,runtime stream 也会关闭。
  • 同步场景可以用 execution.get_runtime_stream(timeout=...)

stream item 不一定只有业务 item。pause、runtime intervention 等系统事件也会以 fail-open item 进入 stream。前端只关心业务 item 时,忽略未知 type 即可。

面向前端、SSE、WebSocket 或 IM 时,建议在服务层把 runtime stream item 映射成稳定产品事件,例如 report_section_readywaiting_for_approvalrole_status。不要把模型 parser path、内部 chunk 名称或 RuntimeEvent 当作长期 UI 协议。完整的交互层和主动任务边界见 交互层与主动任务 Playbook

两个 timeout 分别管不同的等待

参数管什么
execution.get_async_runtime_stream(timeout=N)消费者等下一条 stream item 等多久
auto_close_timeout=Nexecution 空闲多久后自动 close

收集全部 stream item 时,常用 timeout=None,让消费者一直等到 stream 被 close。auto_close_timeout 则是 lifecycle 里的空闲收尾策略。

隐式 stream 只适合自闭合流程

flow.get_async_runtime_stream(...) 会在内部创建一个隐式 execution。它和 flow.async_start(...) 一样,只适合输入一次、自然结束的 flow。

如果流程需要 pause_for(...)、外部 emit(...)、人工恢复,使用显式 execution:

python
execution = flow.create_execution(auto_close=False)
await execution.async_start(input_data)
async for item in execution.get_async_runtime_stream(timeout=None):
    ...

什么时候用哪条通道

想做的事
触发图内另一个 chunkdata.async_emit(...) + flow.when(...)
同时触发多个旁路多个 flow.when("Event").to(...)
等两个业务信号都到齐flow.when({"event": [...]}, mode="and")
给前端推模型增量、状态提示data.async_put_into_stream(...)
给 DevTools / 观测系统看框架事件Event Center
给 IM 或产品界面推稳定业务进度runtime stream item -> 产品事件映射
scheduler / webhook 主动推进流程应用触发源 -> explicit execution -> emitcontinue_with

另见