Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

模型集成

TriggerFlow 负责把多个业务阶段组织成可观测、可等待、可并发的流程。模型调用仍然交给 Agently Agent / request 层。chunk handler 是普通 async 函数,所以在 chunk 里调模型时,重点是三件事:

  • 用 async,不阻塞 execution。
  • 用结构化输出,让下一段 chunk 拿到稳定字段。
  • 只有用户能受益时才把模型增量桥接到 runtime stream。

最小模式:chunk 里发一次结构化请求

python
from agently import Agently, TriggerFlow, TriggerFlowRuntimeData

agent = Agently.create_agent()
flow = TriggerFlow(name="classify-ticket")


async def classify(data: TriggerFlowRuntimeData):
    result = (
        agent
        .input(data.input["content"])
        .output({
            "category": (str, "工单分类", True),
            "priority": (str, "优先级", True),
        })
        .get_result()
    )
    classification = await result.async_get_data(
        ensure_keys=["category", "priority"],
    )
    await data.async_set_state("classification", classification)
    return classification


flow.to(classify)

这段代码里,Agent 负责模型请求和结构化输出;TriggerFlow 负责把 classification 传给下一段,并把它写进 execution state,最终进入 close snapshot。

在 chunk 里保持 async

TriggerFlow execution 本身是 async 调度。chunk 里调用同步 start() 能跑,但会阻塞 event loop,影响并发、stream 和取消。服务代码里优先使用:

  • result.async_get_data(...)
  • result.async_get_text()
  • result.async_get_meta()
  • result.get_async_generator(type="instant")

短脚本可以用同步 API,服务和长流程尽量 async-first。

一次请求,多种读取

同一个模型请求经常要读结构化结果、原文和 meta。用 get_result() 拿 result 对象,再从这个对象读取,不会重复发请求。

python
async def analyze(data):
    result = (
        agent
        .input(data.input)
        .output({
            "summary": (str, "摘要", True),
            "risk": (str, "风险等级", True),
        })
        .get_result()
    )

    text = await result.async_get_text()
    obj = await result.async_get_data(ensure_keys=["summary", "risk"])
    meta = await result.async_get_meta()

    await data.async_set_state("raw_text", text)
    await data.async_set_state("analysis", obj)
    await data.async_set_state("model_meta", meta)
    return obj

业务字段进 state。provider、token、延迟等运行信息可以进单独的 meta key,或者只写服务日志。

把模型增量桥接到 runtime stream

前端需要边生成边显示时,把 Agent result 的 instant 增量转成 TriggerFlow runtime stream item。

python
async def draft_reply(data: TriggerFlowRuntimeData):
    result = (
        agent
        .input(data.input["content"])
        .output({
            "title": (str, "回复标题", True),
            "body": (str, "回复正文", True),
        })
        .get_result()
    )

    async for item in result.get_async_generator(type="instant"):
        if item.delta:
            await data.async_put_into_stream({
                "type": "model_delta",
                "path": item.path,
                "delta": item.delta,
                "done": item.is_complete,
            })

    final = await result.async_get_data(ensure_keys=["title", "body"])
    await data.async_set_state("draft", final)
    return final

type="instant" 产出的是结构化字段的增量,不是 provider 原始 token。UI 可以按 path 更新标题、正文或其他字段。stream 结束后,async_get_data(...) 从同一个 result 读取最终结构化数据。

按 execution 注入不同 agent

模块级 agent 适合所有 execution 共用同一套模型配置。需要按租户、环境或请求切换模型时,把 agent 放进 runtime_resources

python
execution = flow.create_execution(
    runtime_resources={
        "agent": Agently.create_agent().set_settings("OpenAICompatible", {
            "model": tenant_model,
        }),
    },
)


async def classify(data):
    agent = data.require_resource("agent")
    result = agent.input(data.input).output({...}).get_result()
    return await result.async_get_data()

agent 持有模型配置和网络 client,不应该放进 state。state 留给可序列化的业务结果。

多个模型角色放在不同 chunk

一个 flow 可以有多个 agent:小模型分类,大模型起草,另一个模型做校验。

python
classifier = Agently.create_agent().set_settings("OpenAICompatible", {
    "model": "${ENV.CLASSIFIER_MODEL}",
})
writer = Agently.create_agent().set_settings("OpenAICompatible", {
    "model": "${ENV.WRITER_MODEL}",
})


async def classify(data):
    result = classifier.input(data.input).output({...}).get_result()
    return await result.async_get_data()


async def write_reply(data):
    result = writer.input(data.input).output({...}).get_result()
    return await result.async_get_data()


flow.to(classify).to(write_reply)

TriggerFlow 只负责阶段连接和生命周期。每个 agent 仍然保持自己的 prompt、输出结构、工具和模型配置。

校验和重试仍然属于 request 层

chunk 里可以继续使用 request 层的输出控制、校验和重试:

python
async def extract(data):
    result = (
        agent
        .input(data.input)
        .output({"answer": (str, "回答", True)})
        .validate(custom_business_check)
        .get_result()
    )
    answer = await result.async_get_data(ensure_keys=["answer"])
    await data.async_set_state("answer", answer)
    return answer

重试预算属于这一次模型请求,不会让 TriggerFlow 其他 chunk 自动重跑。需要流程级重试、反思或人工审批时,把它们显式写成 TriggerFlow 的图层结构。

不要把模型状态塞进 flow_data

flow_data 是 flow 级共享。把“上一次模型答案”“当前会话上下文”放进去,会在并发 execution 之间互相影响。

  • 单次 execution 的模型输出:放 state
  • 多轮对话上下文:用 Session / Workspace / Recall 这类专门能力。
  • agent、client、tool:放 runtime_resources

另见