Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

持久化与 Blueprint

一个审批流程跑到一半,服务重启了。系统要恢复的是“这一次 execution 已经处理到哪、还在等哪个 interrupt、state 里有哪些业务值”。这和“这个 flow 的图结构长什么样”是两件事。

TriggerFlow 因此有两条序列化路径:

方法保存什么不保存什么常见用途
execution.save() / execution.load(saved)某一次 execution 的运行状态flow 的函数体、live resources、协程中段跨重启恢复、交给另一 worker
flow.save_blueprint() / flow.load_blueprint(blueprint)flow definition 的图结构handler 函数体、execution state把 flow 结构当配置分发或版本化

这两条可以配合使用,但不要混淆。execution save 不能重建 flow 图;blueprint 也不保存某次 execution 跑到哪了。

execution.save:保存一次运行

execution.save() 捕获的是一次 execution 的稳定状态:

  • execution 的 state
  • lifecycle metadata
  • pending interrupt state
  • intervention ledger
  • resource_keys

它不捕获:

  • live runtime_resources 本体
  • 正在运行到一半的 Python 协程栈
  • flow definition 的函数实现
python
execution = flow.create_execution(auto_close=False)
await execution.async_start({"refund_id": "R-2048"})

saved = execution.save()
store.save(execution.id, saved)

保存最好发生在稳定边界:流程已经空闲、正在等待 pause_for(...)、或者业务明确要把当前状态交给另一个 worker。不要指望它把正在执行中的函数中段冻结下来。

execution.load:用同一个 flow 定义恢复

恢复时,应用先准备兼容的 flow 定义,再创建新的 execution,重新注入 live resources,然后 load。

python
saved = store.load(execution_id)

restored = flow.create_execution(
    auto_close=False,
    runtime_resources={
        "db": new_db_client,
        "logger": logger,
        "search": search_tool,
    },
)
restored.load(saved)

load(saved) 不会从 saved 里重建 chunk 图。两端的 flow definition 要一致,或者至少对 saved execution 仍然兼容。

跨 pause_for 恢复

这是 save/load 最常见的场景。

python
execution = flow.create_execution(auto_close=False)
await execution.async_start({"ticket_id": "T-1024"})

saved = execution.save()
store.save(execution.id, saved)

# 后续在另一个 worker 里
restored = flow.create_execution(
    auto_close=False,
    runtime_resources={"db": db_client, "agent": agent},
)
restored.load(store.load(execution.id))

interrupt_id = next(iter(restored.get_pending_interrupts()))
await restored.async_continue_with(interrupt_id, {"approved": True})
snapshot = await restored.async_close()

pending interrupt 是 saved state 的一部分。恢复后仍然通过 continue_with(id, payload) 解开一个 interrupt,TriggerFlow 会按该 interrupt 的 resume_to 继续。

blueprint:保存 flow 的图结构

Blueprint 保存的是 definition surface:chunk、边、分支、条件的结构。handler 函数本体仍然在代码里。

python
async def normalize(data):
    await data.async_set_state("normalized", data.input)
    return data.input


async def store_output(data):
    await data.async_set_state("output", data.input)


source = TriggerFlow(name="source")
source.register_chunk_handler(normalize)
source.register_chunk_handler(store_output)
source.to(normalize).to(store_output)

blueprint = source.save_blueprint()

另一端恢复时,要先把同名 handler 注册回来:

python
restored = TriggerFlow(name="restored")
restored.register_chunk_handler(normalize)
restored.register_chunk_handler(store_output)
restored.load_blueprint(blueprint)

如果 blueprint 里提到的 handler 名在恢复端找不到,load 会失败。Blueprint 不是可执行包,它只是图结构描述。

服务化时的写法

服务代码可以按这个形态组织:

  1. chunk 和 condition 写成模块顶层 named functions。
  2. flow builder 只负责组装图。
  3. 稳定 live 依赖用 flow.update_runtime_resources(...)
  4. 请求级或租户级 live 依赖用 create_execution(runtime_resources={...})
  5. 单次请求的业务值放 execution state
python
async def analyze(data):
    agent = data.require_resource("agent")
    question = data.input["question"]
    await data.async_set_state("question", question)

    result = agent.input(question).output({
        "intent": (str, "用户意图", True),
        "need_policy": (bool, "是否需要查政策", True),
    }).get_result()
    return await result.async_get_data(ensure_keys=["intent", "need_policy"])


async def answer(data):
    policy_doc = data.require_resource("policy_doc")
    question = data.get_state("question")
    await data.async_set_state("answer", {"question": question, "policy": policy_doc})


def build_policy_flow():
    flow = TriggerFlow(name="policy")
    flow.to(analyze).to(answer)
    return flow


flow = build_policy_flow()
execution = flow.create_execution(
    runtime_resources={"agent": agent, "policy_doc": tenant_policy_doc},
)
snapshot = await execution.async_start({"question": "差旅报销上限是多少?"})

这种写法对 save/load 和 blueprint 都更友好。闭包适合短脚本;服务里用 named functions,更容易测试、注册、导出和重载。

save 和 blueprint 怎么配合

text
Flow definition
  |
  +-- save_blueprint() --> 图结构 dict
  |
  +-- create_execution() --> Execution
                              |
                              +-- save() --> 这一次运行的状态 dict
                              |
                              +-- close() --> close snapshot

一个分布式 worker 可以先 load blueprint,注册 handler,再 load execution save。存储后端由应用决定:Redis、Postgres、S3、文件都可以,TriggerFlow 只提供 JSON 友好的 dict。

常见错误

写法问题
只保存 execution save,恢复端没有对应 flow 定义load(saved) 无法知道图怎么继续
把 DB client 放进 state,希望 save 后自动恢复live 对象不会保存,恢复时要重新注入
blueprint 发给没有 handler 代码的一端blueprint 不包含函数体
flow_data 保存 execution 进度save/load 不会往返 flow_data,并发也会互相影响

另见