并发上限控制
并发太高会导致外部服务限流、数据库连接耗尽、CPU 抖动。我们需要一个“可控上限”,确保系统稳定。
场景与解决方式
场景:外部服务有 QPS 上限,或数据库连接池较小。
命题:并发必须被限制,否则会触发限流或雪崩。
解决:在执行实例上设置 concurrency,统一限制并发深度。
python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk("a")
async def work(data: TriggerFlowEventData):
print(f"start-{data.value}")
await asyncio.sleep(0.01)
print(f"end-{data.value}")
return data.value
@flow.chunk("b")
async def work_b(data: TriggerFlowEventData):
return await work(data)
@flow.chunk("c")
async def work_c(data: TriggerFlowEventData):
return await work(data)
flow.batch(work, work_b, work_c).end()
execution = flow.create_execution(concurrency=1)
result = execution.start("X")
print(result)输出:
text
start-X
end-X
start-X
end-X
start-X
end-X
{'a': 'X', 'b': 'X', 'c': 'X'}