Skip to content

runtime_data、flow_data 与事件触发

在 TriggerFlow 中,状态本身就是信号set_runtime_data()set_flow_data() 不只是写数据,还会触发同名事件,被 when() 直接捕获并驱动后续任务。

runtime_data(执行级)

只影响当前执行实例,用于保存“本次运行的上下文”。变更会触发对应 key 的事件:

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def set_runtime(data: TriggerFlowEventData):
  data.set_runtime_data("user_id", "u-001")
  return "runtime ok"

@flow.chunk
def print_value(data: TriggerFlowEventData):
  print(data.value)

flow.when({"runtime_data": "user_id"}).to(print_value)

flow_data(全局级)

属于 Flow 本身,多个执行实例共享。变更会触发所有执行实例中对应的 when({"flow_data": ...})

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def print_value(data: TriggerFlowEventData):
  print(data.value)

flow.set_flow_data("env", "prod")
flow.when({"flow_data": "env"}).to(print_value)

注意:flow_data 会影响当前 Flow 下所有正在执行未来将要执行的 Execution。除非场景非常确定,一般不推荐使用。尤其当你把 Flow 作为服务接口的基础对象,为不同用户请求创建 Execution 时,避免使用 flow_data,否则容易造成用户数据泄露或交叉污染。

collect:多分支结果汇合

collect() 用于在多个分支完成后统一汇总,并 emit Collect-xxx 事件:

python
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
async def set_runtime(data: TriggerFlowEventData):
  data.set_runtime_data("user_id", "u-001")
  return "r1"

@flow.chunk
async def set_runtime_context(data: TriggerFlowEventData):
  data.set_runtime_data("env", "prod")
  return "r2"

@flow.chunk
def print_collect(data: TriggerFlowEventData):
  print(data.value)

flow.to(set_runtime).collect("done", "r1")
flow.to(set_runtime_context).collect("done", "r2").to(print_collect).end()

collect() 支持两种模式:

  • filled_and_update:填满后触发,后续更新仍会触发
  • filled_then_empty:填满触发一次后清空集合