import os import requests from datetime import datetime, timedelta from database import notifications_collection, users_collection, agents_collection def is_mailgun_configured() -> bool: """Check if Mailgun environment variables are set.""" return bool(os.getenv("MAILGUN_API_KEY")) and bool(os.getenv("MAILGUN_DOMAIN")) def send_mailgun_email(to_emails: list[str], subject: str, html_body: str) -> bool: """Send email via Mailgun HTTP API. Returns True on success.""" api_key = os.getenv("MAILGUN_API_KEY") domain = os.getenv("MAILGUN_DOMAIN") from_email = os.getenv("MAILGUN_FROM_EMAIL", f"AgentHub ") response = requests.post( f"https://api.mailgun.net/v3/{domain}/messages", auth=("api", api_key), data={ "from": from_email, "to": to_emails, "subject": subject, "html": html_body, }, timeout=10, ) return response.status_code == 200 def build_threshold_email(agent_name: str, total_tokens: int, threshold: int) -> str: """Build HTML email body for a token threshold notification.""" return f"""

AgentHub - High Token Usage Alert

The following agent has exceeded the token usage threshold:

Agent Name {agent_name}
Total Tokens {total_tokens:,}
Threshold {threshold:,}

This is an automated notification from AgentHub. Please review the agent's token consumption.

""" async def check_and_notify_threshold(agent_name: str, total_tokens: int): """Check token threshold and send notification if exceeded. Non-blocking, safe to call.""" if not is_mailgun_configured(): return threshold = int(os.getenv("TOKEN_USAGE_THRESHOLD", "100000")) if total_tokens < threshold: return cooldown_hours = int(os.getenv("NOTIFICATION_COOLDOWN_HOURS", "24")) cooldown_cutoff = datetime.utcnow() - timedelta(hours=cooldown_hours) # Check cooldown - skip if we already notified within the cooldown period recent = await notifications_collection.find_one({ "agent_name": agent_name, "sent_at": {"$gte": cooldown_cutoff}, }) if recent: return # Get admin user emails admin_cursor = users_collection.find( {"is_admin": True, "is_active": True}, {"email": 1}, ) admin_emails = [doc["email"] async for doc in admin_cursor] if not admin_emails: return # Send email subject = f"[AgentHub] High Token Usage: {agent_name} ({total_tokens:,} tokens)" html_body = build_threshold_email(agent_name, total_tokens, threshold) success = send_mailgun_email(admin_emails, subject, html_body) # Record the notification attempt await notifications_collection.insert_one({ "agent_name": agent_name, "total_tokens": total_tokens, "threshold": threshold, "recipients": admin_emails, "success": success, "sent_at": datetime.utcnow(), }) def build_client_agent_email(agent_data: dict) -> str: """Build HTML email body for a client agent creation notification.""" return f"""

Client Agent Created

A new client-facing agent has been created and requires verification.

Agent Name {agent_data.get('agent_name', 'N/A')}
Description {agent_data.get('agent_description', 'N/A')}
Purpose {agent_data.get('agent_purpose', 'N/A')}
Client Yes
Client Name {agent_data.get('client_name', 'N/A')}
Studio Name {agent_data.get('studio_name', 'N/A')}
Tool {agent_data.get('agent_tool', 'N/A')}
Created By {agent_data.get('created_by_email', 'N/A')}

Please review this agent in AgentHub.

""" def send_client_agent_notification(agent_data: dict): """Send email notification when a client-facing agent is created. Non-blocking.""" if not is_mailgun_configured(): return notify_emails_str = os.getenv("CLIENT_AGENT_NOTIFY_EMAILS", "") if not notify_emails_str: return to_emails = [e.strip() for e in notify_emails_str.split(",") if e.strip()] if not to_emails: return try: subject = "Client Agent Created" html_body = build_client_agent_email(agent_data) send_mailgun_email(to_emails, subject, html_body) except Exception as e: print(f"Failed to send client agent notification: {e}") def build_daily_digest_email(agents: list) -> str: """Build HTML email body for the daily agent digest.""" rows = "" for i, agent in enumerate(agents, 1): rows += f""" {i}. {agent.get('agent_name', 'N/A')}
Purpose: {agent.get('agent_purpose', 'N/A')}
Description: {agent.get('agent_description', 'N/A')}
Created by: {agent.get('created_by_email', 'N/A')} """ return f"""

Agents Created Last 24 Hours

{len(agents)} agent(s) created in the last 24 hours:

{rows}

View full details in the AgentHub admin dashboard.

""" async def send_daily_agent_digest(): """Send daily digest of agents created in the last 24 hours to all admins.""" if not is_mailgun_configured(): print("Daily digest: Mailgun not configured, skipping.") return cutoff = datetime.utcnow() - timedelta(hours=24) # Find agents created in last 24 hours cursor = agents_collection.find( {"created_at": {"$gte": cutoff}}, {"agent_name": 1, "agent_purpose": 1, "agent_description": 1, "created_by": 1} ).sort("created_at", -1) agents = await cursor.to_list(length=None) if not agents: print("Daily digest: No agents created in last 24 hours, skipping.") return # Resolve creator emails for agent in agents: created_by = agent.get("created_by", "") if created_by == "agent_collector_api": agent["created_by_email"] = "Agent Collector API" else: try: from bson import ObjectId user = await users_collection.find_one({"_id": ObjectId(created_by)}, {"email": 1}) agent["created_by_email"] = user["email"] if user else created_by except Exception: agent["created_by_email"] = created_by # Get admin emails admin_cursor = users_collection.find( {"is_admin": True, "is_active": True}, {"email": 1}, ) admin_emails = [doc["email"] async for doc in admin_cursor] if not admin_emails: print("Daily digest: No admin emails found, skipping.") return subject = "Agents Created Last 24 Hours" html_body = build_daily_digest_email(agents) try: success = send_mailgun_email(admin_emails, subject, html_body) print(f"Daily digest: Sent to {len(admin_emails)} admins, success={success}") except Exception as e: print(f"Daily digest: Failed to send: {e}")