Dynamic Task
有些任务不是应用代码提前写好流程,而是运行时才知道该分几步。比如事故简报、合同风险审查、企业续约分析:模型或业务系统会先生成一个任务图,再让系统按依赖执行。
Dynamic Task 处理的是这类问题。它是 TaskDAG 的便利 facade:接收一个目标或一个已提交的 DAG,先校验 TaskDAG,再解析 handler,并把图编译成 TriggerFlow execution 执行。它可以作为 AgentExecution 的一个 bounded DAG phase,但不负责成为外层长任务 owner。
什么时候用 Dynamic Task
| 当前情况 | 建议 |
|---|---|
| 应用代码已经知道稳定流程拓扑 | 用 TriggerFlow |
| 只是一次结构化模型请求 | 用 request + 输出控制 |
| 只是要把 Agent 长期设置和本轮输入隔离 | 用 AgentExecution |
| 模型需要调用函数或 MCP | 用 Actions |
| 任务图来自模型规划或业务系统提交 | 用 Dynamic Task |
| 需要先校验 DAG,再执行节点依赖 | 用 Dynamic Task |
一句话判断:图本身是输入,就用 Dynamic Task;图是应用代码的一部分,就用 TriggerFlow。
两种进入方式
让模型规划任务图
当调用方只知道目标,不知道具体 DAG,可以让 Dynamic Task 规划:
from agently import Agently
task = Agently.create_dynamic_task(
target="write an incident briefing",
output_schema={
"brief": (str, "customer-facing briefing", True),
"next_update": (str, "next update timing", True),
},
)
snapshot = await task.async_start(timeout=120)output_schema 会作用到 semantic output 的模型节点。模型任务仍然复用 Agently request 的输出流水线,不需要在 handler 里自己解析模型文本。
直接提交已有 DAG
如果应用已经有计划,直接传 plan,跳过模型规划:
async def local_handler(context):
return {
"task_id": context.task.id,
"deps": dict(context.dependency_results),
}
task = Agently.create_dynamic_task(
target="review policy",
plan={
"graph_id": "review",
"task_schema_version": "task_dag/v1",
"tasks": [
{"id": "extract", "kind": "local", "binding": "local_handler"},
{
"id": "final",
"kind": "local",
"binding": "local_handler",
"depends_on": ["extract"],
},
],
"semantic_outputs": {"final": "final"},
},
handlers={"local_handler": local_handler},
)
snapshot = await task.async_start(timeout=10)自定义 handler 建议使用清晰且以 _handler 结尾的名字。DAG 里的 binding 指向 handler 名称。
运行时输入怎么传
提交式 DAG 的 inputs 可以引用运行时数据:
plan = {
"graph_id": "review",
"task_schema_version": "task_dag/v1",
"tasks": [
{"id": "lookup", "kind": "local", "binding": "lookup_handler"},
{
"id": "final",
"kind": "local",
"binding": "final_handler",
"depends_on": ["lookup"],
"inputs": {
"account": "${INIT.account}",
"ticket": "${DEPS.lookup.ticket}",
"summary": "Ticket ${STATE.task_results.lookup.ticket.id}",
},
},
],
}常用 slot:
${INIT...}:初始 graph input 或 execution input。${DEPS.task.path}:上游依赖结果。${STATE...}:execution state。${TRIGGER...}:原始 TriggerFlow trigger payload,通常只在高级调试或 executor 集成里用。
整个字符串就是 placeholder 时,会保留原始值类型;placeholder 嵌在普通字符串里时,会渲染成字符串。路径缺失会 fail closed,不会把未解析字符串继续传给 handler。
YAML / JSON 配置计划
提交式计划可以放进 YAML 或 JSON:
from agently.core import TaskDAG
graph = TaskDAG.from_yaml("examples/dynamic_task/config_policy_review.yaml")
task = Agently.create_dynamic_task(
target="review policy",
plan=graph,
handlers={"risk_check_handler": risk_check_handler},
)
snapshot = await task.async_run(graph_input={"doc": "policy"}, timeout=10)TaskDAG.from_json(...) 接受文件路径或 JSON/JSON5 原始内容。from_yaml(...) 和 from_json(...) 都支持 task_dag_key_path="plans.review",用于从较大的配置文件里选择某一个 DAG。
Agent 上的同名入口
Agent 实例也可以创建 Dynamic Task:
task = (
agent
.info({"customer": "Acme"})
.instruct("Focus on renewal risk and account-team actions.")
.input({"account": "Acme", "ticket": "T-42"})
.output({
"summary": (str, "risk summary", True),
"risks": ([str], "risk bullets", True),
}, format="json")
.create_dynamic_task()
)这会消费当前 prompt snapshot。显式传入的 create_dynamic_task(target=..., output_schema=..., output_format=...) 参数优先于 prompt 推导值。
它和 TriggerFlow 的关系
Dynamic Task 的执行基座是 TriggerFlow,但它不是 TriggerFlow 的子 API,也不是一套独立任务生命周期。
Dynamic Task 自己负责:
- 规划或接收
TaskDAG。 - 校验 DAG 语法、依赖、semantic outputs、副作用策略和 resolver 可用性。
- 按
binding、task.id、kind解析 handler。 - 把校验后的 DAG 编译成 TriggerFlow chunk 执行。
TriggerFlow 负责底层 lifecycle、stream、pause/resume、result 和 runtime resources。
怎么判断执行结果
async_start(...) 返回 close snapshot。模型 semantic output 通常在 snapshot["semantic_outputs"] 中:
snapshot = await task.async_start(timeout=120)
_, output = next(iter(snapshot["semantic_outputs"].items()))
brief = output["result"]["brief"]不同业务可以在外层封装自己的 service,例如 IncidentBriefingService.brief(report) 或 ContractRiskReviewService.review(contract),把 Dynamic Task 的 snapshot 投影成业务对象。
可运行示例
Agently examples/dynamic_task/ 里按层次放了示例:
01_dynamic_task_basic.py:本地 handler 的 submitted DAG。02_support_response_module_model.py:模型驱动客服回复模块,业务查询用 mock。03_contract_risk_review_business.py:合同风险审查,local handler + 模型风险 memo。04_incident_briefing_auto_plan.py:模型先生成 TaskDAG,再执行事故简报。05_enterprise_renewal_complex_auto_plan.py:复杂企业续约自动规划。06_dynamic_task_config_plan.py:从 YAML 配置加载 submitted DAG。
常见误用
- 应用已经知道稳定流程,还绕去让模型生成 DAG。直接用 TriggerFlow。
- 普通 Agent prompt 或长任务 strategy 默认从 Dynamic Task 开始。先判断是否只需要 AgentExecution 或 task strategy。
- handler 里自己解析模型文本。模型节点应复用 Agently output schema。
- 未知必需 handler 继续执行。Dynamic Task 应在执行前 fail closed。
- 把
actions和skills默认暴露给 planner。确实需要时再显式传入。 - 把底层
TaskDAGExecutor当日常入口。应用开发优先用Agently.create_dynamic_task(...)或agent.create_dynamic_task(...)。
下一步
- 工作流拓扑由应用掌握:TriggerFlow 概览
- 输出契约和 retry:输出控制
- 给模型接外部能力:Actions 概览
- 服务化 Dynamic Task:FastAPI 服务封装