Runtime Stream
Applies to: 4.0.8.1+
runtime_stream is a side-channel output in TriggerFlow:
- independent from final
result - emits progress, token deltas, and stage states in real time
Great for SSE, WebSocket, CLI progress, and operational dashboards.
1. Mental model
2. Basic pattern: emit + stop
python
import asyncio
from agently import TriggerFlow, TriggerFlowEventData
flow = TriggerFlow()
@flow.chunk
async def stream_steps(data: TriggerFlowEventData):
for i in range(3):
await data.async_put_into_stream({"step": i + 1, "status": "working"})
await asyncio.sleep(0.05)
await data.async_stop_stream()
return "done"
flow.to(stream_steps).end()
for event in flow.get_runtime_stream("start", timeout=None):
print("[stream]", event)3. Key APIs
Common APIs in production:
data.put_into_stream(...)/await data.async_put_into_stream(...)data.stop_stream()/await data.async_stop_stream()flow.get_runtime_stream(...)flow.get_async_runtime_stream(...)
4. Streaming and final result together
Common event contract:
- emit multiple
deltaevents - emit one
finalevent - call
stop_stream
This maps naturally to FastAPIHelper SSE/WS endpoints.
5. Sync generator bridge (new)
v4.0.8.1 adds FunctionShifter.asyncify_sync_generator() for safely bridging sync generators into async pipelines.
Why it matters:
- reuse legacy sync streaming code in async runtime stream/HTTP streaming
- reduce migration effort during async adoption
6. Common issues
6.1 Consumer blocks forever
Most likely stop_stream() is missing. Ensure all branches (including error branches) terminate stream.
6.2 Stream emits but final result is empty
runtime_stream and result are independent. Return final business result explicitly in handlers.
6.3 Event order under high concurrency
In fan-out scenarios, include source/chunk_id/seq in stream events for deterministic client-side ordering.