import logging import os import requests from datetime import datetime, timedelta from database import ( notifications_collection, users_collection, agents_collection, completion_reminders_collection, ) logger = logging.getLogger(__name__) def is_mailgun_configured() -> bool: """Check if Mailgun environment variables are set.""" return bool(os.getenv("MAILGUN_API_KEY")) and bool(os.getenv("MAILGUN_DOMAIN")) def send_mailgun_email(to_emails: list[str], subject: str, html_body: str) -> bool: """Send email via Mailgun HTTP API. Returns True on success.""" api_key = os.getenv("MAILGUN_API_KEY") domain = os.getenv("MAILGUN_DOMAIN") from_email = os.getenv("MAILGUN_FROM_EMAIL", f"AgentHub ") reply_to = os.getenv("NOTIFICATION_REPLY_TO", "Nick.Viljoen@oliver.agency") data = { "from": from_email, "to": to_emails, "subject": subject, "html": html_body, } if reply_to: data["h:Reply-To"] = reply_to response = requests.post( f"https://api.mailgun.net/v3/{domain}/messages", auth=("api", api_key), data=data, timeout=10, ) return response.status_code == 200 def build_threshold_email(agent_name: str, weekly_tokens: int, threshold: int) -> str: """Build HTML email body for a weekly token threshold notification.""" return f"""

AgentHub - High Weekly Token Usage Alert

The following agent has exceeded the weekly token usage threshold:

Agent Name {agent_name}
Tokens (Last 7 Days) {weekly_tokens:,}
Weekly Threshold {threshold:,}

This is an automated notification from AgentHub. Please review the agent's recent token consumption.

""" async def check_and_notify_threshold(agent_name: str): """Check 7-day token usage against threshold and send notification if exceeded. Non-blocking, safe to call.""" if not is_mailgun_configured(): return threshold = int(os.getenv("TOKEN_USAGE_THRESHOLD", "100000")) # Calculate token usage over the last 7 days from usage_timeline agent = await agents_collection.find_one( {"agent_name": agent_name}, {"usage_timeline": 1} ) if not agent or not agent.get("usage_timeline"): return cutoff_date = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%d") weekly_tokens = sum( entry.get("token_count", 0) for entry in agent["usage_timeline"] if entry.get("date", "") >= cutoff_date ) if weekly_tokens < threshold: return cooldown_hours = int(os.getenv("NOTIFICATION_COOLDOWN_HOURS", "24")) cooldown_cutoff = datetime.utcnow() - timedelta(hours=cooldown_hours) # Check cooldown - skip if we already notified within the cooldown period recent = await notifications_collection.find_one({ "agent_name": agent_name, "sent_at": {"$gte": cooldown_cutoff}, }) if recent: return # Get admin user emails (exclude placeholder accounts with no real domain) admin_cursor = users_collection.find( {"is_admin": True, "is_active": True, "email": {"$not": {"$regex": r"@agenthub\.com$"}}}, {"email": 1}, ) admin_emails = [doc["email"] async for doc in admin_cursor] if not admin_emails: return # Send email subject = f"[AgentHub] High Weekly Token Usage: {agent_name} ({weekly_tokens:,} tokens in 7 days)" html_body = build_threshold_email(agent_name, weekly_tokens, threshold) success = send_mailgun_email(admin_emails, subject, html_body) # Record the notification attempt await notifications_collection.insert_one({ "agent_name": agent_name, "weekly_tokens": weekly_tokens, "threshold": threshold, "recipients": admin_emails, "success": success, "sent_at": datetime.utcnow(), }) def build_client_agent_email(agent_data: dict) -> str: """Build HTML email body for a client agent creation notification.""" return f"""

Client Agent Created

A new client-facing agent has been created and requires verification.

Agent Name {agent_data.get('agent_name', 'N/A')}
Description {agent_data.get('agent_description', 'N/A')}
Purpose {agent_data.get('agent_purpose', 'N/A')}
Client Yes
Client Name {agent_data.get('client_name', 'N/A')}
Studio Name {agent_data.get('studio_name', 'N/A')}
Tool {agent_data.get('agent_tool', 'N/A')}
Created By {agent_data.get('created_by_email', 'N/A')}

Please review this agent in AgentHub.

""" def send_client_agent_notification(agent_data: dict): """Send email notification when a client-facing agent is created. Non-blocking.""" if not is_mailgun_configured(): return notify_emails_str = os.getenv("CLIENT_AGENT_NOTIFY_EMAILS", "") if not notify_emails_str: return to_emails = [e.strip() for e in notify_emails_str.split(",") if e.strip()] if not to_emails: return try: subject = "Client Agent Created" html_body = build_client_agent_email(agent_data) send_mailgun_email(to_emails, subject, html_body) except Exception as e: print(f"Failed to send client agent notification: {e}") def build_weekly_digest_email(agents: list) -> str: """Build HTML email body for the weekly agent digest.""" rows = "" for i, agent in enumerate(agents, 1): rows += f""" {i}. {agent.get('agent_name', 'N/A')}
Purpose: {agent.get('agent_purpose', 'N/A')}
Description: {agent.get('agent_description', 'N/A')}
Created by: {agent.get('created_by_email', 'N/A')} """ return f"""

Agents Created in Last Week

{len(agents)} agent(s) created in the last week:

{rows}

View full details in the AgentHub admin dashboard.

""" def build_completion_reminder_email(user_name: str, agents: list, public_url: str) -> str: """Build the HTML body for a completion-reminder digest sent to one owner.""" rows = "" for a in agents: agent_id = str(a.get("_id")) link = f"{public_url.rstrip('/')}/agent-complete/{agent_id}" if public_url else f"/agent-complete/{agent_id}" rows += f""" {a.get('agent_name', 'N/A')}
{(a.get('agent_description') or 'No description')[:140]} Complete → """ count = len(agents) plural = "agent needs" if count == 1 else "agents need" return f"""

Agent registration needs your attention

Hi {user_name},

{count} {plural} a few extra details before {('it can' if count == 1 else 'they can')} be considered fully registered in AgentHub. These are agents that came in from LibreChat — please confirm the existing fields and fill in the new governance fields (Business Entity, Agent Type, Autonomy Level, IP Ownership, declarations).

{rows}

Click "Complete" on any agent above to fill in the missing fields. You'll be assigned as the agent's owner once you save.

""" async def send_completion_reminders(force: bool = False) -> dict: """Daily job: nudge owners whose LibreChat-synced agents need registration completed. - Groups incomplete agents by their resolved owner email (via `agent_contact_person`). - Per-user cooldown (default 7 days, COMPLETION_REMINDER_COOLDOWN_DAYS). - Per-user nudge cap (default 4, COMPLETION_REMINDER_MAX_NUDGES); after the cap we stop emailing — the agents stay incomplete and admins can pick them up. - Pass `force=True` to bypass cooldown + cap (used by the manual-trigger endpoint). - Non-blocking — failures are logged, never raise. """ if not is_mailgun_configured(): return {"status": "skipped", "reason": "mailgun_not_configured"} cooldown_days = int(os.getenv("COMPLETION_REMINDER_COOLDOWN_DAYS", "7")) max_nudges = int(os.getenv("COMPLETION_REMINDER_MAX_NUDGES", "4")) public_url = os.getenv("AGENTHUB_PUBLIC_URL", "").rstrip("/") if not public_url: logger.error( "AGENTHUB_PUBLIC_URL is not set; skipping completion reminders to avoid sending broken links" ) return {"status": "skipped", "reason": "public_url_not_configured"} # Gather all incomplete agents that have a contact email to nudge against. cursor = agents_collection.find( { "registration_complete": {"$ne": True}, "agent_contact_person": {"$nin": [None, ""]}, }, {"agent_name": 1, "agent_contact_person": 1, "agent_description": 1, "_id": 1}, ) incomplete_agents = await cursor.to_list(length=None) # Group by lowercased contact email. by_email: dict[str, list] = {} for agent in incomplete_agents: email = (agent.get("agent_contact_person") or "").strip().lower() if not email or "@" not in email: continue by_email.setdefault(email, []).append(agent) if not by_email: return {"status": "ok", "users_emailed": 0, "reason": "no_incomplete_agents_with_owners"} # Resolve which owner emails actually map to active users (case-insensitive). user_cursor = users_collection.find( {"is_active": True, "email": {"$regex": "@", "$options": "i"}}, {"email": 1, "full_name": 1}, ) users_by_lower_email: dict[str, dict] = {} async for u in user_cursor: email = (u.get("email") or "").strip().lower() if email: users_by_lower_email[email] = u cutoff = datetime.utcnow() - timedelta(days=cooldown_days) summary = { "status": "ok", "users_emailed": 0, "agents_total": 0, "skipped_cooldown": 0, "skipped_max_nudges": 0, "skipped_unresolved": 0, "failed": 0, } for email_lower, agents_for_user in by_email.items(): user = users_by_lower_email.get(email_lower) if not user: summary["skipped_unresolved"] += 1 continue existing = await completion_reminders_collection.find_one({"user_email": email_lower}) if existing and not force: if existing.get("nudge_count", 0) >= max_nudges: summary["skipped_max_nudges"] += 1 continue last_sent = existing.get("last_sent_at") if last_sent and last_sent >= cutoff: summary["skipped_cooldown"] += 1 continue full_name = user.get("full_name") or user.get("email") or email_lower recipient = user.get("email") # send to the canonically-cased email subject = f"[AgentHub] {len(agents_for_user)} agent(s) need registration completed" html_body = build_completion_reminder_email(full_name, agents_for_user, public_url) try: success = send_mailgun_email([recipient], subject, html_body) except Exception as e: print(f"Completion reminder failed for {email_lower}: {e}") success = False await completion_reminders_collection.update_one( {"user_email": email_lower}, { "$set": { "last_sent_at": datetime.utcnow(), "last_success": success, "last_agent_count": len(agents_for_user), }, "$inc": {"nudge_count": 1}, "$setOnInsert": {"first_sent_at": datetime.utcnow()}, }, upsert=True, ) if success: summary["users_emailed"] += 1 summary["agents_total"] += len(agents_for_user) else: summary["failed"] += 1 return summary async def send_weekly_agent_digest(): """Send weekly digest of agents created in the last 7 days to all admins.""" if not is_mailgun_configured(): print("Weekly digest: Mailgun not configured, skipping.") return cutoff = datetime.utcnow() - timedelta(days=7) # Find agents created in last 7 days cursor = agents_collection.find( {"created_at": {"$gte": cutoff}}, {"agent_name": 1, "agent_purpose": 1, "agent_description": 1, "created_by": 1} ).sort("created_at", -1) agents = await cursor.to_list(length=None) if not agents: print("Weekly digest: No agents created in last 7 days, skipping.") return # Resolve creator emails for agent in agents: created_by = agent.get("created_by", "") if created_by == "agent_collector_api": agent["created_by_email"] = "Agent Collector API" else: try: from bson import ObjectId user = await users_collection.find_one({"_id": ObjectId(created_by)}, {"email": 1}) agent["created_by_email"] = user["email"] if user else created_by except Exception: agent["created_by_email"] = created_by # Get admin emails (exclude placeholder accounts with no real domain) admin_cursor = users_collection.find( {"is_admin": True, "is_active": True, "email": {"$not": {"$regex": r"@agenthub\.com$"}}}, {"email": 1}, ) admin_emails = [doc["email"] async for doc in admin_cursor] if not admin_emails: print("Weekly digest: No admin emails found, skipping.") return subject = "Agents Created in Last Week" html_body = build_weekly_digest_email(agents) try: success = send_mailgun_email(admin_emails, subject, html_body) print(f"Weekly digest: Sent to {len(admin_emails)} admins, success={success}") except Exception as e: print(f"Weekly digest: Failed to send: {e}")