how to integrate ContentRetriever with various LangChain components and chains
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from content_retriever.integrations.langchain import TatryRetrieverLangchain
# Initialize TatryRetrieverLangchain
retriever = TatryRetrieverLangchain(api_key="your-api-key")
# Initialize LLM
llm = ChatOpenAI(temperature=0)
# Create a simple QA chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
# retriever=ContentRetriever(api_key="your-api-key"), # you can also initialize it directly here
# retriever=ContentRetriever(api_key="your-api-key"), # you can also initialize it directly here
chain_type="stuff" # Simple document concatenation
)
# Use the chain
response = qa_chain.run("What are the key features of quantum computing?")
print(response)
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
# Initialize memory
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Create conversational chain
chat_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=retriever,
memory=memory,
verbose=True
)
# Use the chain
responses = []
questions = [
"What is quantum computing?",
"What are its main applications?",
"Can you elaborate on the financial applications you mentioned?"
]
for question in questions:
response = chat_chain({"question": question})
responses.append(response)
print(f"Q: {question}")
print(f"A: {response['answer']}\n")
from langchain.chains import create_extraction_chain
from langchain.prompts import PromptTemplate
from typing import List
class CustomRetrievalChain:
def __init__(self, retriever: TatryRetrieverLangchain, llm):
self.retriever = retriever
self.llm = llm
# Define extraction schema
self.schema = {
"properties": {
"key_points": {"type": "array", "items": {"type": "string"}},
"main_topic": {"type": "string"},
"confidence": {"type": "number"}
}
}
# Create extraction chain
self.extraction_chain = create_extraction_chain(self.schema, self.llm)
# Create summary prompt
self.summary_prompt = PromptTemplate(
input_variables=["context"],
template="Summarize the key points from this content:\n\n{context}"
)
async def process_documents(self, query: str):
# Get relevant documents
docs = await self.retriever.aget_relevant_documents(query)
# Extract information from each document
results = []
for doc in docs:
# Extract structured information
extracted = self.extraction_chain.run(doc.page_content)
# Generate summary
summary = await self.llm.agenerate([
self.summary_prompt.format(context=doc.page_content)
])
results.append({
"extracted_info": extracted,
"summary": summary.generations[0][0].text,
"metadata": doc.metadata
})
return results
# Use the custom chain
custom_chain = CustomRetrievalChain(retriever, llm)
results = await custom_chain.process_documents(
"What are the recent developments in quantum computing?"
)
from langchain.chains import MapReduceChain
from langchain.prompts import ChatPromptTemplate
import asyncio
class ParallelResearchChain:
def __init__(self, retriever: TatryRetrieverLangchain, llm):
self.retriever = retriever
self.llm = llm
# Create prompts for analysis
self.analysis_prompt = ChatPromptTemplate.from_template(
"Analyze this content and provide key insights:\n\n{content}"
)
self.synthesis_prompt = ChatPromptTemplate.from_template(
"Synthesize these insights into a coherent analysis:\n\n{insights}"
)
async def research_topic(self, topic: str, num_sources: int = 5):
# Get documents from different sources
docs = await self.retriever.aget_relevant_documents(
topic,
max_results=num_sources
)
# Analyze documents in parallel
analysis_tasks = [
self.analyze_document(doc) for doc in docs
]
analyses = await asyncio.gather(*analysis_tasks)
# Synthesize findings
synthesis = await self.synthesize_analyses(analyses)
return {
"topic": topic,
"individual_analyses": analyses,
"synthesis": synthesis,
"sources": [doc.metadata for doc in docs]
}
async def analyze_document(self, doc):
response = await self.llm.agenerate([
self.analysis_prompt.format(content=doc.page_content)
])
return {
"analysis": response.generations[0][0].text,
"source": doc.metadata
}
async def synthesize_analyses(self, analyses: List[dict]):
insights = "\n\n".join(
[a["analysis"] for a in analyses]
)
response = await self.llm.agenerate([
self.synthesis_prompt.format(insights=insights)
])
return response.generations[0][0].text
# Use the research chain
research_chain = ParallelResearchChain(retriever, llm)
research_results = await research_chain.research_topic(
"Recent breakthroughs in fusion energy"
)
# Configure cost-aware TatryRetrieverLangchain
retriever.configure_cost_optimization({
"prefer_free_content": True,
"max_cost_per_query": 0.50,
"cost_optimization_strategy": "balanced"
})
# Create cost-aware QA chain
cost_aware_qa = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
chain_type="map_reduce", # More efficient for large documents
return_source_documents=True
)
# Monitor costs
async def run_qa_with_cost_tracking(question: str):
estimated_cost = await retriever.estimate_cost(question)
print(f"Estimated cost: ${estimated_cost:.2f}")
response = await cost_aware_qa.arun(question)
actual_cost = retriever.get_last_query_cost() # currently cost tracking is not implemented in TatryRetrieverLangchain
print(f"Actual cost: ${actual_cost:.2f}")
return response
Chain Selection
Cost Management
Performance Optimization