Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

Async First

本地脚本里,同步写法最顺手。真正进入服务端以后,问题会变成:SSE 正在推流,WebSocket 还连着,TriggerFlow 在等事件,外部系统调用也在同一个事件循环里跑。

这时就应该把 async 当默认路径。Async First 不是为了让单次模型请求更快,而是为了让服务、流式 UI 和工作流组合得更稳。

什么时候同步写法够用

  • 一次性脚本。
  • Notebook。
  • 教学 demo。
  • 本地 smoke test。
  • 不和已有事件循环共享执行环境的工具脚本。

同步写法适合先理解 API,不适合直接搬进 FastAPI、ASGI worker、SSE / WebSocket handler 或长流程 worker。

什么时候优先 async

场景为什么
FastAPI / ASGI 服务服务端本来就在事件循环里
SSE / WebSocket需要边生成边推给客户端
字段级流式 UIinstant stream 会持续产出 patch
TriggerFlowflow chunk、event、runtime stream 都更适合 async
外部系统调用网络、数据库、MCP、浏览器环境都可能等待

如果代码未来会进入服务端,就从 async 开始写。这样后面不用再把同步 demo 改一遍。

Request 和 Result 的 async 写法

python
result = (
    agent
    .input("把这条工单整理成客服处理卡片。")
    .output({
        "status_summary": (str, "一句话状态", True),
        "risk_flags": [(str, "风险点", True)],
        "customer_reply": (str, "客户回复", True),
    })
    .get_result()
)

data = await result.async_get_data()
text = await result.async_get_text()
meta = await result.async_get_meta()

get_result() 本身只是拿 result facade。第一次 async_get_data()async_get_text() 或 stream 消费时才触发模型请求,并缓存结果。后续读取同一个 result 不会重复请求。

字段级流式 UI

python
async for item in result.get_async_generator(type="instant"):
    if item.delta:
        await websocket.send_json({
            "path": item.path,
            "delta": item.delta,
            "done": item.is_complete,
        })

final_data = await result.async_get_data()

instant 是结构化字段 patch,不是原始 token。它适合把 status_summaryrisk_flags[0] 这类字段分别更新到 UI slot。最终保存或业务判断仍然用 async_get_data()

TriggerFlow 的 async 路径

python
execution = flow.create_execution()
await execution.async_start({"ticket": "T-104"})
snapshot = await execution.async_close()

chunk 内也优先使用 async API:

python
async def summarize(data):
    result = agent.input(data.input).output({
        "summary": (str, "摘要", True),
    }).get_result()

    payload = await result.async_get_data()
    await data.async_set_state("summary", payload["summary"])
    await data.async_put_into_stream({"summary": payload["summary"]})

需要脚本式一次跑完时,可以用 await flow.async_start(...)。需要暂停、恢复、外部事件或显式关闭时,用 create_execution()

API 对照

同步Async
agent.start() / request.start()agent.async_start() / request.async_start()
result.get_data()result.async_get_data()
result.get_text()result.async_get_text()
result.get_meta()result.async_get_meta()
result.get_generator(type=...)result.get_async_generator(type=...)
flow.start()flow.async_start()
execution.start() / execution.close()execution.async_start() / execution.async_close()
data.set_state(...) / data.emit(...)data.async_set_state(...) / data.async_emit(...)

常见误用

  • 把同步 demo 直接放进 FastAPI handler。
  • 以为 async 能降低单次模型请求延迟。单次请求的墙钟时延主要由模型和网络决定。
  • 只流式推 patch,不在结束后读取最终 async_get_data()
  • TriggerFlow chunk 里混用阻塞式外部调用,导致整个事件循环被卡住。
  • 为了并发开多个线程,而不是先使用 async result 和 asyncio.gather(...)

下一步