Docs
Integrating Tatry with Langchain

Integrating Tatry with Langchain

how to integrate ContentRetriever with various LangChain components and chains

Basic Integration

from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from content_retriever.integrations.langchain import TatryRetrieverLangchain
 
# Initialize TatryRetrieverLangchain
retriever = TatryRetrieverLangchain(api_key="your-api-key")
 
# Initialize LLM
llm = ChatOpenAI(temperature=0)
 
# Create a simple QA chain
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    # retriever=ContentRetriever(api_key="your-api-key"), # you can also initialize it directly here
    # retriever=ContentRetriever(api_key="your-api-key"), # you can also initialize it directly here
    chain_type="stuff"  # Simple document concatenation
)
 
# Use the chain
response = qa_chain.run("What are the key features of quantum computing?")
print(response)

Advanced Integrations

1. Conversational Retrieval Chain

from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
 
# Initialize memory
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)
 
# Create conversational chain
chat_chain = ConversationalRetrievalChain.from_llm(
    llm=llm,
    retriever=retriever,
    memory=memory,
    verbose=True
)
 
# Use the chain
responses = []
questions = [
    "What is quantum computing?",
    "What are its main applications?",
    "Can you elaborate on the financial applications you mentioned?"
]
 
for question in questions:
    response = chat_chain({"question": question})
    responses.append(response)
    print(f"Q: {question}")
    print(f"A: {response['answer']}\n")

2. Custom Document Processing Chain

from langchain.chains import create_extraction_chain
from langchain.prompts import PromptTemplate
from typing import List
 
class CustomRetrievalChain:
    def __init__(self, retriever: TatryRetrieverLangchain, llm):
        self.retriever = retriever
        self.llm = llm
 
        # Define extraction schema
        self.schema = {
            "properties": {
                "key_points": {"type": "array", "items": {"type": "string"}},
                "main_topic": {"type": "string"},
                "confidence": {"type": "number"}
            }
        }
 
        # Create extraction chain
        self.extraction_chain = create_extraction_chain(self.schema, self.llm)
 
        # Create summary prompt
        self.summary_prompt = PromptTemplate(
            input_variables=["context"],
            template="Summarize the key points from this content:\n\n{context}"
        )
 
    async def process_documents(self, query: str):
        # Get relevant documents
        docs = await self.retriever.aget_relevant_documents(query)
 
        # Extract information from each document
        results = []
        for doc in docs:
            # Extract structured information
            extracted = self.extraction_chain.run(doc.page_content)
 
            # Generate summary
            summary = await self.llm.agenerate([
                self.summary_prompt.format(context=doc.page_content)
            ])
 
            results.append({
                "extracted_info": extracted,
                "summary": summary.generations[0][0].text,
                "metadata": doc.metadata
            })
 
        return results
 
# Use the custom chain
custom_chain = CustomRetrievalChain(retriever, llm)
results = await custom_chain.process_documents(
    "What are the recent developments in quantum computing?"
)

3. Parallel Research Chain

from langchain.chains import MapReduceChain
from langchain.prompts import ChatPromptTemplate
import asyncio
 
class ParallelResearchChain:
    def __init__(self, retriever: TatryRetrieverLangchain, llm):
        self.retriever = retriever
        self.llm = llm
 
        # Create prompts for analysis
        self.analysis_prompt = ChatPromptTemplate.from_template(
            "Analyze this content and provide key insights:\n\n{content}"
        )
 
        self.synthesis_prompt = ChatPromptTemplate.from_template(
            "Synthesize these insights into a coherent analysis:\n\n{insights}"
        )
 
    async def research_topic(self, topic: str, num_sources: int = 5):
        # Get documents from different sources
        docs = await self.retriever.aget_relevant_documents(
            topic,
            max_results=num_sources
        )
 
        # Analyze documents in parallel
        analysis_tasks = [
            self.analyze_document(doc) for doc in docs
        ]
        analyses = await asyncio.gather(*analysis_tasks)
 
        # Synthesize findings
        synthesis = await self.synthesize_analyses(analyses)
 
        return {
            "topic": topic,
            "individual_analyses": analyses,
            "synthesis": synthesis,
            "sources": [doc.metadata for doc in docs]
        }
 
    async def analyze_document(self, doc):
        response = await self.llm.agenerate([
            self.analysis_prompt.format(content=doc.page_content)
        ])
        return {
            "analysis": response.generations[0][0].text,
            "source": doc.metadata
        }
 
    async def synthesize_analyses(self, analyses: List[dict]):
        insights = "\n\n".join(
            [a["analysis"] for a in analyses]
        )
        response = await self.llm.agenerate([
            self.synthesis_prompt.format(insights=insights)
        ])
        return response.generations[0][0].text
 
# Use the research chain
research_chain = ParallelResearchChain(retriever, llm)
research_results = await research_chain.research_topic(
    "Recent breakthroughs in fusion energy"
)

Cost Optimization

# Configure cost-aware TatryRetrieverLangchain
retriever.configure_cost_optimization({
    "prefer_free_content": True,
    "max_cost_per_query": 0.50,
    "cost_optimization_strategy": "balanced"
})
 
# Create cost-aware QA chain
cost_aware_qa = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    chain_type="map_reduce",  # More efficient for large documents
    return_source_documents=True
)
 
# Monitor costs
async def run_qa_with_cost_tracking(question: str):
    estimated_cost = await retriever.estimate_cost(question)
    print(f"Estimated cost: ${estimated_cost:.2f}")
 
    response = await cost_aware_qa.arun(question)
 
    actual_cost = retriever.get_last_query_cost() # currently cost tracking is not implemented in TatryRetrieverLangchain
    print(f"Actual cost: ${actual_cost:.2f}")
 
    return response

Best Practices

  1. Chain Selection

    • Use "stuff" for small documents
    • Use "map_reduce" for large documents
    • Use "refine" for detailed analysis
  2. Cost Management

    • Monitor query costs
    • Use cost optimization
    • Cache frequent queries # caching is not implemented in TatryRetrieverLangchain
  3. Performance Optimization

    • Use async methods
    • Implement parallel processing
    • Cache results when appropriate

Additional Resources