Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

模式

TriggerFlow 不是为了把所有 Python 逻辑都画成图。普通计算仍然写在函数里;图层负责那些需要被看清、被观测、能等待、能并发、能恢复的业务阶段。

一个实用判断是:如果这一步需要出现在流程图、日志、UI 进度或人工协作里,就把它放成 chunk;如果只是局部计算,留在 chunk 函数内部。

线性链:上一段的输出进下一段

python
flow.to(normalize).to(classify).to(draft_reply)

每个 handler 的返回值会成为下一个 handler 的 data.input。需要留到最终 snapshot 的值,写进 state:

python
async def classify(data):
    result = {"team": "billing", "priority": "high"}
    await data.async_set_state("classification", result)
    return result

线性链适合稳定的处理管线:清洗输入、分类、检索、生成、保存。

if / elif / else:按条件走不同路径

python
async def score(data):
    return {"score": 82}


async def store_grade(data):
    await data.async_set_state("grade", data.input)


(
    flow.to(score)
    .if_condition(lambda data: data.input["score"] >= 90)
        .to(lambda _: "A")
    .elif_condition(lambda data: data.input["score"] >= 80)
        .to(lambda _: "B")
    .else_condition()
        .to(lambda _: "C")
    .end_condition()
    .to(store_grade)
)

end_condition() 会关闭条件分支,把链交还给后续 .to(...)。被选中的分支返回值会传给下一 chunk。

条件分支适合 predicate 逻辑,比如分数阈值、字段是否存在、模型返回的置信度是否足够。

match / case:少量离散值分发

python
(
    flow.to(lambda _: "refund")
    .match()
        .case("refund").to(handle_refund)
        .case("invoice").to(handle_invoice)
        .case("shipping").to(handle_shipping)
        .case_else().to(handle_general)
    .end_match()
    .to(store_result)
)

match() 看的是前一段的 data.input。标签少、值清楚时用它;判断条件比较复杂时,用 if_condition(...)

batch:同一份输入并行跑多个分支

python
flow.batch(
    ("policy", check_policy),
    ("risk", check_risk),
    ("history", load_customer_history),
).to(merge_context)

batch 适合互不依赖的并行工作:同时查策略、查风险、查历史,再合并。

execution 可以设置全局并发上限:

python
execution = flow.create_execution(concurrency=2)

concurrency 是这次 execution 的 handler dispatch 上限。batch(..., concurrency=...)for_each(..., concurrency=...) 则是局部 fan-out 上限。服务里通常会同时用全局上限和局部上限,避免一个请求把所有并发都占满。

for_each:对一组 item 跑同一段逻辑

python
async def draft_section(data):
    return {"section": data.input, "draft": f"draft for {data.input}"}


(
    flow.to(lambda _: ["overview", "risk", "next_steps"])
    .for_each(concurrency=2)
        .to(draft_section)
    .end_for_each()
    .to(merge_sections)
)

for_each 会把前一段输出拆成 item。非字符串 Sequence 会被拆开,标量会被当成单个 item。每个 item 在并发上限内跑 body,结果按输入顺序收集成 list。

如果业务是“按数字 N 循环 N 次”,先显式生成序列:

python
async def make_range(data):
    return list(range(data.input))


flow.to(make_range).for_each().to(work_one).end_for_each()

emit + when:事件驱动循环

Python for 当然可以写在 handler 里。需要让循环在图层可见、可以被观测、可以被外部事件影响时,用 emit + when

python
flow = TriggerFlow(name="review-loop")


async def start_loop(data):
    await data.async_set_state("history", [], emit=False)
    data.emit_nowait("ReviewRound", {"round": 1, "draft": data.input})


async def review_round(data):
    history = data.get_state("history", []) or []
    round_no = data.input["round"]
    history.append({"round": round_no, "draft": data.input["draft"]})
    await data.async_set_state("history", history, emit=False)

    if round_no < 3:
        data.emit_nowait("ReviewRound", {"round": round_no + 1, "draft": data.input["draft"]})
    else:
        await data.async_set_state("final", data.input["draft"])


flow.to(start_loop)
flow.when("ReviewRound").to(review_round)

emit=False 可以减少热循环里的观测开销。长循环要给 execution 设置合适的 auto_close_timeout,或者使用 auto_close=False 并在业务完成后手动 close。

旁路:不阻塞主链的日志和通知

when(...) 分支和主链可以独立运行。它适合日志、telemetry、通知这类旁路。

python
async def main_step(data):
    result = {"ok": True}
    data.emit_nowait("MainStepDone", result)
    return result


flow.to(main_step)
flow.when("MainStepDone").to(write_metric)
flow.when("MainStepDone").to(send_audit_log)

如果旁路失败不应该影响主结果,就在旁路 handler 自己处理异常,并把错误写进日志或 state。不要让业务主链隐式依赖一个 fire-and-forget 旁路。

模式选择

业务形态更合适的模式
固定阶段处理线性链
按规则选择路径if_condition
按少量标签分发match
多个独立检查并行batch
同一逻辑处理多个 itemfor_each
由信号推进的循环或旁路emit + when
可复用的一组流程Sub-Flow

另见

  • 事件与流 - emit / when 的细节
  • Sub-Flow - 把一组 flow 当作父流程里的一个 chunk
  • Lifecycle - batch、for_each、event loop 什么时候 drain 完