#!/usr/bin/env python3 """ Usage Report Generator Generates comprehensive usage reports from usage tracking logs """ import argparse import json import os import sys from datetime import datetime, timedelta from collections import defaultdict from pathlib import Path # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.abspath(__file__))) from usage_tracker import USAGE_LOG_DIR, COST_PER_1K_TOKENS def load_logs(start_date=None, end_date=None): """ Load all log entries within the specified date range Args: start_date: Start date (YYYY-MM-DD) or None for all time end_date: End date (YYYY-MM-DD) or None for today Returns: List of log entries """ import glob entries = [] log_files = sorted(glob.glob(os.path.join(USAGE_LOG_DIR, '*.jsonl'))) # Parse date filters if provided start_dt = datetime.strptime(start_date, '%Y-%m-%d').date() if start_date else None end_dt = datetime.strptime(end_date, '%Y-%m-%d').date() if end_date else None for log_file in log_files: # Extract date from filename (YYYY-MM-DD.jsonl) file_date_str = os.path.basename(log_file).replace('.jsonl', '') try: file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date() # Skip if outside date range if start_dt and file_date < start_dt: continue if end_dt and file_date > end_dt: continue # Read log entries with open(log_file, 'r') as f: for line in f: try: entry = json.loads(line.strip()) entries.append(entry) except json.JSONDecodeError: continue except ValueError: # Invalid date format in filename, skip continue return entries def generate_report(entries, format='text'): """ Generate a comprehensive usage report from log entries Args: entries: List of log entries format: Output format ('text', 'json', 'csv') Returns: Report string """ # Initialize statistics stats = { 'total_analyses': 0, 'total_checks': 0, 'total_cost': 0.0, 'total_tokens': 0, 'total_prompt_tokens': 0, 'total_completion_tokens': 0, 'by_client': defaultdict(lambda: { 'count': 0, 'profiles': defaultdict(int), 'users': set(), 'checks': 0, 'cost': 0.0, 'avg_score': [], 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0 }), 'by_user': defaultdict(lambda: { 'name': None, 'email': None, 'count': 0, 'clients': defaultdict(int), 'profiles': defaultdict(int), 'checks': 0, 'cost': 0.0, 'avg_score': [], 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0 }), 'by_profile': defaultdict(lambda: { 'count': 0, 'checks': 0, 'avg_score': [], 'clients': set() }), 'by_date': defaultdict(lambda: { 'count': 0, 'cost': 0.0 }), 'by_provider': defaultdict(lambda: { 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0, 'cost': 0.0 }) } # Process entries for entry in entries: if entry.get('event') == 'analysis_complete': stats['total_analyses'] += 1 client = entry.get('client', 'unknown') profile = entry.get('profile', 'unknown') user_id = entry.get('user_id', 'unknown') user_email = entry.get('user_email', 'unknown') user_name = entry.get('user_name', 'unknown') checks = entry.get('checks_completed', 0) cost = entry.get('total_cost_usd', entry.get('estimated_cost_usd', 0.0)) score = entry.get('overall_score', None) date = entry.get('timestamp', '')[:10] # Extract YYYY-MM-DD # Get token usage data (may not exist in old logs) token_usage = entry.get('token_usage', {}) total_tokens = token_usage.get('total_tokens', 0) prompt_tokens = token_usage.get('total_prompt_tokens', 0) completion_tokens = token_usage.get('total_completion_tokens', 0) stats['total_checks'] += checks stats['total_cost'] += cost stats['total_tokens'] += total_tokens stats['total_prompt_tokens'] += prompt_tokens stats['total_completion_tokens'] += completion_tokens # By client stats['by_client'][client]['count'] += 1 stats['by_client'][client]['profiles'][profile] += 1 stats['by_client'][client]['users'].add(user_email) stats['by_client'][client]['checks'] += checks stats['by_client'][client]['cost'] += cost stats['by_client'][client]['total_tokens'] += total_tokens stats['by_client'][client]['prompt_tokens'] += prompt_tokens stats['by_client'][client]['completion_tokens'] += completion_tokens if score is not None: stats['by_client'][client]['avg_score'].append(score) # By user stats['by_user'][user_id]['name'] = user_name stats['by_user'][user_id]['email'] = user_email stats['by_user'][user_id]['count'] += 1 stats['by_user'][user_id]['clients'][client] += 1 stats['by_user'][user_id]['profiles'][profile] += 1 stats['by_user'][user_id]['checks'] += checks stats['by_user'][user_id]['cost'] += cost stats['by_user'][user_id]['total_tokens'] += total_tokens stats['by_user'][user_id]['prompt_tokens'] += prompt_tokens stats['by_user'][user_id]['completion_tokens'] += completion_tokens if score is not None: stats['by_user'][user_id]['avg_score'].append(score) # By profile stats['by_profile'][profile]['count'] += 1 stats['by_profile'][profile]['checks'] += checks stats['by_profile'][profile]['clients'].add(client) if score is not None: stats['by_profile'][profile]['avg_score'].append(score) # By date stats['by_date'][date]['count'] += 1 stats['by_date'][date]['cost'] += cost # By provider (only if token data exists) by_provider = token_usage.get('by_provider', {}) for provider, provider_stats in by_provider.items(): stats['by_provider'][provider]['total_tokens'] += provider_stats.get('total_tokens', 0) stats['by_provider'][provider]['prompt_tokens'] += provider_stats.get('prompt_tokens', 0) stats['by_provider'][provider]['completion_tokens'] += provider_stats.get('completion_tokens', 0) stats['by_provider'][provider]['cost'] += provider_stats.get('cost', 0) # Convert sets to lists and calculate averages for JSON serialization for client_data in stats['by_client'].values(): client_data['users'] = sorted(list(client_data['users'])) if client_data['avg_score']: client_data['avg_score'] = round(sum(client_data['avg_score']) / len(client_data['avg_score']), 2) else: client_data['avg_score'] = 0 for user_data in stats['by_user'].values(): if user_data['avg_score']: user_data['avg_score'] = round(sum(user_data['avg_score']) / len(user_data['avg_score']), 2) else: user_data['avg_score'] = 0 for profile_data in stats['by_profile'].values(): profile_data['clients'] = sorted(list(profile_data['clients'])) if profile_data['avg_score']: profile_data['avg_score'] = round(sum(profile_data['avg_score']) / len(profile_data['avg_score']), 2) else: profile_data['avg_score'] = 0 # Generate output based on format if format == 'json': return json.dumps(dict(stats), indent=2, default=str) elif format == 'csv': return generate_csv_report(stats) else: return generate_text_report(stats) def generate_text_report(stats): """Generate a human-readable text report""" lines = [] lines.append("=" * 80) lines.append("AI QC USAGE REPORT") lines.append("=" * 80) lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("") # Summary lines.append("SUMMARY") lines.append("-" * 80) lines.append(f"Total Analyses: {stats['total_analyses']}") lines.append(f"Total QC Checks: {stats['total_checks']}") lines.append(f"Total Tokens Used: {stats['total_tokens']:,}") lines.append(f" - Prompt Tokens: {stats['total_prompt_tokens']:,}") lines.append(f" - Completion Tokens: {stats['total_completion_tokens']:,}") lines.append(f"Total Cost: ${stats['total_cost']:.2f} USD") if stats['total_analyses'] > 0: lines.append(f"Average Checks per Analysis: {stats['total_checks'] / stats['total_analyses']:.1f}") lines.append(f"Average Tokens per Analysis: {stats['total_tokens'] / stats['total_analyses']:.1f}") lines.append(f"Average Cost per Analysis: ${stats['total_cost'] / stats['total_analyses']:.4f} USD") lines.append("") # By Client lines.append("USAGE BY CLIENT") lines.append("-" * 80) for client, data in sorted(stats['by_client'].items(), key=lambda x: x[1]['count'], reverse=True): lines.append(f"\n{client.upper()}") lines.append(f" Analyses: {data['count']}") lines.append(f" QC Checks: {data['checks']}") lines.append(f" Unique Users: {len(data['users'])}") lines.append(f" Average Score: {data['avg_score']:.1f}/100") lines.append(f" Total Tokens: {data.get('total_tokens', 0):,} (Prompt: {data.get('prompt_tokens', 0):,}, Completion: {data.get('completion_tokens', 0):,})") lines.append(f" Cost: ${data['cost']:.2f} USD") lines.append(f" Top Profiles:") for profile, count in sorted(data['profiles'].items(), key=lambda x: x[1], reverse=True)[:5]: lines.append(f" • {profile}: {count} analyses") lines.append("") # By User lines.append("USAGE BY USER") lines.append("-" * 80) for user_id, data in sorted(stats['by_user'].items(), key=lambda x: x[1]['count'], reverse=True): if data['name']: lines.append(f"\n{data['name']} ({data['email']})") else: lines.append(f"\n{data['email']}") lines.append(f" Analyses: {data['count']}") lines.append(f" QC Checks: {data['checks']}") lines.append(f" Average Score: {data['avg_score']:.1f}/100") lines.append(f" Total Tokens: {data.get('total_tokens', 0):,} (Prompt: {data.get('prompt_tokens', 0):,}, Completion: {data.get('completion_tokens', 0):,})") lines.append(f" Cost: ${data['cost']:.2f} USD") lines.append(f" Clients Used:") for client, count in sorted(data['clients'].items(), key=lambda x: x[1], reverse=True): lines.append(f" • {client}: {count} analyses") lines.append("") # By Profile lines.append("USAGE BY PROFILE") lines.append("-" * 80) for profile, data in sorted(stats['by_profile'].items(), key=lambda x: x[1]['count'], reverse=True): lines.append(f"\n{profile}") lines.append(f" Analyses: {data['count']}") lines.append(f" QC Checks: {data['checks']}") lines.append(f" Average Score: {data['avg_score']:.1f}/100") lines.append(f" Used by Clients: {', '.join(data['clients'])}") lines.append("") # By Date if stats['by_date']: lines.append("USAGE BY DATE") lines.append("-" * 80) for date, data in sorted(stats['by_date'].items(), reverse=True)[:30]: # Last 30 days lines.append(f"{date}: {data['count']} analyses (${data['cost']:.2f})") lines.append("") # By Provider (Token Usage) if stats.get('by_provider'): lines.append("TOKEN USAGE BY PROVIDER") lines.append("-" * 80) for provider, data in sorted(stats['by_provider'].items()): if data['total_tokens'] > 0: lines.append(f"\n{provider}") lines.append(f" Total Tokens: {data['total_tokens']:,}") lines.append(f" Prompt Tokens: {data['prompt_tokens']:,}") lines.append(f" Completion Tokens: {data['completion_tokens']:,}") lines.append(f" Cost: ${data['cost']:.2f} USD") lines.append("") lines.append("=" * 80) return "\n".join(lines) def generate_csv_report(stats): """Generate a CSV format report""" import csv import io output = io.StringIO() # Summary section output.write("SUMMARY\n") output.write("Metric,Value\n") output.write(f"Total Analyses,{stats['total_analyses']}\n") output.write(f"Total QC Checks,{stats['total_checks']}\n") output.write(f"Total Tokens,{stats['total_tokens']}\n") output.write(f"Total Prompt Tokens,{stats['total_prompt_tokens']}\n") output.write(f"Total Completion Tokens,{stats['total_completion_tokens']}\n") output.write(f"Total Cost,${stats['total_cost']:.2f}\n") output.write("\n") # By Client output.write("CLIENT USAGE\n") output.write("Client,Analyses,Checks,Users,Avg Score,Total Tokens,Prompt Tokens,Completion Tokens,Cost\n") for client, data in sorted(stats['by_client'].items(), key=lambda x: x[1]['count'], reverse=True): output.write(f"{client},{data['count']},{data['checks']},{len(data['users'])},{data['avg_score']:.1f},{data.get('total_tokens', 0)},{data.get('prompt_tokens', 0)},{data.get('completion_tokens', 0)},${data['cost']:.2f}\n") output.write("\n") # By User output.write("USER USAGE\n") output.write("Name,Email,Analyses,Checks,Avg Score,Total Tokens,Prompt Tokens,Completion Tokens,Cost\n") for user_id, data in sorted(stats['by_user'].items(), key=lambda x: x[1]['count'], reverse=True): output.write(f"\"{data['name']}\",{data['email']},{data['count']},{data['checks']},{data['avg_score']:.1f},{data.get('total_tokens', 0)},{data.get('prompt_tokens', 0)},{data.get('completion_tokens', 0)},${data['cost']:.2f}\n") output.write("\n") # By Profile output.write("PROFILE USAGE\n") output.write("Profile,Analyses,Checks,Avg Score,Clients\n") for profile, data in sorted(stats['by_profile'].items(), key=lambda x: x[1]['count'], reverse=True): output.write(f"{profile},{data['count']},{data['checks']},{data['avg_score']:.1f},\"{', '.join(data['clients'])}\"\n") return output.getvalue() def main(): parser = argparse.ArgumentParser(description='Generate usage reports from AI QC logs') parser.add_argument('--start-date', help='Start date (YYYY-MM-DD)', default=None) parser.add_argument('--end-date', help='End date (YYYY-MM-DD)', default=None) parser.add_argument('--format', choices=['text', 'json', 'csv'], default='text', help='Output format (default: text)') parser.add_argument('--output', help='Output file path (default: print to console)', default=None) parser.add_argument('--last-days', type=int, help='Show data for last N days', default=None) parser.add_argument('--client', help='Filter by specific client', default=None) args = parser.parse_args() # Handle --last-days shortcut start_date = args.start_date end_date = args.end_date if args.last_days: end_date = datetime.now().strftime('%Y-%m-%d') start_date = (datetime.now() - timedelta(days=args.last_days)).strftime('%Y-%m-%d') # Load logs print(f"Loading usage logs from: {USAGE_LOG_DIR}", file=sys.stderr) if start_date or end_date: print(f"Date range: {start_date or 'all'} to {end_date or 'today'}", file=sys.stderr) entries = load_logs(start_date, end_date) # Filter by client if specified if args.client: entries = [e for e in entries if e.get('client') == args.client] print(f"Filtered to client: {args.client}", file=sys.stderr) print(f"Loaded {len(entries)} log entries", file=sys.stderr) print("", file=sys.stderr) if not entries: print("No usage data found for the specified criteria.", file=sys.stderr) return 1 # Generate report report = generate_report(entries, format=args.format) # Output report if args.output: with open(args.output, 'w') as f: f.write(report) print(f"Report saved to: {args.output}", file=sys.stderr) else: print(report) return 0 if __name__ == '__main__': sys.exit(main())