基本流程与信号链
TriggerFlow 的“链式写法”本质是信号链:to() 把处理任务挂到当前信号上,任务完成后 emit 完成信号,下一段 to() 继续等待该信号。
线性链路
python
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def normalize(data: TriggerFlowEventData):
return str(data.value).strip()
@flow.chunk
async def greet(data: TriggerFlowEventData):
return f"Hello, {data.value}"
flow.to(normalize).to(greet).end()
result = flow.start(" Agently ")
print(result)直接在 to() 里传函数是便利写法,适合快速验证;更规范的方式见「Chunk 概念」部分。
side_branch:不阻塞的旁路
旁路用于“记录、监控、打印”之类的非主流程工作。它不改变当前信号链:
python
flow = TriggerFlow()
@flow.chunk
async def main_task(data: TriggerFlowEventData):
return f"main: {data.value}"
@flow.chunk
async def side_task(data: TriggerFlowEventData):
print(f"[side] {data.value}")
return "side done"
@flow.chunk
def print_main(data: TriggerFlowEventData):
print("[main]", data.value)
(
flow.to(main_task)
.____(print_info=True, show_value=True)
.side_branch(side_task)
.to(print_main)
.end()
)