Skip to content

for_each List Fan‑Out

for_each runs a handler over list items in parallel and merges results in order.

Scenario, problem, solution

Scenario: apply the same handler to a list, e.g. batch validation.
Problem: serial loops make total time long.
Solution: use for_each to run items in parallel and merge results.

python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData

flow = TriggerFlow()

@flow.chunk
def input_list(_: TriggerFlowEventData):
  return [1, 2, 3]

@flow.chunk
async def square(data: TriggerFlowEventData):
  await asyncio.sleep(0.01)
  return data.value * data.value

(
  flow.to(input_list)
  .for_each(concurrency=2)
  .to(square)
  .end_for_each()
  .end()
)
result = flow.start()
print(result)

Output:

text
[1, 4, 9]