English mirror
This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.
模式
TriggerFlow 不是为了把所有 Python 逻辑都画成图。普通计算仍然写在函数里;图层负责那些需要被看清、被观测、能等待、能并发、能恢复的业务阶段。
一个实用判断是:如果这一步需要出现在流程图、日志、UI 进度或人工协作里,就把它放成 chunk;如果只是局部计算,留在 chunk 函数内部。
线性链:上一段的输出进下一段
flow.to(normalize).to(classify).to(draft_reply)每个 handler 的返回值会成为下一个 handler 的 data.input。需要留到最终 snapshot 的值,写进 state:
async def classify(data):
result = {"team": "billing", "priority": "high"}
await data.async_set_state("classification", result)
return result线性链适合稳定的处理管线:清洗输入、分类、检索、生成、保存。
if / elif / else:按条件走不同路径
async def score(data):
return {"score": 82}
async def store_grade(data):
await data.async_set_state("grade", data.input)
(
flow.to(score)
.if_condition(lambda data: data.input["score"] >= 90)
.to(lambda _: "A")
.elif_condition(lambda data: data.input["score"] >= 80)
.to(lambda _: "B")
.else_condition()
.to(lambda _: "C")
.end_condition()
.to(store_grade)
)end_condition() 会关闭条件分支,把链交还给后续 .to(...)。被选中的分支返回值会传给下一 chunk。
条件分支适合 predicate 逻辑,比如分数阈值、字段是否存在、模型返回的置信度是否足够。
match / case:少量离散值分发
(
flow.to(lambda _: "refund")
.match()
.case("refund").to(handle_refund)
.case("invoice").to(handle_invoice)
.case("shipping").to(handle_shipping)
.case_else().to(handle_general)
.end_match()
.to(store_result)
)match() 看的是前一段的 data.input。标签少、值清楚时用它;判断条件比较复杂时,用 if_condition(...)。
batch:同一份输入并行跑多个分支
flow.batch(
("policy", check_policy),
("risk", check_risk),
("history", load_customer_history),
).to(merge_context)batch 适合互不依赖的并行工作:同时查策略、查风险、查历史,再合并。
execution 可以设置全局并发上限:
execution = flow.create_execution(concurrency=2)concurrency 是这次 execution 的 handler dispatch 上限。batch(..., concurrency=...) 和 for_each(..., concurrency=...) 则是局部 fan-out 上限。服务里通常会同时用全局上限和局部上限,避免一个请求把所有并发都占满。
for_each:对一组 item 跑同一段逻辑
async def draft_section(data):
return {"section": data.input, "draft": f"draft for {data.input}"}
(
flow.to(lambda _: ["overview", "risk", "next_steps"])
.for_each(concurrency=2)
.to(draft_section)
.end_for_each()
.to(merge_sections)
)for_each 会把前一段输出拆成 item。非字符串 Sequence 会被拆开,标量会被当成单个 item。每个 item 在并发上限内跑 body,结果按输入顺序收集成 list。
如果业务是“按数字 N 循环 N 次”,先显式生成序列:
async def make_range(data):
return list(range(data.input))
flow.to(make_range).for_each().to(work_one).end_for_each()emit + when:事件驱动循环
Python for 当然可以写在 handler 里。需要让循环在图层可见、可以被观测、可以被外部事件影响时,用 emit + when。
flow = TriggerFlow(name="review-loop")
async def start_loop(data):
await data.async_set_state("history", [], emit=False)
data.emit_nowait("ReviewRound", {"round": 1, "draft": data.input})
async def review_round(data):
history = data.get_state("history", []) or []
round_no = data.input["round"]
history.append({"round": round_no, "draft": data.input["draft"]})
await data.async_set_state("history", history, emit=False)
if round_no < 3:
data.emit_nowait("ReviewRound", {"round": round_no + 1, "draft": data.input["draft"]})
else:
await data.async_set_state("final", data.input["draft"])
flow.to(start_loop)
flow.when("ReviewRound").to(review_round)emit=False 可以减少热循环里的观测开销。长循环要给 execution 设置合适的 auto_close_timeout,或者使用 auto_close=False 并在业务完成后手动 close。
旁路:不阻塞主链的日志和通知
when(...) 分支和主链可以独立运行。它适合日志、telemetry、通知这类旁路。
async def main_step(data):
result = {"ok": True}
data.emit_nowait("MainStepDone", result)
return result
flow.to(main_step)
flow.when("MainStepDone").to(write_metric)
flow.when("MainStepDone").to(send_audit_log)如果旁路失败不应该影响主结果,就在旁路 handler 自己处理异常,并把错误写进日志或 state。不要让业务主链隐式依赖一个 fire-and-forget 旁路。
模式选择
| 业务形态 | 更合适的模式 |
|---|---|
| 固定阶段处理 | 线性链 |
| 按规则选择路径 | if_condition |
| 按少量标签分发 | match |
| 多个独立检查并行 | batch |
| 同一逻辑处理多个 item | for_each |
| 由信号推进的循环或旁路 | emit + when |
| 可复用的一组流程 | Sub-Flow |