Skip to content

并发上限控制

适用版本:4.0.8.1+

并发控制是 TriggerFlow 稳定性的关键能力。v4.0.8.1 起,除了创建执行实例时设置并发,还支持运行时动态调整:execution.set_concurrency(...)

1. 三层并发控制点

  • 节点层batch(..., concurrency=n)
  • 列表层for_each(concurrency=n)
  • 执行实例层create_execution(concurrency=n)execution.set_concurrency(n)

推荐实践:

  • batch/for_each 控制局部 fan-out
  • 用 execution 全局并发控制保护外部依赖(DB/API)

2. batch / for_each 局部并发

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

async def echo(data: TriggerFlowEventData):
    await asyncio.sleep(0.1)
    return f"echo: {data.value}"

flow.batch(
    ("a", echo),
    ("b", echo),
    ("c", echo),
    concurrency=2,
).end()

print(flow.start("hello"))

flow2 = TriggerFlow()
(
    flow2.to(lambda _: [1, 2, 3, 4])
    .for_each(concurrency=2)
    .to(echo)
    .end_for_each()
    .end()
)
print(flow2.start())

3. execution 全局并发

python
execution = flow.create_execution(concurrency=1)
result = execution.start("hello")
print(result)

这会限制 execution 内部 handler 并发,避免全局资源被瞬时打满。

4. 运行时动态调整(新增)

当你希望“按请求负载”动态调整并发:

python
execution = flow.create_execution(concurrency=4)

# 根据实时负载降级
execution.set_concurrency(2)

result = execution.start("payload")

适用场景:

  • 高峰期降并发保稳定
  • 夜间批处理升并发提吞吐

5. 与 FastAPIHelper 联动

FastAPIHelper 在 provider 为 TriggerFlowExecution 时,可从请求 options.concurrency 透传到 execution.set_concurrency(...)

这让你可以在 API 层做并发策略路由,而不需要重建 flow。

6. 工程建议

  • 对外部 API 调用节点设置更低并发
  • 对纯计算节点允许更高并发
  • 记录不同并发配置下的 P95 延迟与错误率
  • 并发只是上限,仍需配合 timeout/retry/circuit breaker