when 事件分支
when() 可以等待 Chunk 完成信号、runtime_data 变化 或 自定义事件,适合事件驱动的分支场景。
场景:等待某个 Chunk 完成后再继续
适合“阶段联动”:上游任务完成后,下游任务才启动。
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def parse(data: TriggerFlowEventData):
return f"parsed:{data.value}"
@flow.chunk
async def build(data: TriggerFlowEventData):
return f"built:{data.value}"
flow.to(parse)
flow.when(parse).to(build).end()
print(flow.start("doc"))输出:
text
built:parsed:doc场景:等待 runtime_data 变化
适合“状态到齐再触发”的场景,例如用户信息写入后再开始后续流程。
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def print_user(data: TriggerFlowEventData):
print(f"user:{data.value['id']}")
flow.when({"runtime_data": "user"}).to(print_user)
execution = flow.create_execution()
execution.set_runtime_data("user", {"id": "u-1"})输出:
text
user:u-1场景:等待自定义事件(emit)
适合外部信号驱动(消息系统、Webhook、用户操作)。
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def print_join(data: TriggerFlowEventData):
print(f"join:{data.value['id']}")
flow.when("User.Join").to(print_join)
execution = flow.create_execution()
execution.emit("User.Join", {"id": "u-9"})输出:
text
join:u-9when 的 mode:and / or / simple_or
when() 可同时监听多个信号,mode 决定触发逻辑:
- and:全部到齐才触发,输出聚合字典
- or:任意一个触发,输出
(type, event, value) - simple_or:任意一个触发,只输出
value
mode="and"
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def print_value(data: TriggerFlowEventData):
print(data.value)
flow.when({"event": ["A", "B"]}, mode="and").to(print_value)
execution = flow.create_execution()
execution.emit("A", 1)
execution.emit("B", 2)输出:
text
{'event': {'A': 1, 'B': 2}}mode="or"
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def print_value(data: TriggerFlowEventData):
print(data.value)
flow.when({"event": ["A", "B"]}, mode="or").to(print_value)
execution = flow.create_execution()
execution.emit("A", 1)
execution.emit("B", 2)输出:
text
('event', 'A', 1)
('event', 'B', 2)mode="simple_or"
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
def print_value(data: TriggerFlowEventData):
print(data.value)
flow.when({"event": ["A", "B"]}, mode="simple_or").to(print_value)
execution = flow.create_execution()
execution.emit("A", 1)
execution.emit("B", 2)输出:
text
1
2