工单分流 Playbook
工单分流看起来简单,线上最容易出问题:模型说“应该找账务”,代码却不知道该调用哪个 handler;或者靠关键词命中把“不能退款”误判成“退款”。一个稳定的分流系统应该让模型只负责语义分类,让代码负责确定性路由。
适用场景
每个输入都要完成这几步:
- 归一化一条工单、邮件、告警或请求。
- 分类到一个小集合。
- 给出优先级和简短理由。
- 按分类调用下游 handler。
- 记录分类结果和处理结果。
类别少、处理一步完成,用单次 Agent 请求。每个类别后面还有多步流程、人工审批、并行处理或 runtime stream,用 TriggerFlow。
单次请求形态
python
from agently import Agently
agent = (
Agently.create_agent()
.info({
"categories": ["billing", "technical", "spam", "other"],
"routing_policy": "只返回 schema 中允许的 category。",
}, always=True)
)
def ensure_known_category(result, ctx):
if result["category"] not in {"billing", "technical", "spam", "other"}:
return {
"ok": False,
"reason": f"unknown category: {result['category']}",
"validator_name": "category",
}
return True
execution = (
agent
.input(ticket_text)
.output({
"category": (str, "billing/technical/spam/other 之一", True),
"severity": (str, "low/med/high", True),
"summary": (str, "一行摘要", True),
})
.validate(ensure_known_category)
)
result = execution.get_result()
triage = await result.async_get_data(ensure_keys=["category", "severity", "summary"])
route_to_handler(triage["category"], triage)这里的分工是:
- 模型负责从自然语言里判断
category、severity、summary。 .validate(...)确保类别在允许集合里。- Python dict 或 router 负责调用 handler。
不要让模型输出“下一步该调用哪个函数”。模型给结构化分类,代码做路由。
TriggerFlow 形态
当每类处理本身有步骤时,把分类和路由放进 TriggerFlow。
python
from agently import TriggerFlow, TriggerFlowRuntimeData
flow = TriggerFlow(name="ticket-triage")
async def classify(data: TriggerFlowRuntimeData):
agent = data.require_resource("classifier")
result = (
agent
.input(data.input["content"])
.output({
"category": (str, "billing/technical/spam/other 之一", True),
"severity": (str, "low/med/high", True),
"summary": (str, "一行摘要", True),
})
.get_result()
)
triage = await result.async_get_data(ensure_keys=["category", "severity", "summary"])
await data.async_set_state("triage", triage)
return triage
async def handle_billing(data):
await data.async_set_state("outcome", {"path": "billing", "ok": True})
async def handle_technical(data):
await data.async_set_state("outcome", {"path": "technical", "ok": True})
async def handle_other(data):
await data.async_set_state("outcome", {"path": "other", "ok": True})
(
flow.to(classify)
.if_condition(lambda data: data.input["category"] == "billing")
.to(handle_billing)
.elif_condition(lambda data: data.input["category"] == "technical")
.to(handle_technical)
.else_condition()
.to(handle_other)
.end_condition()
)每个 handler 后续可以长成子流。比如 billing 需要查账单、判断退款金额、等人工批准,就把它拆成 Sub-Flow。
高量输入:for_each 并行
一批工单同时到达时,用 for_each 控制并发:
python
flow.for_each(concurrency=8).to(triage_one_ticket).end_for_each().to(persist_results)并发上限要同时考虑模型 provider 限速和下游 API 承载能力。过高会把失败推到 provider 或业务系统。
高风险类别:pause 等人工
退款、关账户、批量删除这类操作可以在分支里暂停:
python
async def maybe_request_approval(data):
triage = data.get_state("triage")
if triage["category"] == "billing" and data.input.get("amount", 0) > 1000:
return await data.async_pause_for(
type="approval",
payload={
"ticket_id": data.input["id"],
"amount": data.input["amount"],
"summary": triage["summary"],
},
resume_to="next",
)
return {"approved": True}含 pause 的 execution 用 auto_close=False 创建,并保存 pending interrupt。
审计和前端进度
分类后把结构化结果推到 runtime stream:
python
await data.async_put_into_stream({
"event": "classified",
"ticket_id": data.input["id"],
"triage": triage,
})服务侧消费 execution.get_async_runtime_stream(...),可以把分类结果给前端、日志或审计系统。
常见误用
| 写法 | 后果 |
|---|---|
| 用关键词/正则做语义分类 owner | 输入稍微变化就误判 |
| 让模型输出要调用哪个函数 | 编排逻辑不可控,审计困难 |
| 类别处理都只有一步还上 TriggerFlow | 增加复杂度 |
把 classifier 放进 flow_data | 并发共享和 save/load 都不清楚 |
另见
- 输出控制 -
.validate(...)强制类别 - Schema as Prompt - 分类字段怎么写 schema
- TriggerFlow 模式 - 分支、for_each、事件
- Pause 与 Resume - 人工审批路径