• unwind ai
  • Posts
  • Build a RAG Agent with Database Routing

Build a RAG Agent with Database Routing

Fully functional agentic RAG app using GPT-4o (step-by-step instructions)

Imagine you're building a customer service AI that needs to handle queries about products, support issues, and financial matters. You could dump all your documents into a single vector database, but that would be like having a library where cookbooks, technical manuals, and financial reports are all mixed together. Not very efficient, right?

Traditional RAG systems treat all documents uniformly, leading to slower search times and diluted results. What if we could automatically route queries to the most relevant database while maintaining high performance?

In this tutorial, we'll build a sophisticated RAG system with intelligent database routing that uses multiple specialized vector databases (product info, customer support, financial data) with an agent-based router to direct queries to the most relevant database. When no relevant documents are found, it gracefully falls back to web search using DuckDuckGo.

The app uses:

  • Langchain for RAG orchestration

  • Agno for the router agent to determine the most relevant database for a given query

  • LangGraph as a fallback mechanism, utilizing DuckDuckGo for web research when necessary

  • Streamlit for a user-friendly interface for document upload and querying

  • Qdrant for storing and retrieving document embeddings

  • GTP-4o for answer synthesis

Don’t forget to share this tutorial on your social channels and tag Unwind AI (X, LinkedIn, Threads, Facebook) to support us!

What We’re Building

We’re building a RAG app with database routing capabilities that allows users to upload multiple documents to three different databases: Product Information, Customer Support & FAQ, and Financial Information. The user can query the uploaded information in natural language, and the app will route to the most relevant database.

Features:

  1. Query Routing - The system uses a three-stage routing approach:

    • Vector similarity search across all databases

    • LLM-based routing for ambiguous queries

    • Web search fallback for unknown topics

  2. Dual-stage smart query routing:

    • Primary: Vector similarity scoring with confidence thresholds (0.5 threshold)

    • Fallback: GPT-4o-powered routing agent when confidence is low

  3. Document Processing

    • Automatic text extraction from PDFs

    • Smart text chunking with overlap

    • Vector embedding generation

  4. Answer Generation

    • Context-aware retrieval

    • Confidence-based responses

Prerequisites

Before we begin, make sure you have the following:

  1. Python installed on your machine (version 3.10 or higher is recommended)

  2. Your OpenAI and Qdrant API Key along with cluster URL

  3. A code editor of your choice (we recommend VS Code or PyCharm for their excellent Python support)

  4. Basic familiarity with Python programming

Step-by-Step Instructions

Setting Up the Environment

First, let's get our development environment ready:

  1. Clone the GitHub repository:

git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git
  1. Go to the rag_database_routing folder:

cd rag_tutorials/rag_database_routing
pip install -r requirements.txt
  1. Get your API Key:

    • Obtain an OpenAI API key and set it in the application.

    • Qdrant Cloud Setup - Visit Qdrant Cloud > Create an account or sign in > Create a new cluster > Get your credentials:
      - Qdrant API Key: Found in API Keys section
      - Qdrant URL: Your cluster URL (format: https://xxx-xxx.aws.cloud.qdrant.io)

Creating the Streamlit App

Let’s create our app. Create a new file rag_database_routing.py and add the following code:

  1. Let's set up our imports:

import os
from typing import List, Dict, Any, Literal, Optional
from dataclasses import dataclass
import streamlit as st
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from langgraph.prebuilt import create_react_agent
from langchain_community.tools import DuckDuckGoSearchRun
  1. Define database configurations:

DatabaseType = Literal["products", "support", "finance"]

@dataclass
class CollectionConfig:
    name: str
    description: str
    collection_name: str

COLLECTIONS: Dict[DatabaseType, CollectionConfig] = {
    "products": CollectionConfig(
        name="Product Information",
        description="Product details, specifications, and features",
        collection_name="products_collection"
    ),
    "support": CollectionConfig(
        name="Customer Support & FAQ", 
        description="Customer support information, frequently asked questions, and guides",
        collection_name="support_collection"
    ),
    "finance": CollectionConfig(
        name="Financial Information",
        description="Financial data, revenue, costs, and liabilities", 
        collection_name="finance_collection"
    )
}
  1. Initialize models and Qdrant connection:

def initialize_models():
    """Initialize OpenAI models and Qdrant client"""
    if (st.session_state.openai_api_key and 
        st.session_state.qdrant_url and 
        st.session_state.qdrant_api_key):
        
        os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key
        st.session_state.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
        st.session_state.llm = ChatOpenAI(temperature=0)
        
        try:
            client = QdrantClient(
                url=st.session_state.qdrant_url,
                api_key=st.session_state.qdrant_api_key
            )
            
            # Initialize databases for each collection type
            st.session_state.databases = {}
            for db_type, config in COLLECTIONS.items():
                st.session_state.databases[db_type] = Qdrant(
                    client=client,
                    collection_name=config.collection_name,
                    embeddings=st.session_state.embeddings
                )
            return True
        except Exception as e:
            st.error(f"Failed to connect to Qdrant: {str(e)}")
            return False
    return False
  1. Document processing functionality:

def process_document(file) -> List[Document]:
    """Process uploaded PDF document"""
    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
            tmp_file.write(file.getvalue())
            tmp_path = tmp_file.name
        
        loader = PyPDFLoader(tmp_path)
        documents = loader.load()
        os.unlink(tmp_path)  # Clean up
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        texts = text_splitter.split_documents(documents)
        return texts
    except Exception as e:
        st.error(f"Error processing document: {e}")
        return []
  1. Create the routing agent:

def create_routing_agent() -> Agent:
    """Creates a routing agent using agno framework"""
    return Agent(
        model=OpenAIChat(
            id="gpt-4o",
            api_key=st.session_state.openai_api_key
        ),
        tools=[],
        description="""You are a query routing expert. Your only job is to analyze questions and determine which database they should be routed to.
        
        You must respond with exactly one of these three options: 'products', 'support', or 'finance'.""",
        instructions=[
            "For questions about products, features, specifications → return 'products'",
            "For questions about help, troubleshooting, customer service → return 'support'", 
            "For questions about costs, revenue, financial data → return 'finance'",
            "Return ONLY the database name, no other text"
        ],
        markdown=False,
        show_tool_calls=False
    )
  1. Implement query routing:

def route_query(question: str) -> Optional[DatabaseType]:
    """Route query by searching all databases and comparing relevance scores"""
    try:
        best_score = -1
        best_db_type = None
        
        # Search each database and compare relevance scores
        for db_type, db in st.session_state.databases.items():
            results = db.similarity_search_with_score(question, k=3)
            if results:
                avg_score = sum(score for _, score in results) / len(results)
                if avg_score > best_score:
                    best_score = avg_score
                    best_db_type = db_type
        
        confidence_threshold = 0.5
        if best_score >= confidence_threshold and best_db_type:
            st.success(f"Using vector similarity routing: {best_db_type}")
            return best_db_type
        
        # Fallback to LLM routing
        st.warning("Low confidence, falling back to LLM routing")
        routing_agent = create_routing_agent()
        response = routing_agent.run(question)
        db_type = response.content.strip().lower()
        
        if db_type in COLLECTIONS:
            st.success(f"Using LLM routing decision: {db_type}")
            return db_type
            
        return None
    except Exception as e:
        st.error(f"Routing error: {str(e)}")
        return None
  1. Web search fallback:

def create_fallback_agent(chat_model):
    """Create a LangGraph agent for web research"""
    def web_research(query: str) -> str:
        try:
            search = DuckDuckGoSearchRun(num_results=5)
            results = search.run(query)
            return results
        except Exception as e:
            return f"Search failed: {str(e)}"
    
    tools = [web_research]
    agent = create_react_agent(model=chat_model, tools=tools, debug=False)
    return agent

def _handle_web_fallback(question: str) -> tuple[str, list]:
    st.info("No relevant documents found. Searching web...")
    fallback_agent = create_fallback_agent(st.session_state.llm)
    
    with st.spinner('Researching...'):
        response = fallback_agent.invoke({
            "messages": [HumanMessage(content=f"Research: '{question}'")]
        })
        answer = response["messages"][-1].content
        return f"Web Search Result:\n{answer}", []
  1. Database querying with RAG:

def query_database(db: Qdrant, question: str) -> tuple[str, list]:
    """Query the database and return answer and relevant documents"""
    try:
        retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": 4})
        relevant_docs = retriever.get_relevant_documents(question)
        
        if relevant_docs:
            retrieval_qa_prompt = ChatPromptTemplate.from_messages([
                ("system", "You are a helpful AI assistant. Answer based on provided context."),
                ("human", "Context:\n{context}"),
                ("human", "Question: {input}")
            ])
            
            combine_docs_chain = create_stuff_documents_chain(st.session_state.llm, retrieval_qa_prompt)
            retrieval_chain = create_retrieval_chain(retriever, combine_docs_chain)
            
            response = retrieval_chain.invoke({"input": question})
            return response['answer'], relevant_docs
        
        raise ValueError("No relevant documents found")
    except Exception as e:
        return "Error occurred. Please try rephrasing.", []
  1. Streamlit interface:

def main():
    st.set_page_config(page_title="RAG Agent with Database Routing", page_icon="📚")
    st.title("📠 RAG Agent with Database Routing")
    
    # API configuration in sidebar
    with st.sidebar:
        st.header("Configuration")
        api_key = st.text_input("OpenAI API Key:", type="password")
        qdrant_url = st.text_input("Qdrant URL:")
        qdrant_api_key = st.text_input("Qdrant API Key:", type="password")
    
    # Document upload tabs
    st.header("Document Upload")
    tabs = st.tabs([config.name for config in COLLECTIONS.values()])
    
    # Query interface
    st.header("Ask Questions")
    question = st.text_input("Enter your question:")
    
    if question:
        collection_type = route_query(question)
        if collection_type is None:
            answer, _ = _handle_web_fallback(question)
        else:
            db = st.session_state.databases[collection_type]
            answer, _ = query_database(db, question)
        
        st.write("### Answer")
        st.write(answer)

Running the App

With our code in place, it's time to launch the app.

  • In your terminal, navigate to the project folder, and run the following command

streamlit run rag_database_routing.py

Working Application Demo

Conclusion

You've successfully built a production-ready RAG system with intelligent database routing! Unlike basic RAG implementations, your system now intelligently directs queries to specialized databases, making it vastly more efficient and accurate.

For further enhancements, you can:

  • Implement cross-database queries when questions span multiple domains

  • Add routing history analysis to improve future routing decisions

  • Add support for more document formats (DOCX, HTML, Markdown)

  • Implement caching for frequently accessed documents

  • Implement source citations in responses

Keep experimenting and refining to build smarter AI solutions!

We share hands-on tutorials like this 2-3 times a week, to help you stay ahead in the world of AI. If you're serious about leveling up your AI skills and staying ahead of the curve, subscribe now and be the first to access our latest tutorials.

Don’t forget to share this tutorial on your social channels and tag Unwind AI (X, LinkedIn, Threads, Facebook) to support us!

Reply

or to participate.