Skip to content

Runtime Intervention

流程运行中,用户可能突然补一份附件、运营可能修正一条事实、审核人可能留下一句备注。流程不一定要停下来等这些信息,但后续 chunk 应该能看到它们,并且系统要留下审计记录。

Runtime intervention 处理的就是这件事。它让外部代码在 execution 仍然 open 时追加上下文。TriggerFlow 先把它记录进 ledger,再在安全边界插入,让后续 chunk 可见。

如果流程要停住等外部答案,用 Pause 与 Resume。如果只是补充上下文,用 runtime intervention。

两种模式

模式适合
plannedflow 已经声明“这里允许外部补充上下文”
autoflow 没有显式点位,系统在 chunk dispatch 前按目标自动插入

Runtime intervention 默认关闭。flow 声明了 intervention_point(...) 时,创建 execution 如果不传 intervention_mode,会推断为 planned 模式。明确不想启用时,传 intervention_mode=None

planned:在明确点位插入

python
(
    flow
    .to(extract_terms)
    .intervention_point(name="before_risk", target="before_risk")
    .to(risk_assessment)
)

外部补充上下文:

python
await execution.async_intervene(
    {"text": "附件 A 是最新报价表,以附件 A 为准。"},
    author="reviewer",
    target="before_risk",
)

async_intervene(...) 只记录 pending ledger item。它不会 emit 事件,不会暂停 graph,也不会改写当前 data.input

risk_assessment 运行前,目标为 before_risk 的 intervention 会被插入,chunk 可以读取:

python
async def risk_assessment(data: TriggerFlowRuntimeData):
    supplements = data.get_interventions(status="inserted", target="before_risk")
    result = await assess_with_model({
        "terms": data.input,
        "supplements": [item["payload"] for item in supplements],
    })

    for item in supplements:
        await data.async_mark_intervention_consumed(
            item["id"],
            status="applied",
        )
    return result

读取不会自动消费。chunk 用完后,调用 mark_intervention_consumed(...),把这条上下文标记成 appliedignored

auto:按 operator 目标插入

intervention_mode="auto" 会在每个 chunk dispatch 前检查 pending intervention。带 target 的 intervention 会匹配 operator id、name、kind、group id 或 group kind;不带 target 的 intervention 会在下一个安全边界插入。

python
execution = flow.create_execution(
    auto_close=False,
    intervention_mode="auto",
)

await execution.async_intervene(
    {"note": "客户刚刚补充了企业抬头。"},
    author="support",
    target="classify_ticket",
)

声明了 intervention_point(...) 的 flow 不能同时用 auto 模式。planned 模式强调设计好的人工上下文点位;auto 模式强调运行时按 operator 目标补充。

ledger 的状态

一条 intervention 从进入到结束,大致会经历这些动作:

动作含义
append外部提交了 intervention
insertTriggerFlow 在安全边界把它插入 execution
consumechunk 标记已采用或已忽略
expireexecution close 时仍未插入或未消费
reject目标或策略不接受这条 intervention

流程结束后可以从 result 里审计:

python
applied = execution.result.get_interventions(status="applied")
expired = execution.result.get_interventions(status="expired")

close snapshot 里也会包含 "$interventions",便于业务系统保存完整执行结果。

save/load 怎么处理 intervention

execution.save() / execution.load(saved) 会保存 intervention mode、ledger、version counter、插入 metadata、过期状态和消费 metadata。

运行时 policy callable 不会序列化。恢复 auto-mode execution 时,如果没有重新传入 callable,会使用内置 policy。业务自定义 policy 应在恢复时重新注入。

runtime stream item

intervention 生命周期会进入 runtime stream,便于 UI 或日志实时展示:

python
{
    "type": "intervention",
    "action": "append",
    "execution_id": execution.id,
    "intervention": {...},
}

旧的 stream consumer 可以忽略未知 type。只处理业务 item 的前端也可以按 type 过滤。

Agently 的 observation event 另走 Event Center,常见事件包括 triggerflow.intervention_receivedtriggerflow.intervention_insertedtriggerflow.intervention_expiredtriggerflow.intervention_consumedtriggerflow.intervention_rejected

Intervention 和 Pause 怎么选

场景
审批结果会决定是否继续pause_for(...)
用户补充了附件摘要,后续模型可参考runtime intervention
外部系统回调后要走某条事件分支pause_for(..., resume_to={"event": ...}) 或外部 emit
审核人只是给风险评估加一句备注planned intervention

判断很简单:流程是不是要等这个外部输入。如果要等,用 pause;如果不等,只是把上下文带给后续步骤,用 intervention。

参见

  • Pause 与 Resume - 等待外部答案并恢复 graph
  • 事件与流 - runtime stream 怎么消费
  • Execution Result - intervention ledger 的 reader
  • examples/step_by_step/11-triggerflow-21_document_review_runtime_intervention.py
  • examples/step_by_step/11-triggerflow-22_ticket_triage_auto_intervention.py