Async First
本地脚本里,同步写法最顺手。真正进入服务端以后,问题会变成:SSE 正在推流,WebSocket 还连着,TriggerFlow 在等事件,外部系统调用也在同一个事件循环里跑。
这时就应该把 async 当默认路径。Async First 不是为了让单次模型请求更快,而是为了让服务、流式 UI 和工作流组合得更稳。
什么时候同步写法够用
- 一次性脚本。
- Notebook。
- 教学 demo。
- 本地 smoke test。
- 不和已有事件循环共享执行环境的工具脚本。
同步写法适合先理解 API,不适合直接搬进 FastAPI、ASGI worker、SSE / WebSocket handler 或长流程 worker。
什么时候优先 async
| 场景 | 为什么 |
|---|---|
| FastAPI / ASGI 服务 | 服务端本来就在事件循环里 |
| SSE / WebSocket | 需要边生成边推给客户端 |
| 字段级流式 UI | instant stream 会持续产出 patch |
| TriggerFlow | flow chunk、event、runtime stream 都更适合 async |
| 外部系统调用 | 网络、数据库、MCP、浏览器环境都可能等待 |
如果代码未来会进入服务端,就从 async 开始写。这样后面不用再把同步 demo 改一遍。
Request 和 Result 的 async 写法
python
result = (
agent
.input("把这条工单整理成客服处理卡片。")
.output({
"status_summary": (str, "一句话状态", True),
"risk_flags": [(str, "风险点", True)],
"customer_reply": (str, "客户回复", True),
})
.get_result()
)
data = await result.async_get_data()
text = await result.async_get_text()
meta = await result.async_get_meta()get_result() 本身只是拿 result facade。第一次 async_get_data()、async_get_text() 或 stream 消费时才触发模型请求,并缓存结果。后续读取同一个 result 不会重复请求。
字段级流式 UI
python
async for item in result.get_async_generator(type="instant"):
if item.delta:
await websocket.send_json({
"path": item.path,
"delta": item.delta,
"done": item.is_complete,
})
final_data = await result.async_get_data()instant 是结构化字段 patch,不是原始 token。它适合把 status_summary、risk_flags[0] 这类字段分别更新到 UI slot。最终保存或业务判断仍然用 async_get_data()。
TriggerFlow 的 async 路径
python
execution = flow.create_execution()
await execution.async_start({"ticket": "T-104"})
snapshot = await execution.async_close()chunk 内也优先使用 async API:
python
async def summarize(data):
result = agent.input(data.input).output({
"summary": (str, "摘要", True),
}).get_result()
payload = await result.async_get_data()
await data.async_set_state("summary", payload["summary"])
await data.async_put_into_stream({"summary": payload["summary"]})需要脚本式一次跑完时,可以用 await flow.async_start(...)。需要暂停、恢复、外部事件或显式关闭时,用 create_execution()。
API 对照
| 同步 | Async |
|---|---|
agent.start() / request.start() | agent.async_start() / request.async_start() |
result.get_data() | result.async_get_data() |
result.get_text() | result.async_get_text() |
result.get_meta() | result.async_get_meta() |
result.get_generator(type=...) | result.get_async_generator(type=...) |
flow.start() | flow.async_start() |
execution.start() / execution.close() | execution.async_start() / execution.async_close() |
data.set_state(...) / data.emit(...) | data.async_set_state(...) / data.async_emit(...) |
常见误用
- 把同步 demo 直接放进 FastAPI handler。
- 以为 async 能降低单次模型请求延迟。单次请求的墙钟时延主要由模型和网络决定。
- 只流式推 patch,不在结束后读取最终
async_get_data()。 - TriggerFlow chunk 里混用阻塞式外部调用,导致整个事件循环被卡住。
- 为了并发开多个线程,而不是先使用 async result 和
asyncio.gather(...)。
下一步
- 一次响应如何复用:模型响应
- 服务封装:FastAPI 服务封装
- TriggerFlow 生命周期:Lifecycle
- Runtime stream:事件与流