#!/usr/bin/env python3 """ Usage tracking module for monitoring profile usage, user activity, and cost estimation """ import os import json from datetime import datetime from threading import Lock # Usage log directory USAGE_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'usage_logs') os.makedirs(USAGE_LOG_DIR, exist_ok=True) # Thread-safe log writing log_lock = Lock() # Cost estimation per LLM (approximate USD per 1K tokens) COST_PER_1K_TOKENS = { 'OpenAI': { 'input': 0.0025, # $2.50 per 1M tokens = $0.0025 per 1K 'output': 0.010 # $10.00 per 1M tokens = $0.010 per 1K }, 'Gemini': { 'input': 0.00125, # Gemini 2.5 Pro pricing (estimated) 'output': 0.005 } } def log_analysis_start(session_id, client, profile, user_info, file_info): """ Log the start of an analysis Args: session_id: Unique session identifier client: Client name (diageo, unilever, loreal, general) profile: Profile name used user_info: User information from g.user (user_id, email, name) file_info: File information (filename, size) """ log_entry = { 'event': 'analysis_start', 'timestamp': datetime.now().isoformat(), 'session_id': session_id, 'client': client, 'profile': profile, 'user_id': user_info.get('user_id'), 'user_email': user_info.get('email'), 'user_name': user_info.get('name'), 'filename': file_info.get('filename'), 'filesize': file_info.get('size') } _write_log_entry(log_entry) return log_entry def log_analysis_complete(session_id, client, profile, user_info, results): """ Log the completion of an analysis Args: session_id: Unique session identifier client: Client name profile: Profile name used user_info: User information from g.user results: Analysis results with check count, scores, etc. """ # Estimate cost based on checks and LLM usage total_cost = _estimate_analysis_cost(results) log_entry = { 'event': 'analysis_complete', 'timestamp': datetime.now().isoformat(), 'session_id': session_id, 'client': client, 'profile': profile, 'user_id': user_info.get('user_id'), 'user_email': user_info.get('email'), 'user_name': user_info.get('name'), 'checks_completed': results.get('checks_completed', 0), 'overall_score': results.get('overall_score', 0), 'status': results.get('status', 'unknown'), 'estimated_cost_usd': round(total_cost, 4) } _write_log_entry(log_entry) return log_entry def log_check_execution(session_id, check_name, llm_used, execution_time_ms): """Log individual check execution for detailed tracking""" log_entry = { 'event': 'check_execution', 'timestamp': datetime.now().isoformat(), 'session_id': session_id, 'check_name': check_name, 'llm_used': llm_used, 'execution_time_ms': execution_time_ms } _write_log_entry(log_entry) def _estimate_analysis_cost(results): """Estimate cost based on LLM usage""" # Simplified cost estimation # Average: 1000 input tokens + 200 output tokens per check total_cost = 0 checks_completed = results.get('checks_completed', 0) check_results = results.get('check_results', {}) # Handle both dict (keyed by check name) and list formats if isinstance(check_results, dict): check_results = check_results.values() for check_result in check_results: # Skip if check_result is not a dict (e.g., string error message) if not isinstance(check_result, dict): continue # Safely get model info model_used = check_result.get('model_used', {}) if isinstance(model_used, dict): llm = model_used.get('provider', 'Gemini') else: llm = 'Gemini' # Default # Estimate tokens (rough approximation) input_tokens = 1000 # Prompt + image description output_tokens = 200 # Response if llm in COST_PER_1K_TOKENS: cost_input = (input_tokens / 1000) * COST_PER_1K_TOKENS[llm]['input'] cost_output = (output_tokens / 1000) * COST_PER_1K_TOKENS[llm]['output'] total_cost += cost_input + cost_output return total_cost def _write_log_entry(log_entry): """Write a log entry to the daily log file""" # Create daily log file (YYYY-MM-DD.jsonl) log_date = datetime.now().strftime('%Y-%m-%d') log_file = os.path.join(USAGE_LOG_DIR, f'{log_date}.jsonl') with log_lock: with open(log_file, 'a') as f: f.write(json.dumps(log_entry) + '\n') def get_usage_stats(start_date=None, end_date=None, client=None, user_id=None): """ Get usage statistics for a date range Args: start_date: Start date (YYYY-MM-DD) or None for all time end_date: End date (YYYY-MM-DD) or None for today client: Filter by client or None for all clients user_id: Filter by user or None for all users Returns: Dictionary with usage statistics """ import glob from collections import defaultdict # Find relevant log files if start_date and end_date: # TODO: Implement date range filtering pass log_files = glob.glob(os.path.join(USAGE_LOG_DIR, '*.jsonl')) stats = { 'total_analyses': 0, 'by_client': defaultdict(lambda: { 'count': 0, 'profiles': defaultdict(int), 'users': set(), 'total_checks': 0, 'estimated_cost': 0 }), 'by_user': defaultdict(lambda: { 'count': 0, 'clients': defaultdict(int), 'total_checks': 0 }), 'by_profile': defaultdict(int) } # Read and aggregate logs for log_file in log_files: with open(log_file, 'r') as f: for line in f: try: entry = json.loads(line.strip()) # Filter by client if specified if client and entry.get('client') != client: continue # Filter by user if specified if user_id and entry.get('user_id') != user_id: continue # Process complete analyses only if entry.get('event') == 'analysis_complete': stats['total_analyses'] += 1 # By client client_name = entry.get('client', 'unknown') profile_name = entry.get('profile', 'unknown') user_email = entry.get('user_email', 'unknown') stats['by_client'][client_name]['count'] += 1 stats['by_client'][client_name]['profiles'][profile_name] += 1 stats['by_client'][client_name]['users'].add(user_email) stats['by_client'][client_name]['total_checks'] += entry.get('checks_completed', 0) stats['by_client'][client_name]['estimated_cost'] += entry.get('estimated_cost_usd', 0) # By user user_uid = entry.get('user_id', 'unknown') stats['by_user'][user_uid]['count'] += 1 stats['by_user'][user_uid]['clients'][client_name] += 1 stats['by_user'][user_uid]['total_checks'] += entry.get('checks_completed', 0) # By profile stats['by_profile'][profile_name] += 1 except json.JSONDecodeError: continue # Convert sets to lists for JSON serialization for client_data in stats['by_client'].values(): client_data['users'] = list(client_data['users']) return dict(stats)