How to run performance benchmarks, measuring retrieval quality, speed, and cost efficiency.
from tatry import TatryRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
import time
import pandas as pd
from typing import List, Dict
import asyncio
# Initialize the retriever
retriever = TatryRetriever(api_key="your-api-key")
# Sample queries for benchmarking
benchmark_queries = [
"What are the latest advancements in quantum computing?",
"Explain the impact of artificial intelligence on healthcare",
"Describe recent developments in renewable energy",
"What are the current trends in cybersecurity?",
"How is machine learning affecting financial markets?"
]
class RetrievalBenchmark:
def __init__(self, retriever):
self.retriever = retriever
self.results = []
async def run_single_query(self, query: str) -> Dict:
start_time = time.time()
try:
# Perform retrieval
documents = await self.retriever.ainvoke(query)
end_time = time.time()
return {
"query": query,
"successful": True,
"num_documents": len(documents),
"response_time": end_time - start_time,
"actual_cost": sum(doc.metadata.get("cost", 0) for doc in documents) if documents else 0
}
except Exception as e:
return {
"query": query,
"successful": False,
"error": str(e)
}
async def run_benchmark(self, queries: List[str]):
tasks = [self.run_single_query(query) for query in queries]
self.results = await asyncio.gather(*tasks)
return self.results
def generate_report(self) -> pd.DataFrame:
df = pd.DataFrame(self.results)
# Calculate statistics
successful_queries = df[df['successful']]
stats = {
"total_queries": len(df),
"successful_queries": len(successful_queries),
"average_response_time": successful_queries['response_time'].mean(),
"total_cost": successful_queries['actual_cost'].sum(),
"documents_retrieved": successful_queries['num_documents'].sum()
}
print("\nBenchmark Summary:")
print(f"Total Queries: {stats['total_queries']}")
print(f"Success Rate: {(stats['successful_queries']/stats['total_queries'])*100:.2f}%")
print(f"Average Response Time: {stats['average_response_time']:.3f}s")
print(f"Total Cost: ${stats['total_cost']:.2f}")
print(f"Total Documents Retrieved: {stats['documents_retrieved']}")
return df
# Run the benchmark
async def main():
benchmark = RetrievalBenchmark(retriever)
# Run with different configurations
print("Running standard benchmark...")
await benchmark.run_benchmark(benchmark_queries)
df_standard = benchmark.generate_report()
# Run with different model configuration
print("\nRunning benchmark with different model...")
retriever.model = "gpt-4" # Or another available model
await benchmark.run_benchmark(benchmark_queries)
df_model2 = benchmark.generate_report()
# Compare results
print("\nPerformance Comparison:")
print(f"Standard Response Time: {df_standard['response_time'].mean():.3f}s")
print(f"Alternative Model Response Time: {df_model2['response_time'].mean():.3f}s")
if __name__ == "__main__":
asyncio.run(main())
# Compare different model configurations
model_configs = [
"gpt-3.5-turbo",
"gpt-4",
"claude-2"
]
async def compare_models():
results = []
for model in model_configs:
retriever.model = model
benchmark = RetrievalBenchmark(retriever)
await benchmark.run_benchmark(benchmark_queries)
results.append({
"sources": sources,
"results": benchmark.generate_report()
})
return results
# Run comparison
model_comparison = await compare_models()
# Generate larger query set for load testing
load_test_queries = benchmark_queries * 20 # 100 queries
async def run_load_test():
benchmark = RetrievalBenchmark(retriever)
print("Running load test...")
start_time = time.time()
await benchmark.run_benchmark(load_test_queries)
end_time = time.time()
df = benchmark.generate_report()
print(f"\nLoad Test Results:")
print(f"Total Time: {end_time - start_time:.2f}s")
print(f"Queries per Second: {len(load_test_queries)/(end_time - start_time):.2f}")
return df
# Run load test
load_test_results = await run_load_test()
from sklearn.metrics.pairwise import cosine_similarity
async def assess_quality(query: str, documents: List[dict]):
# Get relevance scores
relevance_scores = [doc.metadata.get('relevance_score', 0) for doc in documents]
# Calculate diversity (using titles as proxy)
titles = [doc.metadata.get('title', '') for doc in documents]
unique_ratio = len(set(titles)) / len(titles)
return {
'average_relevance': sum(relevance_scores) / len(relevance_scores),
'diversity_score': unique_ratio
}
async def run_quality_benchmark():
results = []
for query in benchmark_queries:
documents = await retriever.aget_relevant_documents(query)
quality_metrics = await assess_quality(query, documents)
results.append({
'query': query,
**quality_metrics
})
return pd.DataFrame(results)
# Run quality assessment
quality_results = await run_quality_benchmark()
print("\nQuality Assessment Results:")
print(quality_results.mean())
The benchmark provides several key metrics:
Performance Metrics
Cost Metrics
Quality Metrics
Reliability Metrics
Regular Benchmarking
Cost Optimization
Quality Control