runtime_data、flow_data 与事件触发
在 TriggerFlow 中,状态本身就是信号。set_runtime_data() 与 set_flow_data() 不只是写数据,还会触发同名事件,被 when() 直接捕获并驱动后续任务。
runtime_data(执行级)
只影响当前执行实例,用于保存“本次运行的上下文”。变更会触发对应 key 的事件:
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def set_runtime(data: TriggerFlowEventData):
data.set_runtime_data("user_id", "u-001")
return "runtime ok"
@flow.chunk
def print_value(data: TriggerFlowEventData):
print(data.value)
flow.when({"runtime_data": "user_id"}).to(print_value)flow_data(全局级)
属于 Flow 本身,多个执行实例共享。变更会触发所有执行实例中对应的 when({"flow_data": ...}):
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def print_value(data: TriggerFlowEventData):
print(data.value)
flow.set_flow_data("env", "prod")
flow.when({"flow_data": "env"}).to(print_value)注意:
flow_data会影响当前 Flow 下所有正在执行与未来将要执行的 Execution。除非场景非常确定,一般不推荐使用。尤其当你把 Flow 作为服务接口的基础对象,为不同用户请求创建 Execution 时,避免使用flow_data,否则容易造成用户数据泄露或交叉污染。
collect:多分支结果汇合
collect() 用于在多个分支完成后统一汇总,并 emit Collect-xxx 事件:
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def set_runtime(data: TriggerFlowEventData):
data.set_runtime_data("user_id", "u-001")
return "r1"
@flow.chunk
async def set_runtime_context(data: TriggerFlowEventData):
data.set_runtime_data("env", "prod")
return "r2"
@flow.chunk
def print_collect(data: TriggerFlowEventData):
print(data.value)
flow.to(set_runtime).collect("done", "r1")
flow.to(set_runtime_context).collect("done", "r2").to(print_collect).end()collect() 支持两种模式:
filled_and_update:填满后触发,后续更新仍会触发filled_then_empty:填满触发一次后清空集合