兼容
这一页只服务迁移。新代码从 Lifecycle 和 State 与 Resources 开始读,不需要把旧 API 当成默认路径。
TriggerFlow 新生命周期的核心变化是:close snapshot 是规范完成态。旧代码里常见的 .end()、set_result()、get_result()、wait_for_result= 还保留兼容,但不再推荐给新流程。
Deprecation warning 按 API 在每个 Python 进程内只提示一次。生产环境如果要全局关闭 Agently deprecation warning,可以设置 runtime.show_deprecation_warnings=False。
.end():它不是 close
.end() 这个名字容易误导。它不是运行期 lifecycle 动作。
当前行为是:
- 它在定义期给 flow 追加一个兼容 result sink。
- 它不会
seal()。 - 它不会
close()。 - 运行时只是把流入值写到保留 state key
"$final_result"。
旧写法:
flow.to(step_a).to(step_b).end()
result = flow.start("input")新写法:
async def step_b(data):
answer = do_work(data.input)
await data.async_set_state("answer", answer)
return answer
flow.to(step_a).to(step_b)
snapshot = await flow.async_start("input")
answer = snapshot["answer"]如果某个旧 flow 仍然调用 .end(),它会继续把值写进 snapshot["$final_result"]。迁移时不要再新增 .end()。
set_result / get_result:换成 state 和 snapshot
set_result(value) 也写入 "$final_result"。get_result() 会读取它,或在 close 后回退 close snapshot。
旧写法:
async def worker(data):
data.set_result({"answer": "done"})
result = execution.get_result()新写法:
async def worker(data):
await data.async_set_state("answer", {"status": "done"})
snapshot = await execution.async_close()
answer = snapshot["answer"]如果确实要桥接旧代码,用 execution.result.async_get_final_result(),让兼容意图露出来:
final = await execution.result.async_get_final_result()这应该是过渡代码,不是新 flow 的结果设计。
wait_for_result:参数已无意义
wait_for_result=True / False 是旧生命周期里控制 start(...) 是否等待结果的参数。新生命周期由 auto_close 决定返回行为。
旧写法:
result = flow.start("input", wait_for_result=True)新写法分两类:
# 输入一次,等 close snapshot
snapshot = await flow.async_start("input")# 保留 execution,外部后续还要交互
execution = flow.create_execution(auto_close=False)
await execution.async_start("input")
snapshot = await execution.async_close()wait_for_result= 的值现在会被忽略并发 warning。删掉它,根据业务选择入口。
runtime_data:state 的旧名
旧 runtime_data API 现在是 state API 的别名。语义没变,名字变了。
旧写法:
async def step(data):
await data.set_runtime_data("count", 1)
n = data.get_runtime_data("count")新写法:
async def step(data):
await data.async_set_state("count", 1)
n = data.get_state("count")state 是 execution-local、可序列化、会进入 close snapshot,也会被 save/load 往返。
flow_data:不是 deprecated,但默认有风险
flow_data 没有废弃。它的问题是 scope:它属于 flow,同一个 flow 的所有 execution 共享一份。
默认调用会发 RuntimeWarning,因为它容易带来并发覆盖、save/load 不一致、多进程状态不一致。
确实要共享时可以显式关闭 warning:
flow.set_flow_data("shared_config", {"mode": "strict"}, no_warning=True)单次 execution 的业务数据用 state。live 对象用 runtime_resources。跨进程共享业务数据用外部存储。
close snapshot 里为什么还会看到 $final_result
迁移期间,只要 execution 跑过 .end() 或 set_result(),snapshot 里就可能出现 "$final_result"。这包括子流或共享库里还没迁掉的旧代码。
to_sub_flow(...) 在解析 result.<path> write-back 时,会先查兼容 result,再回退 close snapshot。这样旧子流可以继续嵌进新父流。新子流建议直接写 state,然后让父流用 snapshot.<path> 读取。
迁移清单
按 flow 逐个处理:
- 删除
.end(),给最终业务值命名一个 state key。 - 把
set_result(x)改成await data.async_set_state("meaningful_key", x)。 - 把
get_result()改成读取 close snapshot 或execution.result.get_state(...)。 - 删除
wait_for_result=。 - 把
set_runtime_data/get_runtime_data改成async_set_state/get_state。 - 审计
flow_data。多数改成state;确实共享时加no_warning=True并写清用途。 - 审计 state 里的 live 对象,移到
runtime_resources。
迁移完成后,一个 flow 的完成态应该能从 close snapshot 直接读出来。
另见
- Lifecycle - 新入口和 close snapshot
- State 与 Resources - state、flow_data、runtime_resources 的边界
- Sub-Flow - 子流桥接旧 result 和新 snapshot
- Execution Result - 兼容 final result reader