Issue: Report generator crashed with KeyError when processing log entries that don't have token usage data (older logs created before token tracking was implemented). Fix: - Initialize all token-related fields in stats dictionary - Default to 0 when token_usage field doesn't exist in log entries - Gracefully handle logs from before token tracking enhancement Result: - Reports now work with both old and new log entries - Token fields show 0 for old entries (backward compatible) - New analyses will populate token fields correctly Tested: Report generation works with existing logs Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
419 lines
17 KiB
Python
Executable file
419 lines
17 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Usage Report Generator
|
|
Generates comprehensive usage reports from usage tracking logs
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from usage_tracker import USAGE_LOG_DIR, COST_PER_1K_TOKENS
|
|
|
|
|
|
def load_logs(start_date=None, end_date=None):
|
|
"""
|
|
Load all log entries within the specified date range
|
|
|
|
Args:
|
|
start_date: Start date (YYYY-MM-DD) or None for all time
|
|
end_date: End date (YYYY-MM-DD) or None for today
|
|
|
|
Returns:
|
|
List of log entries
|
|
"""
|
|
import glob
|
|
|
|
entries = []
|
|
log_files = sorted(glob.glob(os.path.join(USAGE_LOG_DIR, '*.jsonl')))
|
|
|
|
# Parse date filters if provided
|
|
start_dt = datetime.strptime(start_date, '%Y-%m-%d').date() if start_date else None
|
|
end_dt = datetime.strptime(end_date, '%Y-%m-%d').date() if end_date else None
|
|
|
|
for log_file in log_files:
|
|
# Extract date from filename (YYYY-MM-DD.jsonl)
|
|
file_date_str = os.path.basename(log_file).replace('.jsonl', '')
|
|
try:
|
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
|
|
|
|
# Skip if outside date range
|
|
if start_dt and file_date < start_dt:
|
|
continue
|
|
if end_dt and file_date > end_dt:
|
|
continue
|
|
|
|
# Read log entries
|
|
with open(log_file, 'r') as f:
|
|
for line in f:
|
|
try:
|
|
entry = json.loads(line.strip())
|
|
entries.append(entry)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
except ValueError:
|
|
# Invalid date format in filename, skip
|
|
continue
|
|
|
|
return entries
|
|
|
|
|
|
def generate_report(entries, format='text'):
|
|
"""
|
|
Generate a comprehensive usage report from log entries
|
|
|
|
Args:
|
|
entries: List of log entries
|
|
format: Output format ('text', 'json', 'csv')
|
|
|
|
Returns:
|
|
Report string
|
|
"""
|
|
# Initialize statistics
|
|
stats = {
|
|
'total_analyses': 0,
|
|
'total_checks': 0,
|
|
'total_cost': 0.0,
|
|
'total_tokens': 0,
|
|
'total_prompt_tokens': 0,
|
|
'total_completion_tokens': 0,
|
|
'by_client': defaultdict(lambda: {
|
|
'count': 0,
|
|
'profiles': defaultdict(int),
|
|
'users': set(),
|
|
'checks': 0,
|
|
'cost': 0.0,
|
|
'avg_score': [],
|
|
'total_tokens': 0,
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0
|
|
}),
|
|
'by_user': defaultdict(lambda: {
|
|
'name': None,
|
|
'email': None,
|
|
'count': 0,
|
|
'clients': defaultdict(int),
|
|
'profiles': defaultdict(int),
|
|
'checks': 0,
|
|
'cost': 0.0,
|
|
'avg_score': [],
|
|
'total_tokens': 0,
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0
|
|
}),
|
|
'by_profile': defaultdict(lambda: {
|
|
'count': 0,
|
|
'checks': 0,
|
|
'avg_score': [],
|
|
'clients': set()
|
|
}),
|
|
'by_date': defaultdict(lambda: {
|
|
'count': 0,
|
|
'cost': 0.0
|
|
}),
|
|
'by_provider': defaultdict(lambda: {
|
|
'total_tokens': 0,
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0,
|
|
'cost': 0.0
|
|
})
|
|
}
|
|
|
|
# Process entries
|
|
for entry in entries:
|
|
if entry.get('event') == 'analysis_complete':
|
|
stats['total_analyses'] += 1
|
|
|
|
client = entry.get('client', 'unknown')
|
|
profile = entry.get('profile', 'unknown')
|
|
user_id = entry.get('user_id', 'unknown')
|
|
user_email = entry.get('user_email', 'unknown')
|
|
user_name = entry.get('user_name', 'unknown')
|
|
checks = entry.get('checks_completed', 0)
|
|
cost = entry.get('total_cost_usd', entry.get('estimated_cost_usd', 0.0))
|
|
score = entry.get('overall_score', None)
|
|
date = entry.get('timestamp', '')[:10] # Extract YYYY-MM-DD
|
|
|
|
# Get token usage data (may not exist in old logs)
|
|
token_usage = entry.get('token_usage', {})
|
|
total_tokens = token_usage.get('total_tokens', 0)
|
|
prompt_tokens = token_usage.get('total_prompt_tokens', 0)
|
|
completion_tokens = token_usage.get('total_completion_tokens', 0)
|
|
|
|
stats['total_checks'] += checks
|
|
stats['total_cost'] += cost
|
|
stats['total_tokens'] += total_tokens
|
|
stats['total_prompt_tokens'] += prompt_tokens
|
|
stats['total_completion_tokens'] += completion_tokens
|
|
|
|
# By client
|
|
stats['by_client'][client]['count'] += 1
|
|
stats['by_client'][client]['profiles'][profile] += 1
|
|
stats['by_client'][client]['users'].add(user_email)
|
|
stats['by_client'][client]['checks'] += checks
|
|
stats['by_client'][client]['cost'] += cost
|
|
stats['by_client'][client]['total_tokens'] += total_tokens
|
|
stats['by_client'][client]['prompt_tokens'] += prompt_tokens
|
|
stats['by_client'][client]['completion_tokens'] += completion_tokens
|
|
if score is not None:
|
|
stats['by_client'][client]['avg_score'].append(score)
|
|
|
|
# By user
|
|
stats['by_user'][user_id]['name'] = user_name
|
|
stats['by_user'][user_id]['email'] = user_email
|
|
stats['by_user'][user_id]['count'] += 1
|
|
stats['by_user'][user_id]['clients'][client] += 1
|
|
stats['by_user'][user_id]['profiles'][profile] += 1
|
|
stats['by_user'][user_id]['checks'] += checks
|
|
stats['by_user'][user_id]['cost'] += cost
|
|
stats['by_user'][user_id]['total_tokens'] += total_tokens
|
|
stats['by_user'][user_id]['prompt_tokens'] += prompt_tokens
|
|
stats['by_user'][user_id]['completion_tokens'] += completion_tokens
|
|
if score is not None:
|
|
stats['by_user'][user_id]['avg_score'].append(score)
|
|
|
|
# By profile
|
|
stats['by_profile'][profile]['count'] += 1
|
|
stats['by_profile'][profile]['checks'] += checks
|
|
stats['by_profile'][profile]['clients'].add(client)
|
|
if score is not None:
|
|
stats['by_profile'][profile]['avg_score'].append(score)
|
|
|
|
# By date
|
|
stats['by_date'][date]['count'] += 1
|
|
stats['by_date'][date]['cost'] += cost
|
|
|
|
# By provider (only if token data exists)
|
|
by_provider = token_usage.get('by_provider', {})
|
|
for provider, provider_stats in by_provider.items():
|
|
stats['by_provider'][provider]['total_tokens'] += provider_stats.get('total_tokens', 0)
|
|
stats['by_provider'][provider]['prompt_tokens'] += provider_stats.get('prompt_tokens', 0)
|
|
stats['by_provider'][provider]['completion_tokens'] += provider_stats.get('completion_tokens', 0)
|
|
stats['by_provider'][provider]['cost'] += provider_stats.get('cost', 0)
|
|
|
|
# Convert sets to lists and calculate averages for JSON serialization
|
|
for client_data in stats['by_client'].values():
|
|
client_data['users'] = sorted(list(client_data['users']))
|
|
if client_data['avg_score']:
|
|
client_data['avg_score'] = round(sum(client_data['avg_score']) / len(client_data['avg_score']), 2)
|
|
else:
|
|
client_data['avg_score'] = 0
|
|
|
|
for user_data in stats['by_user'].values():
|
|
if user_data['avg_score']:
|
|
user_data['avg_score'] = round(sum(user_data['avg_score']) / len(user_data['avg_score']), 2)
|
|
else:
|
|
user_data['avg_score'] = 0
|
|
|
|
for profile_data in stats['by_profile'].values():
|
|
profile_data['clients'] = sorted(list(profile_data['clients']))
|
|
if profile_data['avg_score']:
|
|
profile_data['avg_score'] = round(sum(profile_data['avg_score']) / len(profile_data['avg_score']), 2)
|
|
else:
|
|
profile_data['avg_score'] = 0
|
|
|
|
# Generate output based on format
|
|
if format == 'json':
|
|
return json.dumps(dict(stats), indent=2, default=str)
|
|
elif format == 'csv':
|
|
return generate_csv_report(stats)
|
|
else:
|
|
return generate_text_report(stats)
|
|
|
|
|
|
def generate_text_report(stats):
|
|
"""Generate a human-readable text report"""
|
|
lines = []
|
|
lines.append("=" * 80)
|
|
lines.append("AI QC USAGE REPORT")
|
|
lines.append("=" * 80)
|
|
lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
lines.append("")
|
|
|
|
# Summary
|
|
lines.append("SUMMARY")
|
|
lines.append("-" * 80)
|
|
lines.append(f"Total Analyses: {stats['total_analyses']}")
|
|
lines.append(f"Total QC Checks: {stats['total_checks']}")
|
|
lines.append(f"Total Tokens Used: {stats['total_tokens']:,}")
|
|
lines.append(f" - Prompt Tokens: {stats['total_prompt_tokens']:,}")
|
|
lines.append(f" - Completion Tokens: {stats['total_completion_tokens']:,}")
|
|
lines.append(f"Total Cost: ${stats['total_cost']:.2f} USD")
|
|
if stats['total_analyses'] > 0:
|
|
lines.append(f"Average Checks per Analysis: {stats['total_checks'] / stats['total_analyses']:.1f}")
|
|
lines.append(f"Average Tokens per Analysis: {stats['total_tokens'] / stats['total_analyses']:.1f}")
|
|
lines.append(f"Average Cost per Analysis: ${stats['total_cost'] / stats['total_analyses']:.4f} USD")
|
|
lines.append("")
|
|
|
|
# By Client
|
|
lines.append("USAGE BY CLIENT")
|
|
lines.append("-" * 80)
|
|
for client, data in sorted(stats['by_client'].items(), key=lambda x: x[1]['count'], reverse=True):
|
|
lines.append(f"\n{client.upper()}")
|
|
lines.append(f" Analyses: {data['count']}")
|
|
lines.append(f" QC Checks: {data['checks']}")
|
|
lines.append(f" Unique Users: {len(data['users'])}")
|
|
lines.append(f" Average Score: {data['avg_score']:.1f}/100")
|
|
lines.append(f" Total Tokens: {data.get('total_tokens', 0):,} (Prompt: {data.get('prompt_tokens', 0):,}, Completion: {data.get('completion_tokens', 0):,})")
|
|
lines.append(f" Cost: ${data['cost']:.2f} USD")
|
|
lines.append(f" Top Profiles:")
|
|
for profile, count in sorted(data['profiles'].items(), key=lambda x: x[1], reverse=True)[:5]:
|
|
lines.append(f" • {profile}: {count} analyses")
|
|
lines.append("")
|
|
|
|
# By User
|
|
lines.append("USAGE BY USER")
|
|
lines.append("-" * 80)
|
|
for user_id, data in sorted(stats['by_user'].items(), key=lambda x: x[1]['count'], reverse=True):
|
|
if data['name']:
|
|
lines.append(f"\n{data['name']} ({data['email']})")
|
|
else:
|
|
lines.append(f"\n{data['email']}")
|
|
lines.append(f" Analyses: {data['count']}")
|
|
lines.append(f" QC Checks: {data['checks']}")
|
|
lines.append(f" Average Score: {data['avg_score']:.1f}/100")
|
|
lines.append(f" Total Tokens: {data.get('total_tokens', 0):,} (Prompt: {data.get('prompt_tokens', 0):,}, Completion: {data.get('completion_tokens', 0):,})")
|
|
lines.append(f" Cost: ${data['cost']:.2f} USD")
|
|
lines.append(f" Clients Used:")
|
|
for client, count in sorted(data['clients'].items(), key=lambda x: x[1], reverse=True):
|
|
lines.append(f" • {client}: {count} analyses")
|
|
lines.append("")
|
|
|
|
# By Profile
|
|
lines.append("USAGE BY PROFILE")
|
|
lines.append("-" * 80)
|
|
for profile, data in sorted(stats['by_profile'].items(), key=lambda x: x[1]['count'], reverse=True):
|
|
lines.append(f"\n{profile}")
|
|
lines.append(f" Analyses: {data['count']}")
|
|
lines.append(f" QC Checks: {data['checks']}")
|
|
lines.append(f" Average Score: {data['avg_score']:.1f}/100")
|
|
lines.append(f" Used by Clients: {', '.join(data['clients'])}")
|
|
lines.append("")
|
|
|
|
# By Date
|
|
if stats['by_date']:
|
|
lines.append("USAGE BY DATE")
|
|
lines.append("-" * 80)
|
|
for date, data in sorted(stats['by_date'].items(), reverse=True)[:30]: # Last 30 days
|
|
lines.append(f"{date}: {data['count']} analyses (${data['cost']:.2f})")
|
|
lines.append("")
|
|
|
|
# By Provider (Token Usage)
|
|
if stats.get('by_provider'):
|
|
lines.append("TOKEN USAGE BY PROVIDER")
|
|
lines.append("-" * 80)
|
|
for provider, data in sorted(stats['by_provider'].items()):
|
|
if data['total_tokens'] > 0:
|
|
lines.append(f"\n{provider}")
|
|
lines.append(f" Total Tokens: {data['total_tokens']:,}")
|
|
lines.append(f" Prompt Tokens: {data['prompt_tokens']:,}")
|
|
lines.append(f" Completion Tokens: {data['completion_tokens']:,}")
|
|
lines.append(f" Cost: ${data['cost']:.2f} USD")
|
|
lines.append("")
|
|
|
|
lines.append("=" * 80)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def generate_csv_report(stats):
|
|
"""Generate a CSV format report"""
|
|
import csv
|
|
import io
|
|
|
|
output = io.StringIO()
|
|
|
|
# Summary section
|
|
output.write("SUMMARY\n")
|
|
output.write("Metric,Value\n")
|
|
output.write(f"Total Analyses,{stats['total_analyses']}\n")
|
|
output.write(f"Total QC Checks,{stats['total_checks']}\n")
|
|
output.write(f"Total Tokens,{stats['total_tokens']}\n")
|
|
output.write(f"Total Prompt Tokens,{stats['total_prompt_tokens']}\n")
|
|
output.write(f"Total Completion Tokens,{stats['total_completion_tokens']}\n")
|
|
output.write(f"Total Cost,${stats['total_cost']:.2f}\n")
|
|
output.write("\n")
|
|
|
|
# By Client
|
|
output.write("CLIENT USAGE\n")
|
|
output.write("Client,Analyses,Checks,Users,Avg Score,Total Tokens,Prompt Tokens,Completion Tokens,Cost\n")
|
|
for client, data in sorted(stats['by_client'].items(), key=lambda x: x[1]['count'], reverse=True):
|
|
output.write(f"{client},{data['count']},{data['checks']},{len(data['users'])},{data['avg_score']:.1f},{data.get('total_tokens', 0)},{data.get('prompt_tokens', 0)},{data.get('completion_tokens', 0)},${data['cost']:.2f}\n")
|
|
output.write("\n")
|
|
|
|
# By User
|
|
output.write("USER USAGE\n")
|
|
output.write("Name,Email,Analyses,Checks,Avg Score,Total Tokens,Prompt Tokens,Completion Tokens,Cost\n")
|
|
for user_id, data in sorted(stats['by_user'].items(), key=lambda x: x[1]['count'], reverse=True):
|
|
output.write(f"\"{data['name']}\",{data['email']},{data['count']},{data['checks']},{data['avg_score']:.1f},{data.get('total_tokens', 0)},{data.get('prompt_tokens', 0)},{data.get('completion_tokens', 0)},${data['cost']:.2f}\n")
|
|
output.write("\n")
|
|
|
|
# By Profile
|
|
output.write("PROFILE USAGE\n")
|
|
output.write("Profile,Analyses,Checks,Avg Score,Clients\n")
|
|
for profile, data in sorted(stats['by_profile'].items(), key=lambda x: x[1]['count'], reverse=True):
|
|
output.write(f"{profile},{data['count']},{data['checks']},{data['avg_score']:.1f},\"{', '.join(data['clients'])}\"\n")
|
|
|
|
return output.getvalue()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Generate usage reports from AI QC logs')
|
|
parser.add_argument('--start-date', help='Start date (YYYY-MM-DD)', default=None)
|
|
parser.add_argument('--end-date', help='End date (YYYY-MM-DD)', default=None)
|
|
parser.add_argument('--format', choices=['text', 'json', 'csv'], default='text',
|
|
help='Output format (default: text)')
|
|
parser.add_argument('--output', help='Output file path (default: print to console)', default=None)
|
|
parser.add_argument('--last-days', type=int, help='Show data for last N days', default=None)
|
|
parser.add_argument('--client', help='Filter by specific client', default=None)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Handle --last-days shortcut
|
|
start_date = args.start_date
|
|
end_date = args.end_date
|
|
if args.last_days:
|
|
end_date = datetime.now().strftime('%Y-%m-%d')
|
|
start_date = (datetime.now() - timedelta(days=args.last_days)).strftime('%Y-%m-%d')
|
|
|
|
# Load logs
|
|
print(f"Loading usage logs from: {USAGE_LOG_DIR}", file=sys.stderr)
|
|
if start_date or end_date:
|
|
print(f"Date range: {start_date or 'all'} to {end_date or 'today'}", file=sys.stderr)
|
|
|
|
entries = load_logs(start_date, end_date)
|
|
|
|
# Filter by client if specified
|
|
if args.client:
|
|
entries = [e for e in entries if e.get('client') == args.client]
|
|
print(f"Filtered to client: {args.client}", file=sys.stderr)
|
|
|
|
print(f"Loaded {len(entries)} log entries", file=sys.stderr)
|
|
print("", file=sys.stderr)
|
|
|
|
if not entries:
|
|
print("No usage data found for the specified criteria.", file=sys.stderr)
|
|
return 1
|
|
|
|
# Generate report
|
|
report = generate_report(entries, format=args.format)
|
|
|
|
# Output report
|
|
if args.output:
|
|
with open(args.output, 'w') as f:
|
|
f.write(report)
|
|
print(f"Report saved to: {args.output}", file=sys.stderr)
|
|
else:
|
|
print(report)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|