English mirror
This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.
持久化与 Blueprint
一个审批流程跑到一半,服务重启了。系统要恢复的是“这一次 execution 已经处理到哪、还在等哪个 interrupt、state 里有哪些业务值”。这和“这个 flow 的图结构长什么样”是两件事。
TriggerFlow 因此有两条序列化路径:
| 方法 | 保存什么 | 不保存什么 | 常见用途 |
|---|---|---|---|
execution.save() / execution.load(saved) | 某一次 execution 的运行状态 | flow 的函数体、live resources、协程中段 | 跨重启恢复、交给另一 worker |
flow.save_blueprint() / flow.load_blueprint(blueprint) | flow definition 的图结构 | handler 函数体、execution state | 把 flow 结构当配置分发或版本化 |
这两条可以配合使用,但不要混淆。execution save 不能重建 flow 图;blueprint 也不保存某次 execution 跑到哪了。
execution.save:保存一次运行
execution.save() 捕获的是一次 execution 的稳定状态:
- execution 的
state - lifecycle metadata
- pending interrupt state
- intervention ledger
resource_keys
它不捕获:
- live
runtime_resources本体 - 正在运行到一半的 Python 协程栈
- flow definition 的函数实现
execution = flow.create_execution(auto_close=False)
await execution.async_start({"refund_id": "R-2048"})
saved = execution.save()
store.save(execution.id, saved)保存最好发生在稳定边界:流程已经空闲、正在等待 pause_for(...)、或者业务明确要把当前状态交给另一个 worker。不要指望它把正在执行中的函数中段冻结下来。
execution.load:用同一个 flow 定义恢复
恢复时,应用先准备兼容的 flow 定义,再创建新的 execution,重新注入 live resources,然后 load。
saved = store.load(execution_id)
restored = flow.create_execution(
auto_close=False,
runtime_resources={
"db": new_db_client,
"logger": logger,
"search": search_tool,
},
)
restored.load(saved)load(saved) 不会从 saved 里重建 chunk 图。两端的 flow definition 要一致,或者至少对 saved execution 仍然兼容。
跨 pause_for 恢复
这是 save/load 最常见的场景。
execution = flow.create_execution(auto_close=False)
await execution.async_start({"ticket_id": "T-1024"})
saved = execution.save()
store.save(execution.id, saved)
# 后续在另一个 worker 里
restored = flow.create_execution(
auto_close=False,
runtime_resources={"db": db_client, "agent": agent},
)
restored.load(store.load(execution.id))
interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()pending interrupt 是 saved state 的一部分。恢复后仍然通过 continue_with(id, payload) 解开一个 interrupt,TriggerFlow 会按该 interrupt 的 resume_to 继续。
blueprint:保存 flow 的图结构
Blueprint 保存的是 definition surface:chunk、边、分支、条件的结构。handler 函数本体仍然在代码里。
async def normalize(data):
await data.async_set_state("normalized", data.input)
return data.input
async def store_output(data):
await data.async_set_state("output", data.input)
source = TriggerFlow(name="source")
source.register_chunk_handler(normalize)
source.register_chunk_handler(store_output)
source.to(normalize).to(store_output)
blueprint = source.save_blueprint()另一端恢复时,要先把同名 handler 注册回来:
restored = TriggerFlow(name="restored")
restored.register_chunk_handler(normalize)
restored.register_chunk_handler(store_output)
restored.load_blueprint(blueprint)如果 blueprint 里提到的 handler 名在恢复端找不到,load 会失败。Blueprint 不是可执行包,它只是图结构描述。
服务化时的写法
服务代码可以按这个形态组织:
- chunk 和 condition 写成模块顶层 named functions。
- flow builder 只负责组装图。
- 稳定 live 依赖用
flow.update_runtime_resources(...)。 - 请求级或租户级 live 依赖用
create_execution(runtime_resources={...})。 - 单次请求的业务值放 execution
state。
async def analyze(data):
agent = data.require_resource("agent")
question = data.input["question"]
await data.async_set_state("question", question)
result = agent.input(question).output({
"intent": (str, "用户意图", True),
"need_policy": (bool, "是否需要查政策", True),
}).get_result()
return await result.async_get_data(ensure_keys=["intent", "need_policy"])
async def answer(data):
policy_doc = data.require_resource("policy_doc")
question = data.get_state("question")
await data.async_set_state("answer", {"question": question, "policy": policy_doc})
def build_policy_flow():
flow = TriggerFlow(name="policy")
flow.to(analyze).to(answer)
return flow
flow = build_policy_flow()
execution = flow.create_execution(
runtime_resources={"agent": agent, "policy_doc": tenant_policy_doc},
)
snapshot = await execution.async_start({"question": "差旅报销上限是多少?"})这种写法对 save/load 和 blueprint 都更友好。闭包适合短脚本;服务里用 named functions,更容易测试、注册、导出和重载。
save 和 blueprint 怎么配合
Flow definition
|
+-- save_blueprint() --> 图结构 dict
|
+-- create_execution() --> Execution
|
+-- save() --> 这一次运行的状态 dict
|
+-- close() --> close snapshot一个分布式 worker 可以先 load blueprint,注册 handler,再 load execution save。存储后端由应用决定:Redis、Postgres、S3、文件都可以,TriggerFlow 只提供 JSON 友好的 dict。
常见错误
| 写法 | 问题 |
|---|---|
| 只保存 execution save,恢复端没有对应 flow 定义 | load(saved) 无法知道图怎么继续 |
| 把 DB client 放进 state,希望 save 后自动恢复 | live 对象不会保存,恢复时要重新注入 |
| blueprint 发给没有 handler 代码的一端 | blueprint 不包含函数体 |
用 flow_data 保存 execution 进度 | save/load 不会往返 flow_data,并发也会互相影响 |
另见
- Lifecycle - 什么状态适合保存,close 又做什么
- Pause 与 Resume - 最常见的可恢复等待
- State 与 Resources - state、resource_keys、runtime resources 的边界