Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

Pause 与 Resume

有些流程不能一口气跑完。退款要等人工审批,文档审查要等法务确认,外部系统要过一会儿才回调。pause_for(...) 把这种等待变成 TriggerFlow 可以保存、恢复、审计的 interrupt barrier。

暂停不是 sleep。暂停后,execution 还活着,但当前 chunk 把控制权交回框架;外部系统后续用 continue_with(...) 恢复。

创建一个等待点

python
async def ask_approval(data: TriggerFlowRuntimeData):
    return await data.async_pause_for(
        type="approval",
        payload={
            "title": "退款审批",
            "refund_id": data.input["refund_id"],
            "amount": data.input["amount"],
        },
        resume_to="next",
    )

pause_for 会做几件事:

  • 生成一个 interrupt id。
  • 把等待信息暴露给 execution.get_pending_interrupts()
  • 暂停 auto-close 计时。
  • 保存恢复目标,不依赖 Python 协程栈。

常用参数:

参数作用
type=给应用识别的标签,如 "approval""human_input""webhook"
payload=给 UI、审批系统、Webhook 接收方看的结构化信息
resume_to=恢复目标:"next""self"{"event": "EventName"}
resume_event=旧的事件式恢复快捷方式
interrupt_id=自定义 interrupt id,不传则框架生成

用 continue_with 恢复

python
execution = flow.create_execution(auto_close=False)
await execution.async_start({"refund_id": "R-2048", "amount": 399})

interrupt_id = next(iter(execution.get_pending_interrupts()))
await execution.async_continue_with(interrupt_id, {"approved": True, "reviewer": "ops"})

snapshot = await execution.async_close()

使用 resume_to="next" 时,恢复 payload 会成为暂停 chunk 的输出,下一段 .to(...) 收到它。

python
async def commit_refund(data):
    decision = data.input
    await data.async_set_state("approval", decision)

resume_to="self":同一个 chunk 再跑一次

有些 gate 需要恢复后回到同一个 handler。比如模型先判断是否需要人工复核;人工给出意见后,同一个 gate 再看 data.resume 并继续。

python
async def review_gate(data: TriggerFlowRuntimeData):
    if data.is_resume:
        await data.async_set_state("review_note", data.resume.value)
        return {"status": "reviewed", "note": data.resume.value}

    return await data.async_pause_for(
        type="review",
        payload={"question": "这份报告可以发布吗?"},
        resume_to="self",
    )

data.is_resume 用来判断当前 handler 是否由 continue_with(...) 唤醒。data.resume.value 是恢复 payload。

resume_to={"event": "..."}:恢复后发事件

python
async def wait_webhook(data):
    return await data.async_pause_for(
        type="webhook",
        payload={"callback_for": data.input["job_id"]},
        resume_to={"event": "WebhookReceived"},
    )


flow.to(wait_webhook)
flow.when("WebhookReceived").to(handle_webhook)

这种写法适合“恢复后走一个事件分支”,而不是继续原来的线性链。旧的 resume_event="WebhookReceived" 也保留兼容语义。

pause 需要显式 execution

含有 pause_for(...) 的 flow 不适合用 flow.async_start(...)。隐式 execution 没有外部 handle,无法后续 continue_with(...)

用下面这种形态:

python
execution = flow.create_execution(auto_close=False)
await execution.async_start(input_data)

# 把 execution.id 和 pending interrupt 写进业务系统
pending = execution.get_pending_interrupts()

服务可以把 pending interrupt 展示给 UI,也可以把 execution save 写进数据库,等外部回调再恢复。

跨重启恢复

python
saved = execution.save()
store.save(execution.id, saved)

restored = flow.create_execution(
    auto_close=False,
    runtime_resources={"db": db_client, "agent": agent},
)
restored.load(store.load(execution.id))

interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()

interrupt 会随 execution save 一起保存。live resources 不会保存,恢复时要重新注入。

多个并发等待

并行分支可以各自产生 interrupt。get_pending_interrupts() 会返回全部 pending id,continue_with(id, payload) 一次解开一个。

需要业务系统自己固定 id 时,可以传 interrupt_id=

python
await data.async_pause_for(
    type="approval",
    interrupt_id=f"approval:{data.input['refund_id']}",
    payload={...},
    resume_to="next",
)

Pause、emit、intervention 的区别

需求
当前流程要等外部答案才能继续pause_for(...) + continue_with(...)
触发图内另一个分支,不要求当前 chunk 等待人工emit(...) + when(...)
外部想补充上下文,但流程不应停下等待runtime intervention

把等待写成 pause,业务系统才能知道“这次 execution 在等什么”。把补充信息写成 intervention,流程不会被不必要地挂住。

auto_close 和 close

只要存在 pending interrupt,auto_close=True 不会触发。continue_with(...) 解掉最后一个 pending interrupt 后,execution 重新进入空闲,auto-close 计时从零开始。

async_close() 默认拒绝关闭仍有 pending interrupt 的 execution。确实要放弃等待时,显式取消:

python
snapshot = await execution.async_close(pending_interrupts="cancel")

另见