流式 + 结构化混合 Playbook
场景
既要实时流式展示(UI/日志),又要结构化字段给下游系统使用。
需要使用功能(关键特性)
- 文本流式:
get_generator(type="delta") - 结构化流式:
get_generator(type="instant") - 单次请求复用:
get_response()固定一次请求
具体操作
output()定义结构化字段。get_response()固定一次请求。- 并发消费
delta与instant。
完整代码
python
import asyncio
from agently import Agently
Agently.set_settings(
"OpenAICompatible",
{
"base_url": "http://localhost:11434/v1",
"model": "qwen2.5:7b",
"model_type": "chat",
},
).set_settings("request_options", {"temperature": 0.2}).set_settings("debug", False)
agent = Agently.create_agent()
response = (
agent.system(
"你是运营日报助手,输出结构化结果。"
)
.input(
"给出一个 3 步的增长实验建议,并提供一句话总结。"
)
.output(
{
"title": (str, "标题"),
"summary": (str, "一句话总结"),
"steps": [
{
"step": (str, "步骤"),
"owner": (str, "负责人角色"),
}
],
}
)
.get_response()
)
async def stream_text():
chunks = []
async for chunk in response.get_async_generator(type="delta"):
chunks.append(chunk)
return "".join(chunks)
async def stream_structured():
events = []
async for item in response.get_async_generator(type="instant"):
if item.is_complete:
events.append(f"{item.path} => {item.value}")
return events
async def main():
text_stream, structured_events = await asyncio.gather(
stream_text(),
stream_structured(),
)
print("TEXT_STREAM:")
print(text_stream)
print("\nSTRUCTURED_STREAM:")
for event in structured_events:
print(event)
if __name__ == "__main__":
asyncio.run(main())真实输出
text
TEXT_STREAM:
{
"title": "3步增长实验建议",
"summary": "通过用户调研反馈、数据分析优化及AB测试营销策略,进一步增强用户活跃度。",
"steps": [
{
"step": "进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。",
"owner": "产品经理"
},
{
"step": "依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。",
"owner": "数据分析师"
},
{
"step": "通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。",
"owner": "市场推广负责人"
}
]
}
STRUCTURED_STREAM:
title => 3步增长实验建议
summary => 通过用户调研反馈、数据分析优化及AB测试营销策略,进一步增强用户活跃度。
steps[0].step => 进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。
steps[0].owner => 产品经理
steps[1].step => 依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。
steps[1].owner => 数据分析师
steps[2].step => 通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。
steps[2].owner => 市场推广负责人
steps[0] => {'step': '进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。', 'owner': '产品经理'}
steps[1] => {'step': '依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。', 'owner': '数据分析师'}
steps[2] => {'step': '通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。', 'owner': '市场推广负责人'}
steps => [{'step': '进行用户体验调研和问卷调查,了解用户对于现有功能使用体验的感受以及改进建议。', 'owner': '产品经理'}, {'step': '依据调研结果和现有数据分析潜在优化方向,并设计针对性的改进方案。', 'owner': '数据分析师'}, {'step': '通过AB测试对比不同营销策略的效果,持续监控效果以便及时调整。', 'owner': '市场推广负责人'}]验证点
delta与instant同时可消费。- 结构化字段在
instant中完整可见。