Skip to content

TriggerFlow 编排 Playbook

场景

多步流程需要条件分支路由并发异步状态信号

需要使用功能(关键特性)

  • to:主流程串联
  • if_condition / match:条件与路由
  • for_each:列表批处理
  • batch + concurrency:并发控制
  • runtime_data:状态信号

具体操作

  1. 先用 to 组织主链路。
  2. if_condition / match 做路由判断。
  3. for_eachbatch 处理并发。

完整代码(并发 + runtime_stream)

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()


@flow.chunk("normalize")
async def normalize(data: TriggerFlowEventData):
    topic = str(data.value).strip()
    data.set_runtime_data("topic", topic)
    data.put_into_stream({"stage": "normalized", "topic": topic})
    return topic


@flow.chunk("fetch_facts")
async def fetch_facts(data: TriggerFlowEventData):
    await asyncio.sleep(0.05)
    data.put_into_stream({"stage": "facts_ready", "topic": data.value})
    return f"facts({data.value})"


@flow.chunk("fetch_risks")
async def fetch_risks(data: TriggerFlowEventData):
    await asyncio.sleep(0.03)
    data.put_into_stream({"stage": "risks_ready", "topic": data.value})
    return f"risks({data.value})"


@flow.chunk("compile_report")
async def compile_report(data: TriggerFlowEventData):
    topic = data.get_runtime_data("topic")
    report = {
        "topic": topic,
        "facts": data.value.get("fetch_facts"),
        "risks": data.value.get("fetch_risks"),
    }
    data.put_into_stream({"stage": "compiled", "report": report})
    data.stop_stream()
    return report


flow.to(normalize)
flow.when({"runtime_data": "topic"}).batch(fetch_facts, fetch_risks, concurrency=2).to(compile_report).end()

execution = flow.create_execution(concurrency=2)

for item in execution.get_runtime_stream("Agently TriggerFlow", timeout=5):
    print("STREAM:", item)

result = execution.get_result(timeout=5)
print("RESULT:", result)

真实输出

text
STREAM: {'stage': 'facts_ready', 'topic': 'Agently TriggerFlow'}
STREAM: {'stage': 'risks_ready', 'topic': 'Agently TriggerFlow'}
STREAM: {'stage': 'compiled', 'report': {'topic': 'Agently TriggerFlow', 'facts': 'facts(Agently TriggerFlow)', 'risks': 'risks(Agently TriggerFlow)'}}
RESULT: {'topic': 'Agently TriggerFlow', 'facts': 'facts(Agently TriggerFlow)', 'risks': 'risks(Agently TriggerFlow)'}

验证点

  • 并发任务全部完成。
  • runtime_stream 有阶段性事件。
  • 聚合结果符合预期。