Skip to content

Concurrency Limits

Applies to: 4.0.8.1+

Concurrency is central to TriggerFlow stability. In v4.0.8.1+, you can control concurrency both at execution creation time and dynamically at runtime via execution.set_concurrency(...).

1. Three control layers

  • node layer: batch(..., concurrency=n)
  • list layer: for_each(concurrency=n)
  • execution layer: create_execution(concurrency=n) and execution.set_concurrency(n)

Recommended strategy:

  • use batch/for_each for local fan-out limits
  • use execution-level limits as global protection for external systems

2. Local limits for batch / for_each

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

async def echo(data: TriggerFlowEventData):
    await asyncio.sleep(0.1)
    return f"echo: {data.value}"

flow.batch(
    ("a", echo),
    ("b", echo),
    ("c", echo),
    concurrency=2,
).end()

print(flow.start("hello"))

flow2 = TriggerFlow()
(
    flow2.to(lambda _: [1, 2, 3, 4])
    .for_each(concurrency=2)
    .to(echo)
    .end_for_each()
    .end()
)
print(flow2.start())

3. Global execution-level limit

python
execution = flow.create_execution(concurrency=1)
result = execution.start("hello")
print(result)

This caps concurrent handler execution inside that execution instance.

4. Runtime adjustment (new)

Use runtime tuning for load-adaptive control:

python
execution = flow.create_execution(concurrency=4)

# load shedding during peak
execution.set_concurrency(2)

result = execution.start("payload")

Typical scenarios:

  • reduce concurrency during peak hours
  • increase concurrency for off-peak batch windows

5. FastAPIHelper bridging

When provider is TriggerFlowExecution, FastAPIHelper can forward options.concurrency from request payload to execution.set_concurrency(...).

This enables API-layer concurrency policies without rebuilding flows.

6. Engineering recommendations

  • lower concurrency for I/O-heavy external API nodes
  • allow higher concurrency for CPU/local compute nodes
  • benchmark latency and error rates per concurrency profile
  • combine concurrency control with timeout/retry/circuit breaker