- unwind ai
- Posts
- Build a Corrective RAG Agent
Build a Corrective RAG Agent
Fully functional agentic RAG app using Claude 3.5 Sonnet (step-by-step instructions)
Ever had your RAG system confidently give completely irrelevant information? Or watched it stubbornly stick to outdated data when better sources were just a web search away? You're not alone. Traditional RAG systems, while powerful, often act like that one friend who never admits when they need to double-check their facts.
In this tutorial, we'll fix that by building a Corrective RAG Agent that implements a multi-stage workflow with document retrieval, relevance assessment, and web search. Using LangGraph's workflow capabilities, we'll create a system that can evaluate its responses, adapt on the fly, and even reach out to the web when its local knowledge falls short. Think of it as RAG with a built-in fact-checker and research assistant.
We'll combine the analytical prowess of Claude 3.5 Sonnet with LangGraph's flexible workflow engine. By the end of this tutorial, you'll have a RAG system that's not just smarter but also more honest about what it knows (and doesn't know).
What is Corrective RAG
Corrective RAG (CRAG) enhances traditional RAG architecture by introducing validation and correction mechanisms into the retrieval process. The system operates through a sophisticated multi-stage workflow:
Initial Retrieval Stage
The system performs initial document retrieval from the knowledge base
Uses vector similarity to identify potentially relevant documents
Maintains context through overlapping document chunks
Document Validation Stage
Each retrieved document undergoes LLM-based evaluation
Generates confidence scores based on relevance to the query
Applies validation criteria to determine document usefulness
Adaptive Response Strategy
High-confidence documents: Proceeds to direct response generation
Medium-confidence documents: Initiates hybrid approach
Low-confidence documents: Triggers web search enhancement
Query Enhancement
Automatically reformulates queries when initial results are insufficient
Optimizes search terms for improved document retrieval
Maintains semantic alignment with original user intent
What We’re Building
This implementation creates a production-ready Corrective RAG system that brings together several powerful components:
Smart Document Assessment:
Configurable confidence thresholds (0.8 for high confidence, 0.5 for medium)
Automated relevance scoring of retrieved documents
Dynamic decision-making based on document quality
Models and Database:
Claude 3.5 Sonnet for document evaluation and response generation
OpenAI embeddings for semantic search
Qdrant vector store for efficient document retrieval
Adaptive Search Capabilities:
Automatic query reformation when needed
Integrated Tavily API for web search fallback
Interactive Interface:
Streamlit-based user interface for easy testing
Real-time visibility into the RAG pipeline
Step-by-step workflow monitoring
Prerequisites
Before we begin, make sure you have the following:
Step-by-Step Instructions
Setting Up the Environment
First, let's get our development environment ready:
Clone the GitHub repository:
git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git
Go to the corrective_rag folder:
cd rag_tutorials/corrective_rag
Install the required dependencies:
pip install -r requirements.txt
Get your API Key: You'll need to obtain the following API keys:
OpenAI API key (for embeddings)
Anthropic API key (for Claude 3.5 sonnet as LLM)
Tavily API key (for web search)
Qdrant Cloud Setup
Visit Qdrant Cloud
Create an account or sign in
Create a new cluster
Get your credentials:
Qdrant API Key: Found in API Keys section
Qdrant URL: Your cluster URL (format:
https://xxx-xxx.aws.cloud.qdrant.io
)
Creating the Streamlit App
Let’s create our app. Create a new file corrective_rag.py
and add the following code:
Let's set up our imports:
from langchain import hub
from langchain.output_parsers import PydanticOutputParser
from langchain_core.output_parsers import StrOutputParser
from langchain.schema import Document
from pydantic import BaseModel, Field
import streamlit as st
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, TextLoader, WebBaseLoader
from langchain_community.tools import TavilySearchResults
from langchain_community.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import END, StateGraph
from typing import Dict, TypedDict
from langchain_core.prompts import PromptTemplate
import pprint
import yaml
import nest_asyncio
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
import tempfile
import os
from langchain_anthropic import ChatAnthropic
from tenacity import retry, stop_after_attempt, wait_exponential
Initialize session state:
def initialize_session_state():
if 'initialized' not in st.session_state:
st.session_state.initialized = False
st.session_state.anthropic_api_key = ""
st.session_state.openai_api_key = ""
st.session_state.tavily_api_key = ""
st.session_state.qdrant_api_key = ""
st.session_state.qdrant_url = "http://localhost:6333"
st.session_state.doc_url = "https://arxiv.org/pdf/2307.09288.pdf"
Document loading infrastructure:
def load_documents(file_or_url: str, is_url: bool = True) -> list:
try:
if is_url:
loader = WebBaseLoader(file_or_url)
else:
file_extension = os.path.splitext(file_or_url)[1].lower()
if file_extension == '.pdf':
loader = PyPDFLoader(file_or_url)
elif file_extension in ['.txt', '.md']:
loader = TextLoader(file_or_url)
return loader.load()
except Exception as e:
st.error(f"Error loading document: {str(e)}")
return []
Vector store initialization:
client = QdrantClient(
url=st.session_state.qdrant_url,
api_key=st.session_state.qdrant_api_key
)
vectorstore = Qdrant(
client=client,
collection_name=collection_name,
embeddings=embeddings,
)
Core Agent Functions
Web search implementation:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def execute_tavily_search(tool, query):
return tool.invoke({"query": query})
def web_search(state):
state_dict = state["keys"]
question = state_dict["question"]
documents = state_dict["documents"]
tool = TavilySearchResults(
api_key=st.session_state.tavily_api_key,
max_results=3,
search_depth="advanced"
)
Document grading:
def grade_documents(state):
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key=st.session_state.anthropic_api_key
)
prompt = PromptTemplate(template="""
You are grading document relevance.
Return ONLY a JSON object with "score": "yes" or "no".
Document: {context}
Question: {question}
""")
Query transformation:
def transform_query(state):
prompt = PromptTemplate(
template="""Generate a search-optimized version of this question:
{question}
Return only the improved question:""",
input_variables=["question"],
)
llm = ChatAnthropic(
model="claude-3-5-sonnet-20240620",
anthropic_api_key=st.session_state.anthropic_api_key
)
LangGraph Setup
Graph state and workflow definition:
class GraphState(TypedDict):
keys: Dict[str, any]
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("transform_query", transform_query)
workflow.add_node("web_search", web_search)
Graph edges and conditions:
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"transform_query": "transform_query",
"generate": "generate",
}
)
workflow.add_edge("transform_query", "web_search")
workflow.add_edge("web_search", "generate")
workflow.add_edge("generate", END)
Streamlit Setup
Main interface setup:
st.title("🔄 Corrective RAG Agent")
# File upload interface
input_option = st.radio("Choose input method:", ["URL", "File Upload"])
if input_option == "URL":
url = st.text_input("Enter document URL:", value=st.session_state.doc_url)
else:
uploaded_file = st.file_uploader("Upload a document", type=['pdf', 'txt', 'md'])
Document processing:
if docs:
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=500, chunk_overlap=100
)
all_splits = text_splitter.split_documents(docs)
vectorstore = Qdrant(
client=client,
collection_name=collection_name,
embeddings=embeddings,
)
vectorstore.add_documents(all_splits)
Query processing and output:
user_question = st.text_input("Please enter your question:")
if user_question:
inputs = {"keys": {"question": user_question}}
for output in app.stream(inputs):
for key, value in output.items():
with st.expander(f"Step '{key}':"):
st.text(pprint.pformat(format_state(value["keys"])))
Running the App
With our code in place, it's time to launch the app.
In your terminal, navigate to the project folder, and run the following command
streamlit run corrective_rag.py
Streamlit will provide a local URL (typically http://localhost:8501).
Working Application Demo
Conclusion
You've successfully built a sophisticated Corrective RAG system that goes beyond simple document retrieval. This implementation shows how combining document validation, adaptive search, and intelligent query reformation can create a more reliable and accurate RAG system.
For further enhancements, you can:
Create domain-specific validation criteria
Add visualization for the confidence scoring process
Create detailed logging for system decisions
Add support for more document formats (DOCX, HTML, Markdown)
Implement caching for frequently accessed documents
Keep experimenting and refining to build smarter AI solutions!
We share hands-on tutorials like this 2-3 times a week, to help you stay ahead in the world of AI. If you're serious about leveling up your AI skills and staying ahead of the curve, subscribe now and be the first to access our latest tutorials.
Reply