ai_qc/backend/generate_usage_report.py
nickviljoen a40cf48979 Fix usage report generator to handle missing token data
Issue: Report generator crashed with KeyError when processing log
entries that don't have token usage data (older logs created before
token tracking was implemented).

Fix:
- Initialize all token-related fields in stats dictionary
- Default to 0 when token_usage field doesn't exist in log entries
- Gracefully handle logs from before token tracking enhancement

Result:
- Reports now work with both old and new log entries
- Token fields show 0 for old entries (backward compatible)
- New analyses will populate token fields correctly

Tested: Report generation works with existing logs

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-02 13:32:47 +02:00

419 lines
17 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Usage Report Generator
Generates comprehensive usage reports from usage tracking logs
"""
import argparse
import json
import os
import sys
from datetime import datetime, timedelta
from collections import defaultdict
from pathlib import Path
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from usage_tracker import USAGE_LOG_DIR, COST_PER_1K_TOKENS
def load_logs(start_date=None, end_date=None):
"""
Load all log entries within the specified date range
Args:
start_date: Start date (YYYY-MM-DD) or None for all time
end_date: End date (YYYY-MM-DD) or None for today
Returns:
List of log entries
"""
import glob
entries = []
log_files = sorted(glob.glob(os.path.join(USAGE_LOG_DIR, '*.jsonl')))
# Parse date filters if provided
start_dt = datetime.strptime(start_date, '%Y-%m-%d').date() if start_date else None
end_dt = datetime.strptime(end_date, '%Y-%m-%d').date() if end_date else None
for log_file in log_files:
# Extract date from filename (YYYY-MM-DD.jsonl)
file_date_str = os.path.basename(log_file).replace('.jsonl', '')
try:
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
# Skip if outside date range
if start_dt and file_date < start_dt:
continue
if end_dt and file_date > end_dt:
continue
# Read log entries
with open(log_file, 'r') as f:
for line in f:
try:
entry = json.loads(line.strip())
entries.append(entry)
except json.JSONDecodeError:
continue
except ValueError:
# Invalid date format in filename, skip
continue
return entries
def generate_report(entries, format='text'):
"""
Generate a comprehensive usage report from log entries
Args:
entries: List of log entries
format: Output format ('text', 'json', 'csv')
Returns:
Report string
"""
# Initialize statistics
stats = {
'total_analyses': 0,
'total_checks': 0,
'total_cost': 0.0,
'total_tokens': 0,
'total_prompt_tokens': 0,
'total_completion_tokens': 0,
'by_client': defaultdict(lambda: {
'count': 0,
'profiles': defaultdict(int),
'users': set(),
'checks': 0,
'cost': 0.0,
'avg_score': [],
'total_tokens': 0,
'prompt_tokens': 0,
'completion_tokens': 0
}),
'by_user': defaultdict(lambda: {
'name': None,
'email': None,
'count': 0,
'clients': defaultdict(int),
'profiles': defaultdict(int),
'checks': 0,
'cost': 0.0,
'avg_score': [],
'total_tokens': 0,
'prompt_tokens': 0,
'completion_tokens': 0
}),
'by_profile': defaultdict(lambda: {
'count': 0,
'checks': 0,
'avg_score': [],
'clients': set()
}),
'by_date': defaultdict(lambda: {
'count': 0,
'cost': 0.0
}),
'by_provider': defaultdict(lambda: {
'total_tokens': 0,
'prompt_tokens': 0,
'completion_tokens': 0,
'cost': 0.0
})
}
# Process entries
for entry in entries:
if entry.get('event') == 'analysis_complete':
stats['total_analyses'] += 1
client = entry.get('client', 'unknown')
profile = entry.get('profile', 'unknown')
user_id = entry.get('user_id', 'unknown')
user_email = entry.get('user_email', 'unknown')
user_name = entry.get('user_name', 'unknown')
checks = entry.get('checks_completed', 0)
cost = entry.get('total_cost_usd', entry.get('estimated_cost_usd', 0.0))
score = entry.get('overall_score', None)
date = entry.get('timestamp', '')[:10] # Extract YYYY-MM-DD
# Get token usage data (may not exist in old logs)
token_usage = entry.get('token_usage', {})
total_tokens = token_usage.get('total_tokens', 0)
prompt_tokens = token_usage.get('total_prompt_tokens', 0)
completion_tokens = token_usage.get('total_completion_tokens', 0)
stats['total_checks'] += checks
stats['total_cost'] += cost
stats['total_tokens'] += total_tokens
stats['total_prompt_tokens'] += prompt_tokens
stats['total_completion_tokens'] += completion_tokens
# By client
stats['by_client'][client]['count'] += 1
stats['by_client'][client]['profiles'][profile] += 1
stats['by_client'][client]['users'].add(user_email)
stats['by_client'][client]['checks'] += checks
stats['by_client'][client]['cost'] += cost
stats['by_client'][client]['total_tokens'] += total_tokens
stats['by_client'][client]['prompt_tokens'] += prompt_tokens
stats['by_client'][client]['completion_tokens'] += completion_tokens
if score is not None:
stats['by_client'][client]['avg_score'].append(score)
# By user
stats['by_user'][user_id]['name'] = user_name
stats['by_user'][user_id]['email'] = user_email
stats['by_user'][user_id]['count'] += 1
stats['by_user'][user_id]['clients'][client] += 1
stats['by_user'][user_id]['profiles'][profile] += 1
stats['by_user'][user_id]['checks'] += checks
stats['by_user'][user_id]['cost'] += cost
stats['by_user'][user_id]['total_tokens'] += total_tokens
stats['by_user'][user_id]['prompt_tokens'] += prompt_tokens
stats['by_user'][user_id]['completion_tokens'] += completion_tokens
if score is not None:
stats['by_user'][user_id]['avg_score'].append(score)
# By profile
stats['by_profile'][profile]['count'] += 1
stats['by_profile'][profile]['checks'] += checks
stats['by_profile'][profile]['clients'].add(client)
if score is not None:
stats['by_profile'][profile]['avg_score'].append(score)
# By date
stats['by_date'][date]['count'] += 1
stats['by_date'][date]['cost'] += cost
# By provider (only if token data exists)
by_provider = token_usage.get('by_provider', {})
for provider, provider_stats in by_provider.items():
stats['by_provider'][provider]['total_tokens'] += provider_stats.get('total_tokens', 0)
stats['by_provider'][provider]['prompt_tokens'] += provider_stats.get('prompt_tokens', 0)
stats['by_provider'][provider]['completion_tokens'] += provider_stats.get('completion_tokens', 0)
stats['by_provider'][provider]['cost'] += provider_stats.get('cost', 0)
# Convert sets to lists and calculate averages for JSON serialization
for client_data in stats['by_client'].values():
client_data['users'] = sorted(list(client_data['users']))
if client_data['avg_score']:
client_data['avg_score'] = round(sum(client_data['avg_score']) / len(client_data['avg_score']), 2)
else:
client_data['avg_score'] = 0
for user_data in stats['by_user'].values():
if user_data['avg_score']:
user_data['avg_score'] = round(sum(user_data['avg_score']) / len(user_data['avg_score']), 2)
else:
user_data['avg_score'] = 0
for profile_data in stats['by_profile'].values():
profile_data['clients'] = sorted(list(profile_data['clients']))
if profile_data['avg_score']:
profile_data['avg_score'] = round(sum(profile_data['avg_score']) / len(profile_data['avg_score']), 2)
else:
profile_data['avg_score'] = 0
# Generate output based on format
if format == 'json':
return json.dumps(dict(stats), indent=2, default=str)
elif format == 'csv':
return generate_csv_report(stats)
else:
return generate_text_report(stats)
def generate_text_report(stats):
"""Generate a human-readable text report"""
lines = []
lines.append("=" * 80)
lines.append("AI QC USAGE REPORT")
lines.append("=" * 80)
lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
# Summary
lines.append("SUMMARY")
lines.append("-" * 80)
lines.append(f"Total Analyses: {stats['total_analyses']}")
lines.append(f"Total QC Checks: {stats['total_checks']}")
lines.append(f"Total Tokens Used: {stats['total_tokens']:,}")
lines.append(f" - Prompt Tokens: {stats['total_prompt_tokens']:,}")
lines.append(f" - Completion Tokens: {stats['total_completion_tokens']:,}")
lines.append(f"Total Cost: ${stats['total_cost']:.2f} USD")
if stats['total_analyses'] > 0:
lines.append(f"Average Checks per Analysis: {stats['total_checks'] / stats['total_analyses']:.1f}")
lines.append(f"Average Tokens per Analysis: {stats['total_tokens'] / stats['total_analyses']:.1f}")
lines.append(f"Average Cost per Analysis: ${stats['total_cost'] / stats['total_analyses']:.4f} USD")
lines.append("")
# By Client
lines.append("USAGE BY CLIENT")
lines.append("-" * 80)
for client, data in sorted(stats['by_client'].items(), key=lambda x: x[1]['count'], reverse=True):
lines.append(f"\n{client.upper()}")
lines.append(f" Analyses: {data['count']}")
lines.append(f" QC Checks: {data['checks']}")
lines.append(f" Unique Users: {len(data['users'])}")
lines.append(f" Average Score: {data['avg_score']:.1f}/100")
lines.append(f" Total Tokens: {data.get('total_tokens', 0):,} (Prompt: {data.get('prompt_tokens', 0):,}, Completion: {data.get('completion_tokens', 0):,})")
lines.append(f" Cost: ${data['cost']:.2f} USD")
lines.append(f" Top Profiles:")
for profile, count in sorted(data['profiles'].items(), key=lambda x: x[1], reverse=True)[:5]:
lines.append(f"{profile}: {count} analyses")
lines.append("")
# By User
lines.append("USAGE BY USER")
lines.append("-" * 80)
for user_id, data in sorted(stats['by_user'].items(), key=lambda x: x[1]['count'], reverse=True):
if data['name']:
lines.append(f"\n{data['name']} ({data['email']})")
else:
lines.append(f"\n{data['email']}")
lines.append(f" Analyses: {data['count']}")
lines.append(f" QC Checks: {data['checks']}")
lines.append(f" Average Score: {data['avg_score']:.1f}/100")
lines.append(f" Total Tokens: {data.get('total_tokens', 0):,} (Prompt: {data.get('prompt_tokens', 0):,}, Completion: {data.get('completion_tokens', 0):,})")
lines.append(f" Cost: ${data['cost']:.2f} USD")
lines.append(f" Clients Used:")
for client, count in sorted(data['clients'].items(), key=lambda x: x[1], reverse=True):
lines.append(f"{client}: {count} analyses")
lines.append("")
# By Profile
lines.append("USAGE BY PROFILE")
lines.append("-" * 80)
for profile, data in sorted(stats['by_profile'].items(), key=lambda x: x[1]['count'], reverse=True):
lines.append(f"\n{profile}")
lines.append(f" Analyses: {data['count']}")
lines.append(f" QC Checks: {data['checks']}")
lines.append(f" Average Score: {data['avg_score']:.1f}/100")
lines.append(f" Used by Clients: {', '.join(data['clients'])}")
lines.append("")
# By Date
if stats['by_date']:
lines.append("USAGE BY DATE")
lines.append("-" * 80)
for date, data in sorted(stats['by_date'].items(), reverse=True)[:30]: # Last 30 days
lines.append(f"{date}: {data['count']} analyses (${data['cost']:.2f})")
lines.append("")
# By Provider (Token Usage)
if stats.get('by_provider'):
lines.append("TOKEN USAGE BY PROVIDER")
lines.append("-" * 80)
for provider, data in sorted(stats['by_provider'].items()):
if data['total_tokens'] > 0:
lines.append(f"\n{provider}")
lines.append(f" Total Tokens: {data['total_tokens']:,}")
lines.append(f" Prompt Tokens: {data['prompt_tokens']:,}")
lines.append(f" Completion Tokens: {data['completion_tokens']:,}")
lines.append(f" Cost: ${data['cost']:.2f} USD")
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def generate_csv_report(stats):
"""Generate a CSV format report"""
import csv
import io
output = io.StringIO()
# Summary section
output.write("SUMMARY\n")
output.write("Metric,Value\n")
output.write(f"Total Analyses,{stats['total_analyses']}\n")
output.write(f"Total QC Checks,{stats['total_checks']}\n")
output.write(f"Total Tokens,{stats['total_tokens']}\n")
output.write(f"Total Prompt Tokens,{stats['total_prompt_tokens']}\n")
output.write(f"Total Completion Tokens,{stats['total_completion_tokens']}\n")
output.write(f"Total Cost,${stats['total_cost']:.2f}\n")
output.write("\n")
# By Client
output.write("CLIENT USAGE\n")
output.write("Client,Analyses,Checks,Users,Avg Score,Total Tokens,Prompt Tokens,Completion Tokens,Cost\n")
for client, data in sorted(stats['by_client'].items(), key=lambda x: x[1]['count'], reverse=True):
output.write(f"{client},{data['count']},{data['checks']},{len(data['users'])},{data['avg_score']:.1f},{data.get('total_tokens', 0)},{data.get('prompt_tokens', 0)},{data.get('completion_tokens', 0)},${data['cost']:.2f}\n")
output.write("\n")
# By User
output.write("USER USAGE\n")
output.write("Name,Email,Analyses,Checks,Avg Score,Total Tokens,Prompt Tokens,Completion Tokens,Cost\n")
for user_id, data in sorted(stats['by_user'].items(), key=lambda x: x[1]['count'], reverse=True):
output.write(f"\"{data['name']}\",{data['email']},{data['count']},{data['checks']},{data['avg_score']:.1f},{data.get('total_tokens', 0)},{data.get('prompt_tokens', 0)},{data.get('completion_tokens', 0)},${data['cost']:.2f}\n")
output.write("\n")
# By Profile
output.write("PROFILE USAGE\n")
output.write("Profile,Analyses,Checks,Avg Score,Clients\n")
for profile, data in sorted(stats['by_profile'].items(), key=lambda x: x[1]['count'], reverse=True):
output.write(f"{profile},{data['count']},{data['checks']},{data['avg_score']:.1f},\"{', '.join(data['clients'])}\"\n")
return output.getvalue()
def main():
parser = argparse.ArgumentParser(description='Generate usage reports from AI QC logs')
parser.add_argument('--start-date', help='Start date (YYYY-MM-DD)', default=None)
parser.add_argument('--end-date', help='End date (YYYY-MM-DD)', default=None)
parser.add_argument('--format', choices=['text', 'json', 'csv'], default='text',
help='Output format (default: text)')
parser.add_argument('--output', help='Output file path (default: print to console)', default=None)
parser.add_argument('--last-days', type=int, help='Show data for last N days', default=None)
parser.add_argument('--client', help='Filter by specific client', default=None)
args = parser.parse_args()
# Handle --last-days shortcut
start_date = args.start_date
end_date = args.end_date
if args.last_days:
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=args.last_days)).strftime('%Y-%m-%d')
# Load logs
print(f"Loading usage logs from: {USAGE_LOG_DIR}", file=sys.stderr)
if start_date or end_date:
print(f"Date range: {start_date or 'all'} to {end_date or 'today'}", file=sys.stderr)
entries = load_logs(start_date, end_date)
# Filter by client if specified
if args.client:
entries = [e for e in entries if e.get('client') == args.client]
print(f"Filtered to client: {args.client}", file=sys.stderr)
print(f"Loaded {len(entries)} log entries", file=sys.stderr)
print("", file=sys.stderr)
if not entries:
print("No usage data found for the specified criteria.", file=sys.stderr)
return 1
# Generate report
report = generate_report(entries, format=args.format)
# Output report
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report saved to: {args.output}", file=sys.stderr)
else:
print(report)
return 0
if __name__ == '__main__':
sys.exit(main())