- unwind ai
- Posts
- Build a RAG Agent with Database Routing
Build a RAG Agent with Database Routing
Fully functional agentic RAG app using GPT-4o (step-by-step instructions)
Imagine you're building a customer service AI that needs to handle queries about products, support issues, and financial matters. You could dump all your documents into a single vector database, but that would be like having a library where cookbooks, technical manuals, and financial reports are all mixed together. Not very efficient, right?
Traditional RAG systems treat all documents uniformly, leading to slower search times and diluted results. What if we could automatically route queries to the most relevant database while maintaining high performance?
In this tutorial, we'll build a sophisticated RAG system with intelligent database routing that uses multiple specialized vector databases (product info, customer support, financial data) with an agent-based router to direct queries to the most relevant database. When no relevant documents are found, it gracefully falls back to web search using DuckDuckGo.
The app uses:
Langchain for RAG orchestration
Agno for the router agent to determine the most relevant database for a given query
LangGraph as a fallback mechanism, utilizing DuckDuckGo for web research when necessary
Streamlit for a user-friendly interface for document upload and querying
Qdrant for storing and retrieving document embeddings
GTP-4o for answer synthesis
What We’re Building
We’re building a RAG app with database routing capabilities that allows users to upload multiple documents to three different databases: Product Information, Customer Support & FAQ, and Financial Information. The user can query the uploaded information in natural language, and the app will route to the most relevant database.
Features:
Query Routing - The system uses a three-stage routing approach:
Vector similarity search across all databases
LLM-based routing for ambiguous queries
Web search fallback for unknown topics
Dual-stage smart query routing:
Primary: Vector similarity scoring with confidence thresholds (0.5 threshold)
Fallback: GPT-4o-powered routing agent when confidence is low
Document Processing
Automatic text extraction from PDFs
Smart text chunking with overlap
Vector embedding generation
Answer Generation
Context-aware retrieval
Confidence-based responses
Prerequisites
Before we begin, make sure you have the following:
Step-by-Step Instructions
Setting Up the Environment
First, let's get our development environment ready:
Clone the GitHub repository:
git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git
Go to the rag_database_routing folder:
cd rag_tutorials/rag_database_routing
Install the required dependencies:
pip install -r requirements.txt
Get your API Key:
Obtain an OpenAI API key and set it in the application.
Qdrant Cloud Setup - Visit Qdrant Cloud > Create an account or sign in > Create a new cluster > Get your credentials:
- Qdrant API Key: Found in API Keys section
- Qdrant URL: Your cluster URL (format:https://xxx-xxx.aws.cloud.qdrant.io
)
Creating the Streamlit App
Let’s create our app. Create a new file rag_database_routing.py
and add the following code:
Let's set up our imports:
import os
from typing import List, Dict, Any, Literal, Optional
from dataclasses import dataclass
import streamlit as st
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from langgraph.prebuilt import create_react_agent
from langchain_community.tools import DuckDuckGoSearchRun
Define database configurations:
DatabaseType = Literal["products", "support", "finance"]
@dataclass
class CollectionConfig:
name: str
description: str
collection_name: str
COLLECTIONS: Dict[DatabaseType, CollectionConfig] = {
"products": CollectionConfig(
name="Product Information",
description="Product details, specifications, and features",
collection_name="products_collection"
),
"support": CollectionConfig(
name="Customer Support & FAQ",
description="Customer support information, frequently asked questions, and guides",
collection_name="support_collection"
),
"finance": CollectionConfig(
name="Financial Information",
description="Financial data, revenue, costs, and liabilities",
collection_name="finance_collection"
)
}
Initialize models and Qdrant connection:
def initialize_models():
"""Initialize OpenAI models and Qdrant client"""
if (st.session_state.openai_api_key and
st.session_state.qdrant_url and
st.session_state.qdrant_api_key):
os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key
st.session_state.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
st.session_state.llm = ChatOpenAI(temperature=0)
try:
client = QdrantClient(
url=st.session_state.qdrant_url,
api_key=st.session_state.qdrant_api_key
)
# Initialize databases for each collection type
st.session_state.databases = {}
for db_type, config in COLLECTIONS.items():
st.session_state.databases[db_type] = Qdrant(
client=client,
collection_name=config.collection_name,
embeddings=st.session_state.embeddings
)
return True
except Exception as e:
st.error(f"Failed to connect to Qdrant: {str(e)}")
return False
return False
Document processing functionality:
def process_document(file) -> List[Document]:
"""Process uploaded PDF document"""
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(file.getvalue())
tmp_path = tmp_file.name
loader = PyPDFLoader(tmp_path)
documents = loader.load()
os.unlink(tmp_path) # Clean up
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
texts = text_splitter.split_documents(documents)
return texts
except Exception as e:
st.error(f"Error processing document: {e}")
return []
Create the routing agent:
def create_routing_agent() -> Agent:
"""Creates a routing agent using agno framework"""
return Agent(
model=OpenAIChat(
id="gpt-4o",
api_key=st.session_state.openai_api_key
),
tools=[],
description="""You are a query routing expert. Your only job is to analyze questions and determine which database they should be routed to.
You must respond with exactly one of these three options: 'products', 'support', or 'finance'.""",
instructions=[
"For questions about products, features, specifications → return 'products'",
"For questions about help, troubleshooting, customer service → return 'support'",
"For questions about costs, revenue, financial data → return 'finance'",
"Return ONLY the database name, no other text"
],
markdown=False,
show_tool_calls=False
)
Implement query routing:
def route_query(question: str) -> Optional[DatabaseType]:
"""Route query by searching all databases and comparing relevance scores"""
try:
best_score = -1
best_db_type = None
# Search each database and compare relevance scores
for db_type, db in st.session_state.databases.items():
results = db.similarity_search_with_score(question, k=3)
if results:
avg_score = sum(score for _, score in results) / len(results)
if avg_score > best_score:
best_score = avg_score
best_db_type = db_type
confidence_threshold = 0.5
if best_score >= confidence_threshold and best_db_type:
st.success(f"Using vector similarity routing: {best_db_type}")
return best_db_type
# Fallback to LLM routing
st.warning("Low confidence, falling back to LLM routing")
routing_agent = create_routing_agent()
response = routing_agent.run(question)
db_type = response.content.strip().lower()
if db_type in COLLECTIONS:
st.success(f"Using LLM routing decision: {db_type}")
return db_type
return None
except Exception as e:
st.error(f"Routing error: {str(e)}")
return None
Web search fallback:
def create_fallback_agent(chat_model):
"""Create a LangGraph agent for web research"""
def web_research(query: str) -> str:
try:
search = DuckDuckGoSearchRun(num_results=5)
results = search.run(query)
return results
except Exception as e:
return f"Search failed: {str(e)}"
tools = [web_research]
agent = create_react_agent(model=chat_model, tools=tools, debug=False)
return agent
def _handle_web_fallback(question: str) -> tuple[str, list]:
st.info("No relevant documents found. Searching web...")
fallback_agent = create_fallback_agent(st.session_state.llm)
with st.spinner('Researching...'):
response = fallback_agent.invoke({
"messages": [HumanMessage(content=f"Research: '{question}'")]
})
answer = response["messages"][-1].content
return f"Web Search Result:\n{answer}", []
Database querying with RAG:
def query_database(db: Qdrant, question: str) -> tuple[str, list]:
"""Query the database and return answer and relevant documents"""
try:
retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": 4})
relevant_docs = retriever.get_relevant_documents(question)
if relevant_docs:
retrieval_qa_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant. Answer based on provided context."),
("human", "Context:\n{context}"),
("human", "Question: {input}")
])
combine_docs_chain = create_stuff_documents_chain(st.session_state.llm, retrieval_qa_prompt)
retrieval_chain = create_retrieval_chain(retriever, combine_docs_chain)
response = retrieval_chain.invoke({"input": question})
return response['answer'], relevant_docs
raise ValueError("No relevant documents found")
except Exception as e:
return "Error occurred. Please try rephrasing.", []
Streamlit interface:
def main():
st.set_page_config(page_title="RAG Agent with Database Routing", page_icon="📚")
st.title("📠 RAG Agent with Database Routing")
# API configuration in sidebar
with st.sidebar:
st.header("Configuration")
api_key = st.text_input("OpenAI API Key:", type="password")
qdrant_url = st.text_input("Qdrant URL:")
qdrant_api_key = st.text_input("Qdrant API Key:", type="password")
# Document upload tabs
st.header("Document Upload")
tabs = st.tabs([config.name for config in COLLECTIONS.values()])
# Query interface
st.header("Ask Questions")
question = st.text_input("Enter your question:")
if question:
collection_type = route_query(question)
if collection_type is None:
answer, _ = _handle_web_fallback(question)
else:
db = st.session_state.databases[collection_type]
answer, _ = query_database(db, question)
st.write("### Answer")
st.write(answer)
Running the App
With our code in place, it's time to launch the app.
In your terminal, navigate to the project folder, and run the following command
streamlit run rag_database_routing.py
Streamlit will provide a local URL (typically http://localhost:8501).
Working Application Demo
Conclusion
You've successfully built a production-ready RAG system with intelligent database routing! Unlike basic RAG implementations, your system now intelligently directs queries to specialized databases, making it vastly more efficient and accurate.
For further enhancements, you can:
Implement cross-database queries when questions span multiple domains
Add routing history analysis to improve future routing decisions
Add support for more document formats (DOCX, HTML, Markdown)
Implement caching for frequently accessed documents
Implement source citations in responses
Keep experimenting and refining to build smarter AI solutions!
We share hands-on tutorials like this 2-3 times a week, to help you stay ahead in the world of AI. If you're serious about leveling up your AI skills and staying ahead of the curve, subscribe now and be the first to access our latest tutorials.
Reply