English mirror
This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.
Pause 与 Resume
有些流程不能一口气跑完。退款要等人工审批,文档审查要等法务确认,外部系统要过一会儿才回调。pause_for(...) 把这种等待变成 TriggerFlow 可以保存、恢复、审计的 interrupt barrier。
暂停不是 sleep。暂停后,execution 还活着,但当前 chunk 把控制权交回框架;外部系统后续用 continue_with(...) 恢复。
创建一个等待点
async def ask_approval(data: TriggerFlowRuntimeData):
return await data.async_pause_for(
type="approval",
payload={
"title": "退款审批",
"refund_id": data.input["refund_id"],
"amount": data.input["amount"],
},
resume_to="next",
)pause_for 会做几件事:
- 生成一个 interrupt id。
- 把等待信息暴露给
execution.get_pending_interrupts()。 - 暂停 auto-close 计时。
- 保存恢复目标,不依赖 Python 协程栈。
常用参数:
| 参数 | 作用 |
|---|---|
type= | 给应用识别的标签,如 "approval"、"human_input"、"webhook" |
payload= | 给 UI、审批系统、Webhook 接收方看的结构化信息 |
resume_to= | 恢复目标:"next"、"self" 或 {"event": "EventName"} |
resume_event= | 旧的事件式恢复快捷方式 |
interrupt_id= | 自定义 interrupt id,不传则框架生成 |
用 continue_with 恢复
execution = flow.create_execution(auto_close=False)
await execution.async_start({"refund_id": "R-2048", "amount": 399})
interrupt_id = next(iter(execution.get_pending_interrupts()))
await execution.async_continue_with(interrupt_id, {"approved": True, "reviewer": "ops"})
snapshot = await execution.async_close()使用 resume_to="next" 时,恢复 payload 会成为暂停 chunk 的输出,下一段 .to(...) 收到它。
async def commit_refund(data):
decision = data.input
await data.async_set_state("approval", decision)resume_to="self":同一个 chunk 再跑一次
有些 gate 需要恢复后回到同一个 handler。比如模型先判断是否需要人工复核;人工给出意见后,同一个 gate 再看 data.resume 并继续。
async def review_gate(data: TriggerFlowRuntimeData):
if data.is_resume:
await data.async_set_state("review_note", data.resume.value)
return {"status": "reviewed", "note": data.resume.value}
return await data.async_pause_for(
type="review",
payload={"question": "这份报告可以发布吗?"},
resume_to="self",
)data.is_resume 用来判断当前 handler 是否由 continue_with(...) 唤醒。data.resume.value 是恢复 payload。
resume_to={"event": "..."}:恢复后发事件
async def wait_webhook(data):
return await data.async_pause_for(
type="webhook",
payload={"callback_for": data.input["job_id"]},
resume_to={"event": "WebhookReceived"},
)
flow.to(wait_webhook)
flow.when("WebhookReceived").to(handle_webhook)这种写法适合“恢复后走一个事件分支”,而不是继续原来的线性链。旧的 resume_event="WebhookReceived" 也保留兼容语义。
pause 需要显式 execution
含有 pause_for(...) 的 flow 不适合用 flow.async_start(...)。隐式 execution 没有外部 handle,无法后续 continue_with(...)。
用下面这种形态:
execution = flow.create_execution(auto_close=False)
await execution.async_start(input_data)
# 把 execution.id 和 pending interrupt 写进业务系统
pending = execution.get_pending_interrupts()服务可以把 pending interrupt 展示给 UI,也可以把 execution save 写进数据库,等外部回调再恢复。
跨重启恢复
saved = execution.save()
store.save(execution.id, saved)
restored = flow.create_execution(
auto_close=False,
runtime_resources={"db": db_client, "agent": agent},
)
restored.load(store.load(execution.id))
interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()interrupt 会随 execution save 一起保存。live resources 不会保存,恢复时要重新注入。
多个并发等待
并行分支可以各自产生 interrupt。get_pending_interrupts() 会返回全部 pending id,continue_with(id, payload) 一次解开一个。
需要业务系统自己固定 id 时,可以传 interrupt_id=:
await data.async_pause_for(
type="approval",
interrupt_id=f"approval:{data.input['refund_id']}",
payload={...},
resume_to="next",
)Pause、emit、intervention 的区别
| 需求 | 用 |
|---|---|
| 当前流程要等外部答案才能继续 | pause_for(...) + continue_with(...) |
| 触发图内另一个分支,不要求当前 chunk 等待人工 | emit(...) + when(...) |
| 外部想补充上下文,但流程不应停下等待 | runtime intervention |
把等待写成 pause,业务系统才能知道“这次 execution 在等什么”。把补充信息写成 intervention,流程不会被不必要地挂住。
auto_close 和 close
只要存在 pending interrupt,auto_close=True 不会触发。continue_with(...) 解掉最后一个 pending interrupt 后,execution 重新进入空闲,auto-close 计时从零开始。
async_close() 默认拒绝关闭仍有 pending interrupt 的 execution。确实要放弃等待时,显式取消:
snapshot = await execution.async_close(pending_interrupts="cancel")另见
- Lifecycle - pause 对 auto-close 和 close 的影响
- 持久化与 Blueprint - save/load 恢复等待点
- Runtime Intervention - 不暂停流程地补充上下文
- Sub-Flow - 子流 pause 怎么投影到父 execution