TriggerFlow 编排 Playbook
当一个 AI 过程有多个阶段时,写成一段长函数很快会失控:分类结果决定分支,多个 item 要并行处理,中途还可能等人工或外部回调。TriggerFlow 的作用,是把这些阶段变成可观测、可暂停、可恢复的 execution。
先判断是否需要 TriggerFlow
| 过程特点 | 建议 |
|---|---|
| 只是一次模型请求加结构化输出 | 留在 request 层 |
| 只有一个 Action 调用 | 留在 Agent / Action 层 |
| 有 3 个以上业务阶段 | 考虑 TriggerFlow |
| 中间结果决定分支 | TriggerFlow |
| 要 fan-out 并行处理一组 item | TriggerFlow |
| 要 pause 等人工或 Webhook | TriggerFlow |
| 要把进度流给 UI | TriggerFlow runtime stream |
| 要跨进程重启恢复 | TriggerFlow save/load |
不是所有流程都要上编排。TriggerFlow 适合让阶段边界、等待点和输出流变清楚。
推荐结构
text
build_flow()
|
|-- prepare 校验和归一化输入
|-- classify 模型结构化输出
|-- branch if / match / case
|-- fan-out batch 或 for_each
|-- pause 人工 / Webhook 等待
|-- finalize 写最终 state
run_flow()
|
|-- create_execution(auto_close=False, runtime_resources={...})
|-- async_start(...)
|-- consume runtime stream
|-- async_close() -> close snapshot服务场景里,flow definition 和一次 execution 分开。definition 可以作为模块级对象或 builder;execution 是某次请求。
基础骨架
python
from agently import TriggerFlow, TriggerFlowRuntimeData
def build_flow():
flow = TriggerFlow(name="review-flow")
async def prepare(data: TriggerFlowRuntimeData):
payload = {"text": data.input["text"], "source": data.input.get("source")}
await data.async_set_state("request", payload)
return payload
async def classify(data: TriggerFlowRuntimeData):
agent = data.require_resource("agent")
result = (
agent
.input(data.input["text"])
.output({"category": (str, "A/B/C", True)})
.get_result()
)
classification = await result.async_get_data(ensure_keys=["category"])
await data.async_set_state("classification", classification)
return classification
async def handle_default(data: TriggerFlowRuntimeData):
await data.async_set_state("answer", {"category": data.input["category"]})
(
flow.to(prepare)
.to(classify)
.if_condition(lambda data: data.input["category"] == "A")
.to(handle_default)
.elif_condition(lambda data: data.input["category"] == "B")
.to(handle_default)
.else_condition()
.to(handle_default)
.end_condition()
)
return flow
async def run(input_value, agent):
flow = build_flow()
execution = flow.create_execution(
auto_close=False,
runtime_resources={"agent": agent},
)
await execution.async_start(input_value)
return await execution.async_close()几个边界:
- agent 是 live 对象,放
runtime_resources。 - 中间业务结果写
state,进入 close snapshot。 - 服务端显式 close,便于接 stream、pause、外部事件。
fan-out:处理未知数量 item
python
async def list_subtasks(data):
return data.input["subtasks"]
async def handle_one(data):
agent = data.require_resource("agent")
result = agent.input(data.input).output({
"summary": (str, "摘要", True),
}).get_result()
return await result.async_get_data(ensure_keys=["summary"])
(
flow.to(list_subtasks)
.for_each(concurrency=4)
.to(handle_one)
.end_for_each()
.to(collect)
)concurrency 按模型限速和下游 API 承载设置。结果会按输入顺序汇总成 list。
pause:等人工或外部系统
python
async def ask_approval(data):
return await data.async_pause_for(
type="approval",
payload={"summary": data.input["summary"]},
resume_to="next",
)含 pause 的 execution 要保留 handle,通常也要保存 execution state:
python
execution = flow.create_execution(auto_close=False, runtime_resources={...})
await execution.async_start(input_value)
saved = execution.save()恢复时重新创建 execution、注入资源、load saved,再 continue_with(...)。
runtime stream:给 UI 进度
chunk 内推 item:
python
await data.async_put_into_stream({
"stage": "classify",
"status": "done",
"category": classification["category"],
})服务侧消费:
python
async for item in execution.get_async_runtime_stream(timeout=None):
send_to_client(item)最终业务结果仍然从 async_close() 返回的 close snapshot 读取。
save/load:跨重启恢复
python
saved = execution.save()
db.put(execution.id, saved)
restored = flow.create_execution(
auto_close=False,
runtime_resources={"agent": agent, "db": db_client},
)
restored.load(db.get(execution.id))save 保存 state、pending interrupt、metadata 和 resource_keys;不保存 live resources,也不保存正在运行到一半的协程。
常见误用
| 写法 | 问题 |
|---|---|
| 为了整理代码而拆子流 | 子流应该有复用契约,不只是缩短文件 |
| 在 chunk 里手写额外模型重试 | request 层已有 validate / retry;流程级重试要显式建图 |
| 把 DB client、agent、collection 放进 state | snapshot 不该包含 live 对象 |
只用 flow.async_start(...) 却又想 pause / 外部 emit | 隐式 execution 没有可恢复 handle |
另见
- Lifecycle - execution 入口与 close
- TriggerFlow 模式 - 分支、fan-out、loop
- 模型集成 - chunk 里调用 Agent
- State 与 Resources - state 和 runtime_resources 边界
- FastAPI 服务封装 - 把 stream 暴露给客户端