Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

State 与 Resources

TriggerFlow 里最容易写错的不是连线,而是“这个值应该放哪”。一个客服工单流程里可能同时有三类东西:

  • 工单分类、审批结果、模型产出的报告。
  • 数据库 client、HTTP client、日志对象、回调函数。
  • 所有 execution 都共用的一份配置或缓存。

它们不能放在同一个地方。TriggerFlow 提供三层存储:stateruntime_resourcesflow_data

先用三个问题判断

问题
这个值要出现在 close snapshot 或 save/load 里吗?state继续判断
这个值是 live 对象、函数、client、socket、文件句柄吗?runtime_resources继续判断
这个值真的要被同一个 flow 的所有 execution 共享吗?flow_data 或外部存储多数情况下仍应回到 state

日常业务里,默认答案通常是 state。只有不能序列化的活对象才进 runtime_resourcesflow_data 要带着明确理由使用。

state:一次 execution 自己的业务状态

state 是 execution-local、可序列化、可快照的。它会进入 close snapshot,也会被 execution.save() / execution.load(saved) 往返。

python
async def classify_ticket(data):
    result = {
        "ticket_id": data.input["ticket_id"],
        "team": "billing",
        "priority": "high",
    }
    await data.async_set_state("classification", result)
    return result

读取时不需要 await:

python
async def draft_reply(data):
    classification = data.get_state("classification")
    await data.async_set_state(
        "reply_context",
        {"team": classification["team"], "tone": "concise"},
    )

常用 API:

操作API
写入await data.async_set_state(key, value)
读取data.get_state(key, default=None)
追加 listawait data.async_append_state(key, value)
删除await data.async_del_state(key)

写入、追加、删除都有同步版本。服务和 chunk 里优先用 async 版本,和周围的 async flow 保持一致。

runtime_resources:给 chunk 用的活对象

DB client、模型 agent、搜索函数、logger 都不适合进 state。它们不能可靠序列化,也不应该被 close snapshot 暴露给调用方。

创建 execution 时注入:

python
execution = flow.create_execution(
    auto_close=False,
    runtime_resources={
        "db": db_client,
        "logger": logger,
        "search": search_function,
    },
)

chunk 内读取:

python
async def enrich_ticket(data):
    db = data.require_resource("db")
    logger = data.require_resource("logger")

    ticket = await db.fetch_ticket(data.input["ticket_id"])
    logger.info("ticket loaded")
    await data.async_set_state("ticket", ticket)
    return ticket

require_resource(name) 适合必需依赖,没注入时直接报错。可选依赖用 data.get_resource(name, default=None)

flow 级别也可以注入长期复用的资源:

python
flow.update_runtime_resources(logger=logger, search=search_function)

如果使用 execution_environments=[...],Execution Environment Manager 可以把托管资源注入到同一套 runtime_resources 里。chunk 的读取方式不变,差别在于资源的启动和释放由环境管理器负责。

为什么 runtime_resources 不进 snapshot

close snapshot 要能被 JSON 化、写日志、返回 API、保存到数据库。live 对象没有稳定的序列化意义。TriggerFlow 会在 save/snapshot 里记录 resource_keys,告诉应用“恢复时需要重新注入哪些名字”,但不会保存对象本体。

python
saved = execution.save()

restored = flow.create_execution(
    auto_close=False,
    runtime_resources={
        "db": new_db_client,
        "logger": new_logger,
        "search": search_function,
    },
)
restored.load(saved)

恢复代码要重新注入兼容资源。这个边界清楚后,跨进程恢复才不会把过期连接、旧进程内存对象带到新 execution 里。

flow_data:确实要共享时才用

flow_data 属于 flow,而不是某次 execution。同一个 flow 同时跑两个 execution,它们看到的是同一份 flow_data

python
flow.set_flow_data("shared_counter", 0, no_warning=True)

默认调用会发 RuntimeWarning,原因很直接:

  • 并发 execution 可能互相覆盖。
  • execution.save() / load() 不保存 flow_data
  • 多进程服务里,不同 worker 的 flow 实例可能不是同一份内存。

如果只是一次 execution 的中间值,用 state。如果是不可序列化的共享 client,用 flow 级 runtime_resources。如果是跨进程共享业务数据,通常应该放外部存储。

一个服务里的完整分工

python
async def load_ticket(data):
    db = data.require_resource("db")
    ticket = await db.fetch_ticket(data.input["ticket_id"])
    await data.async_set_state("ticket", ticket)
    return ticket


async def classify(data):
    agent = data.require_resource("agent")
    ticket = data.get_state("ticket")
    result = (
        agent
        .input(ticket["content"])
        .output({"team": (str, "处理团队", True)})
        .get_result()
    )
    classification = await result.async_get_data(ensure_keys=["team"])
    await data.async_set_state("classification", classification)
    return classification


flow.to(load_ticket).to(classify)

execution = flow.create_execution(
    runtime_resources={"db": db_client, "agent": agent},
)
snapshot = await execution.async_start({"ticket_id": "T-1024"})

这里的边界是:

  • ticketclassification 是业务结果,用 state
  • db_clientagent 是 live 依赖,用 runtime_resources
  • close snapshot 只暴露业务状态,不暴露连接对象。

常见错误

写法问题改法
把 DB client 放进 state保存和恢复都不可靠放进 runtime_resources
把单次工单的中间值放进 flow_data并发 execution 会互相影响放进 state
load(saved) 后直接跑缺少 live 资源resource_keys 重新注入
把大块流式文本不断追加进 statesnapshot 变大,前端也拿不到增量进度走 runtime stream,最终摘要进 state

另见