#!/usr/bin/env python3 """ Usage tracking module for monitoring profile usage, user activity, and cost estimation """ import os import json from datetime import datetime from threading import Lock # Usage log directory USAGE_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'usage_logs') os.makedirs(USAGE_LOG_DIR, exist_ok=True) # Thread-safe log writing log_lock = Lock() # Cost per 1K tokens (USD) # Last updated: 2026-04-22 # Source: Official API pricing pages # # OpenAI GPT-4o: https://openai.com/api/pricing/ # Google Gemini 2.5 Pro: https://ai.google.dev/gemini-api/docs/pricing # # Notes: # - Gemini 2.5 Pro uses tiered pricing: standard rate for prompts ≤200K tokens, # 2x rate for prompts >200K. Our typical QC calls (image + prompt) are well # under 200K so we use the standard tier. If brand guideline PDFs push # usage into the >200K tier we'd need to detect this per-call. # - Output pricing for Gemini 2.5 Pro doubled from $5/1M to $10/1M between # 2026-02 and 2026-04 — prior reports understated Gemini cost. # # IMPORTANT: Update these values when pricing changes # To verify current pricing, run: python validate_pricing.py COST_PER_1K_TOKENS = { 'OpenAI': { 'input': 0.0025, # GPT-4o: $2.50 per 1M input tokens 'output': 0.010, # GPT-4o: $10.00 per 1M output tokens 'model': 'gpt-4o', 'last_verified': '2026-04-22' }, 'Gemini': { 'input': 0.00125, # Gemini 2.5 Pro (≤200K prompt): $1.25 per 1M input tokens 'output': 0.010, # Gemini 2.5 Pro (≤200K prompt): $10.00 per 1M output tokens 'model': 'gemini-2.5-pro', 'last_verified': '2026-04-22' } } def get_pricing_info(): """Get current pricing information for all providers""" return COST_PER_1K_TOKENS.copy() def update_pricing(provider, input_cost_per_1k, output_cost_per_1k): """ Update pricing for a specific provider Args: provider: 'OpenAI' or 'Gemini' input_cost_per_1k: Cost per 1K input tokens (USD) output_cost_per_1k: Cost per 1K output tokens (USD) """ if provider in COST_PER_1K_TOKENS: COST_PER_1K_TOKENS[provider]['input'] = input_cost_per_1k COST_PER_1K_TOKENS[provider]['output'] = output_cost_per_1k COST_PER_1K_TOKENS[provider]['last_verified'] = datetime.now().strftime('%Y-%m-%d') print(f"Updated {provider} pricing: Input=${input_cost_per_1k}/1K, Output=${output_cost_per_1k}/1K") return True else: print(f"Unknown provider: {provider}") return False def log_analysis_start(session_id, client, profile, user_info, file_info): """ Log the start of an analysis Args: session_id: Unique session identifier client: Client name (diageo, unilever, loreal, general) profile: Profile name used user_info: User information from g.user (user_id, email, name) file_info: File information (filename, size) """ log_entry = { 'event': 'analysis_start', 'timestamp': datetime.now().isoformat(), 'session_id': session_id, 'client': client, 'profile': profile, 'user_id': user_info.get('user_id'), 'user_email': user_info.get('email'), 'user_name': user_info.get('name'), 'filename': file_info.get('filename'), 'filesize': file_info.get('size') } _write_log_entry(log_entry) return log_entry def log_analysis_complete(session_id, client, profile, user_info, results): """ Log the completion of an analysis Args: session_id: Unique session identifier client: Client name profile: Profile name used user_info: User information from g.user results: Analysis results with check count, scores, token usage, etc. """ # Calculate cost based on actual token usage total_cost, token_stats = _calculate_analysis_cost(results) log_entry = { 'event': 'analysis_complete', 'timestamp': datetime.now().isoformat(), 'session_id': session_id, 'client': client, 'profile': profile, 'user_id': user_info.get('user_id'), 'user_email': user_info.get('email'), 'user_name': user_info.get('name'), 'checks_completed': results.get('checks_completed', 0), 'overall_score': results.get('overall_score', 0), 'status': results.get('status', 'unknown'), 'total_cost_usd': round(total_cost, 4), 'token_usage': token_stats } _write_log_entry(log_entry) return log_entry def log_check_execution(session_id, check_name, llm_used, execution_time_ms, token_usage=None): """Log individual check execution for detailed tracking""" log_entry = { 'event': 'check_execution', 'timestamp': datetime.now().isoformat(), 'session_id': session_id, 'check_name': check_name, 'llm_used': llm_used, 'execution_time_ms': execution_time_ms } # Add token usage if provided if token_usage: log_entry['token_usage'] = token_usage _write_log_entry(log_entry) def log_user_login(user_info): """ Log a user login/visit event for tracking all platform users. Args: user_info: User information from auth (user_id, email, name) """ log_entry = { 'event': 'user_login', 'timestamp': datetime.now().isoformat(), 'user_id': user_info.get('user_id'), 'user_email': user_info.get('email'), 'user_name': user_info.get('name'), } _write_log_entry(log_entry) return log_entry def log_access_change(audit_entry): """ Log an access grant/revoke/promote/demote event. Args: audit_entry: dict from user_access.set_user_clients / promote_admin / demote_admin """ log_entry = { 'event': 'access_change', 'timestamp': datetime.now().isoformat(), **audit_entry } _write_log_entry(log_entry) return log_entry def log_access_request(entry): """ Log a self-service client access request from a signed-in user. Args: entry: dict with at least user_email, user_name, requested_clients, recipients, email_sent """ log_entry = { 'event': 'access_request', 'timestamp': datetime.now().isoformat(), **entry } _write_log_entry(log_entry) return log_entry def _calculate_analysis_cost(results): """ Calculate cost based on actual token usage from LLM responses Returns: tuple: (total_cost, token_statistics) """ total_cost = 0 token_stats = { 'total_prompt_tokens': 0, 'total_completion_tokens': 0, 'total_tokens': 0, 'by_provider': {} } check_results = results.get('check_results', {}) # Handle both dict (keyed by check name) and list formats if isinstance(check_results, dict): check_results = check_results.values() for check_result in check_results: # Skip if check_result is not a dict (e.g., string error message) if not isinstance(check_result, dict): continue # Get model info model_used = check_result.get('model_used', {}) if isinstance(model_used, dict): provider = model_used.get('provider', 'Gemini') else: provider = 'Gemini' # Default # Get actual token usage from check result token_usage = check_result.get('token_usage', {}) prompt_tokens = token_usage.get('prompt_tokens', 0) completion_tokens = token_usage.get('completion_tokens', 0) total_tokens = token_usage.get('total_tokens', 0) # If no token data available, use estimates as fallback if total_tokens == 0: prompt_tokens = 1000 # Estimate completion_tokens = 200 # Estimate total_tokens = prompt_tokens + completion_tokens # Update statistics token_stats['total_prompt_tokens'] += prompt_tokens token_stats['total_completion_tokens'] += completion_tokens token_stats['total_tokens'] += total_tokens # Track by provider if provider not in token_stats['by_provider']: token_stats['by_provider'][provider] = { 'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0, 'cost': 0 } token_stats['by_provider'][provider]['prompt_tokens'] += prompt_tokens token_stats['by_provider'][provider]['completion_tokens'] += completion_tokens token_stats['by_provider'][provider]['total_tokens'] += total_tokens # Calculate cost using actual tokens if provider in COST_PER_1K_TOKENS: cost_input = (prompt_tokens / 1000) * COST_PER_1K_TOKENS[provider]['input'] cost_output = (completion_tokens / 1000) * COST_PER_1K_TOKENS[provider]['output'] check_cost = cost_input + cost_output total_cost += check_cost token_stats['by_provider'][provider]['cost'] += check_cost return total_cost, token_stats def _write_log_entry(log_entry): """Write a log entry to the daily log file""" # Create daily log file (YYYY-MM-DD.jsonl) log_date = datetime.now().strftime('%Y-%m-%d') log_file = os.path.join(USAGE_LOG_DIR, f'{log_date}.jsonl') with log_lock: with open(log_file, 'a') as f: f.write(json.dumps(log_entry) + '\n') def get_usage_stats(start_date=None, end_date=None, client=None, user_id=None): """ Get usage statistics for a date range Args: start_date: Start date (YYYY-MM-DD) or None for all time end_date: End date (YYYY-MM-DD) or None for today client: Filter by client or None for all clients user_id: Filter by user or None for all users Returns: Dictionary with usage statistics """ import glob from collections import defaultdict # Find relevant log files if start_date and end_date: # TODO: Implement date range filtering pass log_files = glob.glob(os.path.join(USAGE_LOG_DIR, '*.jsonl')) stats = { 'total_analyses': 0, 'total_tokens': 0, 'total_prompt_tokens': 0, 'total_completion_tokens': 0, 'by_client': defaultdict(lambda: { 'count': 0, 'profiles': defaultdict(int), 'users': set(), 'total_checks': 0, 'estimated_cost': 0, 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0 }), 'by_user': defaultdict(lambda: { 'count': 0, 'clients': defaultdict(int), 'total_checks': 0, 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0 }), 'by_profile': defaultdict(int), 'by_provider': defaultdict(lambda: { 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0, 'cost': 0 }) } # Read and aggregate logs for log_file in log_files: with open(log_file, 'r') as f: for line in f: try: entry = json.loads(line.strip()) # Filter by client if specified if client and entry.get('client') != client: continue # Filter by user if specified if user_id and entry.get('user_id') != user_id: continue # Process complete analyses only if entry.get('event') == 'analysis_complete': stats['total_analyses'] += 1 # By client client_name = entry.get('client', 'unknown') profile_name = entry.get('profile', 'unknown') user_email = entry.get('user_email', 'unknown') # Get token usage data token_usage = entry.get('token_usage', {}) total_tokens = token_usage.get('total_tokens', 0) prompt_tokens = token_usage.get('total_prompt_tokens', 0) completion_tokens = token_usage.get('total_completion_tokens', 0) cost = entry.get('total_cost_usd', entry.get('estimated_cost_usd', 0)) # Update global stats stats['total_tokens'] += total_tokens stats['total_prompt_tokens'] += prompt_tokens stats['total_completion_tokens'] += completion_tokens stats['by_client'][client_name]['count'] += 1 stats['by_client'][client_name]['profiles'][profile_name] += 1 stats['by_client'][client_name]['users'].add(user_email) stats['by_client'][client_name]['total_checks'] += entry.get('checks_completed', 0) stats['by_client'][client_name]['estimated_cost'] += cost stats['by_client'][client_name]['total_tokens'] += total_tokens stats['by_client'][client_name]['prompt_tokens'] += prompt_tokens stats['by_client'][client_name]['completion_tokens'] += completion_tokens # By user user_uid = entry.get('user_id', 'unknown') stats['by_user'][user_uid]['count'] += 1 stats['by_user'][user_uid]['clients'][client_name] += 1 stats['by_user'][user_uid]['total_checks'] += entry.get('checks_completed', 0) stats['by_user'][user_uid]['total_tokens'] += total_tokens stats['by_user'][user_uid]['prompt_tokens'] += prompt_tokens stats['by_user'][user_uid]['completion_tokens'] += completion_tokens # By profile stats['by_profile'][profile_name] += 1 # By provider by_provider = token_usage.get('by_provider', {}) for provider, provider_stats in by_provider.items(): stats['by_provider'][provider]['total_tokens'] += provider_stats.get('total_tokens', 0) stats['by_provider'][provider]['prompt_tokens'] += provider_stats.get('prompt_tokens', 0) stats['by_provider'][provider]['completion_tokens'] += provider_stats.get('completion_tokens', 0) stats['by_provider'][provider]['cost'] += provider_stats.get('cost', 0) except json.JSONDecodeError: continue # Convert sets to lists for JSON serialization for client_data in stats['by_client'].values(): client_data['users'] = list(client_data['users']) return dict(stats)