barclays-rag-test/rag_test_app/main.py
Vadym Samoilenko ed040ea497 init: add RAG test app with Excel/CSV export
- rag_test_app: OpenAI Assistants benchmark tool
- TEST_TO_RUN: Barclays test configs (Internal Banners, Social Posts, Display Banners, PPC)
- Added report.xlsx + report.csv export alongside HTML report

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-10 13:29:14 +01:00

1326 lines
No EOL
66 KiB
Python

import os
import json
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
import time
import concurrent.futures
from openai import OpenAI
from rich.console import Console
from rich.table import Table
from rich.progress import track, Progress
import matplotlib.pyplot as plt
class RAGTester:
def __init__(self,
api_key: str,
assistant_id: str,
document_paths: List[str] = None,
document_path: str = None,
output_dir: str = "results",
verbose: bool = False,
model: str = "gpt-4o",
prompt_type: str = "task-based"):
"""
Initialize the RAG tester.
Args:
api_key: OpenAI API key
assistant_id: ID of the assistant to test
document_paths: List of paths to documents to use for testing (preferred over document_path)
document_path: Path to a single document or directory with documents to use for testing
output_dir: Directory to save results
verbose: Whether to print verbose output
model: The OpenAI model to use for generating questions and evaluations
prompt_type: Type of prompts to generate ("task-based", "content-based", "scenario-based")
"""
# Import docx2txt for reading .docx files
try:
import docx2txt
self.docx2txt_available = True
except ImportError:
self.docx2txt_available = False
self.console = Console() # Initialize console early
self.console.print("[yellow]Warning: docx2txt not installed. Will not be able to read .docx files.[/yellow]")
self.console.print("[yellow]Install with: pip install docx2txt[/yellow]")
self.api_key = api_key
self.assistant_id = assistant_id
self.document_paths = document_paths or []
self.document_path = document_path
self.output_dir = output_dir
self.verbose = verbose
self.model = model
self.prompt_type = prompt_type
self.client = OpenAI(api_key=api_key)
self.console = Console()
self.console.print(f"[bold blue]Initializing RAG Tester:[/bold blue]")
self.console.print(f" [cyan]Assistant ID:[/cyan] {assistant_id}")
if document_paths:
self.console.print(f" [cyan]Documents:[/cyan] {len(document_paths)} files specified")
elif document_path:
self.console.print(f" [cyan]Document/Directory:[/cyan] {document_path}")
self.console.print(f" [cyan]Output Directory:[/cyan] {output_dir}")
self.console.print(f" [cyan]Model:[/cyan] {model}")
os.makedirs(output_dir, exist_ok=True)
self.console.print(f"[green]Created output directory: {output_dir}[/green]")
# Load document content - can be from multiple sources
self.document_content = ""
total_size = 0
file_count = 0
try:
# Case 1: List of document paths specified
if document_paths:
self.console.print(f"[cyan]Loading specified documents...[/cyan]")
for doc_path in document_paths:
if not os.path.exists(doc_path):
self.console.print(f" [yellow]Warning: Document not found: {doc_path}[/yellow]")
continue
try:
filename = os.path.basename(doc_path)
file_extension = os.path.splitext(filename)[1].lower()
# Handle different file types
if file_extension == '.docx' and self.docx2txt_available:
# Use docx2txt to extract text from .docx files
import docx2txt
content = docx2txt.process(doc_path)
self.console.print(f" [green]Loaded DOCX: {filename}[/green]")
elif file_extension == '.docx' and not self.docx2txt_available:
self.console.print(f" [yellow]Skipping {filename}: docx2txt not installed[/yellow]")
continue
elif file_extension in ['.pdf', '.xls', '.xlsx', '.ppt', '.pptx']:
self.console.print(f" [yellow]Skipping unsupported file type: {filename}[/yellow]")
continue
else:
# Default text file reading
with open(doc_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
# Add the content to our document collection
self.document_content += f"\n\n--- Document: {filename} ---\n\n{content}"
file_size = len(content)
total_size += file_size
file_count += 1
self.console.print(f" [green]Loaded: {filename} ({file_size} characters)[/green]")
except Exception as e:
self.console.print(f" [yellow]Could not load {doc_path}: {str(e)}[/yellow]")
# Case 2: Directory specified
elif document_path and os.path.isdir(document_path):
self.console.print(f"[cyan]Loading documents from directory: {document_path}[/cyan]")
for filename in os.listdir(document_path):
file_path = os.path.join(document_path, filename)
# Skip directories and non-text files
if os.path.isdir(file_path):
continue
try:
file_extension = os.path.splitext(filename)[1].lower()
# Handle different file types
if file_extension == '.docx' and self.docx2txt_available:
# Use docx2txt to extract text from .docx files
import docx2txt
content = docx2txt.process(file_path)
self.console.print(f" [green]Loaded DOCX: {filename}[/green]")
elif file_extension == '.docx' and not self.docx2txt_available:
self.console.print(f" [yellow]Skipping {filename}: docx2txt not installed[/yellow]")
continue
elif file_extension in ['.pdf', '.xls', '.xlsx', '.ppt', '.pptx']:
self.console.print(f" [yellow]Skipping unsupported file type: {filename}[/yellow]")
continue
else:
# Default text file reading
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
# Add the content to our document collection
self.document_content += f"\n\n--- Document: {filename} ---\n\n{content}"
file_size = len(content)
total_size += file_size
file_count += 1
self.console.print(f" [green]Loaded: {filename} ({file_size} characters)[/green]")
except Exception as e:
self.console.print(f" [yellow]Could not load {filename}: {str(e)}[/yellow]")
# Case 3: Single document specified
elif document_path:
filename = os.path.basename(document_path)
file_extension = os.path.splitext(filename)[1].lower()
try:
# Handle different file types
if file_extension == '.docx' and self.docx2txt_available:
# Use docx2txt to extract text from .docx files
import docx2txt
self.document_content = docx2txt.process(document_path)
self.console.print(f"[green]Loaded DOCX document: {filename}[/green]")
elif file_extension == '.docx' and not self.docx2txt_available:
self.console.print(f"[yellow]Cannot load {filename}: docx2txt not installed[/yellow]")
self.document_content = ""
elif file_extension in ['.pdf', '.xls', '.xlsx', '.ppt', '.pptx']:
self.console.print(f"[yellow]Unsupported file type: {filename}[/yellow]")
self.document_content = ""
else:
# Default text file reading
with open(document_path, 'r', encoding='utf-8', errors='replace') as f:
self.document_content = f.read()
doc_size = len(self.document_content)
doc_preview = self.document_content[:100] + "..." if doc_size > 100 else self.document_content
self.console.print(f"[green]Loaded document ({doc_size} characters)[/green]")
file_count = 1
total_size = doc_size
if self.verbose:
self.console.print(f"[dim]Document preview: {doc_preview}[/dim]")
except Exception as e:
self.console.print(f"[bold red]Error loading document: {str(e)}[/bold red]")
self.document_content = ""
else:
self.console.print(f"[bold red]No documents specified![/bold red]")
# Report on loaded documents
if file_count > 0:
self.console.print(f"[green]Successfully loaded {file_count} document(s) (total {total_size} characters)[/green]")
# Check if we have any content
if not self.document_content:
self.console.print(f"[bold red]Warning: No content loaded from documents[/bold red]")
except Exception as e:
self.console.print(f"[bold red]Error loading document(s): {str(e)}[/bold red]")
raise
self.questions = []
self.results = []
def log(self, message, level="info"):
"""Log a message if verbose mode is enabled"""
if self.verbose or level != "debug":
if level == "debug":
self.console.print(f"[dim]{message}[/dim]")
elif level == "info":
self.console.print(message)
elif level == "warning":
self.console.print(f"[yellow]{message}[/yellow]")
elif level == "error":
self.console.print(f"[bold red]{message}[/bold red]")
elif level == "success":
self.console.print(f"[green]{message}[/green]")
def generate_test_questions(self, num_questions: int = 20) -> List[str]:
"""
Generate test questions from the document.
Args:
num_questions: Number of questions to generate
Returns:
List of generated questions
"""
self.console.print("[bold blue]Generating test questions from document...[/bold blue]")
self.log(f"Requesting {num_questions} questions using model: {self.model}", "info")
self.log(f"Using prompt type: {self.prompt_type}", "info")
# Define different prompt templates based on prompt_type
prompt_templates = {
"task-based": {
"system": "You are a helpful assistant that generates realistic user task requests that someone would ask a digital banner creation assistant.",
"user": f"""Generate {num_questions} diverse realistic user requests that someone would ask when using a digital banner creation assistant.
The requests should sound like natural user tasks, such as:
- "Create a banner for our new credit card offer"
- "Write copy for a savings account promotion"
- "Generate headlines for our mobile banking app"
- "Design text for a balance transfer campaign"
Important:
- Make them sound like REAL user requests, not questions about the documents
- Vary the products: credit cards, loans, savings, banking services, financial tools
- Include different banner types: promotional, informational, awareness campaigns
- Keep them concise and action-oriented
- Some should mention specific requirements like target audience or compliance needs
Context from documents to inform realistic requests:
{self.document_content[:3000]}
Return the requests as a JSON array of strings named 'questions'.
Format: {{"questions": ["request 1", "request 2", ...]}}"""
},
"content-based": {
"system": "You are a helpful assistant that generates diverse test questions from a document.",
"user": f"""Generate {num_questions} diverse questions based on the following document.
The questions should test different aspects and levels of understanding.
Return the questions as a JSON array of strings named 'questions'.
{self.document_content}"""
},
"scenario-based": {
"system": "You are a helpful assistant that generates realistic business scenario requests for a digital banner creation assistant.",
"user": f"""Generate {num_questions} diverse realistic business scenarios that combine a specific banner creation task with business context.
The scenarios should sound like real business requests, such as:
- "We're launching a new credit card for students. Create banner copy that's compliant with FCA Consumer Duty guidelines"
- "Our vulnerable customer initiative needs promotional materials. Write banner text that's clear and accessible"
- "Create an internal banner for our mobile banking upgrade, targeting existing customers"
- "We have a new savings product for first-time buyers. Generate compliant promotional copy"
Important:
- Make them sound like REAL business scenarios with context
- Include specific target audiences (students, vulnerable customers, first-time buyers, etc.)
- Mention compliance or regulatory considerations when relevant
- Vary the products and campaign types
- Include both external and internal communications
- Keep them realistic but concise
Context from documents to inform realistic scenarios:
{self.document_content[:3000]}
Return the scenarios as a JSON array of strings named 'questions'.
Format: {{"questions": ["scenario 1", "scenario 2", ...]}}"""
}
}
# Get the appropriate prompt template
prompt_template = prompt_templates.get(self.prompt_type, prompt_templates["task-based"])
try:
# First try with response_format (newer models support this)
self.log("Attempting to generate questions with JSON response format", "debug")
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": prompt_template["system"]},
{"role": "user", "content": prompt_template["user"]}
],
response_format={"type": "json_object"}
)
self.log("Successfully received response with JSON format", "debug")
if self.verbose:
self.log(f"Raw response: {response.choices[0].message.content}", "debug")
questions_json = json.loads(response.choices[0].message.content)
self.questions = questions_json.get("questions", [])
self.log(f"Extracted {len(self.questions)} questions from JSON response", "success")
except Exception as e:
# Fallback method without response_format
self.log(f"JSON response format failed: {str(e)}", "warning")
self.log("Trying fallback method without response_format", "info")
fallback_model = "gpt-3.5-turbo" if self.model != "gpt-3.5-turbo" else "gpt-4"
self.log(f"Using fallback model: {fallback_model}", "debug")
# Add JSON format instruction to the user prompt for fallback
fallback_user_prompt = prompt_template["user"]
if "Format:" not in fallback_user_prompt:
fallback_user_prompt += "\n\nReturn ONLY a JSON object with a 'questions' key containing an array of strings. Format: {'questions': ['item 1', 'item 2', ...]}"
response = self.client.chat.completions.create(
model=fallback_model,
messages=[
{"role": "system", "content": prompt_template["system"]},
{"role": "user", "content": fallback_user_prompt}
]
)
self.log("Received fallback response, attempting to parse", "debug")
if self.verbose:
self.log(f"Raw fallback response: {response.choices[0].message.content}", "debug")
# Try to parse the JSON from the response
try:
content = response.choices[0].message.content
self.log("Looking for JSON in response", "debug")
# Extract JSON if it's wrapped in code blocks or other text
import re
json_match = re.search(r'```json\s*(.*?)\s*```', content, re.DOTALL)
if json_match:
self.log("Found JSON in code block", "debug")
content = json_match.group(1)
else:
# Try to find anything that looks like JSON
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
self.log("Found JSON object in text", "debug")
content = json_match.group(0)
self.log("Attempting to parse JSON content", "debug")
questions_json = json.loads(content)
self.questions = questions_json.get("questions", [])
if self.questions:
self.log(f"Successfully extracted {len(self.questions)} questions from JSON", "success")
else:
self.log("No questions found in JSON, trying to parse from text", "warning")
# If we couldn't find questions in JSON format, try to parse them from the text
if not self.questions:
# Look for numbered or bulleted list items
self.log("Looking for numbered or bulleted lists", "debug")
questions = re.findall(r'(?:^|\n)(?:\d+\.\s*|\*\s*|-\s*)(.+?)(?=(?:\n\d+\.|\n\*|\n-|\n\n|$))', content)
if questions:
self.log(f"Found {len(questions)} questions in list format", "success")
self.questions = [q.strip() for q in questions]
except Exception as json_error:
self.log(f"Error parsing questions: {str(json_error)}", "error")
if self.verbose:
self.log(f"Content that failed to parse: {content}", "debug")
# Last resort: try to extract questions line by line
self.log("Attempting last resort method: extract lines with question marks", "warning")
lines = response.choices[0].message.content.split('\n')
potential_questions = [line for line in lines if '?' in line]
if potential_questions:
self.log(f"Found {len(potential_questions)} lines with question marks", "success")
self.questions = potential_questions[:num_questions]
else:
self.log("Could not extract any questions, giving up", "error")
raise ValueError("Could not generate or parse questions from the model's response")
# Print the questions for verification
if self.questions:
self.log("Generated questions:", "info")
for i, q in enumerate(self.questions[:5]): # Show first 5 questions
self.console.print(f" [cyan]{i+1}.[/cyan] {q}")
if len(self.questions) > 5:
self.console.print(f" ... and {len(self.questions) - 5} more questions")
else:
self.log("No questions were generated!", "error")
# Save questions to file
with open(f"{self.output_dir}/test_questions.json", "w") as f:
json.dump({"questions": self.questions}, f, indent=2)
self.console.print(f"[green]Generated {len(self.questions)} test questions[/green]")
return self.questions
def load_questions_from_file(self, file_path: str) -> List[str]:
"""Load questions from a JSON file"""
with open(file_path, 'r') as f:
data = json.load(f)
self.questions = data.get("questions", [])
return self.questions
def _run_single_test(self, question_data: Tuple[int, str, int]) -> Dict[str, Any]:
"""
Run a single test for a question
Args:
question_data: Tuple containing (question_index, question_text, iteration)
Returns:
Dictionary with test results
"""
i, question, iteration = question_data
# Create a new client for each thread to avoid rate limiting issues
client = OpenAI(api_key=self.api_key)
start_time = time.time()
result = {}
try:
# Create a thread and run it
thread = client.beta.threads.create()
thread_id = thread.id
# Add a message to the thread
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=question
)
# Run the assistant
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=self.assistant_id
)
run_id = run.id
# Wait for the run to complete
status = "queued"
while status not in ["completed", "failed", "cancelled", "expired"]:
time.sleep(1)
run = client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run_id
)
status = run.status
# Get the response
messages = client.beta.threads.messages.list(
thread_id=thread_id
)
# Get the assistant's response
response = None
for msg in messages.data:
if msg.role == "assistant":
response = msg.content[0].text.value
break
end_time = time.time()
response_time = end_time - start_time
# Store results
result = {
"question_id": i,
"question": question,
"iteration": iteration,
"response": response,
"response_time": response_time,
"thread_id": thread_id,
"run_id": run_id,
"timestamp": time.time(),
"status": status
}
except Exception as e:
end_time = time.time()
response_time = end_time - start_time
result = {
"question_id": i,
"question": question,
"iteration": iteration,
"response": f"ERROR: {str(e)}",
"response_time": response_time,
"thread_id": "",
"run_id": "",
"timestamp": time.time(),
"status": "error"
}
return result
def run_tests(self, iterations: int = 3, max_workers: int = 5, batch_size: int = None) -> List[Dict[str, Any]]:
"""
Run tests for each question multiple times in parallel.
Args:
iterations: Number of times to test each question
max_workers: Maximum number of parallel threads (default=5, adjust based on your rate limits)
batch_size: Number of questions to process in a batch (defaults to max_workers if None)
Higher values increase throughput at the cost of more memory usage
Returns:
List of test results
"""
# If batch_size is not specified, use max_workers as default
if batch_size is None:
batch_size = max_workers
if not self.questions:
self.console.print("[bold red]No questions available. Generate or load questions first.[/bold red]")
return []
self.results = []
total_tests = len(self.questions) * iterations
self.console.print(f"[bold blue]Running {total_tests} tests ({iterations} iterations for {len(self.questions)} questions) with parallelization...[/bold blue]")
self.log(f"Using assistant ID: {self.assistant_id}", "info")
self.log(f"Running with {max_workers} parallel workers", "info")
# Prepare all question-iteration combinations
test_items = []
for i, question in enumerate(self.questions):
for iteration in range(iterations):
test_items.append((i, question, iteration))
# Setup progress bar
with Progress() as progress:
task = progress.add_task("[cyan]Running tests...", total=total_tests)
# Process test items in batches for better throughput and memory management
remaining_items = test_items
while remaining_items:
# Get the next batch of items
current_batch = remaining_items[:batch_size]
remaining_items = remaining_items[batch_size:]
# Run the current batch in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit batch tasks
future_to_test = {
executor.submit(self._run_single_test, item): item for item in current_batch
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_test):
test_item = future_to_test[future]
i, question, iteration = test_item
try:
result = future.result()
self.results.append(result)
# Log brief result info
status = result.get("status", "unknown")
response_time = result.get("response_time", 0)
if status == "completed":
self.log(f"Question {i+1}, iteration {iteration+1} completed in {response_time:.2f}s", "success")
else:
self.log(f"Question {i+1}, iteration {iteration+1} ended with status: {status}", "warning")
# Save results frequently to avoid data loss
if len(self.results) % 5 == 0: # Save after every 5 completed tests
self._save_results()
except Exception as e:
self.log(f"Error processing question {i+1}, iteration {iteration+1}: {str(e)}", "error")
progress.update(task, advance=1)
# Save results after each batch
self._save_results()
# Log batch progress
if remaining_items:
completed = total_tests - len(remaining_items)
self.log(f"Batch complete. Progress: {completed}/{total_tests} tests ({completed/total_tests*100:.1f}%)", "info")
# Final save
self._save_results()
# Sort results by question_id and iteration for consistency
self.results.sort(key=lambda x: (x["question_id"], x["iteration"]))
self.console.print(f"[green]Completed {len(self.results)}/{total_tests} tests[/green]")
# Report on any failures
failures = [r for r in self.results if r.get("status") != "completed"]
if failures:
self.console.print(f"[yellow]Warning: {len(failures)} tests did not complete successfully[/yellow]")
return self.results
def evaluate_results(self) -> Dict[str, Any]:
"""
Evaluate test results for quality and consistency.
Returns:
Dictionary with evaluation metrics
"""
if not self.results:
self.console.print("[bold red]No results available. Run tests first.[/bold red]")
return {}
self.console.print("[bold blue]Evaluating test results...[/bold blue]")
# Group results by question
results_by_question = {}
for result in self.results:
q_id = result["question_id"]
if q_id not in results_by_question:
results_by_question[q_id] = []
results_by_question[q_id].append(result)
# Calculate metrics
evaluation = {
"total_questions": len(results_by_question),
"total_tests": len(self.results),
"avg_response_time": np.mean([r["response_time"] for r in self.results]),
"question_metrics": []
}
# Evaluate each question
for q_id, q_results in results_by_question.items():
# Use OpenAI to evaluate response quality and consistency
responses = [r["response"] for r in q_results]
question = q_results[0]["question"]
# Calculate response time statistics
response_times = [r["response_time"] for r in q_results]
# Evaluate consistency and quality with OpenAI
self.log(f"Evaluating responses for question: '{question[:50]}...'", "info")
self.log(f"Using model {self.model} for evaluation", "debug")
try:
# First try with response_format (newer models support this)
self.log("Attempting evaluation with JSON response format", "debug")
eval_response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a helpful assistant that evaluates the quality and consistency of responses."},
{"role": "user", "content": f"Question: {question}\n\nResponses:\n" +
"\n".join([f"Response {i+1}: {r}" for i, r in enumerate(responses)]) +
"\n\nEvaluate these responses according to these metrics:\n\n" +
"1. Quality (1-10): Overall quality of responses, including clarity, coherence, professional tone, " +
"lack of hallucinations, and grammatical correctness.\n\n" +
"2. Consistency (1-10): How consistent responses are across multiple iterations, including similarity " +
"of answers, consistency of core facts, level of detail, and lack of contradictions.\n\n" +
"3. Accuracy (1-10): Factual accuracy compared to source documents, including correct representation of " +
"information, quotes, numbers, facts, and proper interpretation of context.\n\n" +
"4. Completeness (1-10): How thoroughly responses answer the question, including addressing all aspects " +
"of the question, providing important context, sufficient detail, and lack of significant omissions.\n\n" +
"Return a JSON object with these fields:\n" +
"- quality_score: 1-10 rating of overall response quality\n" +
"- consistency_score: 1-10 rating of consistency between responses\n" +
"- accuracy_score: 1-10 rating of factual accuracy\n" +
"- completeness_score: 1-10 rating of how completely the responses answer the question\n" +
"- explanation: Brief explanation of scores"}
],
response_format={"type": "json_object"}
)
self.log("Successfully received JSON format evaluation", "debug")
if self.verbose:
self.log(f"Raw evaluation response: {eval_response.choices[0].message.content}", "debug")
evaluation_data = json.loads(eval_response.choices[0].message.content)
self.log(f"Evaluation scores: Quality={evaluation_data.get('quality_score')}, " +
f"Consistency={evaluation_data.get('consistency_score')}, " +
f"Accuracy={evaluation_data.get('accuracy_score')}, " +
f"Completeness={evaluation_data.get('completeness_score')}", "success")
except Exception as e:
# Fallback method without response_format
self.log(f"JSON response format failed: {str(e)}", "warning")
self.log("Using fallback method for evaluation", "info")
fallback_model = "gpt-3.5-turbo" if self.model != "gpt-3.5-turbo" else "gpt-4"
self.log(f"Using fallback model: {fallback_model}", "debug")
eval_response = self.client.chat.completions.create(
model=fallback_model,
messages=[
{"role": "system", "content": "You are a helpful assistant that evaluates the quality and consistency of responses."},
{"role": "user", "content": f"Question: {question}\n\nResponses:\n" +
"\n".join([f"Response {i+1}: {r}" for i, r in enumerate(responses)]) +
"\n\nEvaluate these responses according to these metrics:\n\n" +
"1. Quality (1-10): Overall quality of responses, including clarity, coherence, professional tone, " +
"lack of hallucinations, and grammatical correctness.\n\n" +
"2. Consistency (1-10): How consistent responses are across multiple iterations, including similarity " +
"of answers, consistency of core facts, level of detail, and lack of contradictions.\n\n" +
"3. Accuracy (1-10): Factual accuracy compared to source documents, including correct representation of " +
"information, quotes, numbers, facts, and proper interpretation of context.\n\n" +
"4. Completeness (1-10): How thoroughly responses answer the question, including addressing all aspects " +
"of the question, providing important context, sufficient detail, and lack of significant omissions.\n\n" +
"Return ONLY a JSON object with these fields:\n" +
"- quality_score: 1-10 rating of overall response quality\n" +
"- consistency_score: 1-10 rating of consistency between responses\n" +
"- accuracy_score: 1-10 rating of factual accuracy\n" +
"- completeness_score: 1-10 rating of how completely the responses answer the question\n" +
"- explanation: Brief explanation of scores\n\n" +
"Format: {'quality_score': X, 'consistency_score': Y, 'accuracy_score': Z, 'completeness_score': W, 'explanation': 'text'}"}
]
)
self.log("Received fallback evaluation, attempting to parse", "debug")
if self.verbose:
self.log(f"Raw fallback evaluation: {eval_response.choices[0].message.content}", "debug")
# Try to parse the JSON from the response
try:
content = eval_response.choices[0].message.content
self.log("Looking for JSON in evaluation response", "debug")
# Extract JSON if it's wrapped in code blocks or other text
import re
json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', content, re.DOTALL)
if json_match:
self.log("Found JSON in code block", "debug")
content = json_match.group(1)
else:
# Try to find anything that looks like JSON
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
self.log("Found JSON object in text", "debug")
content = json_match.group(0)
self.log("Attempting to parse JSON evaluation", "debug")
evaluation_data = json.loads(content)
self.log(f"Successfully parsed evaluation data", "success")
if self.verbose:
self.log(f"Parsed evaluation data: {evaluation_data}", "debug")
except Exception as json_error:
self.log(f"Error parsing evaluation: {str(json_error)}", "error")
if self.verbose:
self.log(f"Content that failed to parse: {content}", "debug")
# Create default evaluation data with average scores
self.log("Using default evaluation scores due to parsing error", "warning")
evaluation_data = {
"quality_score": 5,
"consistency_score": 5,
"accuracy_score": 5,
"completeness_score": 5,
"explanation": "Default scores used due to parsing error"
}
# Add metrics to evaluation
q_metrics = {
"question_id": q_id,
"question": question,
"avg_response_time": np.mean(response_times),
"std_response_time": np.std(response_times),
"quality_score": evaluation_data.get("quality_score"),
"consistency_score": evaluation_data.get("consistency_score"),
"accuracy_score": evaluation_data.get("accuracy_score"),
"completeness_score": evaluation_data.get("completeness_score"),
"explanation": evaluation_data.get("explanation")
}
evaluation["question_metrics"].append(q_metrics)
# Calculate overall scores
evaluation["avg_quality_score"] = np.mean([q["quality_score"] for q in evaluation["question_metrics"]])
evaluation["avg_consistency_score"] = np.mean([q["consistency_score"] for q in evaluation["question_metrics"]])
evaluation["avg_accuracy_score"] = np.mean([q["accuracy_score"] for q in evaluation["question_metrics"]])
evaluation["avg_completeness_score"] = np.mean([q["completeness_score"] for q in evaluation["question_metrics"]])
# Save evaluation
with open(f"{self.output_dir}/evaluation.json", "w") as f:
json.dump(evaluation, f, indent=2)
self.console.print("[green]Evaluation complete[/green]")
return evaluation
def generate_report(self) -> None:
"""Generate a comprehensive report with visualizations"""
if not hasattr(self, 'evaluation') or not self.evaluation:
self.evaluation = self.evaluate_results()
# If we still don't have evaluation data, exit early
if not hasattr(self, 'evaluation') or not self.evaluation or not self.evaluation.get('total_questions'):
self.console.print("[bold red]No evaluation data available. Cannot generate report.[/bold red]")
return
self.console.print("[bold blue]Generating report...[/bold blue]")
# Display summary table
table = Table(title="RAG Test Summary")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="magenta")
table.add_row("Total Questions", str(self.evaluation["total_questions"]))
table.add_row("Total Tests", str(self.evaluation["total_tests"]))
table.add_row("Avg Response Time", f"{self.evaluation['avg_response_time']:.2f}s")
table.add_row("Avg Quality Score", f"{self.evaluation['avg_quality_score']:.2f}/10")
table.add_row("Avg Consistency Score", f"{self.evaluation['avg_consistency_score']:.2f}/10")
table.add_row("Avg Accuracy Score", f"{self.evaluation['avg_accuracy_score']:.2f}/10")
table.add_row("Avg Completeness Score", f"{self.evaluation['avg_completeness_score']:.2f}/10")
self.console.print(table)
# Create visualizations
self._create_visualizations()
# Generate HTML report
self._generate_html_report()
# Generate Excel/CSV exports
self._generate_excel_report()
self.console.print(f"[green]Report generated in {self.output_dir}/report.html[/green]")
self.console.print(f"[green]Excel export saved to {self.output_dir}/report.xlsx[/green]")
self.console.print(f"[green]CSV export saved to {self.output_dir}/report.csv[/green]")
def _generate_excel_report(self) -> None:
"""Export evaluation data to Excel (.xlsx) and CSV for client reporting."""
summary_rows = [
{"Metric": "Total Questions", "Value": self.evaluation["total_questions"]},
{"Metric": "Total Tests", "Value": self.evaluation["total_tests"]},
{"Metric": "Avg Response Time (s)", "Value": round(self.evaluation["avg_response_time"], 2)},
{"Metric": "Avg Quality Score", "Value": round(self.evaluation["avg_quality_score"], 2)},
{"Metric": "Avg Consistency Score", "Value": round(self.evaluation["avg_consistency_score"], 2)},
{"Metric": "Avg Accuracy Score", "Value": round(self.evaluation["avg_accuracy_score"], 2)},
{"Metric": "Avg Completeness Score", "Value": round(self.evaluation["avg_completeness_score"], 2)},
]
summary_df = pd.DataFrame(summary_rows)
detail_rows = []
for q in self.evaluation["question_metrics"]:
avg_score = (
q["quality_score"] + q["consistency_score"] +
q["accuracy_score"] + q["completeness_score"]
) / 4
detail_rows.append({
"Question #": q["question_id"] + 1,
"Question": q["question"],
"Quality": q["quality_score"],
"Consistency": q["consistency_score"],
"Accuracy": q["accuracy_score"],
"Completeness": q["completeness_score"],
"Average Score": round(avg_score, 2),
"Avg Response Time (s)": round(q["avg_response_time"], 2),
"Evaluation Notes": q["explanation"],
})
detail_df = pd.DataFrame(detail_rows)
# Excel: two sheets
excel_path = f"{self.output_dir}/report.xlsx"
with pd.ExcelWriter(excel_path, engine="openpyxl") as writer:
summary_df.to_excel(writer, sheet_name="Summary", index=False)
detail_df.to_excel(writer, sheet_name="Details", index=False)
# Auto-size columns on Details sheet
ws = writer.sheets["Details"]
for col in ws.columns:
max_len = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = min(max_len + 4, 80)
# CSV: details only (what Richard uses for PowerPoint)
csv_path = f"{self.output_dir}/report.csv"
detail_df.to_csv(csv_path, index=False, encoding="utf-8-sig")
def _save_results(self) -> None:
"""Save test results to file"""
with open(f"{self.output_dir}/test_results.json", "w") as f:
json.dump({"results": self.results}, f, indent=2)
def _create_visualizations(self) -> None:
"""Create visualizations for the report"""
# Set Montserrat as the default font for all plots
plt.rcParams['font.family'] = 'Montserrat'
plt.rcParams['font.size'] = 12
# Use a professional color palette
colors = ['#3498db', '#2ecc71', '#e74c3c', '#f39c12']
# Prepare data
question_ids = [q["question_id"] for q in self.evaluation["question_metrics"]]
quality_scores = [q["quality_score"] for q in self.evaluation["question_metrics"]]
consistency_scores = [q["consistency_score"] for q in self.evaluation["question_metrics"]]
accuracy_scores = [q["accuracy_score"] for q in self.evaluation["question_metrics"]]
completeness_scores = [q["completeness_score"] for q in self.evaluation["question_metrics"]]
response_times = [q["avg_response_time"] for q in self.evaluation["question_metrics"]]
# Create score comparison by question
plt.figure(figsize=(14, 9), facecolor='white')
bar_width = 0.2
x = np.arange(len(question_ids))
plt.bar(x - 1.5*bar_width, quality_scores, bar_width, label='Quality', color=colors[0], alpha=0.8)
plt.bar(x - 0.5*bar_width, consistency_scores, bar_width, label='Consistency', color=colors[1], alpha=0.8)
plt.bar(x + 0.5*bar_width, accuracy_scores, bar_width, label='Accuracy', color=colors[2], alpha=0.8)
plt.bar(x + 1.5*bar_width, completeness_scores, bar_width, label='Completeness', color=colors[3], alpha=0.8)
plt.xlabel('Question ID', fontweight='bold')
plt.ylabel('Score (1-10)', fontweight='bold')
plt.title('Performance Scores by Question', fontsize=16, fontweight='bold', pad=20)
plt.xticks(x, question_ids)
plt.ylim(0, 10)
plt.grid(axis='y', linestyle='--', alpha=0.3)
plt.legend(frameon=True, framealpha=0.9, shadow=True)
# Add background
ax = plt.gca()
ax.set_facecolor('#f8f9fa')
plt.tight_layout()
plt.savefig(f"{self.output_dir}/scores_by_question.png", dpi=300, bbox_inches='tight')
# Create response time chart
plt.figure(figsize=(14, 7), facecolor='white')
plt.bar(question_ids, response_times, color='#2980b9', alpha=0.8)
plt.xlabel('Question ID', fontweight='bold')
plt.ylabel('Average Response Time (seconds)', fontweight='bold')
plt.title('Response Time by Question', fontsize=16, fontweight='bold', pad=20)
plt.grid(axis='y', linestyle='--', alpha=0.3)
# Add average line
avg_time = np.mean(response_times)
plt.axhline(y=avg_time, color='#e74c3c', linestyle='--',
label=f'Average: {avg_time:.2f}s')
plt.legend(frameon=True)
# Add background
ax = plt.gca()
ax.set_facecolor('#f8f9fa')
plt.tight_layout()
plt.savefig(f"{self.output_dir}/response_times.png", dpi=300, bbox_inches='tight')
# Create score distribution histogram
plt.figure(figsize=(12, 7), facecolor='white')
all_scores = quality_scores + consistency_scores + accuracy_scores + completeness_scores
bins = np.arange(0, 11, 1) - 0.5
n, bins, patches = plt.hist(all_scores, bins=bins, alpha=0.8, color='#8e44ad',
rwidth=0.85, edgecolor='white')
plt.xlabel('Score (1-10)', fontweight='bold')
plt.ylabel('Frequency', fontweight='bold')
plt.title('Distribution of All Scores', fontsize=16, fontweight='bold', pad=20)
plt.xticks(range(11))
plt.grid(axis='y', linestyle='--', alpha=0.3)
# Add mean score line
mean_score = np.mean(all_scores)
plt.axvline(x=mean_score, color='#e74c3c', linestyle='--',
label=f'Mean Score: {mean_score:.2f}')
plt.legend(frameon=True)
# Add background
ax = plt.gca()
ax.set_facecolor('#f8f9fa')
plt.tight_layout()
plt.savefig(f"{self.output_dir}/score_distribution.png", dpi=300, bbox_inches='tight')
# Create radar chart for average scores
categories = ['Quality', 'Consistency', 'Accuracy', 'Completeness']
values = [
self.evaluation["avg_quality_score"],
self.evaluation["avg_consistency_score"],
self.evaluation["avg_accuracy_score"],
self.evaluation["avg_completeness_score"]
]
angles = np.linspace(0, 2*np.pi, len(categories), endpoint=False).tolist()
values = values + [values[0]]
angles = angles + [angles[0]]
categories = categories + [categories[0]]
fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(polar=True), facecolor='white')
ax.plot(angles, values, 'o-', linewidth=3, color='#3498db')
ax.fill(angles, values, color='#3498db', alpha=0.25)
ax.set_thetagrids(np.degrees(angles[:-1]), categories[:-1], fontweight='bold')
ax.set_ylim(0, 10)
# Add circular gridlines
ax.set_rticks([2, 4, 6, 8, 10])
ax.set_rlabel_position(0)
ax.grid(True)
ax.tick_params(colors='#333333')
plt.title('Average Scores by Category', y=1.1, fontsize=16, fontweight='bold')
plt.savefig(f"{self.output_dir}/radar_chart.png", dpi=300, bbox_inches='tight')
def _generate_html_report(self) -> None:
"""Generate an HTML report"""
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>RAG Test Report for Assistant {self.assistant_id}</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Montserrat:wght@300;400;500;600;700&display=swap" rel="stylesheet">
<style>
body {{
font-family: 'Montserrat', sans-serif;
margin: 20px;
color: #333;
line-height: 1.6;
}}
h1, h2, h3 {{
color: #2c3e50;
font-weight: 600;
}}
h1 {{
font-size: 2.2em;
text-align: center;
margin-bottom: 30px;
border-bottom: 2px solid #eaeaea;
padding-bottom: 15px;
}}
h2 {{
font-size: 1.8em;
margin-top: 30px;
}}
h3 {{
font-size: 1.4em;
}}
.header-info {{
text-align: center;
color: #666;
margin-bottom: 40px;
}}
.assistant-info {{
background-color: #f0f7ff;
border-left: 5px solid #3498db;
padding: 15px;
margin-bottom: 30px;
border-radius: 5px;
}}
.summary {{
background-color: #f8f9fa;
padding: 25px;
border-radius: 8px;
margin-bottom: 30px;
box-shadow: 0 2px 10px rgba(0,0,0,0.05);
}}
table {{
border-collapse: collapse;
width: 100%;
margin: 20px 0;
font-size: 0.95em;
}}
th, td {{
border: 1px solid #ddd;
padding: 12px;
text-align: left;
}}
th {{
background-color: #f2f2f2;
font-weight: 600;
}}
tr:nth-child(even) {{
background-color: #f9f9f9;
}}
.question {{
background-color: #e8f4f8;
padding: 20px;
margin: 20px 0;
border-radius: 8px;
box-shadow: 0 2px 5px rgba(0,0,0,0.05);
}}
.charts {{
display: flex;
flex-wrap: wrap;
justify-content: space-around;
margin-top: 40px;
}}
.chart {{
margin: 20px;
text-align: center;
background-color: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 3px 10px rgba(0,0,0,0.08);
flex-basis: 45%;
}}
.chart h3 {{
color: #3498db;
margin-top: 0;
}}
img {{
max-width: 100%;
height: auto;
border-radius: 5px;
}}
.metrics-highlight {{
display: flex;
justify-content: space-between;
flex-wrap: wrap;
margin-bottom: 20px;
}}
.metric-card {{
background-color: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
flex-basis: 22%;
margin-bottom: 15px;
text-align: center;
}}
.metric-value {{
font-size: 1.8em;
font-weight: 600;
color: #3498db;
margin: 10px 0;
}}
.metric-label {{
font-size: 0.9em;
color: #666;
}}
@media (max-width: 768px) {{
.chart, .metric-card {{
flex-basis: 100%;
}}
}}
.date-generated {{
text-align: center;
margin-top: 50px;
color: #888;
font-size: 0.9em;
}}
</style>
</head>
<body>
<h1>RAG Testing Report</h1>
<div class="header-info">
<p>For Assistant: <strong>{self.assistant_id}</strong></p>
<p>Generated on: {time.strftime("%B %d, %Y at %H:%M:%S")}</p>
</div>
<div class="assistant-info">
<h2>Assistant Information</h2>
<p><strong>Assistant ID:</strong> {self.assistant_id}</p>
<p><strong>Documents:</strong> {f"{len(self.document_paths)} files" if self.document_paths else os.path.basename(self.document_path) if self.document_path else "None"}</p>
<p><strong>Test Configuration:</strong> {self.evaluation["total_questions"]} questions, {self.evaluation["total_tests"] // self.evaluation["total_questions"]} iterations per question</p>
</div>
<div class="summary">
<h2>Performance Summary</h2>
<div class="metrics-highlight">
<div class="metric-card">
<div class="metric-label">Quality</div>
<div class="metric-value">{self.evaluation["avg_quality_score"]:.1f}</div>
<div class="metric-label">out of 10</div>
</div>
<div class="metric-card">
<div class="metric-label">Consistency</div>
<div class="metric-value">{self.evaluation["avg_consistency_score"]:.1f}</div>
<div class="metric-label">out of 10</div>
</div>
<div class="metric-card">
<div class="metric-label">Accuracy</div>
<div class="metric-value">{self.evaluation["avg_accuracy_score"]:.1f}</div>
<div class="metric-label">out of 10</div>
</div>
<div class="metric-card">
<div class="metric-label">Completeness</div>
<div class="metric-value">{self.evaluation["avg_completeness_score"]:.1f}</div>
<div class="metric-label">out of 10</div>
</div>
</div>
<table>
<tr><th>Metric</th><th>Value</th></tr>
<tr><td>Total Questions</td><td>{self.evaluation["total_questions"]}</td></tr>
<tr><td>Total Tests</td><td>{self.evaluation["total_tests"]}</td></tr>
<tr><td>Avg Response Time</td><td>{self.evaluation["avg_response_time"]:.2f} seconds</td></tr>
<tr><td>Avg Quality Score</td><td>{self.evaluation["avg_quality_score"]:.2f}/10</td></tr>
<tr><td>Avg Consistency Score</td><td>{self.evaluation["avg_consistency_score"]:.2f}/10</td></tr>
<tr><td>Avg Accuracy Score</td><td>{self.evaluation["avg_accuracy_score"]:.2f}/10</td></tr>
<tr><td>Avg Completeness Score</td><td>{self.evaluation["avg_completeness_score"]:.2f}/10</td></tr>
</table>
</div>
<div class="charts">
<div class="chart">
<h3>Scores by Question</h3>
<img src="scores_by_question.png" alt="Scores by Question">
</div>
<div class="chart">
<h3>Response Times</h3>
<img src="response_times.png" alt="Response Times">
</div>
<div class="chart">
<h3>Score Distribution</h3>
<img src="score_distribution.png" alt="Score Distribution">
</div>
<div class="chart">
<h3>Average Scores by Category</h3>
<img src="radar_chart.png" alt="Average Scores by Category">
</div>
</div>
<h2>Detailed Question Analysis</h2>
"""
# Add question-by-question analysis
for q_metric in self.evaluation["question_metrics"]:
# Create a color for the question card based on average score
avg_score = (q_metric["quality_score"] + q_metric["consistency_score"] +
q_metric["accuracy_score"] + q_metric["completeness_score"]) / 4
if avg_score >= 8:
card_color = "#e3f2fd" # Light blue for high scores
border_color = "#2196f3"
elif avg_score >= 6:
card_color = "#e8f5e9" # Light green for good scores
border_color = "#4caf50"
elif avg_score >= 4:
card_color = "#fff3e0" # Light orange for medium scores
border_color = "#ff9800"
else:
card_color = "#ffebee" # Light red for low scores
border_color = "#f44336"
# Find all responses for this question
q_responses = []
for result in self.results:
if result["question_id"] == q_metric["question_id"]:
q_responses.append(result)
# Sort responses by iteration
q_responses.sort(key=lambda x: x["iteration"])
html_content += f"""
<div class="question" style="background-color: {card_color}; border-left: 5px solid {border_color};">
<h3>Question {q_metric["question_id"] + 1}</h3>
<p style="font-size: 1.1em;"><strong>Question:</strong> {q_metric["question"]}</p>
<div style="display: flex; flex-wrap: wrap; gap: 10px; margin-bottom: 15px;">
<div style="flex: 1; min-width: 120px; text-align: center; background-color: white; padding: 10px; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<div style="font-size: 0.85em; color: #666;">Quality</div>
<div style="font-size: 1.5em; font-weight: 600; color: #3498db;">{q_metric["quality_score"]}/10</div>
</div>
<div style="flex: 1; min-width: 120px; text-align: center; background-color: white; padding: 10px; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<div style="font-size: 0.85em; color: #666;">Consistency</div>
<div style="font-size: 1.5em; font-weight: 600; color: #2ecc71;">{q_metric["consistency_score"]}/10</div>
</div>
<div style="flex: 1; min-width: 120px; text-align: center; background-color: white; padding: 10px; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<div style="font-size: 0.85em; color: #666;">Accuracy</div>
<div style="font-size: 1.5em; font-weight: 600; color: #e74c3c;">{q_metric["accuracy_score"]}/10</div>
</div>
<div style="flex: 1; min-width: 120px; text-align: center; background-color: white; padding: 10px; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<div style="font-size: 0.85em; color: #666;">Completeness</div>
<div style="font-size: 1.5em; font-weight: 600; color: #f39c12;">{q_metric["completeness_score"]}/10</div>
</div>
<div style="flex: 1; min-width: 120px; text-align: center; background-color: white; padding: 10px; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<div style="font-size: 0.85em; color: #666;">Response Time</div>
<div style="font-size: 1.5em; font-weight: 600; color: #9b59b6;">{q_metric["avg_response_time"]:.2f}s</div>
</div>
</div>
<div style="background-color: white; padding: 15px; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<h4 style="margin-top: 0; color: #555;">Evaluation Notes</h4>
<p>{q_metric["explanation"]}</p>
</div>
<div style="margin-top: 15px;">
<details>
<summary style="cursor: pointer; font-weight: 600; color: #555; padding: 10px; background-color: white; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); display: inline-flex; align-items: center;">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" style="margin-right: 10px;">
<polyline points="6 9 12 15 18 9"></polyline>
</svg>
View All Responses ({len(q_responses)} iterations)
</summary>
<div style="margin-top: 15px; padding: 15px; background-color: white; border-radius: 5px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<div style="display: flex; flex-direction: column; gap: 15px;">
"""
# Add each response
for i, response in enumerate(q_responses):
response_text = response["response"] or "No response received"
response_time = response["response_time"]
html_content += f"""
<div style="border: 1px solid #e0e0e0; border-radius: 5px; overflow: hidden;">
<div style="background-color: #f5f5f5; padding: 10px; border-bottom: 1px solid #e0e0e0; font-weight: 600;">
Response {i+1} <span style="font-weight: normal; color: #666; font-size: 0.9em;">(Response time: {response_time:.2f}s)</span>
</div>
<div style="padding: 15px; white-space: pre-wrap; font-size: 0.95em; max-height: 300px; overflow-y: auto;">
{response_text}
</div>
</div>
"""
html_content += """
</div>
</div>
</details>
</div>
</div>
"""
# Add footer
html_content += f"""
<div class="date-generated">
<p>Generated by RAG Testing App on {time.strftime("%B %d, %Y at %H:%M:%S")}</p>
<p>Assistant ID: {self.assistant_id}</p>
</div>
</body>
</html>
"""
with open(f"{self.output_dir}/report.html", "w") as f:
f.write(html_content)