agent_tracker/main.py
nickviljoen 938691e598 Add filtered XLSX export, fix completion reminders, consolidate admin UI
Export & filtering
- Replace /api/admin/agents/export/csv with /api/admin/agents/export/xlsx
  (openpyxl). Multi-line system prompts stay inside one cell instead of
  fragmenting into thousands of rows when opened in Excel.
- Accept filter query params on export: status, discipline, audit,
  business_entity, agent_classification, autonomy_level, risks_only, search.
- Move Export/Import CSV/Delete by CSV buttons into the Agents Management
  tab, drop the duplicate links from the top nav, and rebuild the cramped
  filter row as a wrappable two-row layout.
- Add a Discipline dropdown to the Agents Management filter row to match
  the Prompt Audit tab.

Completion-reminder emails (fix for the broken Complete-button links)
- Add h:Reply-To header to every Mailgun send so users can reply to a real
  mailbox instead of noreply@. Default Nick.Viljoen@oliver.agency, overridable
  via NOTIFICATION_REPLY_TO env var.
- send_completion_reminders now skips with an error log when
  AGENTHUB_PUBLIC_URL is unset instead of mailing relative links email
  clients can't follow.

UI polish
- Restrict the 5-second alert auto-hide to .alert-dismissible so the
  load-bearing 'unresolved owner' banner stays visible until acted on.
- Move the orange brand gradient to a fixed body::before pseudo-element so
  it can't be covered by the white content card or scrolled out of view.
- Lock the navbar to viewport top (position: fixed) and reserve body
  padding-top so content doesn't sit beneath it.

Audit polish (carried over from previous WIP)
- Add batch-state tracking to audit_analyzer for in-flight progress visibility.
- Update PLAN-prompt-audit.md to match the shipped behaviour.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 22:45:29 +02:00

2507 lines
102 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, Request, Form, Query, File, UploadFile
from fastapi.security import OAuth2PasswordBearer
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse, StreamingResponse
from starlette.middleware.sessions import SessionMiddleware
from typing import List, Optional
from pydantic import BaseModel
from database import users_collection
import crud
import models
import auth
import config
import msal_auth
import notifications
import audit_analyzer
from datetime import datetime
import os
import re
from dotenv import load_dotenv
from fastapi import Header
import csv
import io
import json
import asyncio
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font
load_dotenv()
app = FastAPI(
title="AgentHub",
description="AI Agent Management System with comprehensive CRUD operations",
version="1.0.0",
root_path=config.get_base_path()
)
# Add session middleware for MSAL state management
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-session-secret-key"))
# Mount static files with explicit path handling
from fastapi.responses import FileResponse
import os as path_os
@app.get("/static/{filename:path}")
async def serve_static(filename: str):
"""Serve static files with proper path handling"""
file_path = path_os.path.join("static", filename)
if path_os.path.exists(file_path) and path_os.path.isfile(file_path):
return FileResponse(file_path)
raise HTTPException(status_code=404, detail="Static file not found")
# Also mount the traditional way as backup
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
def get_app_url(path: str) -> str:
"""Get URL with proper base path - production fix"""
base_path = os.getenv("BASE_PATH", "").rstrip("/")
path = path.lstrip("/")
if base_path:
return f"{base_path}/{path}"
return f"/{path}" if path else "/"
# Debug route to check configuration
@app.get("/debug/config")
async def debug_config():
return {
"base_path": config.get_base_path(),
"env_base_path": os.getenv("BASE_PATH"),
"root_path_from_app": app.root_path,
"get_app_url_test": get_app_url("agent-management")
}
def get_template_context(request: Request, current_user=None, **kwargs):
"""Get standard template context with base path and MSAL info"""
return {
"request": request,
"current_user": current_user,
"base_path": config.get_base_path(),
"msal_enabled": msal_auth.is_msal_available(),
"show_local_login": config.show_local_login(),
**kwargs
}
def is_valid_email(email: str) -> bool:
"""Validate email format using regex"""
if not email:
return False
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def can_user_edit_agent(agent: dict, current_user: dict) -> bool:
"""Check if user can edit an agent based on ownership or contact person match"""
# Admins can edit any agent
if current_user.get("is_admin"):
return True
# User can edit agents they created
if agent["created_by"] == str(current_user["_id"]):
return True
# User can edit agents where they are the contact person (if valid email and case-insensitive match)
agent_contact = agent.get("agent_contact_person")
if agent_contact and is_valid_email(agent_contact):
user_email = current_user.get("email", "")
if agent_contact.lower() == user_email.lower():
return True
return False
async def get_current_user(token: str = Depends(oauth2_scheme)):
payload = auth.decode_access_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid token")
user = await crud.get_user_by_id(payload["sub"])
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
async def get_current_user_optional(request: Request):
"""Get current user from cookie if available"""
token = request.cookies.get("access_token")
if token:
try:
payload = auth.decode_access_token(token)
if payload:
user = await crud.get_user_by_id(payload["sub"])
if user:
# Apply view mode override if admin is viewing as user
view_mode = request.session.get("view_mode", "admin")
if user.get("is_admin") and view_mode == "user":
user["actual_is_admin"] = True
user["is_admin"] = False
return user
except:
pass
return None
async def get_current_user_from_cookie(request: Request):
"""Get current user from cookie for API endpoints"""
token = request.cookies.get("access_token")
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
payload = auth.decode_access_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid token")
user = await crud.get_user_by_id(payload["sub"])
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Apply view mode override if admin is viewing as user
view_mode = request.session.get("view_mode", "admin")
if user.get("is_admin") and view_mode == "user":
user["actual_is_admin"] = True
user["is_admin"] = False
return user
async def require_admin(current_user: dict = Depends(get_current_user_from_cookie)):
"""Require admin access"""
if not current_user.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin access required")
return current_user
async def require_admin_or_readonly(current_user: dict = Depends(get_current_user_from_cookie)):
"""Require admin or readonly_admin access"""
if current_user.get("is_admin"):
return current_user
if current_user.get("role") == "readonly_admin":
return current_user
raise HTTPException(status_code=403, detail="Admin access required")
def sanitize_metadata(metadata: dict) -> dict[str, str]:
"""
Sanitize agent metadata to ensure all values are strings.
Handles MongoDB ObjectId references, lists, and other complex objects.
"""
if not metadata or not isinstance(metadata, dict):
return {}
sanitized = {}
for key, value in metadata.items():
if value is None:
continue
elif isinstance(value, str):
sanitized[key] = value
elif isinstance(value, list):
# Handle MongoDB ObjectId references: [{"$oid": "..."}] or regular lists
if value and isinstance(value[0], dict) and "$oid" in value[0]:
# Extract ObjectId strings
oid_values = [item.get("$oid", str(item)) for item in value if isinstance(item, dict) and "$oid" in item]
sanitized[key] = ",".join(oid_values) if oid_values else str(value)
else:
# Convert list to comma-separated string
sanitized[key] = ",".join(str(item) for item in value)
elif isinstance(value, dict):
# Handle nested dictionaries - convert to JSON string
import json
try:
sanitized[key] = json.dumps(value, default=str)
except (TypeError, ValueError):
sanitized[key] = str(value)
else:
# Convert other types to string
sanitized[key] = str(value)
return sanitized
def create_agent_response(agent: dict) -> models.AiAgentResponse:
"""Helper function to create AiAgentResponse with all fields including Quality Audit and Usage Tracking"""
return models.AiAgentResponse(
agent_id=str(agent["_id"]),
agent_name=agent["agent_name"],
agent_tool=agent.get("agent_tool"),
agent_description=agent.get("agent_description"),
agent_purpose=agent.get("agent_purpose"),
agent_version=agent.get("agent_version"),
agent_status=agent.get("agent_status"),
agent_location=agent.get("agent_location"),
agent_department=agent.get("agent_department"),
agent_contact_person=agent.get("agent_contact_person"),
agent_created_at=agent["created_at"].isoformat() if agent.get("created_at") else None,
agent_updated_at=agent["updated_at"].isoformat() if agent.get("updated_at") else None,
agent_tags=agent.get("agent_tags"),
agent_metadata=sanitize_metadata(agent.get("agent_metadata")) if agent.get("agent_metadata") else None,
agent_userbase=agent.get("agent_userbase"),
agent_capabilities=agent.get("agent_capabilities"),
url=agent.get("url"),
quality_audit_status=agent.get("quality_audit_status", False),
quality_audit_updated_by=agent.get("quality_audit_updated_by"),
quality_audit_updated_at=agent.get("quality_audit_updated_at"),
quality_audit_updated_by_name=agent.get("quality_audit_updated_by_name"),
risk_factor=agent.get("risk_factor"),
last_edited_by=agent.get("last_edited_by"),
discipline=agent.get("discipline"),
rating=agent.get("rating"),
rating_count=agent.get("rating_count"),
client=agent.get("client"),
client_name=agent.get("client_name"),
studio_name=agent.get("studio_name"),
verification_status=agent.get("verification_status"),
verified_by=agent.get("verified_by"),
verified_date=agent.get("verified_date"),
instructions=agent.get("instructions"),
audit_status=agent.get("audit_status"),
audit_date=agent.get("audit_date"),
audit_category=agent.get("audit_category"),
audit_risk_level=agent.get("audit_risk_level"),
created_by=agent["created_by"],
# Usage tracking fields
usage_timeline=agent.get("usage_timeline"),
conversation_count=agent.get("conversation_count"),
unique_users=agent.get("unique_users"),
total_messages=agent.get("total_messages"),
first_used=agent.get("first_used"),
last_used=agent.get("last_used"),
total_tokens=agent.get("total_tokens"),
prompt_tokens=agent.get("prompt_tokens"),
completion_tokens=agent.get("completion_tokens")
)
async def verify_agent_collector_api_key(x_api_key: str = Header(alias="X-API-Key")):
"""Verify static API key for agent collector endpoints"""
expected_key = os.getenv("AGENT_COLLECTOR_API_KEY")
if not expected_key:
raise HTTPException(status_code=500, detail="API key not configured")
if x_api_key != expected_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return True
def map_agent_collector_to_internal(collector_data: models.AgentCollectorCreate) -> dict:
"""Map agent collector field names to internal schema"""
# Normalize status to Title Case for internal storage
status = collector_data.status
if status:
status_map = {
"active": "Active",
"inactive": "Inactive",
"deprecated": "Deprecated",
"development": "Development"
}
status = status_map.get(status.lower(), status)
# Convert usage_timeline from Pydantic models to dicts
usage_timeline = None
if collector_data.usage_timeline:
usage_timeline = [{"date": entry.date, "message_count": entry.message_count, "token_count": entry.token_count}
for entry in collector_data.usage_timeline]
# Auto-tag Pencil Agents discipline
discipline = collector_data.discipline
if not discipline and "pencil" in collector_data.name.lower():
discipline = "Pencil Agents"
return {
"agent_name": collector_data.name,
"agent_tool": collector_data.tool,
"agent_description": collector_data.description,
"agent_purpose": collector_data.purpose,
"agent_location": collector_data.location,
"agent_userbase": collector_data.userbase,
"agent_version": collector_data.version,
"agent_capabilities": collector_data.capabilities,
"agent_status": status,
"agent_department": collector_data.department,
"agent_contact_person": collector_data.contact_person,
"agent_tags": collector_data.tags,
"agent_metadata": collector_data.metadata,
"url": collector_data.url,
"discipline": discipline,
"client": collector_data.client,
"client_name": collector_data.client_name,
"studio_name": collector_data.studio_name,
"instructions": collector_data.instructions,
# Usage tracking fields
"usage_timeline": usage_timeline,
"conversation_count": collector_data.conversation_count,
"unique_users": collector_data.unique_users,
"total_messages": collector_data.total_messages,
"first_used": collector_data.first_used,
"last_used": collector_data.last_used,
"total_tokens": collector_data.total_tokens,
"prompt_tokens": collector_data.prompt_tokens,
"completion_tokens": collector_data.completion_tokens,
}
@app.get("/me", response_model=models.UserResponse)
async def me(current_user: dict = Depends(get_current_user)):
return {
"email": current_user["email"],
"full_name": current_user.get("full_name"),
"is_active": current_user["is_active"],
"is_admin": current_user["is_admin"],
"role": current_user.get("role", "admin" if current_user["is_admin"] else "user"),
}
@app.on_event("startup")
async def startup_event():
"""Run database migrations and ensure indexes on startup"""
from database import ensure_indexes
await ensure_indexes()
# Migrate pencil agents discipline
count = await crud.migrate_pencil_agents_discipline()
if count > 0:
print(f"Pencil Agents migration: updated {count} agent(s)")
# Rename discipline 'Optimization' -> 'Optimisation' (UK spelling, 2026-05)
opt_count = await crud.migrate_optimisation_spelling()
if opt_count > 0:
print(f"Optimisation spelling migration: updated {opt_count} agent(s)")
# Backfill registration_complete for agents that pre-date the field (2026-05)
reg_counts = await crud.backfill_registration_complete()
if reg_counts["form_complete"] or reg_counts["collector_incomplete"]:
print(
f"registration_complete backfill: {reg_counts['form_complete']} form-registered marked complete, "
f"{reg_counts['collector_incomplete']} collector-created marked incomplete"
)
# Start schedulers (weekly digest + daily completion-reminder)
try:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
digest_hour = int(os.getenv("WEEKLY_DIGEST_HOUR", "7"))
completion_hour = int(os.getenv("COMPLETION_REMINDER_HOUR", "8"))
scheduler = AsyncIOScheduler()
scheduler.add_job(
notifications.send_weekly_agent_digest,
'cron',
day_of_week='mon',
hour=digest_hour,
minute=0,
id='weekly_agent_digest',
)
scheduler.add_job(
notifications.send_completion_reminders,
'cron',
hour=completion_hour,
minute=15,
id='completion_reminders',
)
scheduler.start()
print(f"Schedulers started: weekly digest Mondays {digest_hour}:00, completion reminders daily {completion_hour}:15")
except Exception as e:
print(f"Warning: Failed to start schedulers: {e}")
# HTML Routes
@app.get("/")
async def home(request: Request):
# Check if this is a JSON API request (for health check)
accept_header = request.headers.get("accept", "")
if "application/json" in accept_header or request.headers.get("content-type") == "application/json":
# Return health check JSON response
from database import check_database_health
db_health = await check_database_health()
return models.HealthCheckResponse(
status="healthy",
message="Agent collector API is running",
timestamp=datetime.utcnow().isoformat(),
database=db_health
)
# Otherwise handle as HTML request for web interface
current_user = await get_current_user_optional(request)
if current_user:
# Redirect logged-in users appropriately
if current_user.get("is_admin") or current_user.get("role") == "readonly_admin":
return RedirectResponse(url=get_app_url("admin"), status_code=303)
else:
return RedirectResponse(url=get_app_url("agent-management"), status_code=303)
else:
# Show landing page for non-authenticated users
return templates.TemplateResponse("index.html", get_template_context(request, current_user))
@app.get("/register", response_class=HTMLResponse)
async def register_page(request: Request):
current_user = await get_current_user_optional(request)
return templates.TemplateResponse("register.html", get_template_context(request, current_user))
@app.post("/register", response_class=HTMLResponse)
async def register_form(
request: Request,
email: str = Form(..., alias="user-email"),
password: str = Form(..., alias="user-password"),
first_name: str = Form(..., alias="first-name"),
last_name: str = Form(..., alias="last-name")
):
try:
full_name = f"{first_name} {last_name}".strip()
existing = await crud.get_user_by_email(email)
if existing:
return templates.TemplateResponse(
"register.html",
{"request": request, "error": "Email already registered"}
)
user = models.UserCreate(email=email, password=password, full_name=full_name)
await crud.create_user(user.email, user.password, user.full_name)
return templates.TemplateResponse(
"login.html",
{"request": request, "success": "Registration successful! Please login."}
)
except Exception as e:
return templates.TemplateResponse(
"register.html",
{"request": request, "error": f"Registration failed: {str(e)}"}
)
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
current_user = await get_current_user_optional(request)
# Add MSAL configuration if enabled
context = get_template_context(request, current_user)
if msal_auth.is_msal_available():
msal_config = config.get_msal_config()
context.update({
"client_id": msal_config["client_id"],
"authority": msal_config["authority"],
"redirect_uri": msal_config["redirect_uri"]
})
return templates.TemplateResponse("login.html", context)
@app.post("/login", response_class=HTMLResponse)
async def login_form(
request: Request,
email: str = Form(...),
password: str = Form(...)
):
try:
print(f"🔍 Login attempt - Email: {email}")
# Check if user exists first
user_exists = await crud.get_user_by_email(email)
print(f"🔍 User exists in database: {user_exists is not None}")
if user_exists:
print(f"🔍 User auth provider: {user_exists.get('auth_provider', 'local')}")
print(f"🔍 User has hashed_password: {'hashed_password' in user_exists}")
print(f"🔍 User is_active: {user_exists.get('is_active', False)}")
user = await crud.authenticate_user(email, password)
print(f"🔍 Authentication result: {user is not None}")
if not user:
error_msg = "Invalid email or password"
if not user_exists:
error_msg += " (User not found)"
elif user_exists.get('auth_provider') != 'local':
error_msg += " (Not a local user - use Microsoft login)"
elif 'hashed_password' not in user_exists:
error_msg += " (No password set for this user)"
print(f"❌ Login failed: {error_msg}")
return templates.TemplateResponse(
"login.html",
get_template_context(request, error=error_msg)
)
# Create token (you can store this in session/cookie in a real app)
token = auth.create_access_token({"sub": str(user["_id"])})
# Check if user is admin or readonly_admin
if user.get("is_admin") or user.get("role") == "readonly_admin":
# Admin / readonly_admin goes to admin dashboard
response = RedirectResponse(url=get_app_url("admin"), status_code=303)
else:
# Regular user goes to all agents page
response = RedirectResponse(url=get_app_url("agent-management"), status_code=303)
# Set token in cookie with proper attributes
response.set_cookie(
key="access_token",
value=token,
httponly=True,
samesite="lax",
secure=False # Set to True in production with HTTPS
)
return response
except Exception as e:
return templates.TemplateResponse(
"login.html",
{"request": request, "error": f"Login failed: {str(e)}"}
)
# Azure AD/MSAL Authentication - Using popup-based authentication per specification
@app.post("/api/auth/azure/token")
async def azure_token_exchange(request: Request):
"""Exchange Azure AD token for local JWT token with proper validation"""
try:
data = await request.json()
access_token = data.get("access_token")
id_token = data.get("id_token")
if not id_token:
raise HTTPException(status_code=400, detail="Missing ID token")
# Validate JWT token against Azure AD public keys (as per specification)
import jwt
import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import base64
msal_config = config.get_msal_config()
# Get Azure AD public keys for token verification
jwks_uri = f"{msal_config['authority']}/discovery/v2.0/keys"
jwks_response = requests.get(jwks_uri)
jwks = jwks_response.json()
# Decode token header to get key ID
unverified_header = jwt.get_unverified_header(id_token)
kid = unverified_header.get("kid")
# Find the matching public key
public_key = None
for key in jwks["keys"]:
if key["kid"] == kid:
# Convert JWK to PEM format for validation
n = base64.urlsafe_b64decode(key["n"] + "==")
e = base64.urlsafe_b64decode(key["e"] + "==")
# Convert to RSA public key
numbers = rsa.RSAPublicNumbers(
int.from_bytes(e, byteorder="big"),
int.from_bytes(n, byteorder="big")
)
public_key = numbers.public_key()
break
if not public_key:
raise HTTPException(status_code=400, detail="Unable to verify token signature")
# Convert to PEM format for JWT library
pem_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Validate JWT token signature and claims
try:
id_token_claims = jwt.decode(
id_token,
pem_key,
algorithms=["RS256"],
audience=msal_config["client_id"],
issuer=f"{msal_config['authority']}/v2.0"
)
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=400, detail=f"Token validation failed: {str(e)}")
# Create user profile from validated ID token claims
user_profile = {
"azure_ad_id": id_token_claims.get("oid"),
"email": id_token_claims.get("email") or id_token_claims.get("preferred_username"),
"full_name": id_token_claims.get("name"),
"first_name": id_token_claims.get("given_name"),
"last_name": id_token_claims.get("family_name"),
"tenant_id": id_token_claims.get("tid")
}
# Create or update user in local database
user = await crud.create_or_update_azure_user(user_profile)
# Create JWT token for local session management
jwt_token = auth.create_access_token({"sub": str(user["_id"])})
# Return user info and set secure cookie (following specification)
response = JSONResponse({
"success": True,
"is_admin": user.get("is_admin", False),
"email": user.get("email"),
"full_name": user.get("full_name")
})
# Set JWT token in httpOnly cookie with security flags (as per specification)
response.set_cookie(
key="access_token",
value=jwt_token,
httponly=True,
samesite="lax",
secure=False, # Set to True in production with HTTPS
max_age=24 * 60 * 60 # 24 hours as per specification
)
return response
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=f"Token exchange failed: {str(e)}")
@app.get("/agent-register", response_class=HTMLResponse)
async def agent_register_page(request: Request):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
return templates.TemplateResponse("agent_register.html", get_template_context(request, current_user))
@app.post("/agent-register", response_class=HTMLResponse)
async def agent_register_form(
request: Request,
# Identity
agent_name: str = Form(...),
business_entity: str = Form(...),
agent_status: str = Form("Development"),
client_scope: str = Form(...),
client_name: str = Form(None),
studio_dept: str = Form(None),
agent_location: str = Form(None),
# Classification & Purpose
agent_classification: str = Form(...),
discipline: str = Form(...),
agent_description: str = Form(None),
agent_purpose: str = Form(None),
agent_tags: str = Form(None),
agent_userbase: str = Form(None),
# Autonomy & Safety
autonomy_level: str = Form(...),
off_switch_confirmed: bool = Form(False),
access_rights_confirmed: bool = Form(False),
# IP
ip_ownership: str = Form(...),
# Tech Stack
foundation_model: str = Form(None),
agent_tool: str = Form(...),
agent_version: str = Form(None),
capabilities: List[str] = Form(default=[]),
capabilities_other: str = Form(None),
# Data Safety
pii_check: str = Form("No"),
pii_legal_ref: str = Form(None),
pii_data_types: str = Form(None),
pii_consent: bool = Form(False),
# Performance & Testing
validated_by: str = Form(None),
validation_date: str = Form(None),
evals_method: str = Form(None),
# Declarations
decl_governance: bool = Form(False),
decl_accuracy: bool = Form(False),
decl_upkeep: bool = Form(False),
):
try:
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
user_id = str(current_user["_id"])
# Conditional: client_name required when scope is "specific"
if client_scope == "specific" and not (client_name and client_name.strip()):
context = get_template_context(request, current_user)
context["error"] = "Client Name is required when Client Scope is 'Built for a specific client'."
return templates.TemplateResponse("agent_register.html", context)
# All three declarations are required (form enforces; defend at API too)
if not (decl_governance and decl_accuracy and decl_upkeep):
context = get_template_context(request, current_user)
context["error"] = "All three declarations must be agreed to before submitting."
return templates.TemplateResponse("agent_register.html", context)
# Map client_scope -> legacy `client` for compatibility with verification + email flows
legacy_client = "yes" if client_scope == "specific" else "no"
# Capabilities: 8 checkboxes + comma-separated free text "Other"
cap_list = list(capabilities or [])
if capabilities_other:
cap_list.extend([c.strip() for c in capabilities_other.split(",") if c.strip()])
handles_pii = pii_check == "Yes"
agent_data = {
"agent_name": agent_name,
"agent_tool": agent_tool,
"agent_description": agent_description,
"agent_purpose": agent_purpose,
"agent_version": agent_version,
"agent_status": agent_status,
"agent_location": agent_location,
"agent_contact_person": current_user.get("email"),
"discipline": discipline,
"client": legacy_client,
"client_scope": client_scope,
"studio_name": studio_dept,
"business_entity": business_entity,
"agent_classification": agent_classification,
"autonomy_level": autonomy_level,
"ip_ownership": ip_ownership,
"foundation_model": foundation_model,
"validated_by": validated_by,
"validation_date": validation_date,
"evals_method": evals_method,
"registration_complete": True,
"safety": {
"off_switch_confirmed": off_switch_confirmed,
"access_rights_confirmed": access_rights_confirmed,
},
"pii": {
"handles_pii": handles_pii,
"legal_ref": pii_legal_ref if handles_pii else None,
"data_types": pii_data_types if handles_pii else None,
"consent_recorded": pii_consent if handles_pii else False,
},
"declarations": {
"governance": decl_governance,
"accuracy": decl_accuracy,
"upkeep": decl_upkeep,
},
}
if cap_list:
agent_data["agent_capabilities"] = cap_list
# Client-specific fields
if legacy_client == "yes":
agent_data["client_name"] = client_name
agent_data["verification_status"] = "needs_verification"
# Tags / userbase parsing (existing pattern)
if agent_tags:
agent_data["agent_tags"] = [t.strip() for t in agent_tags.split(",") if t.strip()]
if agent_userbase:
agent_data["agent_userbase"] = [u.strip() for u in agent_userbase.split(",") if u.strip()]
# Strip None top-level values (preserve nested objects + booleans)
agent_data = {k: v for k, v in agent_data.items() if v is not None}
created_agent = await crud.create_agent(agent_data, user_id)
# Client-agent email notification (non-blocking)
if legacy_client == "yes":
try:
notifications.send_client_agent_notification({
**agent_data,
"created_by_email": current_user.get("email", "Unknown"),
})
except Exception as notify_err:
print(f"Client agent notification failed: {notify_err}")
from urllib.parse import quote
success_msg = quote(f"Agent '{agent_name}' registered successfully!")
return RedirectResponse(
url=get_app_url(f"agent-management?success={success_msg}"),
status_code=303,
)
except ValueError as e:
# Handle duplicate agent name error
current_user = await get_current_user_optional(request)
return templates.TemplateResponse(
"agent_register.html",
get_template_context(request, current_user, error=str(e))
)
except Exception as e:
current_user = await get_current_user_optional(request)
return templates.TemplateResponse(
"agent_register.html",
get_template_context(request, current_user, error=f"Agent registration failed: {str(e)}")
)
# ─── Completion flow for incomplete (e.g. LibreChat-synced) agents ──────────
KNOWN_CAPABILITIES = ["RAG", "Web", "API/MCP", "Image Gen", "Code Execution",
"File Operations", "Email/Calendar", "Multi-Agent"]
def _user_can_complete(agent: dict, current_user: dict) -> bool:
if current_user.get("is_admin") or current_user.get("role") == "admin":
return True
user_id = str(current_user["_id"])
if agent.get("created_by") == user_id:
return True
if agent.get("created_by") == "agent_collector_api":
agent_email = (agent.get("agent_contact_person") or "").lower()
user_email = (current_user.get("email") or "").lower()
return bool(agent_email) and agent_email == user_email
return False
def _build_completion_context(agent: dict) -> dict:
"""Compute pre-fill values for the completion form template."""
caps = agent.get("agent_capabilities") or []
known = [c for c in caps if c in KNOWN_CAPABILITIES]
other = ", ".join(c for c in caps if c not in KNOWN_CAPABILITIES)
# Derive client_scope: explicit field wins, else derive from legacy `client`
scope = agent.get("client_scope")
if not scope:
if agent.get("client") == "yes":
scope = "specific"
elif agent.get("client") == "no":
scope = "internal"
pii = agent.get("pii") or {}
safety = agent.get("safety") or {}
return {
"agent": agent,
"mode": "complete",
"prefilled_known_capabilities": known,
"prefilled_other_capabilities": other,
"prefilled_client_scope": scope or "",
"prefilled_pii_handles": bool(pii.get("handles_pii")),
"prefilled_off_switch": bool(safety.get("off_switch_confirmed")),
"prefilled_access_rights": bool(safety.get("access_rights_confirmed")),
}
@app.get("/api/agents/incomplete")
async def list_incomplete_agents(request: Request):
"""Return current user's incomplete agents (admins see all)."""
current_user = await get_current_user_from_cookie(request)
agents = await crud.get_incomplete_agents_for_user(
user_email=current_user.get("email") or "",
user_id=str(current_user["_id"]),
is_admin=bool(current_user.get("is_admin") or current_user.get("role") == "admin"),
)
# Convert ObjectIds to strings for JSON
for a in agents:
a["_id"] = str(a["_id"])
a["agent_id"] = a["_id"]
return agents
@app.get("/agent-complete/{agent_id}", response_class=HTMLResponse)
async def agent_complete_page(request: Request, agent_id: str):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
agent = await crud.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
if not _user_can_complete(agent, current_user):
raise HTTPException(status_code=403, detail="Not authorised to complete this registration")
context = get_template_context(request, current_user)
context.update(_build_completion_context(agent))
return templates.TemplateResponse("agent_register.html", context)
@app.post("/agent-complete/{agent_id}", response_class=HTMLResponse)
async def agent_complete_submit(
agent_id: str,
request: Request,
# Identity
agent_name: str = Form(...),
business_entity: str = Form(...),
agent_status: str = Form("Development"),
client_scope: str = Form(...),
client_name: str = Form(None),
studio_dept: str = Form(None),
agent_location: str = Form(None),
# Classification & Purpose
agent_classification: str = Form(...),
discipline: str = Form(...),
agent_description: str = Form(None),
agent_purpose: str = Form(None),
agent_tags: str = Form(None),
agent_userbase: str = Form(None),
# Autonomy & Safety
autonomy_level: str = Form(...),
off_switch_confirmed: bool = Form(False),
access_rights_confirmed: bool = Form(False),
# IP
ip_ownership: str = Form(...),
# Tech Stack
foundation_model: str = Form(None),
agent_tool: str = Form(...),
agent_version: str = Form(None),
capabilities: List[str] = Form(default=[]),
capabilities_other: str = Form(None),
# Data Safety
pii_check: str = Form("No"),
pii_legal_ref: str = Form(None),
pii_data_types: str = Form(None),
pii_consent: bool = Form(False),
# Performance & Testing
validated_by: str = Form(None),
validation_date: str = Form(None),
evals_method: str = Form(None),
# Declarations
decl_governance: bool = Form(False),
decl_accuracy: bool = Form(False),
decl_upkeep: bool = Form(False),
):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
agent = await crud.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
if not _user_can_complete(agent, current_user):
raise HTTPException(status_code=403, detail="Not authorised to complete this registration")
# Re-render the form on validation failure.
def _rerender_with_error(msg: str):
context = get_template_context(request, current_user, error=msg)
context.update(_build_completion_context(agent))
return templates.TemplateResponse("agent_register.html", context)
if client_scope == "specific" and not (client_name and client_name.strip()):
return _rerender_with_error("Client Name is required when Client Scope is 'Built for a specific client'.")
if not (decl_governance and decl_accuracy and decl_upkeep):
return _rerender_with_error("All three declarations must be agreed to before submitting.")
legacy_client = "yes" if client_scope == "specific" else "no"
cap_list = list(capabilities or [])
if capabilities_other:
cap_list.extend([c.strip() for c in capabilities_other.split(",") if c.strip()])
handles_pii = pii_check == "Yes"
completion_data = {
"agent_name": agent_name,
"agent_tool": agent_tool,
"agent_description": agent_description,
"agent_purpose": agent_purpose,
"agent_version": agent_version,
"agent_status": agent_status,
"agent_location": agent_location,
"discipline": discipline,
"client": legacy_client,
"client_scope": client_scope,
"studio_name": studio_dept,
"business_entity": business_entity,
"agent_classification": agent_classification,
"autonomy_level": autonomy_level,
"ip_ownership": ip_ownership,
"foundation_model": foundation_model,
"validated_by": validated_by,
"validation_date": validation_date,
"evals_method": evals_method,
"safety": {
"off_switch_confirmed": off_switch_confirmed,
"access_rights_confirmed": access_rights_confirmed,
},
"pii": {
"handles_pii": handles_pii,
"legal_ref": pii_legal_ref if handles_pii else None,
"data_types": pii_data_types if handles_pii else None,
"consent_recorded": pii_consent if handles_pii else False,
},
"declarations": {
"governance": decl_governance,
"accuracy": decl_accuracy,
"upkeep": decl_upkeep,
},
}
if cap_list:
completion_data["agent_capabilities"] = cap_list
if legacy_client == "yes":
completion_data["client_name"] = client_name
# Only set verification_status if not already verified (don't reset prior approvals)
if agent.get("verification_status") != "verified":
completion_data["verification_status"] = "needs_verification"
if agent_tags:
completion_data["agent_tags"] = [t.strip() for t in agent_tags.split(",") if t.strip()]
if agent_userbase:
completion_data["agent_userbase"] = [u.strip() for u in agent_userbase.split(",") if u.strip()]
completion_data = {k: v for k, v in completion_data.items() if v is not None}
try:
await crud.complete_agent_registration(
agent_id,
completion_data,
completing_user_id=str(current_user["_id"]),
completing_user_email=current_user.get("email", ""),
)
except Exception as e:
return _rerender_with_error(f"Failed to save: {str(e)}")
from urllib.parse import quote
success_msg = quote(f"Registration completed for '{agent_name}'.")
return RedirectResponse(
url=get_app_url(f"agent-management?view=my&success={success_msg}"),
status_code=303,
)
@app.get("/dashboard", response_class=HTMLResponse)
async def dashboard(request: Request):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
return templates.TemplateResponse("admin/dashboard.html", get_template_context(request, current_user))
@app.get("/user-management", response_class=HTMLResponse)
async def user_management_page(request: Request):
return templates.TemplateResponse("user_management.html", get_template_context(request))
@app.get("/logout", response_class=HTMLResponse)
async def logout(request: Request):
"""Logout user and clear all session data"""
# Clear MSAL session data if present
request.session.clear()
response = RedirectResponse(url=get_app_url(""), status_code=303)
response.delete_cookie(key="access_token")
return response
@app.get("/profile", response_class=HTMLResponse)
async def profile_page(request: Request):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
return templates.TemplateResponse("profile.html", get_template_context(request, current_user))
@app.get("/agent-management", response_class=HTMLResponse)
async def agent_management_page(request: Request, view: Optional[str] = Query(None), success: Optional[str] = Query(None), error: Optional[str] = Query(None)):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
# Default to "all" view for regular users, "my" view can be specified via query param
if view == "my":
# Get user's agents (owned + contact person)
agents = await crud.get_agents_by_user(str(current_user["_id"]), user_email=current_user.get("email"))
current_view = "my"
page_title = "My Agents Dashboard"
page_description = f"{len(agents)} agents in your portfolio"
else:
# Get all agents (default view for regular users)
agents = await crud.get_all_agents()
current_view = "all"
page_title = "All Agents"
page_description = f"{len(agents)} agents in the system"
return templates.TemplateResponse("agent_management.html", get_template_context(
request,
current_user,
agents=agents,
agent_count=len(agents),
current_view=current_view,
page_title=page_title,
page_description=page_description,
success=success,
error=error
))
@app.get("/admin", response_class=HTMLResponse)
async def admin_dashboard(request: Request):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
if not current_user.get("is_admin") and current_user.get("role") != "readonly_admin":
return RedirectResponse(url=get_app_url("login"), status_code=303)
# Get statistics
all_users = await crud.get_all_users()
all_agents = await crud.get_all_agents()
# Calculate stats
total_users = len(all_users)
admin_users = len([u for u in all_users if u.get("is_admin")])
regular_users = total_users - admin_users
total_agents = len(all_agents)
active_agents = len([a for a in all_agents if a.get("agent_status") == "Active"])
return templates.TemplateResponse("admin/dashboard.html", get_template_context(
request,
current_user,
stats={
"total_users": total_users,
"admin_users": admin_users,
"regular_users": regular_users,
"total_agents": total_agents,
"active_agents": active_agents,
"inactive_agents": total_agents - active_agents
},
users=all_users,
agents=all_agents
))
# New enhanced endpoints
@app.get("/search", response_class=HTMLResponse)
async def search_page(request: Request, q: Optional[str] = Query(None)):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
search_results = {"agents": [], "users": []}
if q:
# Search agents using proper search function
# Regular users can search all agents (consistent with agent management page)
search_results["agents"] = await crud.search_agents(q)
# Search users (admin only)
if current_user.get("is_admin"):
all_users = await crud.get_all_users()
search_results["users"] = [
user for user in all_users
if q.lower() in user.get("email", "").lower() or
q.lower() in user.get("full_name", "").lower()
]
return templates.TemplateResponse("search.html", get_template_context(
request,
current_user,
query=q,
results=search_results
))
@app.post("/agent/{agent_id}/edit", response_class=HTMLResponse)
async def edit_agent_form(request: Request, agent_id: str):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
agent = await crud.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
# Check if user can edit this agent (ownership, contact person, or admin)
if not can_user_edit_agent(agent, current_user):
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
return templates.TemplateResponse("edit_agent.html", get_template_context(
request,
current_user,
agent=agent
))
@app.post("/agent/{agent_id}/delete", response_class=HTMLResponse)
async def delete_agent_form(request: Request, agent_id: str):
current_user = await get_current_user_optional(request)
if not current_user:
return RedirectResponse(url=get_app_url("login"), status_code=303)
# Check permission and delete
user_id = str(current_user["_id"]) if not current_user.get("is_admin") else None
deleted = await crud.delete_agent(agent_id, user_id)
if deleted:
return RedirectResponse(url=get_app_url("agent-management?success=Agent deleted successfully"), status_code=303)
else:
return RedirectResponse(url=get_app_url("agent-management?error=Failed to delete agent"), status_code=303)
# Agent API endpoints
@app.get("/api/agents/all", response_model=List[models.AiAgentResponse])
async def get_all_agents_for_users(current_user: dict = Depends(get_current_user_from_cookie)):
"""Get all agents for regular users (read-only access)"""
agents = await crud.get_all_agents()
return [
create_agent_response(agent) for agent in agents
]
@app.post("/api/agents", response_model=models.AiAgentResponse)
async def create_agent(agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
agent_data = agent.model_dump()
created_agent = await crud.create_agent(agent_data, str(current_user["_id"]))
return create_agent_response(created_agent)
@app.get("/api/agents", response_model=List[models.AiAgentResponse])
async def get_user_agents(current_user: dict = Depends(get_current_user_from_cookie)):
agents = await crud.get_agents_by_user(str(current_user["_id"]), user_email=current_user.get("email"))
return [
create_agent_response(agent) for agent in agents
]
# MSAL routes no longer needed - using popup authentication
@app.get("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
async def get_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
agent = await crud.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
# All authenticated users can view agent details
# Edit/delete permissions are enforced at those respective endpoints
return create_agent_response(agent)
@app.put("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
async def update_agent(agent_id: str, agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
# First check if agent exists
existing_agent = await crud.get_agent_by_id(agent_id)
if not existing_agent:
raise HTTPException(status_code=404, detail="Agent not found")
# Check if user can edit this agent (ownership, contact person, or admin)
if not can_user_edit_agent(existing_agent, current_user):
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
# Since we've already verified authorization, always pass None to allow the update
user_id_filter = None
# Prepare admin info for Quality Audit updates
admin_user_info = None
agent_data = agent.model_dump()
# Validate Risk Factor when Quality Audit is checked (admin only)
if current_user.get("is_admin") and agent_data.get("quality_audit_status"):
if agent_data.get("risk_factor") is None or not (1 <= agent_data.get("risk_factor", 0) <= 5):
raise HTTPException(
status_code=422,
detail="Risk Factor (1-5) is required when Quality Audit is checked."
)
if current_user.get("is_admin"):
admin_user_info = {
"user_id": str(current_user["_id"]),
"user_name": current_user.get("full_name", current_user.get("email")),
"email": current_user.get("email")
}
updated_agent = await crud.update_agent(agent_id, agent_data, user_id_filter, admin_user_info, current_user.get("email"))
if not updated_agent:
raise HTTPException(status_code=404, detail="Agent not found or not authorized")
return create_agent_response(updated_agent)
@app.put("/api/agents/{agent_id}/rating")
async def update_agent_rating(agent_id: str, request: Request, current_user: dict = Depends(get_current_user_from_cookie)):
"""Submit a per-user star rating for an agent (any authenticated user can rate)"""
existing_agent = await crud.get_agent_by_id(agent_id)
if not existing_agent:
raise HTTPException(status_code=404, detail="Agent not found")
body = await request.json()
rating_value = body.get("rating")
if rating_value is None or not (1 <= float(rating_value) <= 5):
raise HTTPException(status_code=422, detail="Rating must be between 1 and 5")
user_id = str(current_user["_id"])
await crud.upsert_agent_rating(agent_id, user_id, float(rating_value))
stats = await crud.update_agent_average_rating(agent_id)
return {
"message": "Rating updated",
"rating": stats["avg_rating"],
"rating_count": stats["count"],
"user_rating": float(rating_value)
}
@app.get("/api/agents/{agent_id}/my-rating")
async def get_my_agent_rating(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
"""Get the current user's rating for an agent plus the average"""
existing_agent = await crud.get_agent_by_id(agent_id)
if not existing_agent:
raise HTTPException(status_code=404, detail="Agent not found")
user_id = str(current_user["_id"])
user_rating = await crud.get_user_rating_for_agent(agent_id, user_id)
stats = await crud.get_agent_rating_stats(agent_id)
return {
"user_rating": user_rating,
"rating": stats["avg_rating"],
"rating_count": stats["count"]
}
@app.delete("/api/agents/{agent_id}")
async def delete_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
print(f"🗑️ DELETE attempt - Agent ID: {agent_id}, User ID: {current_user['_id']}")
# First check if agent exists
agent = await crud.get_agent_by_id(agent_id)
if not agent:
print(f"🗑️ Agent {agent_id} not found in database")
raise HTTPException(status_code=404, detail="Agent not found")
print(f"🗑️ Agent found - Created by: {agent.get('created_by')}, User is admin: {current_user.get('is_admin', False)}")
# Check permission
user_id = str(current_user["_id"])
if agent["created_by"] != user_id and not current_user.get("is_admin"):
print(f"🗑️ Permission denied - Agent owned by {agent['created_by']}, current user: {user_id}")
raise HTTPException(status_code=403, detail="Not authorized to delete this agent")
deleted = await crud.delete_agent(agent_id, user_id if not current_user.get("is_admin") else None)
if not deleted:
print(f"🗑️ Delete operation failed")
raise HTTPException(status_code=500, detail="Failed to delete agent")
print(f"🗑️ Agent {agent_id} deleted successfully")
return {"message": "Agent deleted successfully"}
# Admin view toggle endpoint
@app.post("/api/admin/toggle-view")
async def toggle_admin_view(request: Request, current_user: dict = Depends(get_current_user_from_cookie)):
"""Toggle between admin and regular user view (admin only)"""
# Check if user is actually an admin (using actual_is_admin if present, or is_admin)
is_actual_admin = current_user.get("actual_is_admin") or current_user.get("is_admin")
if not is_actual_admin:
raise HTTPException(status_code=403, detail="Admin access required")
# Toggle view mode
current_view = request.session.get("view_mode", "admin")
new_view = "user" if current_view == "admin" else "admin"
request.session["view_mode"] = new_view
return {
"success": True,
"view_mode": new_view,
"message": f"Switched to {'regular user' if new_view == 'user' else 'admin'} view"
}
# Admin endpoints
@app.get("/api/admin/users", response_model=List[models.UserResponse])
async def get_all_users(current_user: dict = Depends(require_admin_or_readonly)):
users = await crud.get_all_users()
return [
models.UserResponse(
email=user["email"],
full_name=user.get("full_name"),
is_active=user["is_active"],
is_admin=user["is_admin"],
role=user.get("role", "admin" if user.get("is_admin") else "user"),
auth_provider=user.get("auth_provider", "local")
) for user in users
]
@app.post("/api/admin/users", response_model=models.UserResponse)
async def admin_create_user(
user_data: models.AdminUserCreate,
current_user: dict = Depends(require_admin)
):
"""Create a new local user (admin only)"""
try:
created_user = await crud.admin_create_local_user(
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name,
is_admin=user_data.is_admin
)
return models.UserResponse(
email=created_user["email"],
full_name=created_user.get("full_name"),
is_active=created_user["is_active"],
is_admin=created_user["is_admin"],
role=created_user.get("role", "admin" if created_user["is_admin"] else "user"),
auth_provider=created_user.get("auth_provider", "local")
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.put("/api/admin/users/{email}", response_model=models.UserResponse)
async def update_user(email: str, user_update: models.UserUpdate, current_user: dict = Depends(require_admin)):
# Get the user by email first
existing_user = await crud.get_user_by_email(email)
if not existing_user:
raise HTTPException(status_code=404, detail="User not found")
# Update the user
update_data = user_update.model_dump(exclude_unset=True)
# Sync role and is_admin fields
if "role" in update_data:
role = update_data["role"]
if role == "admin":
update_data["is_admin"] = True
elif role == "readonly_admin":
update_data["is_admin"] = False
elif role == "user":
update_data["is_admin"] = False
elif "is_admin" in update_data:
if update_data["is_admin"]:
update_data["role"] = "admin"
else:
# Only reset to user if not already readonly_admin
if existing_user.get("role") != "readonly_admin":
update_data["role"] = "user"
updated_user = await crud.update_user(str(existing_user["_id"]), update_data)
if not updated_user:
raise HTTPException(status_code=500, detail="Failed to update user")
return models.UserResponse(
email=updated_user["email"],
full_name=updated_user.get("full_name"),
is_active=updated_user["is_active"],
is_admin=updated_user["is_admin"],
role=updated_user.get("role", "admin" if updated_user["is_admin"] else "user"),
auth_provider=updated_user.get("auth_provider", "local")
)
@app.post("/api/admin/users/{email}/reset-password")
async def admin_reset_password(
email: str,
password_data: models.AdminPasswordReset,
current_user: dict = Depends(require_admin)
):
"""Reset password for a local user (admin only)"""
user = await crud.get_user_by_email(email)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.get("auth_provider") != "local":
raise HTTPException(
status_code=400,
detail="Cannot reset password for SSO users. This user authenticates via Microsoft."
)
try:
success = await crud.admin_reset_user_password(str(user["_id"]), password_data.new_password)
if success:
return {"message": "Password reset successfully"}
raise HTTPException(status_code=500, detail="Failed to reset password")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/api/users/change-password")
async def change_password(
password_data: models.PasswordChange,
current_user: dict = Depends(get_current_user_from_cookie)
):
"""Change password for current logged-in user"""
if current_user.get("auth_provider") != "local":
raise HTTPException(
status_code=400,
detail="Cannot change password for SSO users. Your account uses Microsoft authentication."
)
try:
success = await crud.change_user_password(
str(current_user["_id"]),
password_data.current_password,
password_data.new_password
)
if success:
return {"message": "Password changed successfully"}
raise HTTPException(status_code=500, detail="Failed to change password")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/api/admin/agents/pending-verification")
async def get_pending_verification(current_user: dict = Depends(require_admin_or_readonly)):
"""Get agents pending verification"""
from database import agents_collection as ac
cursor = ac.find(
{"verification_status": {"$in": ["needs_verification", "verified"]}}
).sort("created_at", -1)
agents = await cursor.to_list(length=None)
result = []
for agent in agents:
# Resolve creator email
created_by = agent.get("created_by", "")
created_by_email = created_by
if created_by and created_by != "agent_collector_api":
try:
from bson import ObjectId
creator = await crud.get_user_by_id(created_by)
if creator:
created_by_email = creator.get("email", created_by)
except Exception:
pass
result.append({
"agent_id": str(agent["_id"]),
"agent_name": agent.get("agent_name"),
"client_name": agent.get("client_name"),
"studio_name": agent.get("studio_name"),
"created_by": created_by_email,
"created_at": agent["created_at"].isoformat() if agent.get("created_at") else None,
"verification_status": agent.get("verification_status"),
"verified_by": agent.get("verified_by"),
"verified_date": agent.get("verified_date"),
})
return result
@app.put("/api/admin/agents/{agent_id}/verify")
async def verify_agent(agent_id: str, current_user: dict = Depends(require_admin)):
"""Approve/verify an agent (admin only)"""
from bson import ObjectId
from database import agents_collection as ac
agent = await crud.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
result = await ac.update_one(
{"_id": ObjectId(agent_id)},
{"$set": {
"verification_status": "verified",
"verified_by": current_user.get("email", str(current_user["_id"])),
"verified_date": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow(),
}}
)
if result.modified_count:
return {"message": "Agent verified successfully"}
raise HTTPException(status_code=500, detail="Failed to verify agent")
@app.post("/api/admin/digest/send")
async def trigger_weekly_digest(current_user: dict = Depends(require_admin)):
"""Manually trigger the weekly agent digest email"""
try:
await notifications.send_weekly_agent_digest()
return {"message": "Weekly digest sent successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to send digest: {str(e)}")
@app.get("/api/admin/agents/unresolved-owner")
async def list_unresolved_owner_agents(current_user: dict = Depends(require_admin)):
"""List collector-created agents whose contact_person doesn't match any active user.
These agents won't be picked up by the completion-reminder flow until an admin
reassigns ownership via PUT /api/admin/agents/{id}/reassign-owner.
"""
agents = await crud.get_unresolved_owner_agents()
return [
{
"agent_id": str(a["_id"]),
"agent_name": a.get("agent_name"),
"agent_contact_person": a.get("agent_contact_person"),
"registration_complete": a.get("registration_complete", False),
"created_at": a["created_at"].isoformat() if a.get("created_at") else None,
}
for a in agents
]
class ReassignOwnerRequest(BaseModel):
new_contact_email: str
new_owner_user_id: Optional[str] = None # if omitted, looked up by email
@app.put("/api/admin/agents/{agent_id}/reassign-owner")
async def reassign_owner(
agent_id: str,
body: ReassignOwnerRequest,
current_user: dict = Depends(require_admin),
):
"""Admin: reassign a collector-created agent to a real user (by email).
Resolves the user by email if `new_owner_user_id` not supplied. After this,
the user will start receiving completion-reminder emails on the next daily run.
"""
target_email = body.new_contact_email.strip().lower()
if "@" not in target_email:
raise HTTPException(status_code=400, detail="new_contact_email must be a valid email")
new_owner_id = body.new_owner_user_id
if not new_owner_id:
# Look up by email (case-insensitive)
user = await users_collection.find_one(
{"email": {"$regex": f"^{re.escape(target_email)}$", "$options": "i"}}
)
if not user:
raise HTTPException(status_code=404, detail=f"No user found with email {target_email}")
new_owner_id = str(user["_id"])
updated = await crud.reassign_agent_owner(agent_id, new_owner_id, body.new_contact_email)
if not updated:
raise HTTPException(status_code=404, detail="Agent not found")
return {
"status": "ok",
"agent_id": agent_id,
"new_owner_user_id": new_owner_id,
"new_contact_email": body.new_contact_email,
}
@app.post("/api/admin/completion-reminders/send")
async def trigger_completion_reminders(
force: bool = Query(False, description="Bypass per-user cooldown and nudge cap"),
current_user: dict = Depends(require_admin),
):
"""Manually trigger the completion-reminder digest email run.
Use `?force=true` to ignore the cooldown / max-nudges cap (e.g., for testing or
after an outage where users haven't been emailed for a while).
"""
try:
summary = await notifications.send_completion_reminders(force=force)
return summary
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to send completion reminders: {str(e)}")
# Prompt Audit Endpoints
async def _audit_batch_runner(unclassified_only: bool, agent_id: str):
"""Background wrapper around run_audit_batch that flips _batch_state.running off when done."""
try:
await audit_analyzer.run_audit_batch(
unclassified_only=unclassified_only,
single_agent_id=agent_id
)
except Exception as e:
audit_analyzer._batch_state["error"] = str(e)
audit_analyzer.logger.exception("Audit batch crashed")
finally:
audit_analyzer._batch_state["running"] = False
audit_analyzer._batch_state["completed_at"] = datetime.utcnow().isoformat()
audit_analyzer._batch_state["current_agent"] = None
@app.post("/api/admin/audit/run")
async def run_audit(
request: Request,
current_user: dict = Depends(require_admin)
):
"""Kick off a Gemini audit batch in the background. Optional body: {agent_id, unclassified_only}.
Returns immediately with {status: started}. Poll /api/admin/audit/status for progress.
"""
if not audit_analyzer.is_gemini_configured():
raise HTTPException(status_code=503, detail="GOOGLE_API_KEY not configured")
if audit_analyzer._batch_state.get("running"):
raise HTTPException(status_code=409, detail="An audit batch is already running")
body = {}
try:
body = await request.json()
except Exception:
pass
agent_id = body.get("agent_id")
unclassified_only = body.get("unclassified_only", False)
if agent_id:
mode = "single"
elif unclassified_only:
mode = "unclassified"
else:
mode = "full"
audit_analyzer._reset_batch_state(mode=mode, started_by=current_user.get("email"))
asyncio.create_task(_audit_batch_runner(unclassified_only=unclassified_only, agent_id=agent_id))
return {"status": "started", "mode": mode}
@app.get("/api/admin/audit/status")
async def audit_status(current_user: dict = Depends(require_admin_or_readonly)):
"""Current state of the audit batch (poll while running)."""
return audit_analyzer.get_batch_state()
@app.get("/api/admin/audit/results")
async def get_audit_results(current_user: dict = Depends(require_admin_or_readonly)):
"""Get all agents with audit data for the Prompt Audit tab"""
agents = await audit_analyzer.get_all_audit_results()
return {
"agents": agents,
"config_status": {
"gemini_configured": audit_analyzer.is_gemini_configured()
}
}
@app.put("/api/admin/audit/{agent_id}/review")
async def review_audit(
agent_id: str,
review: models.AuditReviewRequest,
current_user: dict = Depends(require_admin)
):
"""Mark an agent's audit as reviewed/cleared"""
await audit_analyzer.update_audit_review(
agent_id=agent_id,
status=review.audit_status,
notes=review.reviewer_notes or "",
reviewer_email=current_user["email"]
)
return {"message": f"Audit status updated to {review.audit_status}"}
@app.get("/api/admin/analytics")
async def get_admin_analytics(
status: Optional[str] = None,
discipline: Optional[str] = None,
days: int = 90,
current_user: dict = Depends(require_admin_or_readonly),
):
"""Get analytics data for admin dashboard"""
(
summary,
status_breakdown,
discipline_breakdown,
usage_timeline,
top_by_messages,
top_by_tokens,
recently_active,
all_users,
) = await asyncio.gather(
crud.get_analytics_summary(status_filter=status, discipline_filter=discipline),
crud.get_agent_stats(discipline_filter=discipline),
crud.get_discipline_breakdown(status_filter=status),
crud.get_aggregated_usage_timeline(days, status_filter=status, discipline_filter=discipline),
crud.get_top_agents("total_messages", 5, status_filter=status, discipline_filter=discipline),
crud.get_top_agents("total_tokens", 5, status_filter=status, discipline_filter=discipline),
crud.get_recently_active_agents(10, status_filter=status, discipline_filter=discipline),
crud.get_all_users(),
)
# Rating distribution: count agents in each star bucket (1-5)
all_agents = await crud.get_all_agents(status_filter=status)
filtered_agents = all_agents
if discipline:
filtered_agents = [a for a in all_agents if a.get("discipline") == discipline]
rating_dist = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
for agent in filtered_agents:
r = agent.get("rating")
if r is not None:
bucket = max(1, min(5, round(r)))
rating_dist[bucket] += 1
total_users = len(all_users)
admin_users = sum(1 for u in all_users if u.get("is_admin"))
active_users = sum(1 for u in all_users if u.get("is_active"))
return {
"summary": summary,
"status_breakdown": status_breakdown,
"discipline_breakdown": discipline_breakdown,
"usage_timeline": usage_timeline,
"top_by_messages": top_by_messages,
"top_by_tokens": top_by_tokens,
"recently_active": recently_active,
"rating_distribution": rating_dist,
"total_users": total_users,
"admin_users": admin_users,
"active_users": active_users,
"total_agents": len(filtered_agents),
}
@app.get("/api/admin/agents", response_model=List[models.AiAgentResponse])
async def get_all_agents_admin(current_user: dict = Depends(require_admin_or_readonly)):
agents = await crud.get_all_agents()
return [
create_agent_response(agent) for agent in agents
]
EXPORT_FIELDNAMES = [
"agent_id",
"agent_name",
"agent_tool",
"agent_description",
"agent_purpose",
"agent_version",
"agent_status",
"agent_location",
"agent_department",
"agent_contact_person",
"agent_created_at",
"agent_updated_at",
"agent_tags",
"agent_metadata",
"agent_userbase",
"agent_capabilities",
"url",
"quality_audit_status",
"quality_audit_updated_by",
"quality_audit_updated_at",
"quality_audit_updated_by_name",
"risk_factor",
"last_edited_by",
"created_by",
"total_tokens",
"prompt_tokens",
"completion_tokens",
"discipline",
"rating",
"rating_count",
"instructions",
"business_entity",
"client",
"client_scope",
"client_name",
"studio_name",
"agent_classification",
"autonomy_level",
"ip_ownership",
"foundation_model",
"validated_by",
"validation_date",
"evals_method",
"registration_complete",
"safety_off_switch_confirmed",
"safety_access_rights_confirmed",
"pii_handles_pii",
"pii_legal_ref",
"pii_data_types",
"pii_consent_recorded",
"decl_governance",
"decl_accuracy",
"decl_upkeep",
]
def _export_row(agent: dict) -> dict:
return {
"agent_id": str(agent["_id"]),
"agent_name": agent.get("agent_name", ""),
"agent_tool": agent.get("agent_tool", ""),
"agent_description": agent.get("agent_description", ""),
"agent_purpose": agent.get("agent_purpose", ""),
"agent_version": agent.get("agent_version", ""),
"agent_status": agent.get("agent_status", ""),
"agent_location": agent.get("agent_location", ""),
"agent_department": agent.get("agent_department", ""),
"agent_contact_person": agent.get("agent_contact_person", ""),
"agent_created_at": agent["created_at"].isoformat() if agent.get("created_at") else "",
"agent_updated_at": agent["updated_at"].isoformat() if agent.get("updated_at") else "",
"agent_tags": "|".join(agent.get("agent_tags", [])) if agent.get("agent_tags") else "",
"agent_userbase": "|".join(agent.get("agent_userbase", [])) if agent.get("agent_userbase") else "",
"agent_capabilities": "|".join(agent.get("agent_capabilities", [])) if agent.get("agent_capabilities") else "",
"agent_metadata": json.dumps(sanitize_metadata(agent.get("agent_metadata"))) if agent.get("agent_metadata") else "",
"url": agent.get("url", ""),
"quality_audit_status": str(agent.get("quality_audit_status", False)),
"quality_audit_updated_by": agent.get("quality_audit_updated_by", ""),
"quality_audit_updated_at": agent.get("quality_audit_updated_at", ""),
"quality_audit_updated_by_name": agent.get("quality_audit_updated_by_name", ""),
"risk_factor": str(agent.get("risk_factor", "")) if agent.get("risk_factor") is not None else "",
"last_edited_by": agent.get("last_edited_by", ""),
"created_by": agent.get("created_by", ""),
"total_tokens": str(agent.get("total_tokens", "")) if agent.get("total_tokens") is not None else "",
"prompt_tokens": str(agent.get("prompt_tokens", "")) if agent.get("prompt_tokens") is not None else "",
"completion_tokens": str(agent.get("completion_tokens", "")) if agent.get("completion_tokens") is not None else "",
"discipline": agent.get("discipline", ""),
"rating": str(agent.get("rating", "")) if agent.get("rating") is not None else "",
"rating_count": str(agent.get("rating_count", "")) if agent.get("rating_count") is not None else "",
"instructions": agent.get("instructions", ""),
"business_entity": agent.get("business_entity", ""),
"client": agent.get("client", ""),
"client_scope": agent.get("client_scope", ""),
"client_name": agent.get("client_name", ""),
"studio_name": agent.get("studio_name", ""),
"agent_classification": agent.get("agent_classification", ""),
"autonomy_level": agent.get("autonomy_level", ""),
"ip_ownership": agent.get("ip_ownership", ""),
"foundation_model": agent.get("foundation_model", ""),
"validated_by": agent.get("validated_by", ""),
"validation_date": agent.get("validation_date", ""),
"evals_method": agent.get("evals_method", ""),
"registration_complete": str(agent.get("registration_complete", "")) if agent.get("registration_complete") is not None else "",
"safety_off_switch_confirmed": str((agent.get("safety") or {}).get("off_switch_confirmed", "")) if agent.get("safety") else "",
"safety_access_rights_confirmed": str((agent.get("safety") or {}).get("access_rights_confirmed", "")) if agent.get("safety") else "",
"pii_handles_pii": str((agent.get("pii") or {}).get("handles_pii", "")) if agent.get("pii") else "",
"pii_legal_ref": (agent.get("pii") or {}).get("legal_ref", "") if agent.get("pii") else "",
"pii_data_types": (agent.get("pii") or {}).get("data_types", "") if agent.get("pii") else "",
"pii_consent_recorded": str((agent.get("pii") or {}).get("consent_recorded", "")) if agent.get("pii") else "",
"decl_governance": str((agent.get("declarations") or {}).get("governance", "")) if agent.get("declarations") else "",
"decl_accuracy": str((agent.get("declarations") or {}).get("accuracy", "")) if agent.get("declarations") else "",
"decl_upkeep": str((agent.get("declarations") or {}).get("upkeep", "")) if agent.get("declarations") else "",
}
def _matches_export_filters(
agent: dict,
status: str,
discipline: str,
audit: str,
business_entity: str,
agent_classification: str,
autonomy_level: str,
risks_only: bool,
search: str,
) -> bool:
if status and agent.get("agent_status") != status:
return False
if discipline and agent.get("discipline") != discipline:
return False
if audit == "audited" and not agent.get("quality_audit_status"):
return False
if audit == "not_audited" and agent.get("quality_audit_status"):
return False
if business_entity and agent.get("business_entity") != business_entity:
return False
if agent_classification and agent.get("agent_classification") != agent_classification:
return False
if autonomy_level and agent.get("autonomy_level") != autonomy_level:
return False
if risks_only:
pii_yes = bool((agent.get("pii") or {}).get("handles_pii"))
ip_shared = agent.get("ip_ownership") == "Shared/TBD"
autopilot = agent.get("autonomy_level") == "Autopilot"
if not (pii_yes or ip_shared or autopilot):
return False
if search:
needle = search.lower()
haystack = " ".join(
str(agent.get(f) or "") for f in (
"agent_name", "agent_department", "agent_purpose",
"agent_description", "agent_contact_person", "discipline",
)
).lower()
if needle not in haystack:
return False
return True
@app.get("/api/admin/agents/export/xlsx")
async def export_agents_xlsx(
current_user: dict = Depends(require_admin),
status: str = Query("", description="Filter by agent_status"),
discipline: str = Query("", description="Filter by discipline (Strategy | Creative | ...)"),
audit: str = Query("", description="audited | not_audited"),
business_entity: str = Query("", description="Filter by business_entity"),
agent_classification: str = Query("", description="Utility | Functional | Supervisory | Guardian"),
autonomy_level: str = Query("", description="Human-Led | Hybrid | Autopilot"),
risks_only: bool = Query(False, description="Only PII=Yes, IP=Shared/TBD, or Autopilot"),
search: str = Query("", description="Case-insensitive substring across name/department/purpose/description/contact/discipline"),
):
"""Export agent data as a real .xlsx file. Multi-line instructions stay in one cell.
Optional filters mirror the admin dashboard Agent Management tab; with no params,
every agent is exported."""
agents = await crud.get_all_agents()
agents = [
a for a in agents if _matches_export_filters(
a, status, discipline, audit, business_entity, agent_classification,
autonomy_level, risks_only, search,
)
]
wb = Workbook()
ws = wb.active
ws.title = "agents"
ws.append(EXPORT_FIELDNAMES)
header_font = Font(bold=True)
for cell in ws[1]:
cell.font = header_font
wrap = Alignment(wrap_text=True, vertical="top")
for agent in agents:
row = _export_row(agent)
ws.append([row[name] for name in EXPORT_FIELDNAMES])
for cell in ws[ws.max_row]:
cell.alignment = wrap
ws.freeze_panes = "A2"
# Width heuristic: clamp to the longest value in the column, capped at 60 chars
# so the instructions column stays readable instead of stretching to thousands.
for col_idx, name in enumerate(EXPORT_FIELDNAMES, start=1):
max_len = len(name)
for row in ws.iter_rows(min_row=2, min_col=col_idx, max_col=col_idx, values_only=True):
v = row[0]
if v is None:
continue
# Only consider the first line for width — multi-line cells wrap.
first_line = str(v).split("\n", 1)[0]
if len(first_line) > max_len:
max_len = len(first_line)
ws.column_dimensions[ws.cell(row=1, column=col_idx).column_letter].width = min(max_len + 2, 60)
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return StreamingResponse(
buf,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename=agents_export_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.xlsx"
},
)
@app.post("/api/admin/agents/import/csv")
async def import_agents_csv(
request: Request,
file: UploadFile = File(...),
current_user: dict = Depends(require_admin)
):
"""Import agents from CSV file"""
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be a CSV")
try:
contents = await file.read()
decoded = contents.decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(decoded))
success_count = 0
skipped_count = 0
error_count = 0
errors = []
user_id = str(current_user["_id"])
for row_num, row in enumerate(csv_reader, start=1):
try:
agent_name_from_csv = row.get("agent_name")
if not agent_name_from_csv:
skipped_count += 1
continue
def _csv_bool(val):
"""Parse CSV boolean cell; empty / unrecognised → None so we don't overwrite with False."""
if val is None or val == "":
return None
return val.strip().lower() in ("true", "1", "yes", "y")
# Parse fields into agent_data first
agent_data = {
"agent_name": agent_name_from_csv,
"agent_tool": row.get("agent_tool", ""),
"agent_description": row.get("agent_description"),
"agent_purpose": row.get("agent_purpose"),
"agent_version": row.get("agent_version"),
"agent_status": row.get("agent_status") or "Development",
"agent_location": row.get("agent_location"),
"agent_department": row.get("agent_department"),
"agent_contact_person": row.get("agent_contact_person"),
"url": row.get("url"),
"quality_audit_status": row.get("quality_audit_status", "False").lower() == "true",
"risk_factor": int(row.get("risk_factor")) if row.get("risk_factor") else None,
"discipline": row.get("discipline") or None,
"rating": float(row.get("rating")) if row.get("rating") else None,
"total_tokens": int(row.get("total_tokens")) if row.get("total_tokens") else None,
# Governance / registration fields
"business_entity": row.get("business_entity") or None,
"client": row.get("client") or None,
"client_scope": row.get("client_scope") or None,
"client_name": row.get("client_name") or None,
"studio_name": row.get("studio_name") or None,
"agent_classification": row.get("agent_classification") or None,
"autonomy_level": row.get("autonomy_level") or None,
"ip_ownership": row.get("ip_ownership") or None,
"foundation_model": row.get("foundation_model") or None,
"validated_by": row.get("validated_by") or None,
"validation_date": row.get("validation_date") or None,
"evals_method": row.get("evals_method") or None,
"registration_complete": _csv_bool(row.get("registration_complete")),
}
# Build nested safety / pii / declarations objects only when at least one
# cell is populated, so empty CSV rows don't clobber existing nested data.
safety_off = _csv_bool(row.get("safety_off_switch_confirmed"))
safety_access = _csv_bool(row.get("safety_access_rights_confirmed"))
if safety_off is not None or safety_access is not None:
agent_data["safety"] = {
"off_switch_confirmed": safety_off,
"access_rights_confirmed": safety_access,
}
pii_handles = _csv_bool(row.get("pii_handles_pii"))
pii_legal = row.get("pii_legal_ref") or None
pii_data = row.get("pii_data_types") or None
pii_consent = _csv_bool(row.get("pii_consent_recorded"))
if any(v is not None for v in (pii_handles, pii_legal, pii_data, pii_consent)):
agent_data["pii"] = {
"handles_pii": pii_handles,
"legal_ref": pii_legal,
"data_types": pii_data,
"consent_recorded": pii_consent,
}
decl_gov = _csv_bool(row.get("decl_governance"))
decl_acc = _csv_bool(row.get("decl_accuracy"))
decl_upk = _csv_bool(row.get("decl_upkeep"))
if any(v is not None for v in (decl_gov, decl_acc, decl_upk)):
agent_data["declarations"] = {
"governance": decl_gov,
"accuracy": decl_acc,
"upkeep": decl_upk,
}
# Handle lists (pipe separated)
if row.get("agent_tags"):
agent_data["agent_tags"] = [t.strip() for t in row.get("agent_tags").split("|") if t.strip()]
if row.get("agent_userbase"):
agent_data["agent_userbase"] = [u.strip() for u in row.get("agent_userbase").split("|") if u.strip()]
if row.get("agent_capabilities"):
agent_data["agent_capabilities"] = [c.strip() for c in row.get("agent_capabilities").split("|") if c.strip()]
# Handle metadata (JSON string)
if row.get("agent_metadata"):
try:
agent_data["agent_metadata"] = json.loads(row.get("agent_metadata"))
except:
pass # Ignore invalid metadata JSON
# Check for existing agent with same name
existing_agent = await crud.get_agent_by_name(agent_data["agent_name"])
if existing_agent:
# If duplicate, append location to name
location = agent_data.get("agent_location")
if location and location.strip():
agent_data["agent_name"] = f"{agent_data['agent_name']} - {location.strip()}"
else:
# Fallback if no location
agent_data["agent_name"] = f"{agent_data['agent_name']} - Imported"
# Create agent
await crud.create_agent(agent_data, user_id, skip_time_check=True)
success_count += 1
except Exception as e:
error_count += 1
errors.append(f"Row {row_num}: {str(e)}")
return JSONResponse({
"success": True,
"imported": success_count,
"skipped": skipped_count,
"errors": error_count,
"error_details": errors[:10] # Limit error details
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
@app.post("/api/admin/agents/delete/csv")
async def delete_agents_csv(
request: Request,
file: UploadFile = File(...),
current_user: dict = Depends(require_admin)
):
"""Delete agents based on CSV file"""
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be a CSV")
try:
contents = await file.read()
decoded = contents.decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(decoded))
deleted_count = 0
not_found_count = 0
error_count = 0
errors = []
for row_num, row in enumerate(csv_reader, start=1):
try:
agent_id = row.get("agent_id")
agent_name = row.get("agent_name")
if agent_id and agent_id.strip():
# Delete by ID
success = await crud.delete_agent(agent_id.strip())
if success:
deleted_count += 1
else:
not_found_count += 1
elif agent_name and agent_name.strip():
# Delete by Name (All matches)
# We need to find them first
# Using direct collection access here for efficiency/custom logic not in crud
from database import agents_collection
result = await agents_collection.delete_many({"agent_name": agent_name.strip()})
if result.deleted_count > 0:
deleted_count += result.deleted_count
else:
not_found_count += 1
else:
# Skip empty rows
continue
except Exception as e:
error_count += 1
errors.append(f"Row {row_num}: {str(e)}")
return JSONResponse({
"success": True,
"deleted": deleted_count,
"not_found": not_found_count,
"errors": error_count,
"error_details": errors[:10]
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}")
# Agent Collector API Endpoints (for compatibility with agent_collector app)
@app.post("/agents")
async def create_agent_collector(
agent: models.AgentCollectorCreate,
request: Request,
api_key_valid: bool = Depends(verify_agent_collector_api_key)
):
"""Agent collector API endpoint - handles both new registrations and usage tracking"""
try:
# Check content type
content_type = request.headers.get("content-type", "")
if not content_type.startswith("application/json"):
return JSONResponse(
status_code=415,
content={
"error": "Unsupported Media Type",
"message": "Request must be JSON"
}
)
# Check if agent already exists by name
existing_agent = await crud.get_agent_by_name(agent.name)
if existing_agent:
# Agent exists - log usage AND update agent document with new usage data
internal_data = map_agent_collector_to_internal(agent)
print(f"🔗 URL DEBUG: Agent '{agent.name}' - URL in internal_data: {internal_data.get('url')}")
# Create usage record for fallback purposes (existing system)
await crud.create_agent_usage_record(agent.name, internal_data)
# Update agent document with new usage data (replace strategy)
update_fields = {
"url": internal_data.get("url"),
"discipline": internal_data.get("discipline"),
"instructions": internal_data.get("instructions"),
"usage_timeline": internal_data.get("usage_timeline"),
"conversation_count": internal_data.get("conversation_count"),
"unique_users": internal_data.get("unique_users"),
"total_messages": internal_data.get("total_messages"),
"first_used": internal_data.get("first_used"),
"last_used": internal_data.get("last_used"),
"total_tokens": internal_data.get("total_tokens"),
"prompt_tokens": internal_data.get("prompt_tokens"),
"completion_tokens": internal_data.get("completion_tokens"),
"updated_at": datetime.utcnow()
}
# Only update non-None usage fields
update_fields = {k: v for k, v in update_fields.items() if k == "updated_at" or v is not None}
print(f"🔗 URL DEBUG: Agent '{agent.name}' - update_fields after filter: {update_fields}")
print(f"🔗 URL DEBUG: Agent '{agent.name}' - URL in update_fields: {update_fields.get('url')}")
if len(update_fields) > 1: # More than just updated_at
from database import agents_collection
result = await agents_collection.update_one(
{"agent_name": agent.name},
{"$set": update_fields}
)
print(f"🔗 URL DEBUG: Agent '{agent.name}' - Update executed: matched={result.matched_count}, modified={result.modified_count}")
# Check weekly token threshold and send notification if needed (non-blocking)
try:
await notifications.check_and_notify_threshold(agent.name)
except Exception as notify_err:
print(f"Notification check failed for '{agent.name}': {notify_err}")
# Auto-classify with Gemini in background (non-blocking)
if audit_analyzer.is_gemini_configured() and internal_data.get("instructions"):
asyncio.create_task(audit_analyzer.classify_single_agent(str(existing_agent["_id"])))
return models.AgentUsageTrackingResponse(
status="usage_logged",
message="Agent already exists, usage tracked",
agent_name=agent.name
)
else:
# Agent doesn't exist - create new registration
internal_data = map_agent_collector_to_internal(agent)
# Handle datetime fields if provided
if agent.creation_date:
internal_data["agent_created_at"] = agent.creation_date
if agent.last_updated:
internal_data["agent_updated_at"] = agent.last_updated
# Create agent using collector-specific function
created_agent = await crud.create_agent_from_collector(internal_data)
# Check weekly token threshold and send notification if needed (non-blocking)
try:
await notifications.check_and_notify_threshold(agent.name)
except Exception as notify_err:
print(f"Notification check failed for '{agent.name}': {notify_err}")
# Auto-classify with Gemini in background (non-blocking)
if audit_analyzer.is_gemini_configured() and internal_data.get("instructions"):
asyncio.create_task(audit_analyzer.classify_single_agent(str(created_agent["_id"])))
return models.AgentCollectorResponse(
status="success",
message="Agent data collected successfully",
agent_id=str(created_agent["_id"])
)
except Exception as e:
# Check if it's a database connectivity issue
from database import check_database_health
try:
db_health = await check_database_health()
if not db_health.get("healthy"):
return JSONResponse(
status_code=503,
content={
"error": "Database Unavailable",
"message": "MongoDB connection is not available. Please check the database setup.",
"agent_data": agent.model_dump()
}
)
except:
pass
# General database error
return JSONResponse(
status_code=500,
content={
"error": "Database Error",
"message": "Failed to store agent data. MongoDB may be unavailable or there was an error processing the request.",
"agent_data": agent.model_dump()
}
)
# Agent Usage API Endpoints
@app.get("/api/agents/{agent_name}/usage", response_model=models.AgentUsageStatsResponse)
async def get_agent_usage(
agent_name: str,
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
current_user: dict = Depends(get_current_user_from_cookie)
):
"""Get usage statistics for a specific agent (hybrid: pre-computed data with fallback to calculated)"""
try:
# Get agent by name to check for pre-computed usage data
agent = await crud.get_agent_by_name(agent_name)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
# Check if agent has pre-computed usage data
has_usage_data = agent.get("usage_timeline") is not None
if has_usage_data:
# USE PRE-COMPUTED DATA (new system)
usage_timeline = agent.get("usage_timeline", [])
# Convert timeline to usage_by_period format for response compatibility
usage_by_period = {entry["date"]: entry["message_count"] for entry in usage_timeline}
return models.AgentUsageStatsResponse(
agent_name=agent_name,
total_usage_count=agent.get("total_messages", 0),
first_usage=agent.get("first_used"),
last_usage=agent.get("last_used"),
usage_by_period=usage_by_period,
conversation_count=agent.get("conversation_count"),
unique_users=agent.get("unique_users"),
total_tokens=agent.get("total_tokens"),
prompt_tokens=agent.get("prompt_tokens"),
completion_tokens=agent.get("completion_tokens")
)
else:
# FALLBACK TO CALCULATED DATA (old system)
# Parse date strings if provided
start_dt = None
end_dt = None
if start_date:
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
if end_date:
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
# Get usage stats from agent_usage_collection
stats = await crud.get_agent_usage_stats(agent_name, start_dt, end_dt)
# Get usage by period for the response
usage_by_period = await crud.get_agent_usage_by_period(agent_name, "daily", start_dt, end_dt)
return models.AgentUsageStatsResponse(
agent_name=agent_name,
total_usage_count=stats["total_usage_count"],
first_usage=stats["first_usage"].isoformat() if stats["first_usage"] else None,
last_usage=stats["last_usage"].isoformat() if stats["last_usage"] else None,
usage_by_period=usage_by_period,
conversation_count=None, # Old system doesn't track this
unique_users=None, # Old system doesn't track this
total_tokens=None, # Old system doesn't track this
prompt_tokens=None,
completion_tokens=None
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get usage stats: {str(e)}")
@app.get("/api/agents/{agent_name}/usage/chart")
async def get_agent_usage_chart(
agent_name: str,
period: str = Query("daily", regex="^(daily|weekly|monthly)$"),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
current_user: dict = Depends(get_current_user_from_cookie)
):
"""Get usage chart data for a specific agent (hybrid: pre-computed data with fallback to calculated)"""
try:
# Get agent by name to check for pre-computed usage data
agent = await crud.get_agent_by_name(agent_name)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
# Check if agent has pre-computed usage timeline
has_usage_timeline = agent.get("usage_timeline") is not None
if has_usage_timeline:
# USE PRE-COMPUTED TIMELINE (new system)
usage_timeline = agent.get("usage_timeline", [])
# Format for Chart.js with dual datasets
datasets = [{
"label": "Message Count",
"data": [entry["message_count"] for entry in usage_timeline],
"backgroundColor": "rgba(54, 162, 235, 0.2)",
"borderColor": "rgba(54, 162, 235, 1)",
"borderWidth": 1,
"yAxisID": "y"
}]
# Add token dataset if any entries have token data
token_data = [entry.get("token_count", 0) for entry in usage_timeline]
if any(t > 0 for t in token_data):
datasets.append({
"label": "Token Count",
"data": token_data,
"backgroundColor": "rgba(255, 193, 7, 0.2)",
"borderColor": "rgba(255, 193, 7, 1)",
"borderWidth": 1,
"yAxisID": "y1"
})
chart_data = {
"labels": [entry["date"] for entry in usage_timeline],
"datasets": datasets
}
return chart_data
else:
# FALLBACK TO CALCULATED DATA (old system)
# Parse date strings if provided
start_dt = None
end_dt = None
if start_date:
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
if end_date:
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
# Get usage data grouped by period from agent_usage_collection
usage_data = await crud.get_agent_usage_by_period(agent_name, period, start_dt, end_dt)
# Format for Chart.js
chart_data = {
"labels": list(usage_data.keys()),
"datasets": [{
"label": f"Usage Count ({period})",
"data": list(usage_data.values()),
"backgroundColor": "rgba(54, 162, 235, 0.2)",
"borderColor": "rgba(54, 162, 235, 1)",
"borderWidth": 1
}]
}
return chart_data
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get chart data: {str(e)}")
# MSAL routes no longer needed - using popup authentication