English mirror
This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.
Runtime Intervention
流程运行中,用户可能突然补一份附件、运营可能修正一条事实、审核人可能留下一句备注。流程不一定要停下来等这些信息,但后续 chunk 应该能看到它们,并且系统要留下审计记录。
Runtime intervention 处理的就是这件事。它让外部代码在 execution 仍然 open 时追加上下文。TriggerFlow 先把它记录进 ledger,再在安全边界插入,让后续 chunk 可见。
如果流程要停住等外部答案,用 Pause 与 Resume。如果只是补充上下文,用 runtime intervention。
两种模式
| 模式 | 适合 |
|---|---|
planned | flow 已经声明“这里允许外部补充上下文” |
auto | flow 没有显式点位,系统在 chunk dispatch 前按目标自动插入 |
Runtime intervention 默认关闭。flow 声明了 intervention_point(...) 时,创建 execution 如果不传 intervention_mode,会推断为 planned 模式。明确不想启用时,传 intervention_mode=None。
planned:在明确点位插入
(
flow
.to(extract_terms)
.intervention_point(name="before_risk", target="before_risk")
.to(risk_assessment)
)外部补充上下文:
await execution.async_intervene(
{"text": "附件 A 是最新报价表,以附件 A 为准。"},
author="reviewer",
target="before_risk",
)async_intervene(...) 只记录 pending ledger item。它不会 emit 事件,不会暂停 graph,也不会改写当前 data.input。
risk_assessment 运行前,目标为 before_risk 的 intervention 会被插入,chunk 可以读取:
async def risk_assessment(data: TriggerFlowRuntimeData):
supplements = data.get_interventions(status="inserted", target="before_risk")
result = await assess_with_model({
"terms": data.input,
"supplements": [item["payload"] for item in supplements],
})
for item in supplements:
await data.async_mark_intervention_consumed(
item["id"],
status="applied",
)
return result读取不会自动消费。chunk 用完后,调用 mark_intervention_consumed(...),把这条上下文标记成 applied 或 ignored。
auto:按 operator 目标插入
intervention_mode="auto" 会在每个 chunk dispatch 前检查 pending intervention。带 target 的 intervention 会匹配 operator id、name、kind、group id 或 group kind;不带 target 的 intervention 会在下一个安全边界插入。
execution = flow.create_execution(
auto_close=False,
intervention_mode="auto",
)
await execution.async_intervene(
{"note": "客户刚刚补充了企业抬头。"},
author="support",
target="classify_ticket",
)声明了 intervention_point(...) 的 flow 不能同时用 auto 模式。planned 模式强调设计好的人工上下文点位;auto 模式强调运行时按 operator 目标补充。
ledger 的状态
一条 intervention 从进入到结束,大致会经历这些动作:
| 动作 | 含义 |
|---|---|
append | 外部提交了 intervention |
insert | TriggerFlow 在安全边界把它插入 execution |
consume | chunk 标记已采用或已忽略 |
expire | execution close 时仍未插入或未消费 |
reject | 目标或策略不接受这条 intervention |
流程结束后可以从 result 里审计:
applied = execution.result.get_interventions(status="applied")
expired = execution.result.get_interventions(status="expired")close snapshot 里也会包含 "$interventions",便于业务系统保存完整执行结果。
save/load 怎么处理 intervention
execution.save() / execution.load(saved) 会保存 intervention mode、ledger、version counter、插入 metadata、过期状态和消费 metadata。
运行时 policy callable 不会序列化。恢复 auto-mode execution 时,如果没有重新传入 callable,会使用内置 policy。业务自定义 policy 应在恢复时重新注入。
runtime stream item
intervention 生命周期会进入 runtime stream,便于 UI 或日志实时展示:
{
"type": "intervention",
"action": "append",
"execution_id": execution.id,
"intervention": {...},
}旧的 stream consumer 可以忽略未知 type。只处理业务 item 的前端也可以按 type 过滤。
Agently 的 observation event 另走 Event Center,常见事件包括 triggerflow.intervention_received、triggerflow.intervention_inserted、triggerflow.intervention_expired、triggerflow.intervention_consumed 和 triggerflow.intervention_rejected。
Intervention 和 Pause 怎么选
| 场景 | 用 |
|---|---|
| 审批结果会决定是否继续 | pause_for(...) |
| 用户补充了附件摘要,后续模型可参考 | runtime intervention |
| 外部系统回调后要走某条事件分支 | pause_for(..., resume_to={"event": ...}) 或外部 emit |
| 审核人只是给风险评估加一句备注 | planned intervention |
判断很简单:流程是不是要等这个外部输入。如果要等,用 pause;如果不等,只是把上下文带给后续步骤,用 intervention。
参见
- Pause 与 Resume - 等待外部答案并恢复 graph
- 事件与流 - runtime stream 怎么消费
- Execution Result - intervention ledger 的 reader
examples/step_by_step/11-triggerflow-21_document_review_runtime_intervention.pyexamples/step_by_step/11-triggerflow-22_ticket_triage_auto_intervention.py