ai_qc/backend/usage_tracker.py
nickviljoen 125c5e7064 Simplify settings UX and add client access request flow
Settings panel:
- Reference Assets tab: collapse the Brand Name + Tags + Description form to a single Name field; the user-entered name now drives the dropdown label on the main configuration page (falls back to filename for legacy records).
- Media Plan tab: add a Name field. Backend stores display_name on the plan record, and both the active-plan card and the main-page dropdown prefer display_name (falling back to original_filename for old plans).
- Modal footer is now context-aware: Save Profile + Cancel show only on the Profile / Create Profile tabs; Reference Assets / QC Tools / Media Plan show a single green Save button that closes the modal.

Client access request:
- New "Request Client Access" tile on the client picker, alongside the user's existing client tiles. Opens a modal that auto-fills name + email from the MSAL session (read-only), shows checkboxes for clients the user does not already have, and accepts an optional reason.
- New POST /api/access_request endpoint (auth-required) that takes identity from the verified session, validates the requested clients, looks up admin recipients via user_access.list_access_entries, and emails them via the new email_service module (Mailgun SMTP with STARTTLS). Reply-To is set to the requester. Logs an access_request event to the daily JSONL usage logs.
- New GET /api/all_clients endpoint so the form can list clients the requester currently cannot see.
- Mailgun SMTP credentials added to the four env files (and placeholders in the .env.template files) under SMTP_SERVER / SMTP_PORT / SMTP_USER / SMTP_PASSWORD / SENDER_EMAIL / ERROR_EMAIL / REPORT_EMAILS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 14:02:40 +02:00

411 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Usage tracking module for monitoring profile usage, user activity, and cost estimation
"""
import os
import json
from datetime import datetime
from threading import Lock
# Usage log directory
USAGE_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'usage_logs')
os.makedirs(USAGE_LOG_DIR, exist_ok=True)
# Thread-safe log writing
log_lock = Lock()
# Cost per 1K tokens (USD)
# Last updated: 2026-04-22
# Source: Official API pricing pages
#
# OpenAI GPT-4o: https://openai.com/api/pricing/
# Google Gemini 2.5 Pro: https://ai.google.dev/gemini-api/docs/pricing
#
# Notes:
# - Gemini 2.5 Pro uses tiered pricing: standard rate for prompts ≤200K tokens,
# 2x rate for prompts >200K. Our typical QC calls (image + prompt) are well
# under 200K so we use the standard tier. If brand guideline PDFs push
# usage into the >200K tier we'd need to detect this per-call.
# - Output pricing for Gemini 2.5 Pro doubled from $5/1M to $10/1M between
# 2026-02 and 2026-04 — prior reports understated Gemini cost.
#
# IMPORTANT: Update these values when pricing changes
# To verify current pricing, run: python validate_pricing.py
COST_PER_1K_TOKENS = {
'OpenAI': {
'input': 0.0025, # GPT-4o: $2.50 per 1M input tokens
'output': 0.010, # GPT-4o: $10.00 per 1M output tokens
'model': 'gpt-4o',
'last_verified': '2026-04-22'
},
'Gemini': {
'input': 0.00125, # Gemini 2.5 Pro (≤200K prompt): $1.25 per 1M input tokens
'output': 0.010, # Gemini 2.5 Pro (≤200K prompt): $10.00 per 1M output tokens
'model': 'gemini-2.5-pro',
'last_verified': '2026-04-22'
}
}
def get_pricing_info():
"""Get current pricing information for all providers"""
return COST_PER_1K_TOKENS.copy()
def update_pricing(provider, input_cost_per_1k, output_cost_per_1k):
"""
Update pricing for a specific provider
Args:
provider: 'OpenAI' or 'Gemini'
input_cost_per_1k: Cost per 1K input tokens (USD)
output_cost_per_1k: Cost per 1K output tokens (USD)
"""
if provider in COST_PER_1K_TOKENS:
COST_PER_1K_TOKENS[provider]['input'] = input_cost_per_1k
COST_PER_1K_TOKENS[provider]['output'] = output_cost_per_1k
COST_PER_1K_TOKENS[provider]['last_verified'] = datetime.now().strftime('%Y-%m-%d')
print(f"Updated {provider} pricing: Input=${input_cost_per_1k}/1K, Output=${output_cost_per_1k}/1K")
return True
else:
print(f"Unknown provider: {provider}")
return False
def log_analysis_start(session_id, client, profile, user_info, file_info):
"""
Log the start of an analysis
Args:
session_id: Unique session identifier
client: Client name (diageo, unilever, loreal, general)
profile: Profile name used
user_info: User information from g.user (user_id, email, name)
file_info: File information (filename, size)
"""
log_entry = {
'event': 'analysis_start',
'timestamp': datetime.now().isoformat(),
'session_id': session_id,
'client': client,
'profile': profile,
'user_id': user_info.get('user_id'),
'user_email': user_info.get('email'),
'user_name': user_info.get('name'),
'filename': file_info.get('filename'),
'filesize': file_info.get('size')
}
_write_log_entry(log_entry)
return log_entry
def log_analysis_complete(session_id, client, profile, user_info, results):
"""
Log the completion of an analysis
Args:
session_id: Unique session identifier
client: Client name
profile: Profile name used
user_info: User information from g.user
results: Analysis results with check count, scores, token usage, etc.
"""
# Calculate cost based on actual token usage
total_cost, token_stats = _calculate_analysis_cost(results)
log_entry = {
'event': 'analysis_complete',
'timestamp': datetime.now().isoformat(),
'session_id': session_id,
'client': client,
'profile': profile,
'user_id': user_info.get('user_id'),
'user_email': user_info.get('email'),
'user_name': user_info.get('name'),
'checks_completed': results.get('checks_completed', 0),
'overall_score': results.get('overall_score', 0),
'status': results.get('status', 'unknown'),
'total_cost_usd': round(total_cost, 4),
'token_usage': token_stats
}
_write_log_entry(log_entry)
return log_entry
def log_check_execution(session_id, check_name, llm_used, execution_time_ms, token_usage=None):
"""Log individual check execution for detailed tracking"""
log_entry = {
'event': 'check_execution',
'timestamp': datetime.now().isoformat(),
'session_id': session_id,
'check_name': check_name,
'llm_used': llm_used,
'execution_time_ms': execution_time_ms
}
# Add token usage if provided
if token_usage:
log_entry['token_usage'] = token_usage
_write_log_entry(log_entry)
def log_user_login(user_info):
"""
Log a user login/visit event for tracking all platform users.
Args:
user_info: User information from auth (user_id, email, name)
"""
log_entry = {
'event': 'user_login',
'timestamp': datetime.now().isoformat(),
'user_id': user_info.get('user_id'),
'user_email': user_info.get('email'),
'user_name': user_info.get('name'),
}
_write_log_entry(log_entry)
return log_entry
def log_access_change(audit_entry):
"""
Log an access grant/revoke/promote/demote event.
Args:
audit_entry: dict from user_access.set_user_clients / promote_admin / demote_admin
"""
log_entry = {
'event': 'access_change',
'timestamp': datetime.now().isoformat(),
**audit_entry
}
_write_log_entry(log_entry)
return log_entry
def log_access_request(entry):
"""
Log a self-service client access request from a signed-in user.
Args:
entry: dict with at least user_email, user_name, requested_clients, recipients, email_sent
"""
log_entry = {
'event': 'access_request',
'timestamp': datetime.now().isoformat(),
**entry
}
_write_log_entry(log_entry)
return log_entry
def _calculate_analysis_cost(results):
"""
Calculate cost based on actual token usage from LLM responses
Returns:
tuple: (total_cost, token_statistics)
"""
total_cost = 0
token_stats = {
'total_prompt_tokens': 0,
'total_completion_tokens': 0,
'total_tokens': 0,
'by_provider': {}
}
check_results = results.get('check_results', {})
# Handle both dict (keyed by check name) and list formats
if isinstance(check_results, dict):
check_results = check_results.values()
for check_result in check_results:
# Skip if check_result is not a dict (e.g., string error message)
if not isinstance(check_result, dict):
continue
# Get model info
model_used = check_result.get('model_used', {})
if isinstance(model_used, dict):
provider = model_used.get('provider', 'Gemini')
else:
provider = 'Gemini' # Default
# Get actual token usage from check result
token_usage = check_result.get('token_usage', {})
prompt_tokens = token_usage.get('prompt_tokens', 0)
completion_tokens = token_usage.get('completion_tokens', 0)
total_tokens = token_usage.get('total_tokens', 0)
# If no token data available, use estimates as fallback
if total_tokens == 0:
prompt_tokens = 1000 # Estimate
completion_tokens = 200 # Estimate
total_tokens = prompt_tokens + completion_tokens
# Update statistics
token_stats['total_prompt_tokens'] += prompt_tokens
token_stats['total_completion_tokens'] += completion_tokens
token_stats['total_tokens'] += total_tokens
# Track by provider
if provider not in token_stats['by_provider']:
token_stats['by_provider'][provider] = {
'prompt_tokens': 0,
'completion_tokens': 0,
'total_tokens': 0,
'cost': 0
}
token_stats['by_provider'][provider]['prompt_tokens'] += prompt_tokens
token_stats['by_provider'][provider]['completion_tokens'] += completion_tokens
token_stats['by_provider'][provider]['total_tokens'] += total_tokens
# Calculate cost using actual tokens
if provider in COST_PER_1K_TOKENS:
cost_input = (prompt_tokens / 1000) * COST_PER_1K_TOKENS[provider]['input']
cost_output = (completion_tokens / 1000) * COST_PER_1K_TOKENS[provider]['output']
check_cost = cost_input + cost_output
total_cost += check_cost
token_stats['by_provider'][provider]['cost'] += check_cost
return total_cost, token_stats
def _write_log_entry(log_entry):
"""Write a log entry to the daily log file"""
# Create daily log file (YYYY-MM-DD.jsonl)
log_date = datetime.now().strftime('%Y-%m-%d')
log_file = os.path.join(USAGE_LOG_DIR, f'{log_date}.jsonl')
with log_lock:
with open(log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def get_usage_stats(start_date=None, end_date=None, client=None, user_id=None):
"""
Get usage statistics for a date range
Args:
start_date: Start date (YYYY-MM-DD) or None for all time
end_date: End date (YYYY-MM-DD) or None for today
client: Filter by client or None for all clients
user_id: Filter by user or None for all users
Returns:
Dictionary with usage statistics
"""
import glob
from collections import defaultdict
# Find relevant log files
if start_date and end_date:
# TODO: Implement date range filtering
pass
log_files = glob.glob(os.path.join(USAGE_LOG_DIR, '*.jsonl'))
stats = {
'total_analyses': 0,
'total_tokens': 0,
'total_prompt_tokens': 0,
'total_completion_tokens': 0,
'by_client': defaultdict(lambda: {
'count': 0,
'profiles': defaultdict(int),
'users': set(),
'total_checks': 0,
'estimated_cost': 0,
'total_tokens': 0,
'prompt_tokens': 0,
'completion_tokens': 0
}),
'by_user': defaultdict(lambda: {
'count': 0,
'clients': defaultdict(int),
'total_checks': 0,
'total_tokens': 0,
'prompt_tokens': 0,
'completion_tokens': 0
}),
'by_profile': defaultdict(int),
'by_provider': defaultdict(lambda: {
'total_tokens': 0,
'prompt_tokens': 0,
'completion_tokens': 0,
'cost': 0
})
}
# Read and aggregate logs
for log_file in log_files:
with open(log_file, 'r') as f:
for line in f:
try:
entry = json.loads(line.strip())
# Filter by client if specified
if client and entry.get('client') != client:
continue
# Filter by user if specified
if user_id and entry.get('user_id') != user_id:
continue
# Process complete analyses only
if entry.get('event') == 'analysis_complete':
stats['total_analyses'] += 1
# By client
client_name = entry.get('client', 'unknown')
profile_name = entry.get('profile', 'unknown')
user_email = entry.get('user_email', 'unknown')
# Get token usage data
token_usage = entry.get('token_usage', {})
total_tokens = token_usage.get('total_tokens', 0)
prompt_tokens = token_usage.get('total_prompt_tokens', 0)
completion_tokens = token_usage.get('total_completion_tokens', 0)
cost = entry.get('total_cost_usd', entry.get('estimated_cost_usd', 0))
# Update global stats
stats['total_tokens'] += total_tokens
stats['total_prompt_tokens'] += prompt_tokens
stats['total_completion_tokens'] += completion_tokens
stats['by_client'][client_name]['count'] += 1
stats['by_client'][client_name]['profiles'][profile_name] += 1
stats['by_client'][client_name]['users'].add(user_email)
stats['by_client'][client_name]['total_checks'] += entry.get('checks_completed', 0)
stats['by_client'][client_name]['estimated_cost'] += cost
stats['by_client'][client_name]['total_tokens'] += total_tokens
stats['by_client'][client_name]['prompt_tokens'] += prompt_tokens
stats['by_client'][client_name]['completion_tokens'] += completion_tokens
# By user
user_uid = entry.get('user_id', 'unknown')
stats['by_user'][user_uid]['count'] += 1
stats['by_user'][user_uid]['clients'][client_name] += 1
stats['by_user'][user_uid]['total_checks'] += entry.get('checks_completed', 0)
stats['by_user'][user_uid]['total_tokens'] += total_tokens
stats['by_user'][user_uid]['prompt_tokens'] += prompt_tokens
stats['by_user'][user_uid]['completion_tokens'] += completion_tokens
# By profile
stats['by_profile'][profile_name] += 1
# By provider
by_provider = token_usage.get('by_provider', {})
for provider, provider_stats in by_provider.items():
stats['by_provider'][provider]['total_tokens'] += provider_stats.get('total_tokens', 0)
stats['by_provider'][provider]['prompt_tokens'] += provider_stats.get('prompt_tokens', 0)
stats['by_provider'][provider]['completion_tokens'] += provider_stats.get('completion_tokens', 0)
stats['by_provider'][provider]['cost'] += provider_stats.get('cost', 0)
except json.JSONDecodeError:
continue
# Convert sets to lists for JSON serialization
for client_data in stats['by_client'].values():
client_data['users'] = list(client_data['users'])
return dict(stats)