Export & filtering - Replace /api/admin/agents/export/csv with /api/admin/agents/export/xlsx (openpyxl). Multi-line system prompts stay inside one cell instead of fragmenting into thousands of rows when opened in Excel. - Accept filter query params on export: status, discipline, audit, business_entity, agent_classification, autonomy_level, risks_only, search. - Move Export/Import CSV/Delete by CSV buttons into the Agents Management tab, drop the duplicate links from the top nav, and rebuild the cramped filter row as a wrappable two-row layout. - Add a Discipline dropdown to the Agents Management filter row to match the Prompt Audit tab. Completion-reminder emails (fix for the broken Complete-button links) - Add h:Reply-To header to every Mailgun send so users can reply to a real mailbox instead of noreply@. Default Nick.Viljoen@oliver.agency, overridable via NOTIFICATION_REPLY_TO env var. - send_completion_reminders now skips with an error log when AGENTHUB_PUBLIC_URL is unset instead of mailing relative links email clients can't follow. UI polish - Restrict the 5-second alert auto-hide to .alert-dismissible so the load-bearing 'unresolved owner' banner stays visible until acted on. - Move the orange brand gradient to a fixed body::before pseudo-element so it can't be covered by the white content card or scrolled out of view. - Lock the navbar to viewport top (position: fixed) and reserve body padding-top so content doesn't sit beneath it. Audit polish (carried over from previous WIP) - Add batch-state tracking to audit_analyzer for in-flight progress visibility. - Update PLAN-prompt-audit.md to match the shipped behaviour. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2507 lines
102 KiB
Python
2507 lines
102 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, Request, Form, Query, File, UploadFile
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse, StreamingResponse
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from typing import List, Optional
|
|
from pydantic import BaseModel
|
|
from database import users_collection
|
|
import crud
|
|
import models
|
|
import auth
|
|
import config
|
|
import msal_auth
|
|
import notifications
|
|
import audit_analyzer
|
|
from datetime import datetime
|
|
import os
|
|
import re
|
|
from dotenv import load_dotenv
|
|
from fastapi import Header
|
|
import csv
|
|
import io
|
|
import json
|
|
import asyncio
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Alignment, Font
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI(
|
|
title="AgentHub",
|
|
description="AI Agent Management System with comprehensive CRUD operations",
|
|
version="1.0.0",
|
|
root_path=config.get_base_path()
|
|
)
|
|
|
|
# Add session middleware for MSAL state management
|
|
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-session-secret-key"))
|
|
|
|
# Mount static files with explicit path handling
|
|
from fastapi.responses import FileResponse
|
|
import os as path_os
|
|
|
|
@app.get("/static/{filename:path}")
|
|
async def serve_static(filename: str):
|
|
"""Serve static files with proper path handling"""
|
|
file_path = path_os.path.join("static", filename)
|
|
if path_os.path.exists(file_path) and path_os.path.isfile(file_path):
|
|
return FileResponse(file_path)
|
|
raise HTTPException(status_code=404, detail="Static file not found")
|
|
|
|
# Also mount the traditional way as backup
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
|
|
|
|
def get_app_url(path: str) -> str:
|
|
"""Get URL with proper base path - production fix"""
|
|
base_path = os.getenv("BASE_PATH", "").rstrip("/")
|
|
path = path.lstrip("/")
|
|
if base_path:
|
|
return f"{base_path}/{path}"
|
|
return f"/{path}" if path else "/"
|
|
|
|
# Debug route to check configuration
|
|
@app.get("/debug/config")
|
|
async def debug_config():
|
|
return {
|
|
"base_path": config.get_base_path(),
|
|
"env_base_path": os.getenv("BASE_PATH"),
|
|
"root_path_from_app": app.root_path,
|
|
"get_app_url_test": get_app_url("agent-management")
|
|
}
|
|
|
|
def get_template_context(request: Request, current_user=None, **kwargs):
|
|
"""Get standard template context with base path and MSAL info"""
|
|
return {
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"base_path": config.get_base_path(),
|
|
"msal_enabled": msal_auth.is_msal_available(),
|
|
"show_local_login": config.show_local_login(),
|
|
**kwargs
|
|
}
|
|
|
|
def is_valid_email(email: str) -> bool:
|
|
"""Validate email format using regex"""
|
|
if not email:
|
|
return False
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
return re.match(pattern, email) is not None
|
|
|
|
def can_user_edit_agent(agent: dict, current_user: dict) -> bool:
|
|
"""Check if user can edit an agent based on ownership or contact person match"""
|
|
# Admins can edit any agent
|
|
if current_user.get("is_admin"):
|
|
return True
|
|
|
|
# User can edit agents they created
|
|
if agent["created_by"] == str(current_user["_id"]):
|
|
return True
|
|
|
|
# User can edit agents where they are the contact person (if valid email and case-insensitive match)
|
|
agent_contact = agent.get("agent_contact_person")
|
|
if agent_contact and is_valid_email(agent_contact):
|
|
user_email = current_user.get("email", "")
|
|
if agent_contact.lower() == user_email.lower():
|
|
return True
|
|
|
|
return False
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme)):
|
|
payload = auth.decode_access_token(token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
async def get_current_user_optional(request: Request):
|
|
"""Get current user from cookie if available"""
|
|
token = request.cookies.get("access_token")
|
|
if token:
|
|
try:
|
|
payload = auth.decode_access_token(token)
|
|
if payload:
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
if user:
|
|
# Apply view mode override if admin is viewing as user
|
|
view_mode = request.session.get("view_mode", "admin")
|
|
if user.get("is_admin") and view_mode == "user":
|
|
user["actual_is_admin"] = True
|
|
user["is_admin"] = False
|
|
return user
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
async def get_current_user_from_cookie(request: Request):
|
|
"""Get current user from cookie for API endpoints"""
|
|
token = request.cookies.get("access_token")
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
payload = auth.decode_access_token(token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Apply view mode override if admin is viewing as user
|
|
view_mode = request.session.get("view_mode", "admin")
|
|
if user.get("is_admin") and view_mode == "user":
|
|
user["actual_is_admin"] = True
|
|
user["is_admin"] = False
|
|
|
|
return user
|
|
|
|
async def require_admin(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Require admin access"""
|
|
if not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return current_user
|
|
|
|
async def require_admin_or_readonly(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Require admin or readonly_admin access"""
|
|
if current_user.get("is_admin"):
|
|
return current_user
|
|
if current_user.get("role") == "readonly_admin":
|
|
return current_user
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
def sanitize_metadata(metadata: dict) -> dict[str, str]:
|
|
"""
|
|
Sanitize agent metadata to ensure all values are strings.
|
|
Handles MongoDB ObjectId references, lists, and other complex objects.
|
|
"""
|
|
if not metadata or not isinstance(metadata, dict):
|
|
return {}
|
|
|
|
sanitized = {}
|
|
|
|
for key, value in metadata.items():
|
|
if value is None:
|
|
continue
|
|
elif isinstance(value, str):
|
|
sanitized[key] = value
|
|
elif isinstance(value, list):
|
|
# Handle MongoDB ObjectId references: [{"$oid": "..."}] or regular lists
|
|
if value and isinstance(value[0], dict) and "$oid" in value[0]:
|
|
# Extract ObjectId strings
|
|
oid_values = [item.get("$oid", str(item)) for item in value if isinstance(item, dict) and "$oid" in item]
|
|
sanitized[key] = ",".join(oid_values) if oid_values else str(value)
|
|
else:
|
|
# Convert list to comma-separated string
|
|
sanitized[key] = ",".join(str(item) for item in value)
|
|
elif isinstance(value, dict):
|
|
# Handle nested dictionaries - convert to JSON string
|
|
import json
|
|
try:
|
|
sanitized[key] = json.dumps(value, default=str)
|
|
except (TypeError, ValueError):
|
|
sanitized[key] = str(value)
|
|
else:
|
|
# Convert other types to string
|
|
sanitized[key] = str(value)
|
|
|
|
return sanitized
|
|
|
|
def create_agent_response(agent: dict) -> models.AiAgentResponse:
|
|
"""Helper function to create AiAgentResponse with all fields including Quality Audit and Usage Tracking"""
|
|
return models.AiAgentResponse(
|
|
agent_id=str(agent["_id"]),
|
|
agent_name=agent["agent_name"],
|
|
agent_tool=agent.get("agent_tool"),
|
|
agent_description=agent.get("agent_description"),
|
|
agent_purpose=agent.get("agent_purpose"),
|
|
agent_version=agent.get("agent_version"),
|
|
agent_status=agent.get("agent_status"),
|
|
agent_location=agent.get("agent_location"),
|
|
agent_department=agent.get("agent_department"),
|
|
agent_contact_person=agent.get("agent_contact_person"),
|
|
agent_created_at=agent["created_at"].isoformat() if agent.get("created_at") else None,
|
|
agent_updated_at=agent["updated_at"].isoformat() if agent.get("updated_at") else None,
|
|
agent_tags=agent.get("agent_tags"),
|
|
agent_metadata=sanitize_metadata(agent.get("agent_metadata")) if agent.get("agent_metadata") else None,
|
|
agent_userbase=agent.get("agent_userbase"),
|
|
agent_capabilities=agent.get("agent_capabilities"),
|
|
url=agent.get("url"),
|
|
quality_audit_status=agent.get("quality_audit_status", False),
|
|
quality_audit_updated_by=agent.get("quality_audit_updated_by"),
|
|
quality_audit_updated_at=agent.get("quality_audit_updated_at"),
|
|
quality_audit_updated_by_name=agent.get("quality_audit_updated_by_name"),
|
|
risk_factor=agent.get("risk_factor"),
|
|
last_edited_by=agent.get("last_edited_by"),
|
|
discipline=agent.get("discipline"),
|
|
rating=agent.get("rating"),
|
|
rating_count=agent.get("rating_count"),
|
|
client=agent.get("client"),
|
|
client_name=agent.get("client_name"),
|
|
studio_name=agent.get("studio_name"),
|
|
verification_status=agent.get("verification_status"),
|
|
verified_by=agent.get("verified_by"),
|
|
verified_date=agent.get("verified_date"),
|
|
instructions=agent.get("instructions"),
|
|
audit_status=agent.get("audit_status"),
|
|
audit_date=agent.get("audit_date"),
|
|
audit_category=agent.get("audit_category"),
|
|
audit_risk_level=agent.get("audit_risk_level"),
|
|
created_by=agent["created_by"],
|
|
# Usage tracking fields
|
|
usage_timeline=agent.get("usage_timeline"),
|
|
conversation_count=agent.get("conversation_count"),
|
|
unique_users=agent.get("unique_users"),
|
|
total_messages=agent.get("total_messages"),
|
|
first_used=agent.get("first_used"),
|
|
last_used=agent.get("last_used"),
|
|
total_tokens=agent.get("total_tokens"),
|
|
prompt_tokens=agent.get("prompt_tokens"),
|
|
completion_tokens=agent.get("completion_tokens")
|
|
)
|
|
|
|
async def verify_agent_collector_api_key(x_api_key: str = Header(alias="X-API-Key")):
|
|
"""Verify static API key for agent collector endpoints"""
|
|
expected_key = os.getenv("AGENT_COLLECTOR_API_KEY")
|
|
if not expected_key:
|
|
raise HTTPException(status_code=500, detail="API key not configured")
|
|
if x_api_key != expected_key:
|
|
raise HTTPException(status_code=401, detail="Invalid API key")
|
|
return True
|
|
|
|
def map_agent_collector_to_internal(collector_data: models.AgentCollectorCreate) -> dict:
|
|
"""Map agent collector field names to internal schema"""
|
|
# Normalize status to Title Case for internal storage
|
|
status = collector_data.status
|
|
if status:
|
|
status_map = {
|
|
"active": "Active",
|
|
"inactive": "Inactive",
|
|
"deprecated": "Deprecated",
|
|
"development": "Development"
|
|
}
|
|
status = status_map.get(status.lower(), status)
|
|
|
|
# Convert usage_timeline from Pydantic models to dicts
|
|
usage_timeline = None
|
|
if collector_data.usage_timeline:
|
|
usage_timeline = [{"date": entry.date, "message_count": entry.message_count, "token_count": entry.token_count}
|
|
for entry in collector_data.usage_timeline]
|
|
|
|
# Auto-tag Pencil Agents discipline
|
|
discipline = collector_data.discipline
|
|
if not discipline and "pencil" in collector_data.name.lower():
|
|
discipline = "Pencil Agents"
|
|
|
|
return {
|
|
"agent_name": collector_data.name,
|
|
"agent_tool": collector_data.tool,
|
|
"agent_description": collector_data.description,
|
|
"agent_purpose": collector_data.purpose,
|
|
"agent_location": collector_data.location,
|
|
"agent_userbase": collector_data.userbase,
|
|
"agent_version": collector_data.version,
|
|
"agent_capabilities": collector_data.capabilities,
|
|
"agent_status": status,
|
|
"agent_department": collector_data.department,
|
|
"agent_contact_person": collector_data.contact_person,
|
|
"agent_tags": collector_data.tags,
|
|
"agent_metadata": collector_data.metadata,
|
|
"url": collector_data.url,
|
|
"discipline": discipline,
|
|
"client": collector_data.client,
|
|
"client_name": collector_data.client_name,
|
|
"studio_name": collector_data.studio_name,
|
|
"instructions": collector_data.instructions,
|
|
# Usage tracking fields
|
|
"usage_timeline": usage_timeline,
|
|
"conversation_count": collector_data.conversation_count,
|
|
"unique_users": collector_data.unique_users,
|
|
"total_messages": collector_data.total_messages,
|
|
"first_used": collector_data.first_used,
|
|
"last_used": collector_data.last_used,
|
|
"total_tokens": collector_data.total_tokens,
|
|
"prompt_tokens": collector_data.prompt_tokens,
|
|
"completion_tokens": collector_data.completion_tokens,
|
|
}
|
|
|
|
|
|
|
|
@app.get("/me", response_model=models.UserResponse)
|
|
async def me(current_user: dict = Depends(get_current_user)):
|
|
return {
|
|
"email": current_user["email"],
|
|
"full_name": current_user.get("full_name"),
|
|
"is_active": current_user["is_active"],
|
|
"is_admin": current_user["is_admin"],
|
|
"role": current_user.get("role", "admin" if current_user["is_admin"] else "user"),
|
|
}
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Run database migrations and ensure indexes on startup"""
|
|
from database import ensure_indexes
|
|
await ensure_indexes()
|
|
|
|
# Migrate pencil agents discipline
|
|
count = await crud.migrate_pencil_agents_discipline()
|
|
if count > 0:
|
|
print(f"Pencil Agents migration: updated {count} agent(s)")
|
|
|
|
# Rename discipline 'Optimization' -> 'Optimisation' (UK spelling, 2026-05)
|
|
opt_count = await crud.migrate_optimisation_spelling()
|
|
if opt_count > 0:
|
|
print(f"Optimisation spelling migration: updated {opt_count} agent(s)")
|
|
|
|
# Backfill registration_complete for agents that pre-date the field (2026-05)
|
|
reg_counts = await crud.backfill_registration_complete()
|
|
if reg_counts["form_complete"] or reg_counts["collector_incomplete"]:
|
|
print(
|
|
f"registration_complete backfill: {reg_counts['form_complete']} form-registered marked complete, "
|
|
f"{reg_counts['collector_incomplete']} collector-created marked incomplete"
|
|
)
|
|
|
|
# Start schedulers (weekly digest + daily completion-reminder)
|
|
try:
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
digest_hour = int(os.getenv("WEEKLY_DIGEST_HOUR", "7"))
|
|
completion_hour = int(os.getenv("COMPLETION_REMINDER_HOUR", "8"))
|
|
scheduler = AsyncIOScheduler()
|
|
scheduler.add_job(
|
|
notifications.send_weekly_agent_digest,
|
|
'cron',
|
|
day_of_week='mon',
|
|
hour=digest_hour,
|
|
minute=0,
|
|
id='weekly_agent_digest',
|
|
)
|
|
scheduler.add_job(
|
|
notifications.send_completion_reminders,
|
|
'cron',
|
|
hour=completion_hour,
|
|
minute=15,
|
|
id='completion_reminders',
|
|
)
|
|
scheduler.start()
|
|
print(f"Schedulers started: weekly digest Mondays {digest_hour}:00, completion reminders daily {completion_hour}:15")
|
|
except Exception as e:
|
|
print(f"Warning: Failed to start schedulers: {e}")
|
|
|
|
# HTML Routes
|
|
@app.get("/")
|
|
async def home(request: Request):
|
|
# Check if this is a JSON API request (for health check)
|
|
accept_header = request.headers.get("accept", "")
|
|
if "application/json" in accept_header or request.headers.get("content-type") == "application/json":
|
|
# Return health check JSON response
|
|
from database import check_database_health
|
|
db_health = await check_database_health()
|
|
return models.HealthCheckResponse(
|
|
status="healthy",
|
|
message="Agent collector API is running",
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
database=db_health
|
|
)
|
|
|
|
# Otherwise handle as HTML request for web interface
|
|
current_user = await get_current_user_optional(request)
|
|
if current_user:
|
|
# Redirect logged-in users appropriately
|
|
if current_user.get("is_admin") or current_user.get("role") == "readonly_admin":
|
|
return RedirectResponse(url=get_app_url("admin"), status_code=303)
|
|
else:
|
|
return RedirectResponse(url=get_app_url("agent-management"), status_code=303)
|
|
else:
|
|
# Show landing page for non-authenticated users
|
|
return templates.TemplateResponse("index.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/register", response_class=HTMLResponse)
|
|
async def register_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse("register.html", get_template_context(request, current_user))
|
|
|
|
@app.post("/register", response_class=HTMLResponse)
|
|
async def register_form(
|
|
request: Request,
|
|
email: str = Form(..., alias="user-email"),
|
|
password: str = Form(..., alias="user-password"),
|
|
first_name: str = Form(..., alias="first-name"),
|
|
last_name: str = Form(..., alias="last-name")
|
|
):
|
|
try:
|
|
full_name = f"{first_name} {last_name}".strip()
|
|
existing = await crud.get_user_by_email(email)
|
|
if existing:
|
|
return templates.TemplateResponse(
|
|
"register.html",
|
|
{"request": request, "error": "Email already registered"}
|
|
)
|
|
|
|
user = models.UserCreate(email=email, password=password, full_name=full_name)
|
|
await crud.create_user(user.email, user.password, user.full_name)
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "success": "Registration successful! Please login."}
|
|
)
|
|
except Exception as e:
|
|
return templates.TemplateResponse(
|
|
"register.html",
|
|
{"request": request, "error": f"Registration failed: {str(e)}"}
|
|
)
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
|
|
# Add MSAL configuration if enabled
|
|
context = get_template_context(request, current_user)
|
|
if msal_auth.is_msal_available():
|
|
msal_config = config.get_msal_config()
|
|
context.update({
|
|
"client_id": msal_config["client_id"],
|
|
"authority": msal_config["authority"],
|
|
"redirect_uri": msal_config["redirect_uri"]
|
|
})
|
|
|
|
return templates.TemplateResponse("login.html", context)
|
|
|
|
@app.post("/login", response_class=HTMLResponse)
|
|
async def login_form(
|
|
request: Request,
|
|
email: str = Form(...),
|
|
password: str = Form(...)
|
|
):
|
|
try:
|
|
print(f"🔍 Login attempt - Email: {email}")
|
|
|
|
# Check if user exists first
|
|
user_exists = await crud.get_user_by_email(email)
|
|
print(f"🔍 User exists in database: {user_exists is not None}")
|
|
|
|
if user_exists:
|
|
print(f"🔍 User auth provider: {user_exists.get('auth_provider', 'local')}")
|
|
print(f"🔍 User has hashed_password: {'hashed_password' in user_exists}")
|
|
print(f"🔍 User is_active: {user_exists.get('is_active', False)}")
|
|
|
|
user = await crud.authenticate_user(email, password)
|
|
print(f"🔍 Authentication result: {user is not None}")
|
|
|
|
if not user:
|
|
error_msg = "Invalid email or password"
|
|
if not user_exists:
|
|
error_msg += " (User not found)"
|
|
elif user_exists.get('auth_provider') != 'local':
|
|
error_msg += " (Not a local user - use Microsoft login)"
|
|
elif 'hashed_password' not in user_exists:
|
|
error_msg += " (No password set for this user)"
|
|
|
|
print(f"❌ Login failed: {error_msg}")
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
get_template_context(request, error=error_msg)
|
|
)
|
|
|
|
# Create token (you can store this in session/cookie in a real app)
|
|
token = auth.create_access_token({"sub": str(user["_id"])})
|
|
|
|
# Check if user is admin or readonly_admin
|
|
if user.get("is_admin") or user.get("role") == "readonly_admin":
|
|
# Admin / readonly_admin goes to admin dashboard
|
|
response = RedirectResponse(url=get_app_url("admin"), status_code=303)
|
|
else:
|
|
# Regular user goes to all agents page
|
|
response = RedirectResponse(url=get_app_url("agent-management"), status_code=303)
|
|
|
|
# Set token in cookie with proper attributes
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=False # Set to True in production with HTTPS
|
|
)
|
|
return response
|
|
|
|
except Exception as e:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": f"Login failed: {str(e)}"}
|
|
)
|
|
|
|
# Azure AD/MSAL Authentication - Using popup-based authentication per specification
|
|
|
|
|
|
@app.post("/api/auth/azure/token")
|
|
async def azure_token_exchange(request: Request):
|
|
"""Exchange Azure AD token for local JWT token with proper validation"""
|
|
try:
|
|
data = await request.json()
|
|
access_token = data.get("access_token")
|
|
id_token = data.get("id_token")
|
|
|
|
if not id_token:
|
|
raise HTTPException(status_code=400, detail="Missing ID token")
|
|
|
|
# Validate JWT token against Azure AD public keys (as per specification)
|
|
import jwt
|
|
import requests
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
import base64
|
|
|
|
msal_config = config.get_msal_config()
|
|
|
|
# Get Azure AD public keys for token verification
|
|
jwks_uri = f"{msal_config['authority']}/discovery/v2.0/keys"
|
|
jwks_response = requests.get(jwks_uri)
|
|
jwks = jwks_response.json()
|
|
|
|
# Decode token header to get key ID
|
|
unverified_header = jwt.get_unverified_header(id_token)
|
|
kid = unverified_header.get("kid")
|
|
|
|
# Find the matching public key
|
|
public_key = None
|
|
for key in jwks["keys"]:
|
|
if key["kid"] == kid:
|
|
# Convert JWK to PEM format for validation
|
|
n = base64.urlsafe_b64decode(key["n"] + "==")
|
|
e = base64.urlsafe_b64decode(key["e"] + "==")
|
|
|
|
# Convert to RSA public key
|
|
numbers = rsa.RSAPublicNumbers(
|
|
int.from_bytes(e, byteorder="big"),
|
|
int.from_bytes(n, byteorder="big")
|
|
)
|
|
public_key = numbers.public_key()
|
|
break
|
|
|
|
if not public_key:
|
|
raise HTTPException(status_code=400, detail="Unable to verify token signature")
|
|
|
|
# Convert to PEM format for JWT library
|
|
pem_key = public_key.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
)
|
|
|
|
# Validate JWT token signature and claims
|
|
try:
|
|
id_token_claims = jwt.decode(
|
|
id_token,
|
|
pem_key,
|
|
algorithms=["RS256"],
|
|
audience=msal_config["client_id"],
|
|
issuer=f"{msal_config['authority']}/v2.0"
|
|
)
|
|
except jwt.InvalidTokenError as e:
|
|
raise HTTPException(status_code=400, detail=f"Token validation failed: {str(e)}")
|
|
|
|
# Create user profile from validated ID token claims
|
|
user_profile = {
|
|
"azure_ad_id": id_token_claims.get("oid"),
|
|
"email": id_token_claims.get("email") or id_token_claims.get("preferred_username"),
|
|
"full_name": id_token_claims.get("name"),
|
|
"first_name": id_token_claims.get("given_name"),
|
|
"last_name": id_token_claims.get("family_name"),
|
|
"tenant_id": id_token_claims.get("tid")
|
|
}
|
|
|
|
# Create or update user in local database
|
|
user = await crud.create_or_update_azure_user(user_profile)
|
|
|
|
# Create JWT token for local session management
|
|
jwt_token = auth.create_access_token({"sub": str(user["_id"])})
|
|
|
|
# Return user info and set secure cookie (following specification)
|
|
response = JSONResponse({
|
|
"success": True,
|
|
"is_admin": user.get("is_admin", False),
|
|
"email": user.get("email"),
|
|
"full_name": user.get("full_name")
|
|
})
|
|
|
|
# Set JWT token in httpOnly cookie with security flags (as per specification)
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=jwt_token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=False, # Set to True in production with HTTPS
|
|
max_age=24 * 60 * 60 # 24 hours as per specification
|
|
)
|
|
|
|
return response
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Token exchange failed: {str(e)}")
|
|
|
|
@app.get("/agent-register", response_class=HTMLResponse)
|
|
async def agent_register_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
return templates.TemplateResponse("agent_register.html", get_template_context(request, current_user))
|
|
|
|
@app.post("/agent-register", response_class=HTMLResponse)
|
|
async def agent_register_form(
|
|
request: Request,
|
|
# Identity
|
|
agent_name: str = Form(...),
|
|
business_entity: str = Form(...),
|
|
agent_status: str = Form("Development"),
|
|
client_scope: str = Form(...),
|
|
client_name: str = Form(None),
|
|
studio_dept: str = Form(None),
|
|
agent_location: str = Form(None),
|
|
# Classification & Purpose
|
|
agent_classification: str = Form(...),
|
|
discipline: str = Form(...),
|
|
agent_description: str = Form(None),
|
|
agent_purpose: str = Form(None),
|
|
agent_tags: str = Form(None),
|
|
agent_userbase: str = Form(None),
|
|
# Autonomy & Safety
|
|
autonomy_level: str = Form(...),
|
|
off_switch_confirmed: bool = Form(False),
|
|
access_rights_confirmed: bool = Form(False),
|
|
# IP
|
|
ip_ownership: str = Form(...),
|
|
# Tech Stack
|
|
foundation_model: str = Form(None),
|
|
agent_tool: str = Form(...),
|
|
agent_version: str = Form(None),
|
|
capabilities: List[str] = Form(default=[]),
|
|
capabilities_other: str = Form(None),
|
|
# Data Safety
|
|
pii_check: str = Form("No"),
|
|
pii_legal_ref: str = Form(None),
|
|
pii_data_types: str = Form(None),
|
|
pii_consent: bool = Form(False),
|
|
# Performance & Testing
|
|
validated_by: str = Form(None),
|
|
validation_date: str = Form(None),
|
|
evals_method: str = Form(None),
|
|
# Declarations
|
|
decl_governance: bool = Form(False),
|
|
decl_accuracy: bool = Form(False),
|
|
decl_upkeep: bool = Form(False),
|
|
):
|
|
try:
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
user_id = str(current_user["_id"])
|
|
|
|
# Conditional: client_name required when scope is "specific"
|
|
if client_scope == "specific" and not (client_name and client_name.strip()):
|
|
context = get_template_context(request, current_user)
|
|
context["error"] = "Client Name is required when Client Scope is 'Built for a specific client'."
|
|
return templates.TemplateResponse("agent_register.html", context)
|
|
|
|
# All three declarations are required (form enforces; defend at API too)
|
|
if not (decl_governance and decl_accuracy and decl_upkeep):
|
|
context = get_template_context(request, current_user)
|
|
context["error"] = "All three declarations must be agreed to before submitting."
|
|
return templates.TemplateResponse("agent_register.html", context)
|
|
|
|
# Map client_scope -> legacy `client` for compatibility with verification + email flows
|
|
legacy_client = "yes" if client_scope == "specific" else "no"
|
|
|
|
# Capabilities: 8 checkboxes + comma-separated free text "Other"
|
|
cap_list = list(capabilities or [])
|
|
if capabilities_other:
|
|
cap_list.extend([c.strip() for c in capabilities_other.split(",") if c.strip()])
|
|
|
|
handles_pii = pii_check == "Yes"
|
|
|
|
agent_data = {
|
|
"agent_name": agent_name,
|
|
"agent_tool": agent_tool,
|
|
"agent_description": agent_description,
|
|
"agent_purpose": agent_purpose,
|
|
"agent_version": agent_version,
|
|
"agent_status": agent_status,
|
|
"agent_location": agent_location,
|
|
"agent_contact_person": current_user.get("email"),
|
|
"discipline": discipline,
|
|
"client": legacy_client,
|
|
"client_scope": client_scope,
|
|
"studio_name": studio_dept,
|
|
"business_entity": business_entity,
|
|
"agent_classification": agent_classification,
|
|
"autonomy_level": autonomy_level,
|
|
"ip_ownership": ip_ownership,
|
|
"foundation_model": foundation_model,
|
|
"validated_by": validated_by,
|
|
"validation_date": validation_date,
|
|
"evals_method": evals_method,
|
|
"registration_complete": True,
|
|
"safety": {
|
|
"off_switch_confirmed": off_switch_confirmed,
|
|
"access_rights_confirmed": access_rights_confirmed,
|
|
},
|
|
"pii": {
|
|
"handles_pii": handles_pii,
|
|
"legal_ref": pii_legal_ref if handles_pii else None,
|
|
"data_types": pii_data_types if handles_pii else None,
|
|
"consent_recorded": pii_consent if handles_pii else False,
|
|
},
|
|
"declarations": {
|
|
"governance": decl_governance,
|
|
"accuracy": decl_accuracy,
|
|
"upkeep": decl_upkeep,
|
|
},
|
|
}
|
|
|
|
if cap_list:
|
|
agent_data["agent_capabilities"] = cap_list
|
|
|
|
# Client-specific fields
|
|
if legacy_client == "yes":
|
|
agent_data["client_name"] = client_name
|
|
agent_data["verification_status"] = "needs_verification"
|
|
|
|
# Tags / userbase parsing (existing pattern)
|
|
if agent_tags:
|
|
agent_data["agent_tags"] = [t.strip() for t in agent_tags.split(",") if t.strip()]
|
|
if agent_userbase:
|
|
agent_data["agent_userbase"] = [u.strip() for u in agent_userbase.split(",") if u.strip()]
|
|
|
|
# Strip None top-level values (preserve nested objects + booleans)
|
|
agent_data = {k: v for k, v in agent_data.items() if v is not None}
|
|
|
|
created_agent = await crud.create_agent(agent_data, user_id)
|
|
|
|
# Client-agent email notification (non-blocking)
|
|
if legacy_client == "yes":
|
|
try:
|
|
notifications.send_client_agent_notification({
|
|
**agent_data,
|
|
"created_by_email": current_user.get("email", "Unknown"),
|
|
})
|
|
except Exception as notify_err:
|
|
print(f"Client agent notification failed: {notify_err}")
|
|
|
|
from urllib.parse import quote
|
|
success_msg = quote(f"Agent '{agent_name}' registered successfully!")
|
|
return RedirectResponse(
|
|
url=get_app_url(f"agent-management?success={success_msg}"),
|
|
status_code=303,
|
|
)
|
|
|
|
except ValueError as e:
|
|
# Handle duplicate agent name error
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse(
|
|
"agent_register.html",
|
|
get_template_context(request, current_user, error=str(e))
|
|
)
|
|
except Exception as e:
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse(
|
|
"agent_register.html",
|
|
get_template_context(request, current_user, error=f"Agent registration failed: {str(e)}")
|
|
)
|
|
|
|
# ─── Completion flow for incomplete (e.g. LibreChat-synced) agents ──────────
|
|
KNOWN_CAPABILITIES = ["RAG", "Web", "API/MCP", "Image Gen", "Code Execution",
|
|
"File Operations", "Email/Calendar", "Multi-Agent"]
|
|
|
|
|
|
def _user_can_complete(agent: dict, current_user: dict) -> bool:
|
|
if current_user.get("is_admin") or current_user.get("role") == "admin":
|
|
return True
|
|
user_id = str(current_user["_id"])
|
|
if agent.get("created_by") == user_id:
|
|
return True
|
|
if agent.get("created_by") == "agent_collector_api":
|
|
agent_email = (agent.get("agent_contact_person") or "").lower()
|
|
user_email = (current_user.get("email") or "").lower()
|
|
return bool(agent_email) and agent_email == user_email
|
|
return False
|
|
|
|
|
|
def _build_completion_context(agent: dict) -> dict:
|
|
"""Compute pre-fill values for the completion form template."""
|
|
caps = agent.get("agent_capabilities") or []
|
|
known = [c for c in caps if c in KNOWN_CAPABILITIES]
|
|
other = ", ".join(c for c in caps if c not in KNOWN_CAPABILITIES)
|
|
|
|
# Derive client_scope: explicit field wins, else derive from legacy `client`
|
|
scope = agent.get("client_scope")
|
|
if not scope:
|
|
if agent.get("client") == "yes":
|
|
scope = "specific"
|
|
elif agent.get("client") == "no":
|
|
scope = "internal"
|
|
|
|
pii = agent.get("pii") or {}
|
|
safety = agent.get("safety") or {}
|
|
|
|
return {
|
|
"agent": agent,
|
|
"mode": "complete",
|
|
"prefilled_known_capabilities": known,
|
|
"prefilled_other_capabilities": other,
|
|
"prefilled_client_scope": scope or "",
|
|
"prefilled_pii_handles": bool(pii.get("handles_pii")),
|
|
"prefilled_off_switch": bool(safety.get("off_switch_confirmed")),
|
|
"prefilled_access_rights": bool(safety.get("access_rights_confirmed")),
|
|
}
|
|
|
|
|
|
@app.get("/api/agents/incomplete")
|
|
async def list_incomplete_agents(request: Request):
|
|
"""Return current user's incomplete agents (admins see all)."""
|
|
current_user = await get_current_user_from_cookie(request)
|
|
agents = await crud.get_incomplete_agents_for_user(
|
|
user_email=current_user.get("email") or "",
|
|
user_id=str(current_user["_id"]),
|
|
is_admin=bool(current_user.get("is_admin") or current_user.get("role") == "admin"),
|
|
)
|
|
# Convert ObjectIds to strings for JSON
|
|
for a in agents:
|
|
a["_id"] = str(a["_id"])
|
|
a["agent_id"] = a["_id"]
|
|
return agents
|
|
|
|
|
|
@app.get("/agent-complete/{agent_id}", response_class=HTMLResponse)
|
|
async def agent_complete_page(request: Request, agent_id: str):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
if not _user_can_complete(agent, current_user):
|
|
raise HTTPException(status_code=403, detail="Not authorised to complete this registration")
|
|
|
|
context = get_template_context(request, current_user)
|
|
context.update(_build_completion_context(agent))
|
|
return templates.TemplateResponse("agent_register.html", context)
|
|
|
|
|
|
@app.post("/agent-complete/{agent_id}", response_class=HTMLResponse)
|
|
async def agent_complete_submit(
|
|
agent_id: str,
|
|
request: Request,
|
|
# Identity
|
|
agent_name: str = Form(...),
|
|
business_entity: str = Form(...),
|
|
agent_status: str = Form("Development"),
|
|
client_scope: str = Form(...),
|
|
client_name: str = Form(None),
|
|
studio_dept: str = Form(None),
|
|
agent_location: str = Form(None),
|
|
# Classification & Purpose
|
|
agent_classification: str = Form(...),
|
|
discipline: str = Form(...),
|
|
agent_description: str = Form(None),
|
|
agent_purpose: str = Form(None),
|
|
agent_tags: str = Form(None),
|
|
agent_userbase: str = Form(None),
|
|
# Autonomy & Safety
|
|
autonomy_level: str = Form(...),
|
|
off_switch_confirmed: bool = Form(False),
|
|
access_rights_confirmed: bool = Form(False),
|
|
# IP
|
|
ip_ownership: str = Form(...),
|
|
# Tech Stack
|
|
foundation_model: str = Form(None),
|
|
agent_tool: str = Form(...),
|
|
agent_version: str = Form(None),
|
|
capabilities: List[str] = Form(default=[]),
|
|
capabilities_other: str = Form(None),
|
|
# Data Safety
|
|
pii_check: str = Form("No"),
|
|
pii_legal_ref: str = Form(None),
|
|
pii_data_types: str = Form(None),
|
|
pii_consent: bool = Form(False),
|
|
# Performance & Testing
|
|
validated_by: str = Form(None),
|
|
validation_date: str = Form(None),
|
|
evals_method: str = Form(None),
|
|
# Declarations
|
|
decl_governance: bool = Form(False),
|
|
decl_accuracy: bool = Form(False),
|
|
decl_upkeep: bool = Form(False),
|
|
):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
if not _user_can_complete(agent, current_user):
|
|
raise HTTPException(status_code=403, detail="Not authorised to complete this registration")
|
|
|
|
# Re-render the form on validation failure.
|
|
def _rerender_with_error(msg: str):
|
|
context = get_template_context(request, current_user, error=msg)
|
|
context.update(_build_completion_context(agent))
|
|
return templates.TemplateResponse("agent_register.html", context)
|
|
|
|
if client_scope == "specific" and not (client_name and client_name.strip()):
|
|
return _rerender_with_error("Client Name is required when Client Scope is 'Built for a specific client'.")
|
|
|
|
if not (decl_governance and decl_accuracy and decl_upkeep):
|
|
return _rerender_with_error("All three declarations must be agreed to before submitting.")
|
|
|
|
legacy_client = "yes" if client_scope == "specific" else "no"
|
|
|
|
cap_list = list(capabilities or [])
|
|
if capabilities_other:
|
|
cap_list.extend([c.strip() for c in capabilities_other.split(",") if c.strip()])
|
|
|
|
handles_pii = pii_check == "Yes"
|
|
|
|
completion_data = {
|
|
"agent_name": agent_name,
|
|
"agent_tool": agent_tool,
|
|
"agent_description": agent_description,
|
|
"agent_purpose": agent_purpose,
|
|
"agent_version": agent_version,
|
|
"agent_status": agent_status,
|
|
"agent_location": agent_location,
|
|
"discipline": discipline,
|
|
"client": legacy_client,
|
|
"client_scope": client_scope,
|
|
"studio_name": studio_dept,
|
|
"business_entity": business_entity,
|
|
"agent_classification": agent_classification,
|
|
"autonomy_level": autonomy_level,
|
|
"ip_ownership": ip_ownership,
|
|
"foundation_model": foundation_model,
|
|
"validated_by": validated_by,
|
|
"validation_date": validation_date,
|
|
"evals_method": evals_method,
|
|
"safety": {
|
|
"off_switch_confirmed": off_switch_confirmed,
|
|
"access_rights_confirmed": access_rights_confirmed,
|
|
},
|
|
"pii": {
|
|
"handles_pii": handles_pii,
|
|
"legal_ref": pii_legal_ref if handles_pii else None,
|
|
"data_types": pii_data_types if handles_pii else None,
|
|
"consent_recorded": pii_consent if handles_pii else False,
|
|
},
|
|
"declarations": {
|
|
"governance": decl_governance,
|
|
"accuracy": decl_accuracy,
|
|
"upkeep": decl_upkeep,
|
|
},
|
|
}
|
|
|
|
if cap_list:
|
|
completion_data["agent_capabilities"] = cap_list
|
|
|
|
if legacy_client == "yes":
|
|
completion_data["client_name"] = client_name
|
|
# Only set verification_status if not already verified (don't reset prior approvals)
|
|
if agent.get("verification_status") != "verified":
|
|
completion_data["verification_status"] = "needs_verification"
|
|
|
|
if agent_tags:
|
|
completion_data["agent_tags"] = [t.strip() for t in agent_tags.split(",") if t.strip()]
|
|
if agent_userbase:
|
|
completion_data["agent_userbase"] = [u.strip() for u in agent_userbase.split(",") if u.strip()]
|
|
|
|
completion_data = {k: v for k, v in completion_data.items() if v is not None}
|
|
|
|
try:
|
|
await crud.complete_agent_registration(
|
|
agent_id,
|
|
completion_data,
|
|
completing_user_id=str(current_user["_id"]),
|
|
completing_user_email=current_user.get("email", ""),
|
|
)
|
|
except Exception as e:
|
|
return _rerender_with_error(f"Failed to save: {str(e)}")
|
|
|
|
from urllib.parse import quote
|
|
success_msg = quote(f"Registration completed for '{agent_name}'.")
|
|
return RedirectResponse(
|
|
url=get_app_url(f"agent-management?view=my&success={success_msg}"),
|
|
status_code=303,
|
|
)
|
|
|
|
|
|
@app.get("/dashboard", response_class=HTMLResponse)
|
|
async def dashboard(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
return templates.TemplateResponse("admin/dashboard.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/user-management", response_class=HTMLResponse)
|
|
async def user_management_page(request: Request):
|
|
return templates.TemplateResponse("user_management.html", get_template_context(request))
|
|
|
|
@app.get("/logout", response_class=HTMLResponse)
|
|
async def logout(request: Request):
|
|
"""Logout user and clear all session data"""
|
|
# Clear MSAL session data if present
|
|
request.session.clear()
|
|
|
|
response = RedirectResponse(url=get_app_url(""), status_code=303)
|
|
response.delete_cookie(key="access_token")
|
|
return response
|
|
|
|
@app.get("/profile", response_class=HTMLResponse)
|
|
async def profile_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
return templates.TemplateResponse("profile.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/agent-management", response_class=HTMLResponse)
|
|
async def agent_management_page(request: Request, view: Optional[str] = Query(None), success: Optional[str] = Query(None), error: Optional[str] = Query(None)):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
# Default to "all" view for regular users, "my" view can be specified via query param
|
|
if view == "my":
|
|
# Get user's agents (owned + contact person)
|
|
agents = await crud.get_agents_by_user(str(current_user["_id"]), user_email=current_user.get("email"))
|
|
current_view = "my"
|
|
page_title = "My Agents Dashboard"
|
|
page_description = f"{len(agents)} agents in your portfolio"
|
|
else:
|
|
# Get all agents (default view for regular users)
|
|
agents = await crud.get_all_agents()
|
|
current_view = "all"
|
|
page_title = "All Agents"
|
|
page_description = f"{len(agents)} agents in the system"
|
|
|
|
return templates.TemplateResponse("agent_management.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
agents=agents,
|
|
agent_count=len(agents),
|
|
current_view=current_view,
|
|
page_title=page_title,
|
|
page_description=page_description,
|
|
success=success,
|
|
error=error
|
|
))
|
|
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_dashboard(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
if not current_user.get("is_admin") and current_user.get("role") != "readonly_admin":
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
# Get statistics
|
|
all_users = await crud.get_all_users()
|
|
all_agents = await crud.get_all_agents()
|
|
|
|
# Calculate stats
|
|
total_users = len(all_users)
|
|
admin_users = len([u for u in all_users if u.get("is_admin")])
|
|
regular_users = total_users - admin_users
|
|
total_agents = len(all_agents)
|
|
active_agents = len([a for a in all_agents if a.get("agent_status") == "Active"])
|
|
|
|
return templates.TemplateResponse("admin/dashboard.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
stats={
|
|
"total_users": total_users,
|
|
"admin_users": admin_users,
|
|
"regular_users": regular_users,
|
|
"total_agents": total_agents,
|
|
"active_agents": active_agents,
|
|
"inactive_agents": total_agents - active_agents
|
|
},
|
|
users=all_users,
|
|
agents=all_agents
|
|
))
|
|
|
|
# New enhanced endpoints
|
|
@app.get("/search", response_class=HTMLResponse)
|
|
async def search_page(request: Request, q: Optional[str] = Query(None)):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
search_results = {"agents": [], "users": []}
|
|
if q:
|
|
# Search agents using proper search function
|
|
# Regular users can search all agents (consistent with agent management page)
|
|
search_results["agents"] = await crud.search_agents(q)
|
|
|
|
# Search users (admin only)
|
|
if current_user.get("is_admin"):
|
|
all_users = await crud.get_all_users()
|
|
search_results["users"] = [
|
|
user for user in all_users
|
|
if q.lower() in user.get("email", "").lower() or
|
|
q.lower() in user.get("full_name", "").lower()
|
|
]
|
|
|
|
return templates.TemplateResponse("search.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
query=q,
|
|
results=search_results
|
|
))
|
|
|
|
@app.post("/agent/{agent_id}/edit", response_class=HTMLResponse)
|
|
async def edit_agent_form(request: Request, agent_id: str):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check if user can edit this agent (ownership, contact person, or admin)
|
|
if not can_user_edit_agent(agent, current_user):
|
|
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
|
|
|
|
return templates.TemplateResponse("edit_agent.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
agent=agent
|
|
))
|
|
|
|
@app.post("/agent/{agent_id}/delete", response_class=HTMLResponse)
|
|
async def delete_agent_form(request: Request, agent_id: str):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=get_app_url("login"), status_code=303)
|
|
|
|
# Check permission and delete
|
|
user_id = str(current_user["_id"]) if not current_user.get("is_admin") else None
|
|
deleted = await crud.delete_agent(agent_id, user_id)
|
|
|
|
if deleted:
|
|
return RedirectResponse(url=get_app_url("agent-management?success=Agent deleted successfully"), status_code=303)
|
|
else:
|
|
return RedirectResponse(url=get_app_url("agent-management?error=Failed to delete agent"), status_code=303)
|
|
|
|
# Agent API endpoints
|
|
@app.get("/api/agents/all", response_model=List[models.AiAgentResponse])
|
|
async def get_all_agents_for_users(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Get all agents for regular users (read-only access)"""
|
|
agents = await crud.get_all_agents()
|
|
return [
|
|
create_agent_response(agent) for agent in agents
|
|
]
|
|
|
|
@app.post("/api/agents", response_model=models.AiAgentResponse)
|
|
async def create_agent(agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agent_data = agent.model_dump()
|
|
created_agent = await crud.create_agent(agent_data, str(current_user["_id"]))
|
|
return create_agent_response(created_agent)
|
|
|
|
@app.get("/api/agents", response_model=List[models.AiAgentResponse])
|
|
async def get_user_agents(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agents = await crud.get_agents_by_user(str(current_user["_id"]), user_email=current_user.get("email"))
|
|
return [
|
|
create_agent_response(agent) for agent in agents
|
|
]
|
|
|
|
# MSAL routes no longer needed - using popup authentication
|
|
|
|
@app.get("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
|
|
async def get_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# All authenticated users can view agent details
|
|
# Edit/delete permissions are enforced at those respective endpoints
|
|
return create_agent_response(agent)
|
|
|
|
@app.put("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
|
|
async def update_agent(agent_id: str, agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
# First check if agent exists
|
|
existing_agent = await crud.get_agent_by_id(agent_id)
|
|
if not existing_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check if user can edit this agent (ownership, contact person, or admin)
|
|
if not can_user_edit_agent(existing_agent, current_user):
|
|
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
|
|
|
|
# Since we've already verified authorization, always pass None to allow the update
|
|
user_id_filter = None
|
|
|
|
# Prepare admin info for Quality Audit updates
|
|
admin_user_info = None
|
|
agent_data = agent.model_dump()
|
|
|
|
# Validate Risk Factor when Quality Audit is checked (admin only)
|
|
if current_user.get("is_admin") and agent_data.get("quality_audit_status"):
|
|
if agent_data.get("risk_factor") is None or not (1 <= agent_data.get("risk_factor", 0) <= 5):
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="Risk Factor (1-5) is required when Quality Audit is checked."
|
|
)
|
|
|
|
if current_user.get("is_admin"):
|
|
admin_user_info = {
|
|
"user_id": str(current_user["_id"]),
|
|
"user_name": current_user.get("full_name", current_user.get("email")),
|
|
"email": current_user.get("email")
|
|
}
|
|
|
|
updated_agent = await crud.update_agent(agent_id, agent_data, user_id_filter, admin_user_info, current_user.get("email"))
|
|
|
|
if not updated_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found or not authorized")
|
|
|
|
return create_agent_response(updated_agent)
|
|
|
|
@app.put("/api/agents/{agent_id}/rating")
|
|
async def update_agent_rating(agent_id: str, request: Request, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Submit a per-user star rating for an agent (any authenticated user can rate)"""
|
|
existing_agent = await crud.get_agent_by_id(agent_id)
|
|
if not existing_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
body = await request.json()
|
|
rating_value = body.get("rating")
|
|
|
|
if rating_value is None or not (1 <= float(rating_value) <= 5):
|
|
raise HTTPException(status_code=422, detail="Rating must be between 1 and 5")
|
|
|
|
user_id = str(current_user["_id"])
|
|
await crud.upsert_agent_rating(agent_id, user_id, float(rating_value))
|
|
stats = await crud.update_agent_average_rating(agent_id)
|
|
|
|
return {
|
|
"message": "Rating updated",
|
|
"rating": stats["avg_rating"],
|
|
"rating_count": stats["count"],
|
|
"user_rating": float(rating_value)
|
|
}
|
|
|
|
@app.get("/api/agents/{agent_id}/my-rating")
|
|
async def get_my_agent_rating(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Get the current user's rating for an agent plus the average"""
|
|
existing_agent = await crud.get_agent_by_id(agent_id)
|
|
if not existing_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
user_id = str(current_user["_id"])
|
|
user_rating = await crud.get_user_rating_for_agent(agent_id, user_id)
|
|
stats = await crud.get_agent_rating_stats(agent_id)
|
|
|
|
return {
|
|
"user_rating": user_rating,
|
|
"rating": stats["avg_rating"],
|
|
"rating_count": stats["count"]
|
|
}
|
|
|
|
@app.delete("/api/agents/{agent_id}")
|
|
async def delete_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
print(f"🗑️ DELETE attempt - Agent ID: {agent_id}, User ID: {current_user['_id']}")
|
|
|
|
# First check if agent exists
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
print(f"🗑️ Agent {agent_id} not found in database")
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
print(f"🗑️ Agent found - Created by: {agent.get('created_by')}, User is admin: {current_user.get('is_admin', False)}")
|
|
|
|
# Check permission
|
|
user_id = str(current_user["_id"])
|
|
if agent["created_by"] != user_id and not current_user.get("is_admin"):
|
|
print(f"🗑️ Permission denied - Agent owned by {agent['created_by']}, current user: {user_id}")
|
|
raise HTTPException(status_code=403, detail="Not authorized to delete this agent")
|
|
|
|
deleted = await crud.delete_agent(agent_id, user_id if not current_user.get("is_admin") else None)
|
|
if not deleted:
|
|
print(f"🗑️ Delete operation failed")
|
|
raise HTTPException(status_code=500, detail="Failed to delete agent")
|
|
|
|
print(f"🗑️ Agent {agent_id} deleted successfully")
|
|
return {"message": "Agent deleted successfully"}
|
|
|
|
# Admin view toggle endpoint
|
|
@app.post("/api/admin/toggle-view")
|
|
async def toggle_admin_view(request: Request, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Toggle between admin and regular user view (admin only)"""
|
|
# Check if user is actually an admin (using actual_is_admin if present, or is_admin)
|
|
is_actual_admin = current_user.get("actual_is_admin") or current_user.get("is_admin")
|
|
|
|
if not is_actual_admin:
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
# Toggle view mode
|
|
current_view = request.session.get("view_mode", "admin")
|
|
new_view = "user" if current_view == "admin" else "admin"
|
|
request.session["view_mode"] = new_view
|
|
|
|
return {
|
|
"success": True,
|
|
"view_mode": new_view,
|
|
"message": f"Switched to {'regular user' if new_view == 'user' else 'admin'} view"
|
|
}
|
|
|
|
# Admin endpoints
|
|
@app.get("/api/admin/users", response_model=List[models.UserResponse])
|
|
async def get_all_users(current_user: dict = Depends(require_admin_or_readonly)):
|
|
|
|
users = await crud.get_all_users()
|
|
return [
|
|
models.UserResponse(
|
|
email=user["email"],
|
|
full_name=user.get("full_name"),
|
|
is_active=user["is_active"],
|
|
is_admin=user["is_admin"],
|
|
role=user.get("role", "admin" if user.get("is_admin") else "user"),
|
|
auth_provider=user.get("auth_provider", "local")
|
|
) for user in users
|
|
]
|
|
|
|
@app.post("/api/admin/users", response_model=models.UserResponse)
|
|
async def admin_create_user(
|
|
user_data: models.AdminUserCreate,
|
|
current_user: dict = Depends(require_admin)
|
|
):
|
|
"""Create a new local user (admin only)"""
|
|
try:
|
|
created_user = await crud.admin_create_local_user(
|
|
email=user_data.email,
|
|
password=user_data.password,
|
|
full_name=user_data.full_name,
|
|
is_admin=user_data.is_admin
|
|
)
|
|
return models.UserResponse(
|
|
email=created_user["email"],
|
|
full_name=created_user.get("full_name"),
|
|
is_active=created_user["is_active"],
|
|
is_admin=created_user["is_admin"],
|
|
role=created_user.get("role", "admin" if created_user["is_admin"] else "user"),
|
|
auth_provider=created_user.get("auth_provider", "local")
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
@app.put("/api/admin/users/{email}", response_model=models.UserResponse)
|
|
async def update_user(email: str, user_update: models.UserUpdate, current_user: dict = Depends(require_admin)):
|
|
# Get the user by email first
|
|
existing_user = await crud.get_user_by_email(email)
|
|
if not existing_user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Update the user
|
|
update_data = user_update.model_dump(exclude_unset=True)
|
|
|
|
# Sync role and is_admin fields
|
|
if "role" in update_data:
|
|
role = update_data["role"]
|
|
if role == "admin":
|
|
update_data["is_admin"] = True
|
|
elif role == "readonly_admin":
|
|
update_data["is_admin"] = False
|
|
elif role == "user":
|
|
update_data["is_admin"] = False
|
|
elif "is_admin" in update_data:
|
|
if update_data["is_admin"]:
|
|
update_data["role"] = "admin"
|
|
else:
|
|
# Only reset to user if not already readonly_admin
|
|
if existing_user.get("role") != "readonly_admin":
|
|
update_data["role"] = "user"
|
|
|
|
updated_user = await crud.update_user(str(existing_user["_id"]), update_data)
|
|
|
|
if not updated_user:
|
|
raise HTTPException(status_code=500, detail="Failed to update user")
|
|
|
|
return models.UserResponse(
|
|
email=updated_user["email"],
|
|
full_name=updated_user.get("full_name"),
|
|
is_active=updated_user["is_active"],
|
|
is_admin=updated_user["is_admin"],
|
|
role=updated_user.get("role", "admin" if updated_user["is_admin"] else "user"),
|
|
auth_provider=updated_user.get("auth_provider", "local")
|
|
)
|
|
|
|
@app.post("/api/admin/users/{email}/reset-password")
|
|
async def admin_reset_password(
|
|
email: str,
|
|
password_data: models.AdminPasswordReset,
|
|
current_user: dict = Depends(require_admin)
|
|
):
|
|
"""Reset password for a local user (admin only)"""
|
|
user = await crud.get_user_by_email(email)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if user.get("auth_provider") != "local":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot reset password for SSO users. This user authenticates via Microsoft."
|
|
)
|
|
|
|
try:
|
|
success = await crud.admin_reset_user_password(str(user["_id"]), password_data.new_password)
|
|
if success:
|
|
return {"message": "Password reset successfully"}
|
|
raise HTTPException(status_code=500, detail="Failed to reset password")
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
@app.post("/api/users/change-password")
|
|
async def change_password(
|
|
password_data: models.PasswordChange,
|
|
current_user: dict = Depends(get_current_user_from_cookie)
|
|
):
|
|
"""Change password for current logged-in user"""
|
|
if current_user.get("auth_provider") != "local":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot change password for SSO users. Your account uses Microsoft authentication."
|
|
)
|
|
|
|
try:
|
|
success = await crud.change_user_password(
|
|
str(current_user["_id"]),
|
|
password_data.current_password,
|
|
password_data.new_password
|
|
)
|
|
if success:
|
|
return {"message": "Password changed successfully"}
|
|
raise HTTPException(status_code=500, detail="Failed to change password")
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
@app.get("/api/admin/agents/pending-verification")
|
|
async def get_pending_verification(current_user: dict = Depends(require_admin_or_readonly)):
|
|
"""Get agents pending verification"""
|
|
from database import agents_collection as ac
|
|
cursor = ac.find(
|
|
{"verification_status": {"$in": ["needs_verification", "verified"]}}
|
|
).sort("created_at", -1)
|
|
agents = await cursor.to_list(length=None)
|
|
result = []
|
|
for agent in agents:
|
|
# Resolve creator email
|
|
created_by = agent.get("created_by", "")
|
|
created_by_email = created_by
|
|
if created_by and created_by != "agent_collector_api":
|
|
try:
|
|
from bson import ObjectId
|
|
creator = await crud.get_user_by_id(created_by)
|
|
if creator:
|
|
created_by_email = creator.get("email", created_by)
|
|
except Exception:
|
|
pass
|
|
result.append({
|
|
"agent_id": str(agent["_id"]),
|
|
"agent_name": agent.get("agent_name"),
|
|
"client_name": agent.get("client_name"),
|
|
"studio_name": agent.get("studio_name"),
|
|
"created_by": created_by_email,
|
|
"created_at": agent["created_at"].isoformat() if agent.get("created_at") else None,
|
|
"verification_status": agent.get("verification_status"),
|
|
"verified_by": agent.get("verified_by"),
|
|
"verified_date": agent.get("verified_date"),
|
|
})
|
|
return result
|
|
|
|
@app.put("/api/admin/agents/{agent_id}/verify")
|
|
async def verify_agent(agent_id: str, current_user: dict = Depends(require_admin)):
|
|
"""Approve/verify an agent (admin only)"""
|
|
from bson import ObjectId
|
|
from database import agents_collection as ac
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
result = await ac.update_one(
|
|
{"_id": ObjectId(agent_id)},
|
|
{"$set": {
|
|
"verification_status": "verified",
|
|
"verified_by": current_user.get("email", str(current_user["_id"])),
|
|
"verified_date": datetime.utcnow().isoformat(),
|
|
"updated_at": datetime.utcnow(),
|
|
}}
|
|
)
|
|
if result.modified_count:
|
|
return {"message": "Agent verified successfully"}
|
|
raise HTTPException(status_code=500, detail="Failed to verify agent")
|
|
|
|
@app.post("/api/admin/digest/send")
|
|
async def trigger_weekly_digest(current_user: dict = Depends(require_admin)):
|
|
"""Manually trigger the weekly agent digest email"""
|
|
try:
|
|
await notifications.send_weekly_agent_digest()
|
|
return {"message": "Weekly digest sent successfully"}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to send digest: {str(e)}")
|
|
|
|
|
|
@app.get("/api/admin/agents/unresolved-owner")
|
|
async def list_unresolved_owner_agents(current_user: dict = Depends(require_admin)):
|
|
"""List collector-created agents whose contact_person doesn't match any active user.
|
|
|
|
These agents won't be picked up by the completion-reminder flow until an admin
|
|
reassigns ownership via PUT /api/admin/agents/{id}/reassign-owner.
|
|
"""
|
|
agents = await crud.get_unresolved_owner_agents()
|
|
return [
|
|
{
|
|
"agent_id": str(a["_id"]),
|
|
"agent_name": a.get("agent_name"),
|
|
"agent_contact_person": a.get("agent_contact_person"),
|
|
"registration_complete": a.get("registration_complete", False),
|
|
"created_at": a["created_at"].isoformat() if a.get("created_at") else None,
|
|
}
|
|
for a in agents
|
|
]
|
|
|
|
|
|
class ReassignOwnerRequest(BaseModel):
|
|
new_contact_email: str
|
|
new_owner_user_id: Optional[str] = None # if omitted, looked up by email
|
|
|
|
|
|
@app.put("/api/admin/agents/{agent_id}/reassign-owner")
|
|
async def reassign_owner(
|
|
agent_id: str,
|
|
body: ReassignOwnerRequest,
|
|
current_user: dict = Depends(require_admin),
|
|
):
|
|
"""Admin: reassign a collector-created agent to a real user (by email).
|
|
|
|
Resolves the user by email if `new_owner_user_id` not supplied. After this,
|
|
the user will start receiving completion-reminder emails on the next daily run.
|
|
"""
|
|
target_email = body.new_contact_email.strip().lower()
|
|
if "@" not in target_email:
|
|
raise HTTPException(status_code=400, detail="new_contact_email must be a valid email")
|
|
|
|
new_owner_id = body.new_owner_user_id
|
|
if not new_owner_id:
|
|
# Look up by email (case-insensitive)
|
|
user = await users_collection.find_one(
|
|
{"email": {"$regex": f"^{re.escape(target_email)}$", "$options": "i"}}
|
|
)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail=f"No user found with email {target_email}")
|
|
new_owner_id = str(user["_id"])
|
|
|
|
updated = await crud.reassign_agent_owner(agent_id, new_owner_id, body.new_contact_email)
|
|
if not updated:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
return {
|
|
"status": "ok",
|
|
"agent_id": agent_id,
|
|
"new_owner_user_id": new_owner_id,
|
|
"new_contact_email": body.new_contact_email,
|
|
}
|
|
|
|
|
|
@app.post("/api/admin/completion-reminders/send")
|
|
async def trigger_completion_reminders(
|
|
force: bool = Query(False, description="Bypass per-user cooldown and nudge cap"),
|
|
current_user: dict = Depends(require_admin),
|
|
):
|
|
"""Manually trigger the completion-reminder digest email run.
|
|
|
|
Use `?force=true` to ignore the cooldown / max-nudges cap (e.g., for testing or
|
|
after an outage where users haven't been emailed for a while).
|
|
"""
|
|
try:
|
|
summary = await notifications.send_completion_reminders(force=force)
|
|
return summary
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to send completion reminders: {str(e)}")
|
|
|
|
# Prompt Audit Endpoints
|
|
async def _audit_batch_runner(unclassified_only: bool, agent_id: str):
|
|
"""Background wrapper around run_audit_batch that flips _batch_state.running off when done."""
|
|
try:
|
|
await audit_analyzer.run_audit_batch(
|
|
unclassified_only=unclassified_only,
|
|
single_agent_id=agent_id
|
|
)
|
|
except Exception as e:
|
|
audit_analyzer._batch_state["error"] = str(e)
|
|
audit_analyzer.logger.exception("Audit batch crashed")
|
|
finally:
|
|
audit_analyzer._batch_state["running"] = False
|
|
audit_analyzer._batch_state["completed_at"] = datetime.utcnow().isoformat()
|
|
audit_analyzer._batch_state["current_agent"] = None
|
|
|
|
|
|
@app.post("/api/admin/audit/run")
|
|
async def run_audit(
|
|
request: Request,
|
|
current_user: dict = Depends(require_admin)
|
|
):
|
|
"""Kick off a Gemini audit batch in the background. Optional body: {agent_id, unclassified_only}.
|
|
|
|
Returns immediately with {status: started}. Poll /api/admin/audit/status for progress.
|
|
"""
|
|
if not audit_analyzer.is_gemini_configured():
|
|
raise HTTPException(status_code=503, detail="GOOGLE_API_KEY not configured")
|
|
|
|
if audit_analyzer._batch_state.get("running"):
|
|
raise HTTPException(status_code=409, detail="An audit batch is already running")
|
|
|
|
body = {}
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
pass
|
|
|
|
agent_id = body.get("agent_id")
|
|
unclassified_only = body.get("unclassified_only", False)
|
|
|
|
if agent_id:
|
|
mode = "single"
|
|
elif unclassified_only:
|
|
mode = "unclassified"
|
|
else:
|
|
mode = "full"
|
|
|
|
audit_analyzer._reset_batch_state(mode=mode, started_by=current_user.get("email"))
|
|
|
|
asyncio.create_task(_audit_batch_runner(unclassified_only=unclassified_only, agent_id=agent_id))
|
|
|
|
return {"status": "started", "mode": mode}
|
|
|
|
|
|
@app.get("/api/admin/audit/status")
|
|
async def audit_status(current_user: dict = Depends(require_admin_or_readonly)):
|
|
"""Current state of the audit batch (poll while running)."""
|
|
return audit_analyzer.get_batch_state()
|
|
|
|
@app.get("/api/admin/audit/results")
|
|
async def get_audit_results(current_user: dict = Depends(require_admin_or_readonly)):
|
|
"""Get all agents with audit data for the Prompt Audit tab"""
|
|
agents = await audit_analyzer.get_all_audit_results()
|
|
return {
|
|
"agents": agents,
|
|
"config_status": {
|
|
"gemini_configured": audit_analyzer.is_gemini_configured()
|
|
}
|
|
}
|
|
|
|
@app.put("/api/admin/audit/{agent_id}/review")
|
|
async def review_audit(
|
|
agent_id: str,
|
|
review: models.AuditReviewRequest,
|
|
current_user: dict = Depends(require_admin)
|
|
):
|
|
"""Mark an agent's audit as reviewed/cleared"""
|
|
await audit_analyzer.update_audit_review(
|
|
agent_id=agent_id,
|
|
status=review.audit_status,
|
|
notes=review.reviewer_notes or "",
|
|
reviewer_email=current_user["email"]
|
|
)
|
|
return {"message": f"Audit status updated to {review.audit_status}"}
|
|
|
|
@app.get("/api/admin/analytics")
|
|
async def get_admin_analytics(
|
|
status: Optional[str] = None,
|
|
discipline: Optional[str] = None,
|
|
days: int = 90,
|
|
current_user: dict = Depends(require_admin_or_readonly),
|
|
):
|
|
"""Get analytics data for admin dashboard"""
|
|
(
|
|
summary,
|
|
status_breakdown,
|
|
discipline_breakdown,
|
|
usage_timeline,
|
|
top_by_messages,
|
|
top_by_tokens,
|
|
recently_active,
|
|
all_users,
|
|
) = await asyncio.gather(
|
|
crud.get_analytics_summary(status_filter=status, discipline_filter=discipline),
|
|
crud.get_agent_stats(discipline_filter=discipline),
|
|
crud.get_discipline_breakdown(status_filter=status),
|
|
crud.get_aggregated_usage_timeline(days, status_filter=status, discipline_filter=discipline),
|
|
crud.get_top_agents("total_messages", 5, status_filter=status, discipline_filter=discipline),
|
|
crud.get_top_agents("total_tokens", 5, status_filter=status, discipline_filter=discipline),
|
|
crud.get_recently_active_agents(10, status_filter=status, discipline_filter=discipline),
|
|
crud.get_all_users(),
|
|
)
|
|
|
|
# Rating distribution: count agents in each star bucket (1-5)
|
|
all_agents = await crud.get_all_agents(status_filter=status)
|
|
filtered_agents = all_agents
|
|
if discipline:
|
|
filtered_agents = [a for a in all_agents if a.get("discipline") == discipline]
|
|
rating_dist = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
|
|
for agent in filtered_agents:
|
|
r = agent.get("rating")
|
|
if r is not None:
|
|
bucket = max(1, min(5, round(r)))
|
|
rating_dist[bucket] += 1
|
|
|
|
total_users = len(all_users)
|
|
admin_users = sum(1 for u in all_users if u.get("is_admin"))
|
|
active_users = sum(1 for u in all_users if u.get("is_active"))
|
|
|
|
return {
|
|
"summary": summary,
|
|
"status_breakdown": status_breakdown,
|
|
"discipline_breakdown": discipline_breakdown,
|
|
"usage_timeline": usage_timeline,
|
|
"top_by_messages": top_by_messages,
|
|
"top_by_tokens": top_by_tokens,
|
|
"recently_active": recently_active,
|
|
"rating_distribution": rating_dist,
|
|
"total_users": total_users,
|
|
"admin_users": admin_users,
|
|
"active_users": active_users,
|
|
"total_agents": len(filtered_agents),
|
|
}
|
|
|
|
@app.get("/api/admin/agents", response_model=List[models.AiAgentResponse])
|
|
async def get_all_agents_admin(current_user: dict = Depends(require_admin_or_readonly)):
|
|
|
|
agents = await crud.get_all_agents()
|
|
return [
|
|
create_agent_response(agent) for agent in agents
|
|
]
|
|
|
|
EXPORT_FIELDNAMES = [
|
|
"agent_id",
|
|
"agent_name",
|
|
"agent_tool",
|
|
"agent_description",
|
|
"agent_purpose",
|
|
"agent_version",
|
|
"agent_status",
|
|
"agent_location",
|
|
"agent_department",
|
|
"agent_contact_person",
|
|
"agent_created_at",
|
|
"agent_updated_at",
|
|
"agent_tags",
|
|
"agent_metadata",
|
|
"agent_userbase",
|
|
"agent_capabilities",
|
|
"url",
|
|
"quality_audit_status",
|
|
"quality_audit_updated_by",
|
|
"quality_audit_updated_at",
|
|
"quality_audit_updated_by_name",
|
|
"risk_factor",
|
|
"last_edited_by",
|
|
"created_by",
|
|
"total_tokens",
|
|
"prompt_tokens",
|
|
"completion_tokens",
|
|
"discipline",
|
|
"rating",
|
|
"rating_count",
|
|
"instructions",
|
|
"business_entity",
|
|
"client",
|
|
"client_scope",
|
|
"client_name",
|
|
"studio_name",
|
|
"agent_classification",
|
|
"autonomy_level",
|
|
"ip_ownership",
|
|
"foundation_model",
|
|
"validated_by",
|
|
"validation_date",
|
|
"evals_method",
|
|
"registration_complete",
|
|
"safety_off_switch_confirmed",
|
|
"safety_access_rights_confirmed",
|
|
"pii_handles_pii",
|
|
"pii_legal_ref",
|
|
"pii_data_types",
|
|
"pii_consent_recorded",
|
|
"decl_governance",
|
|
"decl_accuracy",
|
|
"decl_upkeep",
|
|
]
|
|
|
|
|
|
def _export_row(agent: dict) -> dict:
|
|
return {
|
|
"agent_id": str(agent["_id"]),
|
|
"agent_name": agent.get("agent_name", ""),
|
|
"agent_tool": agent.get("agent_tool", ""),
|
|
"agent_description": agent.get("agent_description", ""),
|
|
"agent_purpose": agent.get("agent_purpose", ""),
|
|
"agent_version": agent.get("agent_version", ""),
|
|
"agent_status": agent.get("agent_status", ""),
|
|
"agent_location": agent.get("agent_location", ""),
|
|
"agent_department": agent.get("agent_department", ""),
|
|
"agent_contact_person": agent.get("agent_contact_person", ""),
|
|
"agent_created_at": agent["created_at"].isoformat() if agent.get("created_at") else "",
|
|
"agent_updated_at": agent["updated_at"].isoformat() if agent.get("updated_at") else "",
|
|
"agent_tags": "|".join(agent.get("agent_tags", [])) if agent.get("agent_tags") else "",
|
|
"agent_userbase": "|".join(agent.get("agent_userbase", [])) if agent.get("agent_userbase") else "",
|
|
"agent_capabilities": "|".join(agent.get("agent_capabilities", [])) if agent.get("agent_capabilities") else "",
|
|
"agent_metadata": json.dumps(sanitize_metadata(agent.get("agent_metadata"))) if agent.get("agent_metadata") else "",
|
|
"url": agent.get("url", ""),
|
|
"quality_audit_status": str(agent.get("quality_audit_status", False)),
|
|
"quality_audit_updated_by": agent.get("quality_audit_updated_by", ""),
|
|
"quality_audit_updated_at": agent.get("quality_audit_updated_at", ""),
|
|
"quality_audit_updated_by_name": agent.get("quality_audit_updated_by_name", ""),
|
|
"risk_factor": str(agent.get("risk_factor", "")) if agent.get("risk_factor") is not None else "",
|
|
"last_edited_by": agent.get("last_edited_by", ""),
|
|
"created_by": agent.get("created_by", ""),
|
|
"total_tokens": str(agent.get("total_tokens", "")) if agent.get("total_tokens") is not None else "",
|
|
"prompt_tokens": str(agent.get("prompt_tokens", "")) if agent.get("prompt_tokens") is not None else "",
|
|
"completion_tokens": str(agent.get("completion_tokens", "")) if agent.get("completion_tokens") is not None else "",
|
|
"discipline": agent.get("discipline", ""),
|
|
"rating": str(agent.get("rating", "")) if agent.get("rating") is not None else "",
|
|
"rating_count": str(agent.get("rating_count", "")) if agent.get("rating_count") is not None else "",
|
|
"instructions": agent.get("instructions", ""),
|
|
"business_entity": agent.get("business_entity", ""),
|
|
"client": agent.get("client", ""),
|
|
"client_scope": agent.get("client_scope", ""),
|
|
"client_name": agent.get("client_name", ""),
|
|
"studio_name": agent.get("studio_name", ""),
|
|
"agent_classification": agent.get("agent_classification", ""),
|
|
"autonomy_level": agent.get("autonomy_level", ""),
|
|
"ip_ownership": agent.get("ip_ownership", ""),
|
|
"foundation_model": agent.get("foundation_model", ""),
|
|
"validated_by": agent.get("validated_by", ""),
|
|
"validation_date": agent.get("validation_date", ""),
|
|
"evals_method": agent.get("evals_method", ""),
|
|
"registration_complete": str(agent.get("registration_complete", "")) if agent.get("registration_complete") is not None else "",
|
|
"safety_off_switch_confirmed": str((agent.get("safety") or {}).get("off_switch_confirmed", "")) if agent.get("safety") else "",
|
|
"safety_access_rights_confirmed": str((agent.get("safety") or {}).get("access_rights_confirmed", "")) if agent.get("safety") else "",
|
|
"pii_handles_pii": str((agent.get("pii") or {}).get("handles_pii", "")) if agent.get("pii") else "",
|
|
"pii_legal_ref": (agent.get("pii") or {}).get("legal_ref", "") if agent.get("pii") else "",
|
|
"pii_data_types": (agent.get("pii") or {}).get("data_types", "") if agent.get("pii") else "",
|
|
"pii_consent_recorded": str((agent.get("pii") or {}).get("consent_recorded", "")) if agent.get("pii") else "",
|
|
"decl_governance": str((agent.get("declarations") or {}).get("governance", "")) if agent.get("declarations") else "",
|
|
"decl_accuracy": str((agent.get("declarations") or {}).get("accuracy", "")) if agent.get("declarations") else "",
|
|
"decl_upkeep": str((agent.get("declarations") or {}).get("upkeep", "")) if agent.get("declarations") else "",
|
|
}
|
|
|
|
|
|
def _matches_export_filters(
|
|
agent: dict,
|
|
status: str,
|
|
discipline: str,
|
|
audit: str,
|
|
business_entity: str,
|
|
agent_classification: str,
|
|
autonomy_level: str,
|
|
risks_only: bool,
|
|
search: str,
|
|
) -> bool:
|
|
if status and agent.get("agent_status") != status:
|
|
return False
|
|
if discipline and agent.get("discipline") != discipline:
|
|
return False
|
|
if audit == "audited" and not agent.get("quality_audit_status"):
|
|
return False
|
|
if audit == "not_audited" and agent.get("quality_audit_status"):
|
|
return False
|
|
if business_entity and agent.get("business_entity") != business_entity:
|
|
return False
|
|
if agent_classification and agent.get("agent_classification") != agent_classification:
|
|
return False
|
|
if autonomy_level and agent.get("autonomy_level") != autonomy_level:
|
|
return False
|
|
if risks_only:
|
|
pii_yes = bool((agent.get("pii") or {}).get("handles_pii"))
|
|
ip_shared = agent.get("ip_ownership") == "Shared/TBD"
|
|
autopilot = agent.get("autonomy_level") == "Autopilot"
|
|
if not (pii_yes or ip_shared or autopilot):
|
|
return False
|
|
if search:
|
|
needle = search.lower()
|
|
haystack = " ".join(
|
|
str(agent.get(f) or "") for f in (
|
|
"agent_name", "agent_department", "agent_purpose",
|
|
"agent_description", "agent_contact_person", "discipline",
|
|
)
|
|
).lower()
|
|
if needle not in haystack:
|
|
return False
|
|
return True
|
|
|
|
|
|
@app.get("/api/admin/agents/export/xlsx")
|
|
async def export_agents_xlsx(
|
|
current_user: dict = Depends(require_admin),
|
|
status: str = Query("", description="Filter by agent_status"),
|
|
discipline: str = Query("", description="Filter by discipline (Strategy | Creative | ...)"),
|
|
audit: str = Query("", description="audited | not_audited"),
|
|
business_entity: str = Query("", description="Filter by business_entity"),
|
|
agent_classification: str = Query("", description="Utility | Functional | Supervisory | Guardian"),
|
|
autonomy_level: str = Query("", description="Human-Led | Hybrid | Autopilot"),
|
|
risks_only: bool = Query(False, description="Only PII=Yes, IP=Shared/TBD, or Autopilot"),
|
|
search: str = Query("", description="Case-insensitive substring across name/department/purpose/description/contact/discipline"),
|
|
):
|
|
"""Export agent data as a real .xlsx file. Multi-line instructions stay in one cell.
|
|
Optional filters mirror the admin dashboard Agent Management tab; with no params,
|
|
every agent is exported."""
|
|
|
|
agents = await crud.get_all_agents()
|
|
agents = [
|
|
a for a in agents if _matches_export_filters(
|
|
a, status, discipline, audit, business_entity, agent_classification,
|
|
autonomy_level, risks_only, search,
|
|
)
|
|
]
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "agents"
|
|
ws.append(EXPORT_FIELDNAMES)
|
|
|
|
header_font = Font(bold=True)
|
|
for cell in ws[1]:
|
|
cell.font = header_font
|
|
|
|
wrap = Alignment(wrap_text=True, vertical="top")
|
|
for agent in agents:
|
|
row = _export_row(agent)
|
|
ws.append([row[name] for name in EXPORT_FIELDNAMES])
|
|
for cell in ws[ws.max_row]:
|
|
cell.alignment = wrap
|
|
|
|
ws.freeze_panes = "A2"
|
|
|
|
# Width heuristic: clamp to the longest value in the column, capped at 60 chars
|
|
# so the instructions column stays readable instead of stretching to thousands.
|
|
for col_idx, name in enumerate(EXPORT_FIELDNAMES, start=1):
|
|
max_len = len(name)
|
|
for row in ws.iter_rows(min_row=2, min_col=col_idx, max_col=col_idx, values_only=True):
|
|
v = row[0]
|
|
if v is None:
|
|
continue
|
|
# Only consider the first line for width — multi-line cells wrap.
|
|
first_line = str(v).split("\n", 1)[0]
|
|
if len(first_line) > max_len:
|
|
max_len = len(first_line)
|
|
ws.column_dimensions[ws.cell(row=1, column=col_idx).column_letter].width = min(max_len + 2, 60)
|
|
|
|
buf = io.BytesIO()
|
|
wb.save(buf)
|
|
buf.seek(0)
|
|
|
|
return StreamingResponse(
|
|
buf,
|
|
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=agents_export_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.xlsx"
|
|
},
|
|
)
|
|
|
|
@app.post("/api/admin/agents/import/csv")
|
|
async def import_agents_csv(
|
|
request: Request,
|
|
file: UploadFile = File(...),
|
|
current_user: dict = Depends(require_admin)
|
|
):
|
|
"""Import agents from CSV file"""
|
|
if not file.filename.endswith('.csv'):
|
|
raise HTTPException(status_code=400, detail="File must be a CSV")
|
|
|
|
try:
|
|
contents = await file.read()
|
|
decoded = contents.decode('utf-8')
|
|
csv_reader = csv.DictReader(io.StringIO(decoded))
|
|
|
|
success_count = 0
|
|
skipped_count = 0
|
|
error_count = 0
|
|
errors = []
|
|
|
|
user_id = str(current_user["_id"])
|
|
|
|
for row_num, row in enumerate(csv_reader, start=1):
|
|
try:
|
|
agent_name_from_csv = row.get("agent_name")
|
|
if not agent_name_from_csv:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
def _csv_bool(val):
|
|
"""Parse CSV boolean cell; empty / unrecognised → None so we don't overwrite with False."""
|
|
if val is None or val == "":
|
|
return None
|
|
return val.strip().lower() in ("true", "1", "yes", "y")
|
|
|
|
# Parse fields into agent_data first
|
|
agent_data = {
|
|
"agent_name": agent_name_from_csv,
|
|
"agent_tool": row.get("agent_tool", ""),
|
|
"agent_description": row.get("agent_description"),
|
|
"agent_purpose": row.get("agent_purpose"),
|
|
"agent_version": row.get("agent_version"),
|
|
"agent_status": row.get("agent_status") or "Development",
|
|
"agent_location": row.get("agent_location"),
|
|
"agent_department": row.get("agent_department"),
|
|
"agent_contact_person": row.get("agent_contact_person"),
|
|
"url": row.get("url"),
|
|
"quality_audit_status": row.get("quality_audit_status", "False").lower() == "true",
|
|
"risk_factor": int(row.get("risk_factor")) if row.get("risk_factor") else None,
|
|
"discipline": row.get("discipline") or None,
|
|
"rating": float(row.get("rating")) if row.get("rating") else None,
|
|
"total_tokens": int(row.get("total_tokens")) if row.get("total_tokens") else None,
|
|
# Governance / registration fields
|
|
"business_entity": row.get("business_entity") or None,
|
|
"client": row.get("client") or None,
|
|
"client_scope": row.get("client_scope") or None,
|
|
"client_name": row.get("client_name") or None,
|
|
"studio_name": row.get("studio_name") or None,
|
|
"agent_classification": row.get("agent_classification") or None,
|
|
"autonomy_level": row.get("autonomy_level") or None,
|
|
"ip_ownership": row.get("ip_ownership") or None,
|
|
"foundation_model": row.get("foundation_model") or None,
|
|
"validated_by": row.get("validated_by") or None,
|
|
"validation_date": row.get("validation_date") or None,
|
|
"evals_method": row.get("evals_method") or None,
|
|
"registration_complete": _csv_bool(row.get("registration_complete")),
|
|
}
|
|
|
|
# Build nested safety / pii / declarations objects only when at least one
|
|
# cell is populated, so empty CSV rows don't clobber existing nested data.
|
|
safety_off = _csv_bool(row.get("safety_off_switch_confirmed"))
|
|
safety_access = _csv_bool(row.get("safety_access_rights_confirmed"))
|
|
if safety_off is not None or safety_access is not None:
|
|
agent_data["safety"] = {
|
|
"off_switch_confirmed": safety_off,
|
|
"access_rights_confirmed": safety_access,
|
|
}
|
|
|
|
pii_handles = _csv_bool(row.get("pii_handles_pii"))
|
|
pii_legal = row.get("pii_legal_ref") or None
|
|
pii_data = row.get("pii_data_types") or None
|
|
pii_consent = _csv_bool(row.get("pii_consent_recorded"))
|
|
if any(v is not None for v in (pii_handles, pii_legal, pii_data, pii_consent)):
|
|
agent_data["pii"] = {
|
|
"handles_pii": pii_handles,
|
|
"legal_ref": pii_legal,
|
|
"data_types": pii_data,
|
|
"consent_recorded": pii_consent,
|
|
}
|
|
|
|
decl_gov = _csv_bool(row.get("decl_governance"))
|
|
decl_acc = _csv_bool(row.get("decl_accuracy"))
|
|
decl_upk = _csv_bool(row.get("decl_upkeep"))
|
|
if any(v is not None for v in (decl_gov, decl_acc, decl_upk)):
|
|
agent_data["declarations"] = {
|
|
"governance": decl_gov,
|
|
"accuracy": decl_acc,
|
|
"upkeep": decl_upk,
|
|
}
|
|
|
|
# Handle lists (pipe separated)
|
|
if row.get("agent_tags"):
|
|
agent_data["agent_tags"] = [t.strip() for t in row.get("agent_tags").split("|") if t.strip()]
|
|
if row.get("agent_userbase"):
|
|
agent_data["agent_userbase"] = [u.strip() for u in row.get("agent_userbase").split("|") if u.strip()]
|
|
if row.get("agent_capabilities"):
|
|
agent_data["agent_capabilities"] = [c.strip() for c in row.get("agent_capabilities").split("|") if c.strip()]
|
|
|
|
# Handle metadata (JSON string)
|
|
if row.get("agent_metadata"):
|
|
try:
|
|
agent_data["agent_metadata"] = json.loads(row.get("agent_metadata"))
|
|
except:
|
|
pass # Ignore invalid metadata JSON
|
|
|
|
# Check for existing agent with same name
|
|
existing_agent = await crud.get_agent_by_name(agent_data["agent_name"])
|
|
if existing_agent:
|
|
# If duplicate, append location to name
|
|
location = agent_data.get("agent_location")
|
|
if location and location.strip():
|
|
agent_data["agent_name"] = f"{agent_data['agent_name']} - {location.strip()}"
|
|
else:
|
|
# Fallback if no location
|
|
agent_data["agent_name"] = f"{agent_data['agent_name']} - Imported"
|
|
|
|
# Create agent
|
|
await crud.create_agent(agent_data, user_id, skip_time_check=True)
|
|
success_count += 1
|
|
|
|
except Exception as e:
|
|
error_count += 1
|
|
errors.append(f"Row {row_num}: {str(e)}")
|
|
|
|
return JSONResponse({
|
|
"success": True,
|
|
"imported": success_count,
|
|
"skipped": skipped_count,
|
|
"errors": error_count,
|
|
"error_details": errors[:10] # Limit error details
|
|
})
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
|
|
|
|
@app.post("/api/admin/agents/delete/csv")
|
|
async def delete_agents_csv(
|
|
request: Request,
|
|
file: UploadFile = File(...),
|
|
current_user: dict = Depends(require_admin)
|
|
):
|
|
"""Delete agents based on CSV file"""
|
|
if not file.filename.endswith('.csv'):
|
|
raise HTTPException(status_code=400, detail="File must be a CSV")
|
|
|
|
try:
|
|
contents = await file.read()
|
|
decoded = contents.decode('utf-8')
|
|
csv_reader = csv.DictReader(io.StringIO(decoded))
|
|
|
|
deleted_count = 0
|
|
not_found_count = 0
|
|
error_count = 0
|
|
errors = []
|
|
|
|
for row_num, row in enumerate(csv_reader, start=1):
|
|
try:
|
|
agent_id = row.get("agent_id")
|
|
agent_name = row.get("agent_name")
|
|
|
|
if agent_id and agent_id.strip():
|
|
# Delete by ID
|
|
success = await crud.delete_agent(agent_id.strip())
|
|
if success:
|
|
deleted_count += 1
|
|
else:
|
|
not_found_count += 1
|
|
elif agent_name and agent_name.strip():
|
|
# Delete by Name (All matches)
|
|
# We need to find them first
|
|
# Using direct collection access here for efficiency/custom logic not in crud
|
|
from database import agents_collection
|
|
result = await agents_collection.delete_many({"agent_name": agent_name.strip()})
|
|
if result.deleted_count > 0:
|
|
deleted_count += result.deleted_count
|
|
else:
|
|
not_found_count += 1
|
|
else:
|
|
# Skip empty rows
|
|
continue
|
|
|
|
except Exception as e:
|
|
error_count += 1
|
|
errors.append(f"Row {row_num}: {str(e)}")
|
|
|
|
return JSONResponse({
|
|
"success": True,
|
|
"deleted": deleted_count,
|
|
"not_found": not_found_count,
|
|
"errors": error_count,
|
|
"error_details": errors[:10]
|
|
})
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}")
|
|
|
|
# Agent Collector API Endpoints (for compatibility with agent_collector app)
|
|
@app.post("/agents")
|
|
async def create_agent_collector(
|
|
agent: models.AgentCollectorCreate,
|
|
request: Request,
|
|
api_key_valid: bool = Depends(verify_agent_collector_api_key)
|
|
):
|
|
"""Agent collector API endpoint - handles both new registrations and usage tracking"""
|
|
try:
|
|
# Check content type
|
|
content_type = request.headers.get("content-type", "")
|
|
if not content_type.startswith("application/json"):
|
|
return JSONResponse(
|
|
status_code=415,
|
|
content={
|
|
"error": "Unsupported Media Type",
|
|
"message": "Request must be JSON"
|
|
}
|
|
)
|
|
|
|
# Check if agent already exists by name
|
|
existing_agent = await crud.get_agent_by_name(agent.name)
|
|
|
|
if existing_agent:
|
|
# Agent exists - log usage AND update agent document with new usage data
|
|
internal_data = map_agent_collector_to_internal(agent)
|
|
print(f"🔗 URL DEBUG: Agent '{agent.name}' - URL in internal_data: {internal_data.get('url')}")
|
|
|
|
# Create usage record for fallback purposes (existing system)
|
|
await crud.create_agent_usage_record(agent.name, internal_data)
|
|
|
|
# Update agent document with new usage data (replace strategy)
|
|
update_fields = {
|
|
"url": internal_data.get("url"),
|
|
"discipline": internal_data.get("discipline"),
|
|
"instructions": internal_data.get("instructions"),
|
|
"usage_timeline": internal_data.get("usage_timeline"),
|
|
"conversation_count": internal_data.get("conversation_count"),
|
|
"unique_users": internal_data.get("unique_users"),
|
|
"total_messages": internal_data.get("total_messages"),
|
|
"first_used": internal_data.get("first_used"),
|
|
"last_used": internal_data.get("last_used"),
|
|
"total_tokens": internal_data.get("total_tokens"),
|
|
"prompt_tokens": internal_data.get("prompt_tokens"),
|
|
"completion_tokens": internal_data.get("completion_tokens"),
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
|
|
# Only update non-None usage fields
|
|
update_fields = {k: v for k, v in update_fields.items() if k == "updated_at" or v is not None}
|
|
print(f"🔗 URL DEBUG: Agent '{agent.name}' - update_fields after filter: {update_fields}")
|
|
print(f"🔗 URL DEBUG: Agent '{agent.name}' - URL in update_fields: {update_fields.get('url')}")
|
|
|
|
if len(update_fields) > 1: # More than just updated_at
|
|
from database import agents_collection
|
|
result = await agents_collection.update_one(
|
|
{"agent_name": agent.name},
|
|
{"$set": update_fields}
|
|
)
|
|
print(f"🔗 URL DEBUG: Agent '{agent.name}' - Update executed: matched={result.matched_count}, modified={result.modified_count}")
|
|
|
|
# Check weekly token threshold and send notification if needed (non-blocking)
|
|
try:
|
|
await notifications.check_and_notify_threshold(agent.name)
|
|
except Exception as notify_err:
|
|
print(f"Notification check failed for '{agent.name}': {notify_err}")
|
|
|
|
# Auto-classify with Gemini in background (non-blocking)
|
|
if audit_analyzer.is_gemini_configured() and internal_data.get("instructions"):
|
|
asyncio.create_task(audit_analyzer.classify_single_agent(str(existing_agent["_id"])))
|
|
|
|
return models.AgentUsageTrackingResponse(
|
|
status="usage_logged",
|
|
message="Agent already exists, usage tracked",
|
|
agent_name=agent.name
|
|
)
|
|
else:
|
|
# Agent doesn't exist - create new registration
|
|
internal_data = map_agent_collector_to_internal(agent)
|
|
|
|
# Handle datetime fields if provided
|
|
if agent.creation_date:
|
|
internal_data["agent_created_at"] = agent.creation_date
|
|
if agent.last_updated:
|
|
internal_data["agent_updated_at"] = agent.last_updated
|
|
|
|
# Create agent using collector-specific function
|
|
created_agent = await crud.create_agent_from_collector(internal_data)
|
|
|
|
# Check weekly token threshold and send notification if needed (non-blocking)
|
|
try:
|
|
await notifications.check_and_notify_threshold(agent.name)
|
|
except Exception as notify_err:
|
|
print(f"Notification check failed for '{agent.name}': {notify_err}")
|
|
|
|
# Auto-classify with Gemini in background (non-blocking)
|
|
if audit_analyzer.is_gemini_configured() and internal_data.get("instructions"):
|
|
asyncio.create_task(audit_analyzer.classify_single_agent(str(created_agent["_id"])))
|
|
|
|
return models.AgentCollectorResponse(
|
|
status="success",
|
|
message="Agent data collected successfully",
|
|
agent_id=str(created_agent["_id"])
|
|
)
|
|
|
|
except Exception as e:
|
|
# Check if it's a database connectivity issue
|
|
from database import check_database_health
|
|
try:
|
|
db_health = await check_database_health()
|
|
if not db_health.get("healthy"):
|
|
return JSONResponse(
|
|
status_code=503,
|
|
content={
|
|
"error": "Database Unavailable",
|
|
"message": "MongoDB connection is not available. Please check the database setup.",
|
|
"agent_data": agent.model_dump()
|
|
}
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# General database error
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"error": "Database Error",
|
|
"message": "Failed to store agent data. MongoDB may be unavailable or there was an error processing the request.",
|
|
"agent_data": agent.model_dump()
|
|
}
|
|
)
|
|
|
|
# Agent Usage API Endpoints
|
|
@app.get("/api/agents/{agent_name}/usage", response_model=models.AgentUsageStatsResponse)
|
|
async def get_agent_usage(
|
|
agent_name: str,
|
|
start_date: Optional[str] = Query(None),
|
|
end_date: Optional[str] = Query(None),
|
|
current_user: dict = Depends(get_current_user_from_cookie)
|
|
):
|
|
"""Get usage statistics for a specific agent (hybrid: pre-computed data with fallback to calculated)"""
|
|
try:
|
|
# Get agent by name to check for pre-computed usage data
|
|
agent = await crud.get_agent_by_name(agent_name)
|
|
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check if agent has pre-computed usage data
|
|
has_usage_data = agent.get("usage_timeline") is not None
|
|
|
|
if has_usage_data:
|
|
# USE PRE-COMPUTED DATA (new system)
|
|
usage_timeline = agent.get("usage_timeline", [])
|
|
|
|
# Convert timeline to usage_by_period format for response compatibility
|
|
usage_by_period = {entry["date"]: entry["message_count"] for entry in usage_timeline}
|
|
|
|
return models.AgentUsageStatsResponse(
|
|
agent_name=agent_name,
|
|
total_usage_count=agent.get("total_messages", 0),
|
|
first_usage=agent.get("first_used"),
|
|
last_usage=agent.get("last_used"),
|
|
usage_by_period=usage_by_period,
|
|
conversation_count=agent.get("conversation_count"),
|
|
unique_users=agent.get("unique_users"),
|
|
total_tokens=agent.get("total_tokens"),
|
|
prompt_tokens=agent.get("prompt_tokens"),
|
|
completion_tokens=agent.get("completion_tokens")
|
|
)
|
|
else:
|
|
# FALLBACK TO CALCULATED DATA (old system)
|
|
# Parse date strings if provided
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_date:
|
|
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
if end_date:
|
|
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
|
|
|
|
# Get usage stats from agent_usage_collection
|
|
stats = await crud.get_agent_usage_stats(agent_name, start_dt, end_dt)
|
|
|
|
# Get usage by period for the response
|
|
usage_by_period = await crud.get_agent_usage_by_period(agent_name, "daily", start_dt, end_dt)
|
|
|
|
return models.AgentUsageStatsResponse(
|
|
agent_name=agent_name,
|
|
total_usage_count=stats["total_usage_count"],
|
|
first_usage=stats["first_usage"].isoformat() if stats["first_usage"] else None,
|
|
last_usage=stats["last_usage"].isoformat() if stats["last_usage"] else None,
|
|
usage_by_period=usage_by_period,
|
|
conversation_count=None, # Old system doesn't track this
|
|
unique_users=None, # Old system doesn't track this
|
|
total_tokens=None, # Old system doesn't track this
|
|
prompt_tokens=None,
|
|
completion_tokens=None
|
|
)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get usage stats: {str(e)}")
|
|
|
|
@app.get("/api/agents/{agent_name}/usage/chart")
|
|
async def get_agent_usage_chart(
|
|
agent_name: str,
|
|
period: str = Query("daily", regex="^(daily|weekly|monthly)$"),
|
|
start_date: Optional[str] = Query(None),
|
|
end_date: Optional[str] = Query(None),
|
|
current_user: dict = Depends(get_current_user_from_cookie)
|
|
):
|
|
"""Get usage chart data for a specific agent (hybrid: pre-computed data with fallback to calculated)"""
|
|
try:
|
|
# Get agent by name to check for pre-computed usage data
|
|
agent = await crud.get_agent_by_name(agent_name)
|
|
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check if agent has pre-computed usage timeline
|
|
has_usage_timeline = agent.get("usage_timeline") is not None
|
|
|
|
if has_usage_timeline:
|
|
# USE PRE-COMPUTED TIMELINE (new system)
|
|
usage_timeline = agent.get("usage_timeline", [])
|
|
|
|
# Format for Chart.js with dual datasets
|
|
datasets = [{
|
|
"label": "Message Count",
|
|
"data": [entry["message_count"] for entry in usage_timeline],
|
|
"backgroundColor": "rgba(54, 162, 235, 0.2)",
|
|
"borderColor": "rgba(54, 162, 235, 1)",
|
|
"borderWidth": 1,
|
|
"yAxisID": "y"
|
|
}]
|
|
|
|
# Add token dataset if any entries have token data
|
|
token_data = [entry.get("token_count", 0) for entry in usage_timeline]
|
|
if any(t > 0 for t in token_data):
|
|
datasets.append({
|
|
"label": "Token Count",
|
|
"data": token_data,
|
|
"backgroundColor": "rgba(255, 193, 7, 0.2)",
|
|
"borderColor": "rgba(255, 193, 7, 1)",
|
|
"borderWidth": 1,
|
|
"yAxisID": "y1"
|
|
})
|
|
|
|
chart_data = {
|
|
"labels": [entry["date"] for entry in usage_timeline],
|
|
"datasets": datasets
|
|
}
|
|
|
|
return chart_data
|
|
else:
|
|
# FALLBACK TO CALCULATED DATA (old system)
|
|
# Parse date strings if provided
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_date:
|
|
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
if end_date:
|
|
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
|
|
|
|
# Get usage data grouped by period from agent_usage_collection
|
|
usage_data = await crud.get_agent_usage_by_period(agent_name, period, start_dt, end_dt)
|
|
|
|
# Format for Chart.js
|
|
chart_data = {
|
|
"labels": list(usage_data.keys()),
|
|
"datasets": [{
|
|
"label": f"Usage Count ({period})",
|
|
"data": list(usage_data.values()),
|
|
"backgroundColor": "rgba(54, 162, 235, 0.2)",
|
|
"borderColor": "rgba(54, 162, 235, 1)",
|
|
"borderWidth": 1
|
|
}]
|
|
}
|
|
|
|
return chart_data
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get chart data: {str(e)}")
|
|
|
|
# MSAL routes no longer needed - using popup authentication
|