Skip to content

兼容

这一页只服务迁移。新代码从 LifecycleState 与 Resources 开始读,不需要把旧 API 当成默认路径。

TriggerFlow 新生命周期的核心变化是:close snapshot 是规范完成态。旧代码里常见的 .end()set_result()get_result()wait_for_result= 还保留兼容,但不再推荐给新流程。

Deprecation warning 按 API 在每个 Python 进程内只提示一次。生产环境如果要全局关闭 Agently deprecation warning,可以设置 runtime.show_deprecation_warnings=False

.end():它不是 close

.end() 这个名字容易误导。它不是运行期 lifecycle 动作。

当前行为是:

  • 它在定义期给 flow 追加一个兼容 result sink。
  • 它不会 seal()
  • 它不会 close()
  • 运行时只是把流入值写到保留 state key "$final_result"

旧写法:

python
flow.to(step_a).to(step_b).end()
result = flow.start("input")

新写法:

python
async def step_b(data):
    answer = do_work(data.input)
    await data.async_set_state("answer", answer)
    return answer


flow.to(step_a).to(step_b)
snapshot = await flow.async_start("input")
answer = snapshot["answer"]

如果某个旧 flow 仍然调用 .end(),它会继续把值写进 snapshot["$final_result"]。迁移时不要再新增 .end()

set_result / get_result:换成 state 和 snapshot

set_result(value) 也写入 "$final_result"get_result() 会读取它,或在 close 后回退 close snapshot。

旧写法:

python
async def worker(data):
    data.set_result({"answer": "done"})


result = execution.get_result()

新写法:

python
async def worker(data):
    await data.async_set_state("answer", {"status": "done"})


snapshot = await execution.async_close()
answer = snapshot["answer"]

如果确实要桥接旧代码,用 execution.result.async_get_final_result(),让兼容意图露出来:

python
final = await execution.result.async_get_final_result()

这应该是过渡代码,不是新 flow 的结果设计。

wait_for_result:参数已无意义

wait_for_result=True / False 是旧生命周期里控制 start(...) 是否等待结果的参数。新生命周期由 auto_close 决定返回行为。

旧写法:

python
result = flow.start("input", wait_for_result=True)

新写法分两类:

python
# 输入一次,等 close snapshot
snapshot = await flow.async_start("input")
python
# 保留 execution,外部后续还要交互
execution = flow.create_execution(auto_close=False)
await execution.async_start("input")
snapshot = await execution.async_close()

wait_for_result= 的值现在会被忽略并发 warning。删掉它,根据业务选择入口。

runtime_data:state 的旧名

runtime_data API 现在是 state API 的别名。语义没变,名字变了。

旧写法:

python
async def step(data):
    await data.set_runtime_data("count", 1)
    n = data.get_runtime_data("count")

新写法:

python
async def step(data):
    await data.async_set_state("count", 1)
    n = data.get_state("count")

state 是 execution-local、可序列化、会进入 close snapshot,也会被 save/load 往返。

flow_data:不是 deprecated,但默认有风险

flow_data 没有废弃。它的问题是 scope:它属于 flow,同一个 flow 的所有 execution 共享一份。

默认调用会发 RuntimeWarning,因为它容易带来并发覆盖、save/load 不一致、多进程状态不一致。

确实要共享时可以显式关闭 warning:

python
flow.set_flow_data("shared_config", {"mode": "strict"}, no_warning=True)

单次 execution 的业务数据用 state。live 对象用 runtime_resources。跨进程共享业务数据用外部存储。

close snapshot 里为什么还会看到 $final_result

迁移期间,只要 execution 跑过 .end()set_result(),snapshot 里就可能出现 "$final_result"。这包括子流或共享库里还没迁掉的旧代码。

to_sub_flow(...) 在解析 result.<path> write-back 时,会先查兼容 result,再回退 close snapshot。这样旧子流可以继续嵌进新父流。新子流建议直接写 state,然后让父流用 snapshot.<path> 读取。

迁移清单

按 flow 逐个处理:

  1. 删除 .end(),给最终业务值命名一个 state key。
  2. set_result(x) 改成 await data.async_set_state("meaningful_key", x)
  3. get_result() 改成读取 close snapshot 或 execution.result.get_state(...)
  4. 删除 wait_for_result=
  5. set_runtime_data / get_runtime_data 改成 async_set_state / get_state
  6. 审计 flow_data。多数改成 state;确实共享时加 no_warning=True 并写清用途。
  7. 审计 state 里的 live 对象,移到 runtime_resources

迁移完成后,一个 flow 的完成态应该能从 close snapshot 直接读出来。

另见