English mirror
This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.
事件与流
流程跑起来后,常见需求有两类。第一类是图内控制:A 步完成后触发 B、C 两个分支,两个外部事件都到齐后再继续。第二类是对外输出:把模型生成进度、审批提示、处理日志推给 UI 或 SSE。
这两类不要混在一起。
| 通道 | 作用 | 写入方 | 消费方 |
|---|---|---|---|
emit / when | flow 内部控制流 | chunk 或外部 execution | flow.when(...).to(...) 后面的 chunk |
| runtime stream | execution 对外输出 | chunk | execution.get_async_runtime_stream(...) |
| observation event | 框架级观测 | Agently runtime | Event Center / DevTools |
emit 决定“哪个 chunk 被触发”。runtime stream 决定“外部能看到什么进度”。observation event 是框架观测,不参与业务图调度。
emit / when:在图里发信号
from agently import TriggerFlow, TriggerFlowRuntimeData
flow = TriggerFlow(name="ticket-route")
async def prepare(data: TriggerFlowRuntimeData):
ticket = {"id": data.input["ticket_id"], "team": "billing"}
await data.async_set_state("ticket", ticket)
await data.async_emit("TicketPrepared", ticket)
return ticket
async def route(data: TriggerFlowRuntimeData):
await data.async_set_state("route", {"team": data.input["team"]})
flow.to(prepare)
flow.when("TicketPrepared").to(route)data.async_emit("TicketPrepared", ticket) 发出一个业务事件。挂在 flow.when("TicketPrepared") 上的 chunk 会收到这个 payload,它的 data.input 就是 ticket。
同一个事件可以有多个分支:
flow.when("TicketPrepared").to(route)
flow.when("TicketPrepared").to(write_audit_log)
flow.when("TicketPrepared").to(send_metric)这些分支会并发触发。每一次 emit 都是一条业务信号,不会因为名字相同被自动去重。
外部也可以 emit
execution 还处于 open 状态时,外部代码可以向它发事件:
execution = await flow.async_start_execution({"ticket_id": "T-1024"})
await execution.async_emit("UserAddedNote", {"text": "客户已补充发票信息"})seal() 或 close() 后,新的外部 emit 会被拒绝。这个边界让服务可以安全收尾:seal 之后只处理已经进入 execution 的工作,不再接新提交。
AND join:等多个信号都到齐
有些流程不是“收到一个事件就继续”,而是要等多个事件都到:
flow.when({"event": ["done:classify", "done:retrieve"]}, mode="and").to(merge_context)join 状态属于单个 execution。不要把 join 进度放进 flow_data,否则并发 execution 会互相污染。
chunk 内部 emit 的信号会继承当前 runtime scope。batch、for_each 或 chunk 内 fan-out 产生的信号,在 join 时会保持同一组关联。外部直接 emit 的信号没有共同 scope;如果业务上要把多个外部提交按同一个对象关联,payload 里要带明确的 correlation key,或者先进入一个有 scope 的 flow stage。
emit_nowait:旁路可以不等
data.emit_nowait(event, payload) 是 fire-and-forget。它适合日志、指标、带外通知这类旁路:
async def main_step(data):
result = {"ok": True}
data.emit_nowait("StepFinished", result)
return result如果下游结果会影响主流程判断,用 await data.async_emit(...);如果只是旁路,不想让主链等待,用 emit_nowait(...)。
runtime stream:把进度推给外部
runtime stream 属于一次 execution。chunk 往里推 item,外部按到达顺序消费。
async def draft_reply(data: TriggerFlowRuntimeData):
await data.async_put_into_stream({"type": "status", "message": "开始生成回复"})
await data.async_put_into_stream({"type": "delta", "content": "您好,"})
await data.async_set_state("reply", "您好,我们已经收到您的问题。")
execution = flow.create_execution(auto_close=False)
await execution.async_start({"ticket_id": "T-1024"})
close_task = asyncio.create_task(execution.async_close())
async for item in execution.get_async_runtime_stream(timeout=None):
send_to_sse(item)
snapshot = await close_task几个事实:
data.async_put_into_stream(item)推一个 item。execution.get_async_runtime_stream(timeout=...)是 async generator。- execution close 时,runtime stream 也会关闭。
- 同步场景可以用
execution.get_runtime_stream(timeout=...)。
stream item 不一定只有业务 item。pause、runtime intervention 等系统事件也会以 fail-open item 进入 stream。前端只关心业务 item 时,忽略未知 type 即可。
面向前端、SSE、WebSocket 或 IM 时,建议在服务层把 runtime stream item 映射成稳定产品事件,例如 report_section_ready、waiting_for_approval、role_status。不要把模型 parser path、内部 chunk 名称或 RuntimeEvent 当作长期 UI 协议。完整的交互层和主动任务边界见 交互层与主动任务 Playbook。
两个 timeout 分别管不同的等待
| 参数 | 管什么 |
|---|---|
execution.get_async_runtime_stream(timeout=N) | 消费者等下一条 stream item 等多久 |
auto_close_timeout=N | execution 空闲多久后自动 close |
收集全部 stream item 时,常用 timeout=None,让消费者一直等到 stream 被 close。auto_close_timeout 则是 lifecycle 里的空闲收尾策略。
隐式 stream 只适合自闭合流程
flow.get_async_runtime_stream(...) 会在内部创建一个隐式 execution。它和 flow.async_start(...) 一样,只适合输入一次、自然结束的 flow。
如果流程需要 pause_for(...)、外部 emit(...)、人工恢复,使用显式 execution:
execution = flow.create_execution(auto_close=False)
await execution.async_start(input_data)
async for item in execution.get_async_runtime_stream(timeout=None):
...什么时候用哪条通道
| 想做的事 | 用 |
|---|---|
| 触发图内另一个 chunk | data.async_emit(...) + flow.when(...) |
| 同时触发多个旁路 | 多个 flow.when("Event").to(...) |
| 等两个业务信号都到齐 | flow.when({"event": [...]}, mode="and") |
| 给前端推模型增量、状态提示 | data.async_put_into_stream(...) |
| 给 DevTools / 观测系统看框架事件 | Event Center |
| 给 IM 或产品界面推稳定业务进度 | runtime stream item -> 产品事件映射 |
| scheduler / webhook 主动推进流程 | 应用触发源 -> explicit execution -> emit 或 continue_with |
另见
- 模式 -
when和其他图层流控怎么组合 - Pause 与 Resume -
continue_with(...)是等待恢复,不是普通 emit - Runtime Intervention - 运行中补充上下文
- Lifecycle - close 对 runtime stream 做了什么
- Event Center - 框架级观测事件
- 交互层与主动任务 Playbook - 产品事件、IM、SSE/WebSocket 和主动任务边界