Build a Corrective RAG Agent

Fully functional agentic RAG app using Claude 3.5 Sonnet (step-by-step instructions)

Ever had your RAG system confidently give completely irrelevant information? Or watched it stubbornly stick to outdated data when better sources were just a web search away? You're not alone. Traditional RAG systems, while powerful, often act like that one friend who never admits when they need to double-check their facts.

In this tutorial, we'll fix that by building a Corrective RAG Agent that implements a multi-stage workflow with document retrieval, relevance assessment, and web search. Using LangGraph's workflow capabilities, we'll create a system that can evaluate its responses, adapt on the fly, and even reach out to the web when its local knowledge falls short. Think of it as RAG with a built-in fact-checker and research assistant.

We'll combine the analytical prowess of Claude 3.5 Sonnet with LangGraph's flexible workflow engine. By the end of this tutorial, you'll have a RAG system that's not just smarter but also more honest about what it knows (and doesn't know).

Don’t forget to share this tutorial on your social channels and tag Unwind AI (X, LinkedIn, Threads, Facebook) to support us!

What is Corrective RAG

Corrective RAG (CRAG) enhances traditional RAG architecture by introducing validation and correction mechanisms into the retrieval process. The system operates through a sophisticated multi-stage workflow:

  1. Initial Retrieval Stage

    • The system performs initial document retrieval from the knowledge base

    • Uses vector similarity to identify potentially relevant documents

    • Maintains context through overlapping document chunks

  2. Document Validation Stage

    • Each retrieved document undergoes LLM-based evaluation

    • Generates confidence scores based on relevance to the query

    • Applies validation criteria to determine document usefulness

  3. Adaptive Response Strategy

    • High-confidence documents: Proceeds to direct response generation

    • Medium-confidence documents: Initiates hybrid approach

    • Low-confidence documents: Triggers web search enhancement

  4. Query Enhancement

    • Automatically reformulates queries when initial results are insufficient

    • Optimizes search terms for improved document retrieval

    • Maintains semantic alignment with original user intent

What We’re Building

This implementation creates a production-ready Corrective RAG system that brings together several powerful components:

  1. Smart Document Assessment:

    • Configurable confidence thresholds (0.8 for high confidence, 0.5 for medium)

    • Automated relevance scoring of retrieved documents

    • Dynamic decision-making based on document quality

  2. Models and Database:

    • Claude 3.5 Sonnet for document evaluation and response generation

    • OpenAI embeddings for semantic search

    • Qdrant vector store for efficient document retrieval

  3. Adaptive Search Capabilities:

    • Automatic query reformation when needed

    • Integrated Tavily API for web search fallback

  4. Interactive Interface:

    • Streamlit-based user interface for easy testing

    • Real-time visibility into the RAG pipeline

    • Step-by-step workflow monitoring

Prerequisites

Before we begin, make sure you have the following:

  1. Python installed on your machine (version 3.10 or higher is recommended)

  2. Your OpenAI, Anthropic, Tavily, and Qdrant API Key along with cluster URL

  3. A code editor of your choice (we recommend VS Code or PyCharm for their excellent Python support)

  4. Basic familiarity with Python programming

Step-by-Step Instructions

Setting Up the Environment

First, let's get our development environment ready:

  1. Clone the GitHub repository:

git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git
  1. Go to the corrective_rag folder:

cd rag_tutorials/corrective_rag
pip install -r requirements.txt
  1. Get your API Key: You'll need to obtain the following API keys:

    • OpenAI API key (for embeddings)

    • Anthropic API key (for Claude 3.5 sonnet as LLM)

    • Tavily API key (for web search)

    • Qdrant Cloud Setup

      1. Visit Qdrant Cloud

      2. Create an account or sign in

      3. Create a new cluster

      4. Get your credentials:

        • Qdrant API Key: Found in API Keys section

        • Qdrant URL: Your cluster URL (format: https://xxx-xxx.aws.cloud.qdrant.io)

Creating the Streamlit App

Let’s create our app. Create a new file corrective_rag.py and add the following code:

  1. Let's set up our imports:

from langchain import hub
from langchain.output_parsers import PydanticOutputParser
from langchain_core.output_parsers import StrOutputParser
from langchain.schema import Document
from pydantic import BaseModel, Field
import streamlit as st
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, TextLoader, WebBaseLoader
from langchain_community.tools import TavilySearchResults
from langchain_community.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.messages import HumanMessage        
from langgraph.graph import END, StateGraph
from typing import Dict, TypedDict
from langchain_core.prompts import PromptTemplate
import pprint
import yaml
import nest_asyncio
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
import tempfile
import os
from langchain_anthropic import ChatAnthropic
from tenacity import retry, stop_after_attempt, wait_exponential
  1. Initialize session state:

def initialize_session_state():
    if 'initialized' not in st.session_state:
        st.session_state.initialized = False
        st.session_state.anthropic_api_key = ""
        st.session_state.openai_api_key = ""
        st.session_state.tavily_api_key = ""
        st.session_state.qdrant_api_key = ""
        st.session_state.qdrant_url = "http://localhost:6333"
        st.session_state.doc_url = "https://arxiv.org/pdf/2307.09288.pdf"
  1. Document loading infrastructure:

def load_documents(file_or_url: str, is_url: bool = True) -> list:
    try:
        if is_url:
            loader = WebBaseLoader(file_or_url)
        else:
            file_extension = os.path.splitext(file_or_url)[1].lower()
            if file_extension == '.pdf':
                loader = PyPDFLoader(file_or_url)
            elif file_extension in ['.txt', '.md']:
                loader = TextLoader(file_or_url)
        return loader.load()
    except Exception as e:
        st.error(f"Error loading document: {str(e)}")
        return []
  1. Vector store initialization:

client = QdrantClient(
    url=st.session_state.qdrant_url,
    api_key=st.session_state.qdrant_api_key
)

vectorstore = Qdrant(
    client=client,
    collection_name=collection_name,
    embeddings=embeddings,
)
Core Agent Functions
  1. Web search implementation:

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def execute_tavily_search(tool, query):
    return tool.invoke({"query": query})

def web_search(state):
    state_dict = state["keys"]
    question = state_dict["question"]
    documents = state_dict["documents"]
    
    tool = TavilySearchResults(
        api_key=st.session_state.tavily_api_key,
        max_results=3,
        search_depth="advanced"
    )
  1. Document grading:

def grade_documents(state):
    llm = ChatAnthropic(
        model="claude-3-5-sonnet-20241022", 
        api_key=st.session_state.anthropic_api_key
    )
    
    prompt = PromptTemplate(template="""
        You are grading document relevance.
        Return ONLY a JSON object with "score": "yes" or "no".
        
        Document: {context}
        Question: {question}
    """)
  1. Query transformation:

def transform_query(state):
    prompt = PromptTemplate(
        template="""Generate a search-optimized version of this question:
        {question}
        Return only the improved question:""",
        input_variables=["question"],
    )
    
    llm = ChatAnthropic(
        model="claude-3-5-sonnet-20240620",
        anthropic_api_key=st.session_state.anthropic_api_key
    )
LangGraph Setup
  1. Graph state and workflow definition:

class GraphState(TypedDict):
    keys: Dict[str, any]

workflow = StateGraph(GraphState)

workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("transform_query", transform_query)
workflow.add_node("web_search", web_search)
  1. Graph edges and conditions:

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    }
)
workflow.add_edge("transform_query", "web_search")
workflow.add_edge("web_search", "generate")
workflow.add_edge("generate", END)
Streamlit Setup
  1. Main interface setup:

st.title("🔄 Corrective RAG Agent")

# File upload interface
input_option = st.radio("Choose input method:", ["URL", "File Upload"])

if input_option == "URL":
    url = st.text_input("Enter document URL:", value=st.session_state.doc_url)
else:
    uploaded_file = st.file_uploader("Upload a document", type=['pdf', 'txt', 'md'])
  1. Document processing:

if docs:
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=500, chunk_overlap=100
    )
    all_splits = text_splitter.split_documents(docs)
    
    vectorstore = Qdrant(
        client=client,
        collection_name=collection_name,
        embeddings=embeddings,
    )
    vectorstore.add_documents(all_splits)
  1. Query processing and output:

user_question = st.text_input("Please enter your question:")

if user_question:
    inputs = {"keys": {"question": user_question}}
    
    for output in app.stream(inputs):
        for key, value in output.items():
            with st.expander(f"Step '{key}':"):
                st.text(pprint.pformat(format_state(value["keys"])))

Running the App

With our code in place, it's time to launch the app.

  • In your terminal, navigate to the project folder, and run the following command

streamlit run corrective_rag.py

Working Application Demo

Conclusion

You've successfully built a sophisticated Corrective RAG system that goes beyond simple document retrieval. This implementation shows how combining document validation, adaptive search, and intelligent query reformation can create a more reliable and accurate RAG system.

For further enhancements, you can:

  • Create domain-specific validation criteria

  • Add visualization for the confidence scoring process

  • Create detailed logging for system decisions

  • Add support for more document formats (DOCX, HTML, Markdown)

  • Implement caching for frequently accessed documents

Keep experimenting and refining to build smarter AI solutions!

We share hands-on tutorials like this 2-3 times a week, to help you stay ahead in the world of AI. If you're serious about leveling up your AI skills and staying ahead of the curve, subscribe now and be the first to access our latest tutorials.

Don’t forget to share this tutorial on your social channels and tag Unwind AI (X, LinkedIn, Threads, Facebook) to support us!

Reply

or to participate.