Knowledge‑base Multi‑turn QA
Scenario
Keep answers grounded across multiple turns using retrieval results from a knowledge base.
Approach
- Build a KB with ChromaCollection + embeddings
- TriggerFlow orchestrates “retrieve → answer”
- Agent answers from
kbonly
Code
python
from agently import Agently, TriggerFlow, TriggerFlowEventData
from agently.integrations.chromadb import ChromaCollection
Agently.set_settings("prompt.add_current_time", False)
Agently.set_settings("OpenAICompatible", {
"base_url": "http://localhost:11434/v1",
"model": "qwen2.5:7b",
"model_type": "chat",
})
agent = Agently.create_agent()
embedding_agent = Agently.create_agent()
embedding_agent.set_settings("OpenAICompatible", {
"base_url": "http://localhost:11434/v1",
"model": "qwen3-embedding:0.6b",
"model_type": "embeddings",
})
kb = ChromaCollection(
collection_name="agently_case_kb_en",
embedding_agent=embedding_agent,
get_or_create=True,
)
kb.add([
{
"id": "doc-1",
"document": "Agently is a production-oriented AI application framework focused on controllable output and orchestration.",
"metadata": {"source": "overview"},
},
{
"id": "doc-2",
"document": "TriggerFlow uses a when-emit signal mechanism for event-driven orchestration, suitable for complex signal systems.",
"metadata": {"source": "triggerflow"},
},
{
"id": "doc-3",
"document": "Instant structured streaming allows partial structured results to be consumed while generation is still in progress.",
"metadata": {"source": "output-control"},
},
])
questions = [
"What is Agently's core capability?",
"How is TriggerFlow different from traditional DAG orchestration?",
]
flow = TriggerFlow()
@flow.chunk
def input_questions(_: TriggerFlowEventData):
return questions
@flow.chunk
async def retrieve(data: TriggerFlowEventData):
question = data.value
results = kb.query(question, top_n=2)
return {"question": question, "kb": results}
@flow.chunk
async def answer(data: TriggerFlowEventData):
payload = data.value
result = await (
agent
.input(payload["question"])
.info({"kb": payload["kb"]})
.instruct("Answer using only {kb}. Keep it within 1-2 sentences.")
.output({"answer": ("str", "short answer"), "sources": [("str", "doc id")]})
.async_start()
)
return {"question": payload["question"], "answer": result}
(
flow.to(input_questions)
.for_each(concurrency=1)
.to(retrieve)
.to(answer)
.end_for_each()
.end()
)
result = flow.start("run")
print(result)Output
text
[{'question': "What is Agently's core capability?", 'answer': {'answer': 'Agently is a production-oriented AI application framework focused on controllable output and orchestration.', 'sources': ['doc-1']}}, {'question': 'How is TriggerFlow different from traditional DAG orchestration?', 'answer': {'answer': 'TriggerFlow uses a when-emit signal mechanism for event-driven orchestration, which differs from traditional DAG orchestration that typically relies on predefined task sequences.', 'sources': ['doc-2']}}]