Settings panel: - Reference Assets tab: collapse the Brand Name + Tags + Description form to a single Name field; the user-entered name now drives the dropdown label on the main configuration page (falls back to filename for legacy records). - Media Plan tab: add a Name field. Backend stores display_name on the plan record, and both the active-plan card and the main-page dropdown prefer display_name (falling back to original_filename for old plans). - Modal footer is now context-aware: Save Profile + Cancel show only on the Profile / Create Profile tabs; Reference Assets / QC Tools / Media Plan show a single green Save button that closes the modal. Client access request: - New "Request Client Access" tile on the client picker, alongside the user's existing client tiles. Opens a modal that auto-fills name + email from the MSAL session (read-only), shows checkboxes for clients the user does not already have, and accepts an optional reason. - New POST /api/access_request endpoint (auth-required) that takes identity from the verified session, validates the requested clients, looks up admin recipients via user_access.list_access_entries, and emails them via the new email_service module (Mailgun SMTP with STARTTLS). Reply-To is set to the requester. Logs an access_request event to the daily JSONL usage logs. - New GET /api/all_clients endpoint so the form can list clients the requester currently cannot see. - Mailgun SMTP credentials added to the four env files (and placeholders in the .env.template files) under SMTP_SERVER / SMTP_PORT / SMTP_USER / SMTP_PASSWORD / SENDER_EMAIL / ERROR_EMAIL / REPORT_EMAILS. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
411 lines
15 KiB
Python
411 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Usage tracking module for monitoring profile usage, user activity, and cost estimation
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
from threading import Lock
|
|
|
|
# Usage log directory
|
|
USAGE_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'usage_logs')
|
|
os.makedirs(USAGE_LOG_DIR, exist_ok=True)
|
|
|
|
# Thread-safe log writing
|
|
log_lock = Lock()
|
|
|
|
# Cost per 1K tokens (USD)
|
|
# Last updated: 2026-04-22
|
|
# Source: Official API pricing pages
|
|
#
|
|
# OpenAI GPT-4o: https://openai.com/api/pricing/
|
|
# Google Gemini 2.5 Pro: https://ai.google.dev/gemini-api/docs/pricing
|
|
#
|
|
# Notes:
|
|
# - Gemini 2.5 Pro uses tiered pricing: standard rate for prompts ≤200K tokens,
|
|
# 2x rate for prompts >200K. Our typical QC calls (image + prompt) are well
|
|
# under 200K so we use the standard tier. If brand guideline PDFs push
|
|
# usage into the >200K tier we'd need to detect this per-call.
|
|
# - Output pricing for Gemini 2.5 Pro doubled from $5/1M to $10/1M between
|
|
# 2026-02 and 2026-04 — prior reports understated Gemini cost.
|
|
#
|
|
# IMPORTANT: Update these values when pricing changes
|
|
# To verify current pricing, run: python validate_pricing.py
|
|
COST_PER_1K_TOKENS = {
|
|
'OpenAI': {
|
|
'input': 0.0025, # GPT-4o: $2.50 per 1M input tokens
|
|
'output': 0.010, # GPT-4o: $10.00 per 1M output tokens
|
|
'model': 'gpt-4o',
|
|
'last_verified': '2026-04-22'
|
|
},
|
|
'Gemini': {
|
|
'input': 0.00125, # Gemini 2.5 Pro (≤200K prompt): $1.25 per 1M input tokens
|
|
'output': 0.010, # Gemini 2.5 Pro (≤200K prompt): $10.00 per 1M output tokens
|
|
'model': 'gemini-2.5-pro',
|
|
'last_verified': '2026-04-22'
|
|
}
|
|
}
|
|
|
|
def get_pricing_info():
|
|
"""Get current pricing information for all providers"""
|
|
return COST_PER_1K_TOKENS.copy()
|
|
|
|
def update_pricing(provider, input_cost_per_1k, output_cost_per_1k):
|
|
"""
|
|
Update pricing for a specific provider
|
|
|
|
Args:
|
|
provider: 'OpenAI' or 'Gemini'
|
|
input_cost_per_1k: Cost per 1K input tokens (USD)
|
|
output_cost_per_1k: Cost per 1K output tokens (USD)
|
|
"""
|
|
if provider in COST_PER_1K_TOKENS:
|
|
COST_PER_1K_TOKENS[provider]['input'] = input_cost_per_1k
|
|
COST_PER_1K_TOKENS[provider]['output'] = output_cost_per_1k
|
|
COST_PER_1K_TOKENS[provider]['last_verified'] = datetime.now().strftime('%Y-%m-%d')
|
|
print(f"Updated {provider} pricing: Input=${input_cost_per_1k}/1K, Output=${output_cost_per_1k}/1K")
|
|
return True
|
|
else:
|
|
print(f"Unknown provider: {provider}")
|
|
return False
|
|
|
|
def log_analysis_start(session_id, client, profile, user_info, file_info):
|
|
"""
|
|
Log the start of an analysis
|
|
|
|
Args:
|
|
session_id: Unique session identifier
|
|
client: Client name (diageo, unilever, loreal, general)
|
|
profile: Profile name used
|
|
user_info: User information from g.user (user_id, email, name)
|
|
file_info: File information (filename, size)
|
|
"""
|
|
log_entry = {
|
|
'event': 'analysis_start',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'session_id': session_id,
|
|
'client': client,
|
|
'profile': profile,
|
|
'user_id': user_info.get('user_id'),
|
|
'user_email': user_info.get('email'),
|
|
'user_name': user_info.get('name'),
|
|
'filename': file_info.get('filename'),
|
|
'filesize': file_info.get('size')
|
|
}
|
|
|
|
_write_log_entry(log_entry)
|
|
return log_entry
|
|
|
|
def log_analysis_complete(session_id, client, profile, user_info, results):
|
|
"""
|
|
Log the completion of an analysis
|
|
|
|
Args:
|
|
session_id: Unique session identifier
|
|
client: Client name
|
|
profile: Profile name used
|
|
user_info: User information from g.user
|
|
results: Analysis results with check count, scores, token usage, etc.
|
|
"""
|
|
# Calculate cost based on actual token usage
|
|
total_cost, token_stats = _calculate_analysis_cost(results)
|
|
|
|
log_entry = {
|
|
'event': 'analysis_complete',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'session_id': session_id,
|
|
'client': client,
|
|
'profile': profile,
|
|
'user_id': user_info.get('user_id'),
|
|
'user_email': user_info.get('email'),
|
|
'user_name': user_info.get('name'),
|
|
'checks_completed': results.get('checks_completed', 0),
|
|
'overall_score': results.get('overall_score', 0),
|
|
'status': results.get('status', 'unknown'),
|
|
'total_cost_usd': round(total_cost, 4),
|
|
'token_usage': token_stats
|
|
}
|
|
|
|
_write_log_entry(log_entry)
|
|
return log_entry
|
|
|
|
def log_check_execution(session_id, check_name, llm_used, execution_time_ms, token_usage=None):
|
|
"""Log individual check execution for detailed tracking"""
|
|
log_entry = {
|
|
'event': 'check_execution',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'session_id': session_id,
|
|
'check_name': check_name,
|
|
'llm_used': llm_used,
|
|
'execution_time_ms': execution_time_ms
|
|
}
|
|
|
|
# Add token usage if provided
|
|
if token_usage:
|
|
log_entry['token_usage'] = token_usage
|
|
|
|
_write_log_entry(log_entry)
|
|
|
|
def log_user_login(user_info):
|
|
"""
|
|
Log a user login/visit event for tracking all platform users.
|
|
|
|
Args:
|
|
user_info: User information from auth (user_id, email, name)
|
|
"""
|
|
log_entry = {
|
|
'event': 'user_login',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'user_id': user_info.get('user_id'),
|
|
'user_email': user_info.get('email'),
|
|
'user_name': user_info.get('name'),
|
|
}
|
|
|
|
_write_log_entry(log_entry)
|
|
return log_entry
|
|
|
|
|
|
def log_access_change(audit_entry):
|
|
"""
|
|
Log an access grant/revoke/promote/demote event.
|
|
|
|
Args:
|
|
audit_entry: dict from user_access.set_user_clients / promote_admin / demote_admin
|
|
"""
|
|
log_entry = {
|
|
'event': 'access_change',
|
|
'timestamp': datetime.now().isoformat(),
|
|
**audit_entry
|
|
}
|
|
_write_log_entry(log_entry)
|
|
return log_entry
|
|
|
|
|
|
def log_access_request(entry):
|
|
"""
|
|
Log a self-service client access request from a signed-in user.
|
|
|
|
Args:
|
|
entry: dict with at least user_email, user_name, requested_clients, recipients, email_sent
|
|
"""
|
|
log_entry = {
|
|
'event': 'access_request',
|
|
'timestamp': datetime.now().isoformat(),
|
|
**entry
|
|
}
|
|
_write_log_entry(log_entry)
|
|
return log_entry
|
|
|
|
|
|
def _calculate_analysis_cost(results):
|
|
"""
|
|
Calculate cost based on actual token usage from LLM responses
|
|
|
|
Returns:
|
|
tuple: (total_cost, token_statistics)
|
|
"""
|
|
total_cost = 0
|
|
token_stats = {
|
|
'total_prompt_tokens': 0,
|
|
'total_completion_tokens': 0,
|
|
'total_tokens': 0,
|
|
'by_provider': {}
|
|
}
|
|
|
|
check_results = results.get('check_results', {})
|
|
|
|
# Handle both dict (keyed by check name) and list formats
|
|
if isinstance(check_results, dict):
|
|
check_results = check_results.values()
|
|
|
|
for check_result in check_results:
|
|
# Skip if check_result is not a dict (e.g., string error message)
|
|
if not isinstance(check_result, dict):
|
|
continue
|
|
|
|
# Get model info
|
|
model_used = check_result.get('model_used', {})
|
|
if isinstance(model_used, dict):
|
|
provider = model_used.get('provider', 'Gemini')
|
|
else:
|
|
provider = 'Gemini' # Default
|
|
|
|
# Get actual token usage from check result
|
|
token_usage = check_result.get('token_usage', {})
|
|
prompt_tokens = token_usage.get('prompt_tokens', 0)
|
|
completion_tokens = token_usage.get('completion_tokens', 0)
|
|
total_tokens = token_usage.get('total_tokens', 0)
|
|
|
|
# If no token data available, use estimates as fallback
|
|
if total_tokens == 0:
|
|
prompt_tokens = 1000 # Estimate
|
|
completion_tokens = 200 # Estimate
|
|
total_tokens = prompt_tokens + completion_tokens
|
|
|
|
# Update statistics
|
|
token_stats['total_prompt_tokens'] += prompt_tokens
|
|
token_stats['total_completion_tokens'] += completion_tokens
|
|
token_stats['total_tokens'] += total_tokens
|
|
|
|
# Track by provider
|
|
if provider not in token_stats['by_provider']:
|
|
token_stats['by_provider'][provider] = {
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0,
|
|
'total_tokens': 0,
|
|
'cost': 0
|
|
}
|
|
|
|
token_stats['by_provider'][provider]['prompt_tokens'] += prompt_tokens
|
|
token_stats['by_provider'][provider]['completion_tokens'] += completion_tokens
|
|
token_stats['by_provider'][provider]['total_tokens'] += total_tokens
|
|
|
|
# Calculate cost using actual tokens
|
|
if provider in COST_PER_1K_TOKENS:
|
|
cost_input = (prompt_tokens / 1000) * COST_PER_1K_TOKENS[provider]['input']
|
|
cost_output = (completion_tokens / 1000) * COST_PER_1K_TOKENS[provider]['output']
|
|
check_cost = cost_input + cost_output
|
|
total_cost += check_cost
|
|
token_stats['by_provider'][provider]['cost'] += check_cost
|
|
|
|
return total_cost, token_stats
|
|
|
|
def _write_log_entry(log_entry):
|
|
"""Write a log entry to the daily log file"""
|
|
# Create daily log file (YYYY-MM-DD.jsonl)
|
|
log_date = datetime.now().strftime('%Y-%m-%d')
|
|
log_file = os.path.join(USAGE_LOG_DIR, f'{log_date}.jsonl')
|
|
|
|
with log_lock:
|
|
with open(log_file, 'a') as f:
|
|
f.write(json.dumps(log_entry) + '\n')
|
|
|
|
def get_usage_stats(start_date=None, end_date=None, client=None, user_id=None):
|
|
"""
|
|
Get usage statistics for a date range
|
|
|
|
Args:
|
|
start_date: Start date (YYYY-MM-DD) or None for all time
|
|
end_date: End date (YYYY-MM-DD) or None for today
|
|
client: Filter by client or None for all clients
|
|
user_id: Filter by user or None for all users
|
|
|
|
Returns:
|
|
Dictionary with usage statistics
|
|
"""
|
|
import glob
|
|
from collections import defaultdict
|
|
|
|
# Find relevant log files
|
|
if start_date and end_date:
|
|
# TODO: Implement date range filtering
|
|
pass
|
|
|
|
log_files = glob.glob(os.path.join(USAGE_LOG_DIR, '*.jsonl'))
|
|
|
|
stats = {
|
|
'total_analyses': 0,
|
|
'total_tokens': 0,
|
|
'total_prompt_tokens': 0,
|
|
'total_completion_tokens': 0,
|
|
'by_client': defaultdict(lambda: {
|
|
'count': 0,
|
|
'profiles': defaultdict(int),
|
|
'users': set(),
|
|
'total_checks': 0,
|
|
'estimated_cost': 0,
|
|
'total_tokens': 0,
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0
|
|
}),
|
|
'by_user': defaultdict(lambda: {
|
|
'count': 0,
|
|
'clients': defaultdict(int),
|
|
'total_checks': 0,
|
|
'total_tokens': 0,
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0
|
|
}),
|
|
'by_profile': defaultdict(int),
|
|
'by_provider': defaultdict(lambda: {
|
|
'total_tokens': 0,
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0,
|
|
'cost': 0
|
|
})
|
|
}
|
|
|
|
# Read and aggregate logs
|
|
for log_file in log_files:
|
|
with open(log_file, 'r') as f:
|
|
for line in f:
|
|
try:
|
|
entry = json.loads(line.strip())
|
|
|
|
# Filter by client if specified
|
|
if client and entry.get('client') != client:
|
|
continue
|
|
|
|
# Filter by user if specified
|
|
if user_id and entry.get('user_id') != user_id:
|
|
continue
|
|
|
|
# Process complete analyses only
|
|
if entry.get('event') == 'analysis_complete':
|
|
stats['total_analyses'] += 1
|
|
|
|
# By client
|
|
client_name = entry.get('client', 'unknown')
|
|
profile_name = entry.get('profile', 'unknown')
|
|
user_email = entry.get('user_email', 'unknown')
|
|
|
|
# Get token usage data
|
|
token_usage = entry.get('token_usage', {})
|
|
total_tokens = token_usage.get('total_tokens', 0)
|
|
prompt_tokens = token_usage.get('total_prompt_tokens', 0)
|
|
completion_tokens = token_usage.get('total_completion_tokens', 0)
|
|
cost = entry.get('total_cost_usd', entry.get('estimated_cost_usd', 0))
|
|
|
|
# Update global stats
|
|
stats['total_tokens'] += total_tokens
|
|
stats['total_prompt_tokens'] += prompt_tokens
|
|
stats['total_completion_tokens'] += completion_tokens
|
|
|
|
stats['by_client'][client_name]['count'] += 1
|
|
stats['by_client'][client_name]['profiles'][profile_name] += 1
|
|
stats['by_client'][client_name]['users'].add(user_email)
|
|
stats['by_client'][client_name]['total_checks'] += entry.get('checks_completed', 0)
|
|
stats['by_client'][client_name]['estimated_cost'] += cost
|
|
stats['by_client'][client_name]['total_tokens'] += total_tokens
|
|
stats['by_client'][client_name]['prompt_tokens'] += prompt_tokens
|
|
stats['by_client'][client_name]['completion_tokens'] += completion_tokens
|
|
|
|
# By user
|
|
user_uid = entry.get('user_id', 'unknown')
|
|
stats['by_user'][user_uid]['count'] += 1
|
|
stats['by_user'][user_uid]['clients'][client_name] += 1
|
|
stats['by_user'][user_uid]['total_checks'] += entry.get('checks_completed', 0)
|
|
stats['by_user'][user_uid]['total_tokens'] += total_tokens
|
|
stats['by_user'][user_uid]['prompt_tokens'] += prompt_tokens
|
|
stats['by_user'][user_uid]['completion_tokens'] += completion_tokens
|
|
|
|
# By profile
|
|
stats['by_profile'][profile_name] += 1
|
|
|
|
# By provider
|
|
by_provider = token_usage.get('by_provider', {})
|
|
for provider, provider_stats in by_provider.items():
|
|
stats['by_provider'][provider]['total_tokens'] += provider_stats.get('total_tokens', 0)
|
|
stats['by_provider'][provider]['prompt_tokens'] += provider_stats.get('prompt_tokens', 0)
|
|
stats['by_provider'][provider]['completion_tokens'] += provider_stats.get('completion_tokens', 0)
|
|
stats['by_provider'][provider]['cost'] += provider_stats.get('cost', 0)
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Convert sets to lists for JSON serialization
|
|
for client_data in stats['by_client'].values():
|
|
client_data['users'] = list(client_data['users'])
|
|
|
|
return dict(stats)
|