Skip to content

English mirror

This English page is generated from the current Chinese documentation so every route, anchor, code sample, and language switch stays available on agently.tech. Human-edited English copy can replace this generated body page by page.

Read the Chinese source page

工单分流 Playbook

工单分流看起来简单,线上最容易出问题:模型说“应该找账务”,代码却不知道该调用哪个 handler;或者靠关键词命中把“不能退款”误判成“退款”。一个稳定的分流系统应该让模型只负责语义分类,让代码负责确定性路由。

适用场景

每个输入都要完成这几步:

  1. 归一化一条工单、邮件、告警或请求。
  2. 分类到一个小集合。
  3. 给出优先级和简短理由。
  4. 按分类调用下游 handler。
  5. 记录分类结果和处理结果。

类别少、处理一步完成,用单次 Agent 请求。每个类别后面还有多步流程、人工审批、并行处理或 runtime stream,用 TriggerFlow。

单次请求形态

python
from agently import Agently

agent = (
    Agently.create_agent()
    .info({
        "categories": ["billing", "technical", "spam", "other"],
        "routing_policy": "只返回 schema 中允许的 category。",
    }, always=True)
)


def ensure_known_category(result, ctx):
    if result["category"] not in {"billing", "technical", "spam", "other"}:
        return {
            "ok": False,
            "reason": f"unknown category: {result['category']}",
            "validator_name": "category",
        }
    return True


execution = (
    agent
    .input(ticket_text)
    .output({
        "category": (str, "billing/technical/spam/other 之一", True),
        "severity": (str, "low/med/high", True),
        "summary": (str, "一行摘要", True),
    })
    .validate(ensure_known_category)
)
result = execution.get_result()
triage = await result.async_get_data(ensure_keys=["category", "severity", "summary"])

route_to_handler(triage["category"], triage)

这里的分工是:

  • 模型负责从自然语言里判断 categoryseveritysummary
  • .validate(...) 确保类别在允许集合里。
  • Python dict 或 router 负责调用 handler。

不要让模型输出“下一步该调用哪个函数”。模型给结构化分类,代码做路由。

TriggerFlow 形态

当每类处理本身有步骤时,把分类和路由放进 TriggerFlow。

python
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow(name="ticket-triage")


async def classify(data: TriggerFlowRuntimeData):
    agent = data.require_resource("classifier")
    result = (
        agent
        .input(data.input["content"])
        .output({
            "category": (str, "billing/technical/spam/other 之一", True),
            "severity": (str, "low/med/high", True),
            "summary": (str, "一行摘要", True),
        })
        .get_result()
    )
    triage = await result.async_get_data(ensure_keys=["category", "severity", "summary"])
    await data.async_set_state("triage", triage)
    return triage


async def handle_billing(data):
    await data.async_set_state("outcome", {"path": "billing", "ok": True})


async def handle_technical(data):
    await data.async_set_state("outcome", {"path": "technical", "ok": True})


async def handle_other(data):
    await data.async_set_state("outcome", {"path": "other", "ok": True})


(
    flow.to(classify)
    .if_condition(lambda data: data.input["category"] == "billing")
        .to(handle_billing)
    .elif_condition(lambda data: data.input["category"] == "technical")
        .to(handle_technical)
    .else_condition()
        .to(handle_other)
    .end_condition()
)

每个 handler 后续可以长成子流。比如 billing 需要查账单、判断退款金额、等人工批准,就把它拆成 Sub-Flow

高量输入:for_each 并行

一批工单同时到达时,用 for_each 控制并发:

python
flow.for_each(concurrency=8).to(triage_one_ticket).end_for_each().to(persist_results)

并发上限要同时考虑模型 provider 限速和下游 API 承载能力。过高会把失败推到 provider 或业务系统。

高风险类别:pause 等人工

退款、关账户、批量删除这类操作可以在分支里暂停:

python
async def maybe_request_approval(data):
    triage = data.get_state("triage")
    if triage["category"] == "billing" and data.input.get("amount", 0) > 1000:
        return await data.async_pause_for(
            type="approval",
            payload={
                "ticket_id": data.input["id"],
                "amount": data.input["amount"],
                "summary": triage["summary"],
            },
            resume_to="next",
        )
    return {"approved": True}

含 pause 的 execution 用 auto_close=False 创建,并保存 pending interrupt。

审计和前端进度

分类后把结构化结果推到 runtime stream:

python
await data.async_put_into_stream({
    "event": "classified",
    "ticket_id": data.input["id"],
    "triage": triage,
})

服务侧消费 execution.get_async_runtime_stream(...),可以把分类结果给前端、日志或审计系统。

常见误用

写法后果
用关键词/正则做语义分类 owner输入稍微变化就误判
让模型输出要调用哪个函数编排逻辑不可控,审计困难
类别处理都只有一步还上 TriggerFlow增加复杂度
把 classifier 放进 flow_data并发共享和 save/load 都不清楚

另见