TriggerFlow 核心概念
TriggerFlow 把流程拆解为信号、节点与运行态。理解这些概念,才能正确组织复杂编排。
Flow 与 Execution
- TriggerFlow(Flow):流程定义本身(蓝图 + 规则)。
- Execution(执行实例):一次运行的上下文与状态,每次
start()都会创建。
信号与触发类型
TriggerFlow 的触发信号分三类:
- event:
emit()产生的事件信号 - runtime_data:
set_runtime_data()变更 key 时产生信号 - flow_data:
set_flow_data()变更 key 时产生信号(影响所有执行实例)
注意:
flow_data会影响当前 Flow 下所有正在执行与未来将要执行的 Execution。除非场景非常确定,一般不推荐使用。尤其当你把 Flow 作为服务接口的基础对象,为不同用户请求创建 Execution 时,避免使用flow_data,否则容易造成用户数据泄露或交叉污染。
这三类信号都可以被 when() 捕获。
to 与 Chunk(完成信号)
to() 的本质是“把处理任务挂到当前触发信号上”。任务执行完成后,会自动发出一个 完成信号:
- 每个 Chunk 都有独立 trigger(形如
Chunk[xxx]-<id>) - 上游信号触发 Chunk 执行
- Chunk 完成后 emit 自己的完成信号
- 下游既可以继续
to(),也可以通过when()监听
Chunk:显式命名与规范写法
在工程实践里,建议先用 @flow.chunk 定义“具名节点”,再用 flow.to() / flow.when().to() 组织流程。这样既可复用,也更易读。
to() 直接传函数是便利写法,适合快速验证;规范写法是把函数提升为 Chunk,再按名称挂接。
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk("normalize_input")
async def normalize(data: TriggerFlowEventData):
return str(data.value).strip()
@flow.chunk
async def greet(data: TriggerFlowEventData):
return f"Hello, {data.value}"
flow.to("normalize_input").to(greet).end()
execution = flow.create_execution()
result = execution.start(" Agently ")
print(result)when(事件订阅)
when() 是事件订阅器,支持:
- 单事件:
when("Plan.Read") - 运行态:
when({"runtime_data": "user"}) - 多事件聚合:
when({"event": ["A", "B"]}, mode="and")
多事件聚合会生成一个新的内部事件(When-xxx),用于后续链路。
TriggerFlowEventData
节点入参为 TriggerFlowEventData,包含:
value:当前事件 payloadevent/trigger_eventtype/trigger_type- 运行态方法:
set_runtime_data/get_runtime_data - 事件方法:
emit/async_emit - 流式输出:
put_into_stream/stop_stream
collect(分支聚合)
collect() 用于多分支结果汇合,等待多个分支填充后再触发 Collect-xxx 事件。
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def reader(data: TriggerFlowEventData):
return f"read: {data.value}"
@flow.chunk
def writer(data: TriggerFlowEventData):
return f"write: {data.value}"
@flow.chunk
def print_collect(data: TriggerFlowEventData):
print(data.value)
flow.when("Plan.Read").to(reader).collect("plan", "read")
flow.when("Plan.Write").to(writer).collect("plan", "write").to(print_collect).end()runtime_stream(运行时流)
运行过程中可用 put_into_stream() 输出状态片段,通过 get_runtime_stream() 实时消费。
蓝图(BluePrint)
Flow 结构可 save_blue_print() 导出,并用 load_blue_print() 复用。
从 token 输出到实时信号
这部分提供 TriggerFlow 信号逻辑的背景,详见:从 token 输出到实时信号。