sandbox-reports/veo3_report.py
DJP facacc94d4 Update AI Tools Usage Report System - Multi-Tool Support
Enhanced the VEO3 usage report system to support all AI tool types:
- Added support for 6 tool types: VEO3, TEXT2IMAGE, TEXT2VOICE, SPEECH2SPEECH, DOCUMENT_TRANSLATION, VIDEOQUERY
- Updated Python report generator (veo3_report.py) with dynamic tool detection
- Updated PHP report page (report.php) with tool usage breakdown section
- Changed all "Prompts" references to "Requests" for clarity
- Updated refresh button to fetch only last 12 weeks (84 days) for better performance
- Made system dynamic to handle unknown tool types automatically
- Renamed report titles from "VEO3 Usage Report" to "AI Tools Usage Report"

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-01-08 14:50:04 -05:00

418 lines
18 KiB
Python

#!/usr/bin/env python3
"""
AI Tools Usage Report Generator
Fetches usage data from webhook and sends email reports via SMTP
Supports: VEO3, TEXT2IMAGE, TEXT2VOICE, SPEECH2SPEECH, DOCUMENT_TRANSLATION, VIDEOQUERY
"""
import os
import json
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta, timezone
from collections import defaultdict
from dotenv import load_dotenv
from typing import Dict, List, Tuple
# Load environment variables
load_dotenv()
# Configuration
WEBHOOK_URL = os.getenv('WEBHOOK_URL', 'https://hook.us1.make.celonis.com/u8i4yq6rydu8u8g9bfhk0xbajsyckrmj')
SMTP_SERVER = os.getenv('SMTP_SERVER', 'smtp.mailgun.org')
SMTP_PORT = int(os.getenv('SMTP_PORT', '587'))
SMTP_USER = os.getenv('SMTP_USER')
SMTP_PASSWORD = os.getenv('SMTP_PASSWORD')
SENDER_EMAIL = os.getenv('SENDER_EMAIL')
REPORT_RECIPIENTS = os.getenv('REPORT_RECIPIENTS', '').split(',')
# Cost configuration per tool type (adjust these as needed)
TOOL_COSTS = {
'VEO3': 3.20, # $3.20 per 8-second video
'TEXT2IMAGE': 0.04, # Average cost per image generation
'TEXT2VOICE': 0.30, # Cost per TTS conversion
'SPEECH2SPEECH': 0.50, # Cost per speech-to-speech conversion
'DOCUMENT_TRANSLATION': 0.10, # Cost per document translation
'VIDEOQUERY': 0.25, # Cost per video analysis query
}
# Tool display names
TOOL_NAMES = {
'VEO3': 'VEO3 Video Generation',
'TEXT2IMAGE': 'Text to Image',
'TEXT2VOICE': 'Text to Voice',
'SPEECH2SPEECH': 'Speech to Speech',
'DOCUMENT_TRANSLATION': 'Document Translation',
'VIDEOQUERY': 'Video Query Analysis',
}
def fetch_webhook_data(start_date: str, end_date: str) -> List[Dict]:
"""Fetch data from the webhook"""
print(f"Fetching data from {start_date} to {end_date}...")
payload = {
'start_date': start_date,
'end_date': end_date
}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
try:
response = requests.post(WEBHOOK_URL, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
print(f"✓ Fetched {len(data)} records")
return data
except Exception as e:
print(f"✗ Error fetching data: {e}")
return []
def analyze_period_data(data: List[Dict], period_start: datetime, period_end: datetime, period_name: str) -> Dict:
"""Analyze data for a specific time period"""
user_counts = defaultdict(int)
daily_counts = defaultdict(int)
tool_counts = defaultdict(int)
tool_user_counts = defaultdict(lambda: defaultdict(int))
total_requests = 0
prompt_lengths = []
total_cost = 0
for record in data:
item = record['data']
record_date = datetime.fromisoformat(item['Date'].replace('Z', '+00:00'))
# Filter to period
if not (period_start <= record_date <= period_end):
continue
total_requests += 1
user = item['USER']
tool = item['TOOL']
user_counts[user] += 1
tool_counts[tool] += 1
tool_user_counts[tool][user] += 1
date_str = record_date.strftime('%Y-%m-%d')
daily_counts[date_str] += 1
if 'PROMPT' in item and item['PROMPT']:
prompt_lengths.append(len(item['PROMPT']))
# Calculate cost based on tool type
cost_per_request = TOOL_COSTS.get(tool, 0)
total_cost += cost_per_request
# Sort users by count
top_users = sorted(user_counts.items(), key=lambda x: x[1], reverse=True)[:25]
# Sort tools by count
tool_breakdown = sorted(tool_counts.items(), key=lambda x: x[1], reverse=True)
# Get top users per tool
top_users_by_tool = {}
for tool, users in tool_user_counts.items():
top_users_by_tool[tool] = sorted(users.items(), key=lambda x: x[1], reverse=True)[:10]
# Calculate statistics
unique_users = len(user_counts)
avg_requests_per_user = total_requests / unique_users if unique_users > 0 else 0
avg_prompt_length = sum(prompt_lengths) / len(prompt_lengths) if prompt_lengths else 0
return {
'period_name': period_name,
'total_requests': total_requests,
'unique_users': unique_users,
'avg_requests_per_user': avg_requests_per_user,
'avg_prompt_length': avg_prompt_length,
'total_cost': total_cost,
'top_users': top_users,
'tool_breakdown': tool_breakdown,
'top_users_by_tool': top_users_by_tool,
'daily_counts': dict(sorted(daily_counts.items())),
'start_date': period_start.strftime('%Y-%m-%d'),
'end_date': period_end.strftime('%Y-%m-%d')
}
def generate_html_report(daily_data: Dict, weekly_data: Dict, monthly_data: Dict) -> str:
"""Generate HTML email report"""
def generate_user_table(users: List[Tuple[str, int]], total: int, period: str) -> str:
"""Generate HTML table for top users"""
if not users:
return f"<p>No activity in {period}</p>"
html = '<table style="width: 100%; border-collapse: collapse; font-size: 13px;">'
html += '''
<thead>
<tr style="background-color: #f8f9fa;">
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Rank</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">User</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Prompts</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Percentage</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Activity</th>
</tr>
</thead>
<tbody>
'''
max_count = users[0][1] if users else 1
for rank, (user, count) in enumerate(users, 1):
percentage = (count / total * 100) if total > 0 else 0
bar_width = (count / max_count * 100) if max_count > 0 else 0
badge = ''
rank_color = '#FFC407'
if rank == 1:
badge = '<span style="background-color: #f39c12; color: white; padding: 4px 8px; border-radius: 4px; font-size: 11px; font-weight: 600; margin-left: 5px;">🥇 #1</span>'
elif rank == 2:
badge = '<span style="background-color: #95a5a6; color: white; padding: 4px 8px; border-radius: 4px; font-size: 11px; font-weight: 600; margin-left: 5px;">🥈 #2</span>'
elif rank == 3:
badge = '<span style="background-color: #cd7f32; color: white; padding: 4px 8px; border-radius: 4px; font-size: 11px; font-weight: 600; margin-left: 5px;">🥉 #3</span>'
html += f'''
<tr style="border-bottom: 1px solid #ecf0f1;">
<td style="padding: 12px;"><span style="font-weight: 700; color: {rank_color};">#{rank}</span></td>
<td style="padding: 12px;">{user}{badge}</td>
<td style="padding: 12px;"><strong>{count:,}</strong></td>
<td style="padding: 12px;">{percentage:.2f}%</td>
<td style="padding: 12px;">
<div style="background: linear-gradient(90deg, #FFC407, #f5b800); height: 24px; width: {bar_width}%; border-radius: 4px;"></div>
</td>
</tr>
'''
html += '</tbody></table>'
return html
def generate_period_section(data: Dict, title: str, emoji: str) -> str:
"""Generate HTML section for a time period"""
if data['total_requests'] == 0:
return f'''
<div style="background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 8px rgba(0,0,0,0.08); margin-bottom: 30px;">
<h2 style="color: #2c3e50; margin-bottom: 20px; font-size: 22px; border-bottom: 2px solid #ecf0f1; padding-bottom: 10px;">
{emoji} {title}
</h2>
<p>No activity recorded for this period.</p>
</div>
'''
# Generate tool breakdown table
tool_breakdown_html = '<div style="margin: 25px 0;"><h3 style="color: #2c3e50; margin-bottom: 15px; font-size: 16px;">Tool Usage Breakdown</h3>'
tool_breakdown_html += '<table style="width: 100%; border-collapse: collapse; font-size: 13px;">'
tool_breakdown_html += '<thead><tr style="background-color: #f8f9fa;"><th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Tool</th><th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Requests</th><th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Cost</th><th style="padding: 12px; text-align: left; border-bottom: 1px solid #ecf0f1;">Percentage</th></tr></thead><tbody>'
for tool, count in data['tool_breakdown']:
tool_name = TOOL_NAMES.get(tool, tool)
tool_cost = count * TOOL_COSTS.get(tool, 0)
percentage = (count / data['total_requests'] * 100) if data['total_requests'] > 0 else 0
tool_breakdown_html += f'<tr style="border-bottom: 1px solid #ecf0f1;"><td style="padding: 12px;"><strong>{tool_name}</strong></td><td style="padding: 12px;">{count:,}</td><td style="padding: 12px;">${tool_cost:,.2f}</td><td style="padding: 12px;">{percentage:.1f}%</td></tr>'
tool_breakdown_html += '</tbody></table></div>'
return f'''
<div style="background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 8px rgba(0,0,0,0.08); margin-bottom: 30px;">
<h2 style="color: #2c3e50; margin-bottom: 20px; font-size: 22px; border-bottom: 2px solid #ecf0f1; padding-bottom: 10px;">
{emoji} {title}
</h2>
<p style="color: #7f8c8d; margin-bottom: 20px; font-size: 14px;">
{data['start_date']} to {data['end_date']}
</p>
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px; margin-bottom: 25px;">
<div style="background: white; padding: 20px; border-radius: 8px; border-left: 4px solid #FFC407; box-shadow: 0 1px 4px rgba(0,0,0,0.08);">
<h3 style="color: #7f8c8d; font-size: 12px; font-weight: 600; text-transform: uppercase; margin-bottom: 8px;">Total Requests</h3>
<div style="font-size: 28px; font-weight: 700; color: #2c3e50;">{data['total_requests']:,}</div>
</div>
<div style="background: white; padding: 20px; border-radius: 8px; border-left: 4px solid #FFC407; box-shadow: 0 1px 4px rgba(0,0,0,0.08);">
<h3 style="color: #7f8c8d; font-size: 12px; font-weight: 600; text-transform: uppercase; margin-bottom: 8px;">Total Cost</h3>
<div style="font-size: 28px; font-weight: 700; color: #2c3e50;">${data['total_cost']:,.2f}</div>
<div style="font-size: 11px; color: #95a5a6; margin-top: 4px;">All tools combined</div>
</div>
<div style="background: white; padding: 20px; border-radius: 8px; border-left: 4px solid #FFC407; box-shadow: 0 1px 4px rgba(0,0,0,0.08);">
<h3 style="color: #7f8c8d; font-size: 12px; font-weight: 600; text-transform: uppercase; margin-bottom: 8px;">Unique Users</h3>
<div style="font-size: 28px; font-weight: 700; color: #2c3e50;">{data['unique_users']:,}</div>
</div>
<div style="background: white; padding: 20px; border-radius: 8px; border-left: 4px solid #FFC407; box-shadow: 0 1px 4px rgba(0,0,0,0.08);">
<h3 style="color: #7f8c8d; font-size: 12px; font-weight: 600; text-transform: uppercase; margin-bottom: 8px;">Avg/User</h3>
<div style="font-size: 28px; font-weight: 700; color: #2c3e50;">{data['avg_requests_per_user']:.1f}</div>
</div>
</div>
{tool_breakdown_html}
<h3 style="color: #2c3e50; margin: 25px 0 15px 0; font-size: 18px;">Top 25 Users</h3>
{generate_user_table(data['top_users'], data['total_requests'], data['period_name'])}
</div>
'''
# Build full HTML
html = f'''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link href="https://fonts.googleapis.com/css2?family=Montserrat:wght@400;600;700&display=swap" rel="stylesheet">
<style>
body {{
font-family: 'Montserrat', Arial, sans-serif;
background-color: #f5f7fa;
margin: 0;
padding: 0;
}}
</style>
</head>
<body style="font-family: 'Montserrat', Arial, sans-serif; background-color: #f5f7fa; margin: 0; padding: 20px;">
<div style="max-width: 1200px; margin: 0 auto;">
<div style="background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 8px rgba(0,0,0,0.08); margin-bottom: 30px;">
<h1 style="color: #2c3e50; margin: 0 0 10px 0; font-size: 32px;">AI Tools Usage Report</h1>
<p style="color: #7f8c8d; margin: 0; font-size: 14px;">Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
{generate_period_section(daily_data, 'Last 24 Hours', '📅')}
{generate_period_section(weekly_data, 'Last 7 Days', '📊')}
{generate_period_section(monthly_data, 'Last 30 Days', '📈')}
<div style="background: white; padding: 20px; border-radius: 10px; box-shadow: 0 2px 8px rgba(0,0,0,0.08); text-align: center;">
<p style="color: #7f8c8d; margin: 0; font-size: 12px;">
This is an automated report. For questions, please contact your administrator.
</p>
</div>
</div>
</body>
</html>
'''
return html
def send_email_via_smtp(subject: str, html_content: str, recipients: List[str]) -> bool:
"""Send email via SMTP"""
if not SMTP_USER or not SMTP_PASSWORD or not SENDER_EMAIL:
print("✗ SMTP credentials not configured")
print(f" SMTP_USER: {'' if SMTP_USER else ''}")
print(f" SMTP_PASSWORD: {'' if SMTP_PASSWORD else ''}")
print(f" SENDER_EMAIL: {'' if SENDER_EMAIL else ''}")
return False
# Filter out empty recipients
recipients = [r.strip() for r in recipients if r.strip()]
if not recipients:
print("✗ No valid recipients configured")
return False
print(f"Sending email to {len(recipients)} recipient(s)...")
print(f" SMTP Server: {SMTP_SERVER}:{SMTP_PORT}")
print(f" From: {SENDER_EMAIL}")
print(f" To: {', '.join(recipients)}")
try:
# Create message
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = SENDER_EMAIL
msg['To'] = ', '.join(recipients)
# Attach HTML content
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
# Connect to SMTP server and send
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.set_debuglevel(0) # Set to 1 for verbose output
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
print("✓ Email sent successfully")
return True
except smtplib.SMTPAuthenticationError as e:
print(f"✗ SMTP Authentication Error: {e}")
print(" Check your SMTP_USER and SMTP_PASSWORD")
return False
except smtplib.SMTPException as e:
print(f"✗ SMTP Error: {e}")
return False
except Exception as e:
print(f"✗ Error sending email: {e}")
return False
def main():
"""Main execution function"""
print("=" * 60)
print("AI TOOLS USAGE REPORT GENERATOR")
print("=" * 60)
# Calculate date ranges (timezone-aware)
now = datetime.now(timezone.utc)
# Last 24 hours
daily_end = now
daily_start = now - timedelta(days=1)
# Last 7 days
weekly_end = now
weekly_start = now - timedelta(days=7)
# Last 30 days
monthly_end = now
monthly_start = now - timedelta(days=30)
# Fetch data (get last 30 days to cover all periods)
data_start = monthly_start.strftime('%Y-%m-%d')
data_end = monthly_end.strftime('%Y-%m-%d')
data = fetch_webhook_data(data_start, data_end)
if not data:
print("✗ No data retrieved, exiting")
return
print("\nAnalyzing data...")
# Analyze each period
daily_report = analyze_period_data(data, daily_start, daily_end, "Last 24 Hours")
weekly_report = analyze_period_data(data, weekly_start, weekly_end, "Last 7 Days")
monthly_report = analyze_period_data(data, monthly_start, monthly_end, "Last 30 Days")
print(f"✓ Daily: {daily_report['total_requests']} requests from {daily_report['unique_users']} users")
print(f"✓ Weekly: {weekly_report['total_requests']} requests from {weekly_report['unique_users']} users")
print(f"✓ Monthly: {monthly_report['total_requests']} requests from {monthly_report['unique_users']} users")
# Generate HTML report
print("\nGenerating HTML report...")
html_report = generate_html_report(daily_report, weekly_report, monthly_report)
# Save HTML to file
output_file = 'email_report.html'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_report)
print(f"✓ Report saved to {output_file}")
# Send email if configured
if REPORT_RECIPIENTS and REPORT_RECIPIENTS[0]:
subject = f"AI Tools Usage Report - {now.strftime('%Y-%m-%d')}"
send_email_via_smtp(subject, html_report, REPORT_RECIPIENTS)
else:
print("\n⚠ No recipients configured. Skipping email send.")
print(" Set REPORT_RECIPIENTS in .env file to enable email delivery.")
print("\n" + "=" * 60)
print("REPORT GENERATION COMPLETE")
print("=" * 60)
if __name__ == "__main__":
main()