并发上限控制
适用版本:4.0.8.1+
并发控制是 TriggerFlow 稳定性的关键能力。v4.0.8.1 起,除了创建执行实例时设置并发,还支持运行时动态调整:execution.set_concurrency(...)。
1. 三层并发控制点
- 节点层:
batch(..., concurrency=n) - 列表层:
for_each(concurrency=n) - 执行实例层:
create_execution(concurrency=n)与execution.set_concurrency(n)
推荐实践:
- 用
batch/for_each控制局部 fan-out - 用 execution 全局并发控制保护外部依赖(DB/API)
2. batch / for_each 局部并发
python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
async def echo(data: TriggerFlowEventData):
await asyncio.sleep(0.1)
return f"echo: {data.value}"
flow.batch(
("a", echo),
("b", echo),
("c", echo),
concurrency=2,
).end()
print(flow.start("hello"))
flow2 = TriggerFlow()
(
flow2.to(lambda _: [1, 2, 3, 4])
.for_each(concurrency=2)
.to(echo)
.end_for_each()
.end()
)
print(flow2.start())3. execution 全局并发
python
execution = flow.create_execution(concurrency=1)
result = execution.start("hello")
print(result)这会限制 execution 内部 handler 并发,避免全局资源被瞬时打满。
4. 运行时动态调整(新增)
当你希望“按请求负载”动态调整并发:
python
execution = flow.create_execution(concurrency=4)
# 根据实时负载降级
execution.set_concurrency(2)
result = execution.start("payload")适用场景:
- 高峰期降并发保稳定
- 夜间批处理升并发提吞吐
5. 与 FastAPIHelper 联动
FastAPIHelper 在 provider 为 TriggerFlowExecution 时,可从请求 options.concurrency 透传到 execution.set_concurrency(...)。
这让你可以在 API 层做并发策略路由,而不需要重建 flow。
6. 工程建议
- 对外部 API 调用节点设置更低并发
- 对纯计算节点允许更高并发
- 记录不同并发配置下的 P95 延迟与错误率
- 并发只是上限,仍需配合 timeout/retry/circuit breaker