State 与 Resources
TriggerFlow 里最容易写错的不是连线,而是“这个值应该放哪”。一个客服工单流程里可能同时有三类东西:
- 工单分类、审批结果、模型产出的报告。
- 数据库 client、HTTP client、日志对象、回调函数。
- 所有 execution 都共用的一份配置或缓存。
它们不能放在同一个地方。TriggerFlow 提供三层存储:state、runtime_resources、flow_data。
先用三个问题判断
| 问题 | 是 | 否 |
|---|---|---|
| 这个值要出现在 close snapshot 或 save/load 里吗? | 用 state | 继续判断 |
| 这个值是 live 对象、函数、client、socket、文件句柄吗? | 用 runtime_resources | 继续判断 |
| 这个值真的要被同一个 flow 的所有 execution 共享吗? | 用 flow_data 或外部存储 | 多数情况下仍应回到 state |
日常业务里,默认答案通常是 state。只有不能序列化的活对象才进 runtime_resources。flow_data 要带着明确理由使用。
state:一次 execution 自己的业务状态
state 是 execution-local、可序列化、可快照的。它会进入 close snapshot,也会被 execution.save() / execution.load(saved) 往返。
async def classify_ticket(data):
result = {
"ticket_id": data.input["ticket_id"],
"team": "billing",
"priority": "high",
}
await data.async_set_state("classification", result)
return result读取时不需要 await:
async def draft_reply(data):
classification = data.get_state("classification")
await data.async_set_state(
"reply_context",
{"team": classification["team"], "tone": "concise"},
)常用 API:
| 操作 | API |
|---|---|
| 写入 | await data.async_set_state(key, value) |
| 读取 | data.get_state(key, default=None) |
| 追加 list | await data.async_append_state(key, value) |
| 删除 | await data.async_del_state(key) |
写入、追加、删除都有同步版本。服务和 chunk 里优先用 async 版本,和周围的 async flow 保持一致。
runtime_resources:给 chunk 用的活对象
DB client、模型 agent、搜索函数、logger 都不适合进 state。它们不能可靠序列化,也不应该被 close snapshot 暴露给调用方。
创建 execution 时注入:
execution = flow.create_execution(
auto_close=False,
runtime_resources={
"db": db_client,
"logger": logger,
"search": search_function,
},
)chunk 内读取:
async def enrich_ticket(data):
db = data.require_resource("db")
logger = data.require_resource("logger")
ticket = await db.fetch_ticket(data.input["ticket_id"])
logger.info("ticket loaded")
await data.async_set_state("ticket", ticket)
return ticketrequire_resource(name) 适合必需依赖,没注入时直接报错。可选依赖用 data.get_resource(name, default=None)。
flow 级别也可以注入长期复用的资源:
flow.update_runtime_resources(logger=logger, search=search_function)如果使用 execution_environments=[...],Execution Environment Manager 可以把托管资源注入到同一套 runtime_resources 里。chunk 的读取方式不变,差别在于资源的启动和释放由环境管理器负责。
为什么 runtime_resources 不进 snapshot
close snapshot 要能被 JSON 化、写日志、返回 API、保存到数据库。live 对象没有稳定的序列化意义。TriggerFlow 会在 save/snapshot 里记录 resource_keys,告诉应用“恢复时需要重新注入哪些名字”,但不会保存对象本体。
saved = execution.save()
restored = flow.create_execution(
auto_close=False,
runtime_resources={
"db": new_db_client,
"logger": new_logger,
"search": search_function,
},
)
restored.load(saved)恢复代码要重新注入兼容资源。这个边界清楚后,跨进程恢复才不会把过期连接、旧进程内存对象带到新 execution 里。
flow_data:确实要共享时才用
flow_data 属于 flow,而不是某次 execution。同一个 flow 同时跑两个 execution,它们看到的是同一份 flow_data。
flow.set_flow_data("shared_counter", 0, no_warning=True)默认调用会发 RuntimeWarning,原因很直接:
- 并发 execution 可能互相覆盖。
execution.save()/load()不保存flow_data。- 多进程服务里,不同 worker 的 flow 实例可能不是同一份内存。
如果只是一次 execution 的中间值,用 state。如果是不可序列化的共享 client,用 flow 级 runtime_resources。如果是跨进程共享业务数据,通常应该放外部存储。
一个服务里的完整分工
async def load_ticket(data):
db = data.require_resource("db")
ticket = await db.fetch_ticket(data.input["ticket_id"])
await data.async_set_state("ticket", ticket)
return ticket
async def classify(data):
agent = data.require_resource("agent")
ticket = data.get_state("ticket")
result = (
agent
.input(ticket["content"])
.output({"team": (str, "处理团队", True)})
.get_result()
)
classification = await result.async_get_data(ensure_keys=["team"])
await data.async_set_state("classification", classification)
return classification
flow.to(load_ticket).to(classify)
execution = flow.create_execution(
runtime_resources={"db": db_client, "agent": agent},
)
snapshot = await execution.async_start({"ticket_id": "T-1024"})这里的边界是:
ticket和classification是业务结果,用state。db_client和agent是 live 依赖,用runtime_resources。- close snapshot 只暴露业务状态,不暴露连接对象。
常见错误
| 写法 | 问题 | 改法 |
|---|---|---|
把 DB client 放进 state | 保存和恢复都不可靠 | 放进 runtime_resources |
把单次工单的中间值放进 flow_data | 并发 execution 会互相影响 | 放进 state |
load(saved) 后直接跑 | 缺少 live 资源 | 按 resource_keys 重新注入 |
| 把大块流式文本不断追加进 state | snapshot 变大,前端也拿不到增量 | 进度走 runtime stream,最终摘要进 state |
另见
- Lifecycle -
close()何时冻结 snapshot - 事件与流 - 进度和增量输出怎么走 runtime stream
- Execution Environment - 托管 live resource 生命周期
- 持久化与 Blueprint - save/load 时哪些东西能回来
- 兼容 -
runtime_data是state的旧名