Skip to content

Dynamic Task

有些任务不是应用代码提前写好流程,而是运行时才知道该分几步。比如事故简报、合同风险审查、企业续约分析:模型或业务系统会先生成一个任务图,再让系统按依赖执行。

Dynamic Task 处理的是这类问题。它是 TaskDAG 的便利 facade:接收一个目标或一个已提交的 DAG,先校验 TaskDAG,再解析 handler,并把图编译成 TriggerFlow execution 执行。它可以作为 AgentExecution 的一个 bounded DAG phase,但不负责成为外层长任务 owner。

什么时候用 Dynamic Task

当前情况建议
应用代码已经知道稳定流程拓扑TriggerFlow
只是一次结构化模型请求用 request + 输出控制
只是要把 Agent 长期设置和本轮输入隔离AgentExecution
模型需要调用函数或 MCPActions
任务图来自模型规划或业务系统提交用 Dynamic Task
需要先校验 DAG,再执行节点依赖用 Dynamic Task

一句话判断:图本身是输入,就用 Dynamic Task;图是应用代码的一部分,就用 TriggerFlow。

两种进入方式

让模型规划任务图

当调用方只知道目标,不知道具体 DAG,可以让 Dynamic Task 规划:

python
from agently import Agently

task = Agently.create_dynamic_task(
    target="write an incident briefing",
    output_schema={
        "brief": (str, "customer-facing briefing", True),
        "next_update": (str, "next update timing", True),
    },
)

snapshot = await task.async_start(timeout=120)

output_schema 会作用到 semantic output 的模型节点。模型任务仍然复用 Agently request 的输出流水线,不需要在 handler 里自己解析模型文本。

直接提交已有 DAG

如果应用已经有计划,直接传 plan,跳过模型规划:

python
async def local_handler(context):
    return {
        "task_id": context.task.id,
        "deps": dict(context.dependency_results),
    }

task = Agently.create_dynamic_task(
    target="review policy",
    plan={
        "graph_id": "review",
        "task_schema_version": "task_dag/v1",
        "tasks": [
            {"id": "extract", "kind": "local", "binding": "local_handler"},
            {
                "id": "final",
                "kind": "local",
                "binding": "local_handler",
                "depends_on": ["extract"],
            },
        ],
        "semantic_outputs": {"final": "final"},
    },
    handlers={"local_handler": local_handler},
)

snapshot = await task.async_start(timeout=10)

自定义 handler 建议使用清晰且以 _handler 结尾的名字。DAG 里的 binding 指向 handler 名称。

运行时输入怎么传

提交式 DAG 的 inputs 可以引用运行时数据:

python
plan = {
    "graph_id": "review",
    "task_schema_version": "task_dag/v1",
    "tasks": [
        {"id": "lookup", "kind": "local", "binding": "lookup_handler"},
        {
            "id": "final",
            "kind": "local",
            "binding": "final_handler",
            "depends_on": ["lookup"],
            "inputs": {
                "account": "${INIT.account}",
                "ticket": "${DEPS.lookup.ticket}",
                "summary": "Ticket ${STATE.task_results.lookup.ticket.id}",
            },
        },
    ],
}

常用 slot:

  • ${INIT...}:初始 graph input 或 execution input。
  • ${DEPS.task.path}:上游依赖结果。
  • ${STATE...}:execution state。
  • ${TRIGGER...}:原始 TriggerFlow trigger payload,通常只在高级调试或 executor 集成里用。

整个字符串就是 placeholder 时,会保留原始值类型;placeholder 嵌在普通字符串里时,会渲染成字符串。路径缺失会 fail closed,不会把未解析字符串继续传给 handler。

YAML / JSON 配置计划

提交式计划可以放进 YAML 或 JSON:

python
from agently.core import TaskDAG

graph = TaskDAG.from_yaml("examples/dynamic_task/config_policy_review.yaml")
task = Agently.create_dynamic_task(
    target="review policy",
    plan=graph,
    handlers={"risk_check_handler": risk_check_handler},
)

snapshot = await task.async_run(graph_input={"doc": "policy"}, timeout=10)

TaskDAG.from_json(...) 接受文件路径或 JSON/JSON5 原始内容。from_yaml(...)from_json(...) 都支持 task_dag_key_path="plans.review",用于从较大的配置文件里选择某一个 DAG。

Agent 上的同名入口

Agent 实例也可以创建 Dynamic Task:

python
task = (
    agent
    .info({"customer": "Acme"})
    .instruct("Focus on renewal risk and account-team actions.")
    .input({"account": "Acme", "ticket": "T-42"})
    .output({
        "summary": (str, "risk summary", True),
        "risks": ([str], "risk bullets", True),
    }, format="json")
    .create_dynamic_task()
)

这会消费当前 prompt snapshot。显式传入的 create_dynamic_task(target=..., output_schema=..., output_format=...) 参数优先于 prompt 推导值。

它和 TriggerFlow 的关系

Dynamic Task 的执行基座是 TriggerFlow,但它不是 TriggerFlow 的子 API,也不是一套独立任务生命周期。

Dynamic Task 自己负责:

  • 规划或接收 TaskDAG
  • 校验 DAG 语法、依赖、semantic outputs、副作用策略和 resolver 可用性。
  • bindingtask.idkind 解析 handler。
  • 把校验后的 DAG 编译成 TriggerFlow chunk 执行。

TriggerFlow 负责底层 lifecycle、stream、pause/resume、result 和 runtime resources。

怎么判断执行结果

async_start(...) 返回 close snapshot。模型 semantic output 通常在 snapshot["semantic_outputs"] 中:

python
snapshot = await task.async_start(timeout=120)
_, output = next(iter(snapshot["semantic_outputs"].items()))
brief = output["result"]["brief"]

不同业务可以在外层封装自己的 service,例如 IncidentBriefingService.brief(report)ContractRiskReviewService.review(contract),把 Dynamic Task 的 snapshot 投影成业务对象。

可运行示例

Agently examples/dynamic_task/ 里按层次放了示例:

  • 01_dynamic_task_basic.py:本地 handler 的 submitted DAG。
  • 02_support_response_module_model.py:模型驱动客服回复模块,业务查询用 mock。
  • 03_contract_risk_review_business.py:合同风险审查,local handler + 模型风险 memo。
  • 04_incident_briefing_auto_plan.py:模型先生成 TaskDAG,再执行事故简报。
  • 05_enterprise_renewal_complex_auto_plan.py:复杂企业续约自动规划。
  • 06_dynamic_task_config_plan.py:从 YAML 配置加载 submitted DAG。

常见误用

  • 应用已经知道稳定流程,还绕去让模型生成 DAG。直接用 TriggerFlow。
  • 普通 Agent prompt 或长任务 strategy 默认从 Dynamic Task 开始。先判断是否只需要 AgentExecution 或 task strategy。
  • handler 里自己解析模型文本。模型节点应复用 Agently output schema。
  • 未知必需 handler 继续执行。Dynamic Task 应在执行前 fail closed。
  • actionsskills 默认暴露给 planner。确实需要时再显式传入。
  • 把底层 TaskDAGExecutor 当日常入口。应用开发优先用 Agently.create_dynamic_task(...)agent.create_dynamic_task(...)

下一步