import os import requests from datetime import datetime, timedelta from database import notifications_collection, users_collection def is_mailgun_configured() -> bool: """Check if Mailgun environment variables are set.""" return bool(os.getenv("MAILGUN_API_KEY")) and bool(os.getenv("MAILGUN_DOMAIN")) def send_mailgun_email(to_emails: list[str], subject: str, html_body: str) -> bool: """Send email via Mailgun HTTP API. Returns True on success.""" api_key = os.getenv("MAILGUN_API_KEY") domain = os.getenv("MAILGUN_DOMAIN") from_email = os.getenv("MAILGUN_FROM_EMAIL", f"AgentHub ") response = requests.post( f"https://api.mailgun.net/v3/{domain}/messages", auth=("api", api_key), data={ "from": from_email, "to": to_emails, "subject": subject, "html": html_body, }, timeout=10, ) return response.status_code == 200 def build_threshold_email(agent_name: str, total_tokens: int, threshold: int) -> str: """Build HTML email body for a token threshold notification.""" return f"""

AgentHub - High Token Usage Alert

The following agent has exceeded the token usage threshold:

Agent Name {agent_name}
Total Tokens {total_tokens:,}
Threshold {threshold:,}

This is an automated notification from AgentHub. Please review the agent's token consumption.

""" async def check_and_notify_threshold(agent_name: str, total_tokens: int): """Check token threshold and send notification if exceeded. Non-blocking, safe to call.""" if not is_mailgun_configured(): return threshold = int(os.getenv("TOKEN_USAGE_THRESHOLD", "100000")) if total_tokens < threshold: return cooldown_hours = int(os.getenv("NOTIFICATION_COOLDOWN_HOURS", "24")) cooldown_cutoff = datetime.utcnow() - timedelta(hours=cooldown_hours) # Check cooldown - skip if we already notified within the cooldown period recent = await notifications_collection.find_one({ "agent_name": agent_name, "sent_at": {"$gte": cooldown_cutoff}, }) if recent: return # Get admin user emails admin_cursor = users_collection.find( {"is_admin": True, "is_active": True}, {"email": 1}, ) admin_emails = [doc["email"] async for doc in admin_cursor] if not admin_emails: return # Send email subject = f"[AgentHub] High Token Usage: {agent_name} ({total_tokens:,} tokens)" html_body = build_threshold_email(agent_name, total_tokens, threshold) success = send_mailgun_email(admin_emails, subject, html_body) # Record the notification attempt await notifications_collection.insert_one({ "agent_name": agent_name, "total_tokens": total_tokens, "threshold": threshold, "recipients": admin_emails, "success": success, "sent_at": datetime.utcnow(), })