1004 lines
40 KiB
Python
1004 lines
40 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, Request, Form, Query
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from typing import List, Optional
|
|
import crud
|
|
import models
|
|
import auth
|
|
import config
|
|
import msal_auth
|
|
from datetime import datetime
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from fastapi import Header
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI(
|
|
title="AgentHub",
|
|
description="AI Agent Management System with comprehensive CRUD operations",
|
|
version="1.0.0",
|
|
root_path=config.get_base_path()
|
|
)
|
|
|
|
# Add session middleware for MSAL state management
|
|
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-session-secret-key"))
|
|
|
|
# Mount static files with explicit path handling
|
|
from fastapi.responses import FileResponse
|
|
import os as path_os
|
|
|
|
@app.get("/static/{filename:path}")
|
|
async def serve_static(filename: str):
|
|
"""Serve static files with proper path handling"""
|
|
file_path = path_os.path.join("static", filename)
|
|
if path_os.path.exists(file_path) and path_os.path.isfile(file_path):
|
|
return FileResponse(file_path)
|
|
raise HTTPException(status_code=404, detail="Static file not found")
|
|
|
|
# Also mount the traditional way as backup
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
|
|
|
|
def get_app_url(path: str) -> str:
|
|
"""Get URL with proper base path - production fix"""
|
|
base_path = os.getenv("BASE_PATH", "").rstrip("/")
|
|
path = path.lstrip("/")
|
|
if base_path:
|
|
return f"{base_path}/{path}"
|
|
return f"/{path}" if path else "/"
|
|
|
|
# Debug route to check configuration
|
|
@app.get("/debug/config")
|
|
async def debug_config():
|
|
return {
|
|
"base_path": config.get_base_path(),
|
|
"env_base_path": os.getenv("BASE_PATH"),
|
|
"root_path_from_app": app.root_path,
|
|
"get_app_url_test": get_app_url("agent-management")
|
|
}
|
|
|
|
def get_template_context(request: Request, current_user=None, **kwargs):
|
|
"""Get standard template context with base path and MSAL info"""
|
|
return {
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"base_path": config.get_base_path(),
|
|
"msal_enabled": msal_auth.is_msal_available(),
|
|
"show_local_login": config.show_local_login(),
|
|
**kwargs
|
|
}
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme)):
|
|
payload = auth.decode_access_token(token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
async def get_current_user_optional(request: Request):
|
|
"""Get current user from cookie if available"""
|
|
token = request.cookies.get("access_token")
|
|
if token:
|
|
try:
|
|
payload = auth.decode_access_token(token)
|
|
if payload:
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
return user
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
async def get_current_user_from_cookie(request: Request):
|
|
"""Get current user from cookie for API endpoints"""
|
|
token = request.cookies.get("access_token")
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
payload = auth.decode_access_token(token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
async def require_admin(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Require admin access"""
|
|
if not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return current_user
|
|
|
|
def create_agent_response(agent: dict) -> models.AiAgentResponse:
|
|
"""Helper function to create AiAgentResponse with all fields including Quality Audit"""
|
|
return models.AiAgentResponse(
|
|
agent_id=str(agent["_id"]),
|
|
agent_name=agent["agent_name"],
|
|
agent_tool=agent.get("agent_tool"),
|
|
agent_description=agent.get("agent_description"),
|
|
agent_purpose=agent.get("agent_purpose"),
|
|
agent_version=agent.get("agent_version"),
|
|
agent_status=agent.get("agent_status"),
|
|
agent_location=agent.get("agent_location"),
|
|
agent_department=agent.get("agent_department"),
|
|
agent_contact_person=agent.get("agent_contact_person"),
|
|
agent_created_at=agent["created_at"].isoformat() if agent.get("created_at") else None,
|
|
agent_updated_at=agent["updated_at"].isoformat() if agent.get("updated_at") else None,
|
|
agent_tags=agent.get("agent_tags"),
|
|
agent_metadata=agent.get("agent_metadata"),
|
|
agent_userbase=agent.get("agent_userbase"),
|
|
agent_capabilities=agent.get("agent_capabilities"),
|
|
quality_audit_status=agent.get("quality_audit_status", False),
|
|
quality_audit_updated_by=agent.get("quality_audit_updated_by"),
|
|
quality_audit_updated_at=agent.get("quality_audit_updated_at"),
|
|
quality_audit_updated_by_name=agent.get("quality_audit_updated_by_name"),
|
|
risk_factor=agent.get("risk_factor"),
|
|
created_by=agent["created_by"]
|
|
)
|
|
|
|
async def verify_agent_collector_api_key(x_api_key: str = Header(alias="X-API-Key")):
|
|
"""Verify static API key for agent collector endpoints"""
|
|
expected_key = os.getenv("AGENT_COLLECTOR_API_KEY")
|
|
if not expected_key:
|
|
raise HTTPException(status_code=500, detail="API key not configured")
|
|
if x_api_key != expected_key:
|
|
raise HTTPException(status_code=401, detail="Invalid API key")
|
|
return True
|
|
|
|
def map_agent_collector_to_internal(collector_data: models.AgentCollectorCreate) -> dict:
|
|
"""Map agent collector field names to internal schema"""
|
|
# Normalize status to Title Case for internal storage
|
|
status = collector_data.status
|
|
if status:
|
|
status_map = {
|
|
"active": "Active",
|
|
"inactive": "Inactive",
|
|
"deprecated": "Deprecated",
|
|
"development": "Development"
|
|
}
|
|
status = status_map.get(status.lower(), status)
|
|
|
|
return {
|
|
"agent_name": collector_data.name,
|
|
"agent_tool": collector_data.tool,
|
|
"agent_description": collector_data.description,
|
|
"agent_purpose": collector_data.purpose,
|
|
"agent_location": collector_data.location,
|
|
"agent_userbase": collector_data.userbase,
|
|
"agent_version": collector_data.version,
|
|
"agent_capabilities": collector_data.capabilities,
|
|
"agent_status": status,
|
|
"agent_department": collector_data.department,
|
|
"agent_contact_person": collector_data.contact_person,
|
|
"agent_tags": collector_data.tags,
|
|
"agent_metadata": collector_data.metadata,
|
|
}
|
|
|
|
|
|
|
|
@app.get("/me", response_model=models.UserResponse)
|
|
async def me(current_user: dict = Depends(get_current_user)):
|
|
return {
|
|
"email": current_user["email"],
|
|
"full_name": current_user.get("full_name"),
|
|
"is_active": current_user["is_active"],
|
|
"is_admin": current_user["is_admin"]
|
|
}
|
|
|
|
# HTML Routes
|
|
@app.get("/")
|
|
async def home(request: Request):
|
|
# Check if this is a JSON API request (for health check)
|
|
accept_header = request.headers.get("accept", "")
|
|
if "application/json" in accept_header or request.headers.get("content-type") == "application/json":
|
|
# Return health check JSON response
|
|
from database import check_database_health
|
|
db_health = await check_database_health()
|
|
return models.HealthCheckResponse(
|
|
status="healthy",
|
|
message="Agent collector API is running",
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
database=db_health
|
|
)
|
|
|
|
# Otherwise handle as HTML request for web interface
|
|
current_user = await get_current_user_optional(request)
|
|
if current_user:
|
|
# Redirect logged-in users appropriately
|
|
if current_user.get("is_admin"):
|
|
return RedirectResponse(url=get_app_url("admin"), status_code=303)
|
|
else:
|
|
return RedirectResponse(url=get_app_url("agent-management"), status_code=303)
|
|
else:
|
|
# Show landing page for non-authenticated users
|
|
return templates.TemplateResponse("index.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/register", response_class=HTMLResponse)
|
|
async def register_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse("register.html", get_template_context(request, current_user))
|
|
|
|
@app.post("/register", response_class=HTMLResponse)
|
|
async def register_form(
|
|
request: Request,
|
|
email: str = Form(..., alias="user-email"),
|
|
password: str = Form(..., alias="user-password"),
|
|
first_name: str = Form(..., alias="first-name"),
|
|
last_name: str = Form(..., alias="last-name")
|
|
):
|
|
try:
|
|
full_name = f"{first_name} {last_name}".strip()
|
|
existing = await crud.get_user_by_email(email)
|
|
if existing:
|
|
return templates.TemplateResponse(
|
|
"register.html",
|
|
{"request": request, "error": "Email already registered"}
|
|
)
|
|
|
|
user = models.UserCreate(email=email, password=password, full_name=full_name)
|
|
await crud.create_user(user.email, user.password, user.full_name)
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "success": "Registration successful! Please login."}
|
|
)
|
|
except Exception as e:
|
|
return templates.TemplateResponse(
|
|
"register.html",
|
|
{"request": request, "error": f"Registration failed: {str(e)}"}
|
|
)
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
|
|
# Add MSAL configuration if enabled
|
|
context = get_template_context(request, current_user)
|
|
if msal_auth.is_msal_available():
|
|
msal_config = config.get_msal_config()
|
|
context.update({
|
|
"client_id": msal_config["client_id"],
|
|
"authority": msal_config["authority"],
|
|
"redirect_uri": msal_config["redirect_uri"]
|
|
})
|
|
|
|
return templates.TemplateResponse("login.html", context)
|
|
|
|
@app.post("/login", response_class=HTMLResponse)
|
|
async def login_form(
|
|
request: Request,
|
|
email: str = Form(...),
|
|
password: str = Form(...)
|
|
):
|
|
try:
|
|
print(f"🔍 Login attempt - Email: {email}")
|
|
|
|
# Check if user exists first
|
|
user_exists = await crud.get_user_by_email(email)
|
|
print(f"🔍 User exists in database: {user_exists is not None}")
|
|
|
|
if user_exists:
|
|
print(f"🔍 User auth provider: {user_exists.get('auth_provider', 'local')}")
|
|
print(f"🔍 User has hashed_password: {'hashed_password' in user_exists}")
|
|
print(f"🔍 User is_active: {user_exists.get('is_active', False)}")
|
|
|
|
user = await crud.authenticate_user(email, password)
|
|
print(f"🔍 Authentication result: {user is not None}")
|
|
|
|
if not user:
|
|
error_msg = "Invalid email or password"
|
|
if not user_exists:
|
|
error_msg += " (User not found)"
|
|
elif user_exists.get('auth_provider') != 'local':
|
|
error_msg += " (Not a local user - use Microsoft login)"
|
|
elif 'hashed_password' not in user_exists:
|
|
error_msg += " (No password set for this user)"
|
|
|
|
print(f"❌ Login failed: {error_msg}")
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
get_template_context(request, error=error_msg)
|
|
)
|
|
|
|
# Create token (you can store this in session/cookie in a real app)
|
|
token = auth.create_access_token({"sub": str(user["_id"])})
|
|
|
|
# Check if user is admin
|
|
if user.get("is_admin"):
|
|
# Admin goes to admin dashboard
|
|
response = RedirectResponse(url=get_app_url("admin"), status_code=303)
|
|
else:
|
|
# Regular user goes to all agents page
|
|
response = RedirectResponse(url=get_app_url("agent-management"), status_code=303)
|
|
|
|
# Set token in cookie with proper attributes
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=False # Set to True in production with HTTPS
|
|
)
|
|
return response
|
|
|
|
except Exception as e:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": f"Login failed: {str(e)}"}
|
|
)
|
|
|
|
# Azure AD/MSAL Authentication - Using popup-based authentication per specification
|
|
|
|
|
|
@app.post("/api/auth/azure/token")
|
|
async def azure_token_exchange(request: Request):
|
|
"""Exchange Azure AD token for local JWT token with proper validation"""
|
|
try:
|
|
data = await request.json()
|
|
access_token = data.get("access_token")
|
|
id_token = data.get("id_token")
|
|
|
|
if not id_token:
|
|
raise HTTPException(status_code=400, detail="Missing ID token")
|
|
|
|
# Validate JWT token against Azure AD public keys (as per specification)
|
|
import jwt
|
|
import requests
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
import base64
|
|
|
|
msal_config = config.get_msal_config()
|
|
|
|
# Get Azure AD public keys for token verification
|
|
jwks_uri = f"{msal_config['authority']}/discovery/v2.0/keys"
|
|
jwks_response = requests.get(jwks_uri)
|
|
jwks = jwks_response.json()
|
|
|
|
# Decode token header to get key ID
|
|
unverified_header = jwt.get_unverified_header(id_token)
|
|
kid = unverified_header.get("kid")
|
|
|
|
# Find the matching public key
|
|
public_key = None
|
|
for key in jwks["keys"]:
|
|
if key["kid"] == kid:
|
|
# Convert JWK to PEM format for validation
|
|
n = base64.urlsafe_b64decode(key["n"] + "==")
|
|
e = base64.urlsafe_b64decode(key["e"] + "==")
|
|
|
|
# Convert to RSA public key
|
|
numbers = rsa.RSAPublicNumbers(
|
|
int.from_bytes(e, byteorder="big"),
|
|
int.from_bytes(n, byteorder="big")
|
|
)
|
|
public_key = numbers.public_key()
|
|
break
|
|
|
|
if not public_key:
|
|
raise HTTPException(status_code=400, detail="Unable to verify token signature")
|
|
|
|
# Convert to PEM format for JWT library
|
|
pem_key = public_key.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
)
|
|
|
|
# Validate JWT token signature and claims
|
|
try:
|
|
id_token_claims = jwt.decode(
|
|
id_token,
|
|
pem_key,
|
|
algorithms=["RS256"],
|
|
audience=msal_config["client_id"],
|
|
issuer=f"{msal_config['authority']}/v2.0"
|
|
)
|
|
except jwt.InvalidTokenError as e:
|
|
raise HTTPException(status_code=400, detail=f"Token validation failed: {str(e)}")
|
|
|
|
# Create user profile from validated ID token claims
|
|
user_profile = {
|
|
"azure_ad_id": id_token_claims.get("oid"),
|
|
"email": id_token_claims.get("email") or id_token_claims.get("preferred_username"),
|
|
"full_name": id_token_claims.get("name"),
|
|
"first_name": id_token_claims.get("given_name"),
|
|
"last_name": id_token_claims.get("family_name"),
|
|
"tenant_id": id_token_claims.get("tid")
|
|
}
|
|
|
|
# Create or update user in local database
|
|
user = await crud.create_or_update_azure_user(user_profile)
|
|
|
|
# Create JWT token for local session management
|
|
jwt_token = auth.create_access_token({"sub": str(user["_id"])})
|
|
|
|
# Return user info and set secure cookie (following specification)
|
|
response = JSONResponse({
|
|
"success": True,
|
|
"is_admin": user.get("is_admin", False),
|
|
"email": user.get("email"),
|
|
"full_name": user.get("full_name")
|
|
})
|
|
|
|
# Set JWT token in httpOnly cookie with security flags (as per specification)
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=jwt_token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=False, # Set to True in production with HTTPS
|
|
max_age=24 * 60 * 60 # 24 hours as per specification
|
|
)
|
|
|
|
return response
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Token exchange failed: {str(e)}")
|
|
|
|
@app.get("/agent-register", response_class=HTMLResponse)
|
|
async def agent_register_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
return templates.TemplateResponse("agent_register.html", get_template_context(request, current_user))
|
|
|
|
@app.post("/agent-register", response_class=HTMLResponse)
|
|
async def agent_register_form(
|
|
request: Request,
|
|
agent_name: str = Form(...),
|
|
agent_tool: str = Form(...),
|
|
agent_description: str = Form(None),
|
|
agent_purpose: str = Form(None),
|
|
agent_version: str = Form(None),
|
|
agent_status: str = Form("Development"),
|
|
agent_location: str = Form(None),
|
|
agent_department: str = Form(None),
|
|
agent_contact_person: str = Form(None),
|
|
agent_tags: str = Form(None),
|
|
agent_userbase: str = Form(None),
|
|
agent_capabilities: str = Form(None),
|
|
quality_audit_status: bool = Form(False),
|
|
risk_factor: int = Form(None)
|
|
):
|
|
try:
|
|
# Get user from cookie - require authentication
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
user_id = str(current_user["_id"])
|
|
|
|
# Prepare agent data
|
|
agent_data = {
|
|
"agent_name": agent_name,
|
|
"agent_tool": agent_tool,
|
|
"agent_description": agent_description,
|
|
"agent_purpose": agent_purpose,
|
|
"agent_version": agent_version,
|
|
"agent_status": agent_status,
|
|
"agent_location": agent_location,
|
|
"agent_department": agent_department,
|
|
"agent_contact_person": agent_contact_person,
|
|
}
|
|
|
|
# Process tags, userbase, and capabilities (convert comma-separated to lists)
|
|
if agent_tags:
|
|
agent_data["agent_tags"] = [tag.strip() for tag in agent_tags.split(',') if tag.strip()]
|
|
if agent_userbase:
|
|
agent_data["agent_userbase"] = [user.strip() for user in agent_userbase.split(',') if user.strip()]
|
|
if agent_capabilities:
|
|
agent_data["agent_capabilities"] = [cap.strip() for cap in agent_capabilities.split(',') if cap.strip()]
|
|
|
|
# Handle Quality Audit - only admins can set it to True
|
|
if quality_audit_status and current_user.get("is_admin"):
|
|
# Admin is setting quality audit to true
|
|
from datetime import datetime
|
|
agent_data["quality_audit_status"] = True
|
|
agent_data["quality_audit_updated_by"] = user_id
|
|
agent_data["quality_audit_updated_at"] = datetime.utcnow().isoformat()
|
|
agent_data["quality_audit_updated_by_name"] = current_user.get("full_name", current_user.get("email"))
|
|
|
|
# Validate Risk Factor when Quality Audit is checked
|
|
if risk_factor is None or not (1 <= risk_factor <= 5):
|
|
context = get_template_context(request, current_user)
|
|
context["error"] = "Risk Factor (1-5) is required when Quality Audit is checked."
|
|
return templates.TemplateResponse("agent_register.html", context)
|
|
|
|
agent_data["risk_factor"] = risk_factor
|
|
else:
|
|
# Non-admin or quality audit not checked
|
|
agent_data["quality_audit_status"] = False
|
|
agent_data["risk_factor"] = None
|
|
|
|
# Remove None values
|
|
agent_data = {k: v for k, v in agent_data.items() if v is not None}
|
|
|
|
# Create agent in database
|
|
created_agent = await crud.create_agent(agent_data, user_id)
|
|
|
|
# Redirect to agent management with success message
|
|
from urllib.parse import quote
|
|
success_msg = quote(f"Agent '{agent_name}' registered successfully!")
|
|
redirect_url = get_app_url(f"agent-management?success={success_msg}")
|
|
print(f"DEBUG: Redirecting to: {redirect_url}") # Debug line
|
|
return RedirectResponse(
|
|
url=redirect_url,
|
|
status_code=303
|
|
)
|
|
|
|
except ValueError as e:
|
|
# Handle duplicate agent name error
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse(
|
|
"agent_register.html",
|
|
get_template_context(request, current_user, error=str(e))
|
|
)
|
|
except Exception as e:
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse(
|
|
"agent_register.html",
|
|
get_template_context(request, current_user, error=f"Agent registration failed: {str(e)}")
|
|
)
|
|
|
|
@app.get("/dashboard", response_class=HTMLResponse)
|
|
async def dashboard(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
return templates.TemplateResponse("admin/dashboard.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/user-management", response_class=HTMLResponse)
|
|
async def user_management_page(request: Request):
|
|
return templates.TemplateResponse("user_management.html", get_template_context(request))
|
|
|
|
@app.get("/logout", response_class=HTMLResponse)
|
|
async def logout(request: Request):
|
|
"""Logout user and clear all session data"""
|
|
# Clear MSAL session data if present
|
|
request.session.clear()
|
|
|
|
response = RedirectResponse(url=config.get_full_url(""), status_code=303)
|
|
response.delete_cookie(key="access_token")
|
|
return response
|
|
|
|
@app.get("/profile", response_class=HTMLResponse)
|
|
async def profile_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
return templates.TemplateResponse("profile.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/agent-management", response_class=HTMLResponse)
|
|
async def agent_management_page(request: Request, view: Optional[str] = Query(None), success: Optional[str] = Query(None), error: Optional[str] = Query(None)):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
# Default to "all" view for regular users, "my" view can be specified via query param
|
|
if view == "my":
|
|
# Get user's agents only
|
|
agents = await crud.get_agents_by_user(str(current_user["_id"]))
|
|
current_view = "my"
|
|
page_title = "My Agents Dashboard"
|
|
page_description = f"{len(agents)} agents in your portfolio"
|
|
else:
|
|
# Get all agents (default view for regular users)
|
|
agents = await crud.get_all_agents()
|
|
current_view = "all"
|
|
page_title = "All Agents"
|
|
page_description = f"{len(agents)} agents in the system"
|
|
|
|
return templates.TemplateResponse("agent_management.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
agents=agents,
|
|
agent_count=len(agents),
|
|
current_view=current_view,
|
|
page_title=page_title,
|
|
page_description=page_description,
|
|
success=success,
|
|
error=error
|
|
))
|
|
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_dashboard(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user or not current_user.get("is_admin"):
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
# Get statistics
|
|
all_users = await crud.get_all_users()
|
|
all_agents = await crud.get_all_agents()
|
|
|
|
# Calculate stats
|
|
total_users = len(all_users)
|
|
admin_users = len([u for u in all_users if u.get("is_admin")])
|
|
regular_users = total_users - admin_users
|
|
total_agents = len(all_agents)
|
|
active_agents = len([a for a in all_agents if a.get("agent_status") == "Active"])
|
|
|
|
return templates.TemplateResponse("admin/dashboard.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
stats={
|
|
"total_users": total_users,
|
|
"admin_users": admin_users,
|
|
"regular_users": regular_users,
|
|
"total_agents": total_agents,
|
|
"active_agents": active_agents,
|
|
"inactive_agents": total_agents - active_agents
|
|
},
|
|
users=all_users,
|
|
agents=all_agents
|
|
))
|
|
|
|
# New enhanced endpoints
|
|
@app.get("/search", response_class=HTMLResponse)
|
|
async def search_page(request: Request, q: Optional[str] = Query(None)):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
search_results = {"agents": [], "users": []}
|
|
if q:
|
|
# Search agents using proper search function
|
|
# Regular users can search all agents (consistent with agent management page)
|
|
search_results["agents"] = await crud.search_agents(q)
|
|
|
|
# Search users (admin only)
|
|
if current_user.get("is_admin"):
|
|
all_users = await crud.get_all_users()
|
|
search_results["users"] = [
|
|
user for user in all_users
|
|
if q.lower() in user.get("email", "").lower() or
|
|
q.lower() in user.get("full_name", "").lower()
|
|
]
|
|
|
|
return templates.TemplateResponse("search.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
query=q,
|
|
results=search_results
|
|
))
|
|
|
|
@app.post("/agent/{agent_id}/edit", response_class=HTMLResponse)
|
|
async def edit_agent_form(request: Request, agent_id: str):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check ownership: only allow users to edit their own agents (admins can edit any)
|
|
if agent["created_by"] != str(current_user["_id"]) and not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
|
|
|
|
return templates.TemplateResponse("edit_agent.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
agent=agent
|
|
))
|
|
|
|
@app.post("/agent/{agent_id}/delete", response_class=HTMLResponse)
|
|
async def delete_agent_form(request: Request, agent_id: str):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
# Check permission and delete
|
|
user_id = str(current_user["_id"]) if not current_user.get("is_admin") else None
|
|
deleted = await crud.delete_agent(agent_id, user_id)
|
|
|
|
if deleted:
|
|
return RedirectResponse(url=get_app_url("agent-management?success=Agent deleted successfully"), status_code=303)
|
|
else:
|
|
return RedirectResponse(url=get_app_url("agent-management?error=Failed to delete agent"), status_code=303)
|
|
|
|
# Agent API endpoints
|
|
@app.get("/api/agents/all", response_model=List[models.AiAgentResponse])
|
|
async def get_all_agents_for_users(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Get all agents for regular users (read-only access)"""
|
|
agents = await crud.get_all_agents()
|
|
return [
|
|
create_agent_response(agent) for agent in agents
|
|
]
|
|
|
|
@app.post("/api/agents", response_model=models.AiAgentResponse)
|
|
async def create_agent(agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agent_data = agent.model_dump()
|
|
created_agent = await crud.create_agent(agent_data, str(current_user["_id"]))
|
|
return create_agent_response(created_agent)
|
|
|
|
@app.get("/api/agents", response_model=List[models.AiAgentResponse])
|
|
async def get_user_agents(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agents = await crud.get_agents_by_user(str(current_user["_id"]))
|
|
return [
|
|
create_agent_response(agent) for agent in agents
|
|
]
|
|
|
|
# MSAL routes no longer needed - using popup authentication
|
|
|
|
@app.get("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
|
|
async def get_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
if agent["created_by"] != str(current_user["_id"]) and not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Not authorized to view this agent")
|
|
|
|
return create_agent_response(agent)
|
|
|
|
@app.put("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
|
|
async def update_agent(agent_id: str, agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
# First check if agent exists
|
|
existing_agent = await crud.get_agent_by_id(agent_id)
|
|
if not existing_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check ownership: only allow users to edit their own agents (admins can edit any)
|
|
if existing_agent["created_by"] != str(current_user["_id"]) and not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
|
|
|
|
# For regular users, pass user_id to enforce ownership at DB level
|
|
# For admins, pass None to allow editing any agent
|
|
user_id_filter = str(current_user["_id"]) if not current_user.get("is_admin") else None
|
|
|
|
# Prepare admin info for Quality Audit updates
|
|
admin_user_info = None
|
|
agent_data = agent.model_dump()
|
|
|
|
# Validate Risk Factor when Quality Audit is checked (admin only)
|
|
if current_user.get("is_admin") and agent_data.get("quality_audit_status"):
|
|
if agent_data.get("risk_factor") is None or not (1 <= agent_data.get("risk_factor", 0) <= 5):
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="Risk Factor (1-5) is required when Quality Audit is checked."
|
|
)
|
|
|
|
if current_user.get("is_admin"):
|
|
admin_user_info = {
|
|
"user_id": str(current_user["_id"]),
|
|
"user_name": current_user.get("full_name", current_user.get("email")),
|
|
"email": current_user.get("email")
|
|
}
|
|
|
|
updated_agent = await crud.update_agent(agent_id, agent_data, user_id_filter, admin_user_info)
|
|
|
|
if not updated_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found or not authorized")
|
|
|
|
return create_agent_response(updated_agent)
|
|
|
|
@app.delete("/api/agents/{agent_id}")
|
|
async def delete_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
print(f"🗑️ DELETE attempt - Agent ID: {agent_id}, User ID: {current_user['_id']}")
|
|
|
|
# First check if agent exists
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
print(f"🗑️ Agent {agent_id} not found in database")
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
print(f"🗑️ Agent found - Created by: {agent.get('created_by')}, User is admin: {current_user.get('is_admin', False)}")
|
|
|
|
# Check permission
|
|
user_id = str(current_user["_id"])
|
|
if agent["created_by"] != user_id and not current_user.get("is_admin"):
|
|
print(f"🗑️ Permission denied - Agent owned by {agent['created_by']}, current user: {user_id}")
|
|
raise HTTPException(status_code=403, detail="Not authorized to delete this agent")
|
|
|
|
deleted = await crud.delete_agent(agent_id, user_id if not current_user.get("is_admin") else None)
|
|
if not deleted:
|
|
print(f"🗑️ Delete operation failed")
|
|
raise HTTPException(status_code=500, detail="Failed to delete agent")
|
|
|
|
print(f"🗑️ Agent {agent_id} deleted successfully")
|
|
return {"message": "Agent deleted successfully"}
|
|
|
|
# Admin endpoints
|
|
@app.get("/api/admin/users", response_model=List[models.UserResponse])
|
|
async def get_all_users(current_user: dict = Depends(require_admin)):
|
|
|
|
users = await crud.get_all_users()
|
|
return [
|
|
models.UserResponse(
|
|
email=user["email"],
|
|
full_name=user.get("full_name"),
|
|
is_active=user["is_active"],
|
|
is_admin=user["is_admin"]
|
|
) for user in users
|
|
]
|
|
|
|
@app.put("/api/admin/users/{email}", response_model=models.UserResponse)
|
|
async def update_user(email: str, user_update: models.UserUpdate, current_user: dict = Depends(require_admin)):
|
|
# Get the user by email first
|
|
existing_user = await crud.get_user_by_email(email)
|
|
if not existing_user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Update the user
|
|
update_data = user_update.model_dump(exclude_unset=True)
|
|
updated_user = await crud.update_user(str(existing_user["_id"]), update_data)
|
|
|
|
if not updated_user:
|
|
raise HTTPException(status_code=500, detail="Failed to update user")
|
|
|
|
return models.UserResponse(
|
|
email=updated_user["email"],
|
|
full_name=updated_user.get("full_name"),
|
|
is_active=updated_user["is_active"],
|
|
is_admin=updated_user["is_admin"]
|
|
)
|
|
|
|
@app.get("/api/admin/agents", response_model=List[models.AiAgentResponse])
|
|
async def get_all_agents_admin(current_user: dict = Depends(require_admin)):
|
|
|
|
agents = await crud.get_all_agents()
|
|
return [
|
|
create_agent_response(agent) for agent in agents
|
|
]
|
|
|
|
# Agent Collector API Endpoints (for compatibility with agent_collector app)
|
|
@app.post("/agents")
|
|
async def create_agent_collector(
|
|
agent: models.AgentCollectorCreate,
|
|
request: Request,
|
|
api_key_valid: bool = Depends(verify_agent_collector_api_key)
|
|
):
|
|
"""Agent collector API endpoint - handles both new registrations and usage tracking"""
|
|
try:
|
|
# Check content type
|
|
content_type = request.headers.get("content-type", "")
|
|
if not content_type.startswith("application/json"):
|
|
return JSONResponse(
|
|
status_code=415,
|
|
content={
|
|
"error": "Unsupported Media Type",
|
|
"message": "Request must be JSON"
|
|
}
|
|
)
|
|
|
|
# Check if agent already exists by name
|
|
existing_agent = await crud.get_agent_by_name(agent.name)
|
|
|
|
if existing_agent:
|
|
# Agent exists - log usage instead of creating duplicate
|
|
internal_data = map_agent_collector_to_internal(agent)
|
|
await crud.create_agent_usage_record(agent.name, internal_data)
|
|
|
|
return models.AgentUsageTrackingResponse(
|
|
status="usage_logged",
|
|
message="Agent already exists, usage tracked",
|
|
agent_name=agent.name
|
|
)
|
|
else:
|
|
# Agent doesn't exist - create new registration
|
|
internal_data = map_agent_collector_to_internal(agent)
|
|
|
|
# Handle datetime fields if provided
|
|
if agent.creation_date:
|
|
internal_data["agent_created_at"] = agent.creation_date
|
|
if agent.last_updated:
|
|
internal_data["agent_updated_at"] = agent.last_updated
|
|
|
|
# Create agent using collector-specific function
|
|
created_agent = await crud.create_agent_from_collector(internal_data)
|
|
|
|
return models.AgentCollectorResponse(
|
|
status="success",
|
|
message="Agent data collected successfully",
|
|
agent_id=str(created_agent["_id"])
|
|
)
|
|
|
|
except Exception as e:
|
|
# Check if it's a database connectivity issue
|
|
from database import check_database_health
|
|
try:
|
|
db_health = await check_database_health()
|
|
if not db_health.get("healthy"):
|
|
return JSONResponse(
|
|
status_code=503,
|
|
content={
|
|
"error": "Database Unavailable",
|
|
"message": "MongoDB connection is not available. Please check the database setup.",
|
|
"agent_data": agent.model_dump()
|
|
}
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# General database error
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"error": "Database Error",
|
|
"message": "Failed to store agent data. MongoDB may be unavailable or there was an error processing the request.",
|
|
"agent_data": agent.model_dump()
|
|
}
|
|
)
|
|
|
|
# Agent Usage API Endpoints
|
|
@app.get("/api/agents/{agent_name}/usage", response_model=models.AgentUsageStatsResponse)
|
|
async def get_agent_usage(
|
|
agent_name: str,
|
|
start_date: Optional[str] = Query(None),
|
|
end_date: Optional[str] = Query(None),
|
|
current_user: dict = Depends(get_current_user_from_cookie)
|
|
):
|
|
"""Get usage statistics for a specific agent"""
|
|
try:
|
|
# Parse date strings if provided
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_date:
|
|
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
if end_date:
|
|
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
|
|
|
|
# Get usage stats
|
|
stats = await crud.get_agent_usage_stats(agent_name, start_dt, end_dt)
|
|
|
|
# Get usage by period for the response
|
|
usage_by_period = await crud.get_agent_usage_by_period(agent_name, "daily", start_dt, end_dt)
|
|
|
|
return models.AgentUsageStatsResponse(
|
|
agent_name=agent_name,
|
|
total_usage_count=stats["total_usage_count"],
|
|
first_usage=stats["first_usage"].isoformat() if stats["first_usage"] else None,
|
|
last_usage=stats["last_usage"].isoformat() if stats["last_usage"] else None,
|
|
usage_by_period=usage_by_period
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get usage stats: {str(e)}")
|
|
|
|
@app.get("/api/agents/{agent_name}/usage/chart")
|
|
async def get_agent_usage_chart(
|
|
agent_name: str,
|
|
period: str = Query("daily", regex="^(daily|weekly|monthly)$"),
|
|
start_date: Optional[str] = Query(None),
|
|
end_date: Optional[str] = Query(None),
|
|
current_user: dict = Depends(get_current_user_from_cookie)
|
|
):
|
|
"""Get usage chart data for a specific agent"""
|
|
try:
|
|
# Parse date strings if provided
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_date:
|
|
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
if end_date:
|
|
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
|
|
|
|
# Get usage data grouped by period
|
|
usage_data = await crud.get_agent_usage_by_period(agent_name, period, start_dt, end_dt)
|
|
|
|
# Format for Chart.js
|
|
chart_data = {
|
|
"labels": list(usage_data.keys()),
|
|
"datasets": [{
|
|
"label": f"Usage Count ({period})",
|
|
"data": list(usage_data.values()),
|
|
"backgroundColor": "rgba(54, 162, 235, 0.2)",
|
|
"borderColor": "rgba(54, 162, 235, 1)",
|
|
"borderWidth": 1
|
|
}]
|
|
}
|
|
|
|
return chart_data
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get chart data: {str(e)}")
|
|
|
|
# MSAL routes no longer needed - using popup authentication
|