1013 lines
42 KiB
Python
1013 lines
42 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, Request, Form, Query
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from typing import List, Optional
|
|
import crud
|
|
import models
|
|
import auth
|
|
import config
|
|
import msal_auth
|
|
from datetime import datetime
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from fastapi import Header
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI(
|
|
title="AgentHub",
|
|
description="AI Agent Management System with comprehensive CRUD operations",
|
|
version="1.0.0",
|
|
root_path=config.get_base_path()
|
|
)
|
|
|
|
# Add session middleware for MSAL state management
|
|
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-session-secret-key"))
|
|
|
|
# Mount static files with explicit path handling
|
|
from fastapi.responses import FileResponse
|
|
import os as path_os
|
|
|
|
@app.get("/static/{filename:path}")
|
|
async def serve_static(filename: str):
|
|
"""Serve static files with proper path handling"""
|
|
file_path = path_os.path.join("static", filename)
|
|
if path_os.path.exists(file_path) and path_os.path.isfile(file_path):
|
|
return FileResponse(file_path)
|
|
raise HTTPException(status_code=404, detail="Static file not found")
|
|
|
|
# Also mount the traditional way as backup
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
|
|
|
|
def get_app_url(path: str) -> str:
|
|
"""Get URL with proper base path - production fix"""
|
|
base_path = os.getenv("BASE_PATH", "").rstrip("/")
|
|
path = path.lstrip("/")
|
|
if base_path:
|
|
return f"{base_path}/{path}"
|
|
return f"/{path}" if path else "/"
|
|
|
|
# Debug route to check configuration
|
|
@app.get("/debug/config")
|
|
async def debug_config():
|
|
return {
|
|
"base_path": config.get_base_path(),
|
|
"env_base_path": os.getenv("BASE_PATH"),
|
|
"root_path_from_app": app.root_path,
|
|
"get_app_url_test": get_app_url("agent-management")
|
|
}
|
|
|
|
def get_template_context(request: Request, current_user=None, **kwargs):
|
|
"""Get standard template context with base path and MSAL info"""
|
|
return {
|
|
"request": request,
|
|
"current_user": current_user,
|
|
"base_path": config.get_base_path(),
|
|
"msal_enabled": msal_auth.is_msal_available(),
|
|
**kwargs
|
|
}
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme)):
|
|
payload = auth.decode_access_token(token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
async def get_current_user_optional(request: Request):
|
|
"""Get current user from cookie if available"""
|
|
token = request.cookies.get("access_token")
|
|
if token:
|
|
try:
|
|
payload = auth.decode_access_token(token)
|
|
if payload:
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
return user
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
async def get_current_user_from_cookie(request: Request):
|
|
"""Get current user from cookie for API endpoints"""
|
|
token = request.cookies.get("access_token")
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
payload = auth.decode_access_token(token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
user = await crud.get_user_by_id(payload["sub"])
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
async def require_admin(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Require admin access"""
|
|
if not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return current_user
|
|
|
|
async def verify_agent_collector_api_key(x_api_key: str = Header(alias="X-API-Key")):
|
|
"""Verify static API key for agent collector endpoints"""
|
|
expected_key = os.getenv("AGENT_COLLECTOR_API_KEY")
|
|
if not expected_key:
|
|
raise HTTPException(status_code=500, detail="API key not configured")
|
|
if x_api_key != expected_key:
|
|
raise HTTPException(status_code=401, detail="Invalid API key")
|
|
return True
|
|
|
|
def map_agent_collector_to_internal(collector_data: models.AgentCollectorCreate) -> dict:
|
|
"""Map agent collector field names to internal schema"""
|
|
# Normalize status to Title Case for internal storage
|
|
status = collector_data.status
|
|
if status:
|
|
status_map = {
|
|
"active": "Active",
|
|
"inactive": "Inactive",
|
|
"deprecated": "Deprecated",
|
|
"development": "Development"
|
|
}
|
|
status = status_map.get(status.lower(), status)
|
|
|
|
return {
|
|
"agent_name": collector_data.name,
|
|
"agent_description": collector_data.description,
|
|
"agent_purpose": collector_data.purpose,
|
|
"agent_location": collector_data.location,
|
|
"agent_userbase": collector_data.userbase,
|
|
"agent_version": collector_data.version,
|
|
"agent_capabilities": collector_data.capabilities,
|
|
"agent_status": status,
|
|
"agent_department": collector_data.department,
|
|
"agent_contact_person": collector_data.contact_person,
|
|
"agent_tags": collector_data.tags,
|
|
"agent_metadata": collector_data.metadata,
|
|
}
|
|
|
|
|
|
|
|
@app.get("/me", response_model=models.UserResponse)
|
|
async def me(current_user: dict = Depends(get_current_user)):
|
|
return {
|
|
"email": current_user["email"],
|
|
"full_name": current_user.get("full_name"),
|
|
"is_active": current_user["is_active"],
|
|
"is_admin": current_user["is_admin"]
|
|
}
|
|
|
|
# HTML Routes
|
|
@app.get("/")
|
|
async def home(request: Request):
|
|
# Check if this is a JSON API request (for health check)
|
|
accept_header = request.headers.get("accept", "")
|
|
if "application/json" in accept_header or request.headers.get("content-type") == "application/json":
|
|
# Return health check JSON response
|
|
from database import check_database_health
|
|
db_health = await check_database_health()
|
|
return models.HealthCheckResponse(
|
|
status="healthy",
|
|
message="Agent collector API is running",
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
database=db_health
|
|
)
|
|
|
|
# Otherwise handle as HTML request for web interface
|
|
current_user = await get_current_user_optional(request)
|
|
if current_user:
|
|
# Redirect logged-in users appropriately
|
|
if current_user.get("is_admin"):
|
|
return RedirectResponse(url=get_app_url("admin"), status_code=303)
|
|
else:
|
|
return RedirectResponse(url=get_app_url("agent-management"), status_code=303)
|
|
else:
|
|
# Show landing page for non-authenticated users
|
|
return templates.TemplateResponse("index.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/register", response_class=HTMLResponse)
|
|
async def register_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse("register.html", get_template_context(request, current_user))
|
|
|
|
@app.post("/register", response_class=HTMLResponse)
|
|
async def register_form(
|
|
request: Request,
|
|
email: str = Form(..., alias="user-email"),
|
|
password: str = Form(..., alias="user-password"),
|
|
first_name: str = Form(..., alias="first-name"),
|
|
last_name: str = Form(..., alias="last-name")
|
|
):
|
|
try:
|
|
full_name = f"{first_name} {last_name}".strip()
|
|
existing = await crud.get_user_by_email(email)
|
|
if existing:
|
|
return templates.TemplateResponse(
|
|
"register.html",
|
|
{"request": request, "error": "Email already registered"}
|
|
)
|
|
|
|
user = models.UserCreate(email=email, password=password, full_name=full_name)
|
|
await crud.create_user(user.email, user.password, user.full_name)
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "success": "Registration successful! Please login."}
|
|
)
|
|
except Exception as e:
|
|
return templates.TemplateResponse(
|
|
"register.html",
|
|
{"request": request, "error": f"Registration failed: {str(e)}"}
|
|
)
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse("login.html", get_template_context(request, current_user))
|
|
|
|
@app.post("/login", response_class=HTMLResponse)
|
|
async def login_form(
|
|
request: Request,
|
|
email: str = Form(...),
|
|
password: str = Form(...)
|
|
):
|
|
try:
|
|
print(f"🔍 Login attempt - Email: {email}")
|
|
|
|
# Check if user exists first
|
|
user_exists = await crud.get_user_by_email(email)
|
|
print(f"🔍 User exists in database: {user_exists is not None}")
|
|
|
|
if user_exists:
|
|
print(f"🔍 User auth provider: {user_exists.get('auth_provider', 'local')}")
|
|
print(f"🔍 User has hashed_password: {'hashed_password' in user_exists}")
|
|
print(f"🔍 User is_active: {user_exists.get('is_active', False)}")
|
|
|
|
user = await crud.authenticate_user(email, password)
|
|
print(f"🔍 Authentication result: {user is not None}")
|
|
|
|
if not user:
|
|
error_msg = "Invalid email or password"
|
|
if not user_exists:
|
|
error_msg += " (User not found)"
|
|
elif user_exists.get('auth_provider') != 'local':
|
|
error_msg += " (Not a local user - use Microsoft login)"
|
|
elif 'hashed_password' not in user_exists:
|
|
error_msg += " (No password set for this user)"
|
|
|
|
print(f"❌ Login failed: {error_msg}")
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
get_template_context(request, error=error_msg)
|
|
)
|
|
|
|
# Create token (you can store this in session/cookie in a real app)
|
|
token = auth.create_access_token({"sub": str(user["_id"])})
|
|
|
|
# Check if user is admin
|
|
if user.get("is_admin"):
|
|
# Admin goes to admin dashboard
|
|
response = RedirectResponse(url=get_app_url("admin"), status_code=303)
|
|
else:
|
|
# Regular user goes to all agents page
|
|
response = RedirectResponse(url=get_app_url("agent-management"), status_code=303)
|
|
|
|
# Set token in cookie (for demo purposes)
|
|
response.set_cookie(key="access_token", value=token)
|
|
return response
|
|
|
|
except Exception as e:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": f"Login failed: {str(e)}"}
|
|
)
|
|
|
|
# Azure AD/MSAL Authentication Routes
|
|
async def azure_login(request: Request):
|
|
"""Initiate Azure AD login with PKCE"""
|
|
try:
|
|
msal_instance = msal_auth.get_msal_instance()
|
|
auth_data = msal_instance.get_auth_url()
|
|
|
|
# Store PKCE parameters in session
|
|
request.session["msal_state"] = auth_data["state"]
|
|
request.session["msal_code_verifier"] = auth_data["code_verifier"]
|
|
|
|
# Redirect to Azure AD
|
|
return RedirectResponse(url=auth_data["auth_url"], status_code=302)
|
|
|
|
except Exception as e:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": f"Azure login failed: {str(e)}"}
|
|
)
|
|
|
|
async def azure_callback(request: Request):
|
|
"""Handle Azure AD callback and complete authentication"""
|
|
try:
|
|
# Get authorization code and state from callback
|
|
auth_code = request.query_params.get("code")
|
|
state = request.query_params.get("state")
|
|
error = request.query_params.get("error")
|
|
|
|
if error:
|
|
error_description = request.query_params.get("error_description", "Unknown error")
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": f"Azure AD error: {error_description}"}
|
|
)
|
|
|
|
if not auth_code:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": "No authorization code received from Azure AD"}
|
|
)
|
|
|
|
# Validate state parameter (CSRF protection)
|
|
session_state = request.session.get("msal_state")
|
|
code_verifier = request.session.get("msal_code_verifier")
|
|
|
|
if not session_state or not code_verifier:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": "Session expired. Please try logging in again."}
|
|
)
|
|
|
|
msal_instance = msal_auth.get_msal_instance()
|
|
if not msal_instance.validate_state(state, session_state):
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": "Invalid state parameter. Possible CSRF attack."}
|
|
)
|
|
|
|
# Exchange authorization code for tokens
|
|
token_result = msal_instance.acquire_token_by_auth_code(auth_code, code_verifier)
|
|
if not token_result:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": "Failed to acquire token from Azure AD"}
|
|
)
|
|
|
|
# Extract user profile
|
|
user_profile = msal_instance.get_user_profile(token_result)
|
|
if not user_profile:
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": "Failed to get user profile from Azure AD"}
|
|
)
|
|
|
|
# Create or update user in local database
|
|
user = await crud.create_or_update_azure_user(user_profile)
|
|
|
|
# Create JWT token for local session management
|
|
jwt_token = auth.create_access_token({"sub": str(user["_id"])})
|
|
|
|
# Clear MSAL session data
|
|
request.session.pop("msal_state", None)
|
|
request.session.pop("msal_code_verifier", None)
|
|
|
|
# Redirect based on user role
|
|
if user.get("is_admin"):
|
|
response = RedirectResponse(url=get_app_url("admin"), status_code=303)
|
|
else:
|
|
response = RedirectResponse(url=get_app_url("agent-management"), status_code=303)
|
|
|
|
# Set JWT token in cookie
|
|
response.set_cookie(key="access_token", value=jwt_token)
|
|
return response
|
|
|
|
except Exception as e:
|
|
# Clear session on error
|
|
request.session.pop("msal_state", None)
|
|
request.session.pop("msal_code_verifier", None)
|
|
|
|
return templates.TemplateResponse(
|
|
"login.html",
|
|
{"request": request, "error": f"Authentication failed: {str(e)}"}
|
|
)
|
|
|
|
@app.get("/agent-register", response_class=HTMLResponse)
|
|
async def agent_register_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
return templates.TemplateResponse("agent_register.html", get_template_context(request, current_user))
|
|
|
|
@app.post("/agent-register", response_class=HTMLResponse)
|
|
async def agent_register_form(
|
|
request: Request,
|
|
agent_name: str = Form(...),
|
|
agent_description: str = Form(None),
|
|
agent_purpose: str = Form(None),
|
|
agent_version: str = Form(None),
|
|
agent_status: str = Form("Development"),
|
|
agent_location: str = Form(None),
|
|
agent_department: str = Form(None),
|
|
agent_contact_person: str = Form(None),
|
|
agent_tags: str = Form(None),
|
|
agent_userbase: str = Form(None),
|
|
agent_capabilities: str = Form(None)
|
|
):
|
|
try:
|
|
# Get user from cookie - require authentication
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
user_id = str(current_user["_id"])
|
|
|
|
# Prepare agent data
|
|
agent_data = {
|
|
"agent_name": agent_name,
|
|
"agent_description": agent_description,
|
|
"agent_purpose": agent_purpose,
|
|
"agent_version": agent_version,
|
|
"agent_status": agent_status,
|
|
"agent_location": agent_location,
|
|
"agent_department": agent_department,
|
|
"agent_contact_person": agent_contact_person,
|
|
}
|
|
|
|
# Process tags, userbase, and capabilities (convert comma-separated to lists)
|
|
if agent_tags:
|
|
agent_data["agent_tags"] = [tag.strip() for tag in agent_tags.split(',') if tag.strip()]
|
|
if agent_userbase:
|
|
agent_data["agent_userbase"] = [user.strip() for user in agent_userbase.split(',') if user.strip()]
|
|
if agent_capabilities:
|
|
agent_data["agent_capabilities"] = [cap.strip() for cap in agent_capabilities.split(',') if cap.strip()]
|
|
|
|
# Remove None values
|
|
agent_data = {k: v for k, v in agent_data.items() if v is not None}
|
|
|
|
# Create agent in database
|
|
created_agent = await crud.create_agent(agent_data, user_id)
|
|
|
|
# Redirect to agent management with success message
|
|
from urllib.parse import quote
|
|
success_msg = quote(f"Agent '{agent_name}' registered successfully!")
|
|
redirect_url = get_app_url(f"agent-management?success={success_msg}")
|
|
print(f"DEBUG: Redirecting to: {redirect_url}") # Debug line
|
|
return RedirectResponse(
|
|
url=redirect_url,
|
|
status_code=303
|
|
)
|
|
|
|
except ValueError as e:
|
|
# Handle duplicate agent name error
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse(
|
|
"agent_register.html",
|
|
get_template_context(request, current_user, error=str(e))
|
|
)
|
|
except Exception as e:
|
|
current_user = await get_current_user_optional(request)
|
|
return templates.TemplateResponse(
|
|
"agent_register.html",
|
|
get_template_context(request, current_user, error=f"Agent registration failed: {str(e)}")
|
|
)
|
|
|
|
@app.get("/dashboard", response_class=HTMLResponse)
|
|
async def dashboard(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
return templates.TemplateResponse("admin/dashboard.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/user-management", response_class=HTMLResponse)
|
|
async def user_management_page(request: Request):
|
|
return templates.TemplateResponse("user_management.html", get_template_context(request))
|
|
|
|
@app.get("/logout", response_class=HTMLResponse)
|
|
async def logout(request: Request):
|
|
"""Logout user and clear all session data"""
|
|
# Clear MSAL session data if present
|
|
request.session.clear()
|
|
|
|
response = RedirectResponse(url=config.get_full_url(""), status_code=303)
|
|
response.delete_cookie(key="access_token")
|
|
return response
|
|
|
|
@app.get("/profile", response_class=HTMLResponse)
|
|
async def profile_page(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
return templates.TemplateResponse("profile.html", get_template_context(request, current_user))
|
|
|
|
@app.get("/agent-management", response_class=HTMLResponse)
|
|
async def agent_management_page(request: Request, view: Optional[str] = Query(None), success: Optional[str] = Query(None), error: Optional[str] = Query(None)):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
# Default to "all" view for regular users, "my" view can be specified via query param
|
|
if view == "my":
|
|
# Get user's agents only
|
|
agents = await crud.get_agents_by_user(str(current_user["_id"]))
|
|
current_view = "my"
|
|
page_title = "My Agents Dashboard"
|
|
page_description = f"{len(agents)} agents in your portfolio"
|
|
else:
|
|
# Get all agents (default view for regular users)
|
|
agents = await crud.get_all_agents()
|
|
current_view = "all"
|
|
page_title = "All Agents"
|
|
page_description = f"{len(agents)} agents in the system"
|
|
|
|
return templates.TemplateResponse("agent_management.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
agents=agents,
|
|
agent_count=len(agents),
|
|
current_view=current_view,
|
|
page_title=page_title,
|
|
page_description=page_description,
|
|
success=success,
|
|
error=error
|
|
))
|
|
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_dashboard(request: Request):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user or not current_user.get("is_admin"):
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
# Get statistics
|
|
all_users = await crud.get_all_users()
|
|
all_agents = await crud.get_all_agents()
|
|
|
|
# Calculate stats
|
|
total_users = len(all_users)
|
|
admin_users = len([u for u in all_users if u.get("is_admin")])
|
|
regular_users = total_users - admin_users
|
|
total_agents = len(all_agents)
|
|
active_agents = len([a for a in all_agents if a.get("agent_status") == "Active"])
|
|
|
|
return templates.TemplateResponse("admin/dashboard.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
stats={
|
|
"total_users": total_users,
|
|
"admin_users": admin_users,
|
|
"regular_users": regular_users,
|
|
"total_agents": total_agents,
|
|
"active_agents": active_agents,
|
|
"inactive_agents": total_agents - active_agents
|
|
},
|
|
users=all_users,
|
|
agents=all_agents
|
|
))
|
|
|
|
# New enhanced endpoints
|
|
@app.get("/search", response_class=HTMLResponse)
|
|
async def search_page(request: Request, q: Optional[str] = Query(None)):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
search_results = {"agents": [], "users": []}
|
|
if q:
|
|
# Search agents using proper search function
|
|
# Regular users can search all agents (consistent with agent management page)
|
|
search_results["agents"] = await crud.search_agents(q)
|
|
|
|
# Search users (admin only)
|
|
if current_user.get("is_admin"):
|
|
all_users = await crud.get_all_users()
|
|
search_results["users"] = [
|
|
user for user in all_users
|
|
if q.lower() in user.get("email", "").lower() or
|
|
q.lower() in user.get("full_name", "").lower()
|
|
]
|
|
|
|
return templates.TemplateResponse("search.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
query=q,
|
|
results=search_results
|
|
))
|
|
|
|
@app.post("/agent/{agent_id}/edit", response_class=HTMLResponse)
|
|
async def edit_agent_form(request: Request, agent_id: str):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check ownership: only allow users to edit their own agents (admins can edit any)
|
|
if agent["created_by"] != str(current_user["_id"]) and not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
|
|
|
|
return templates.TemplateResponse("edit_agent.html", get_template_context(
|
|
request,
|
|
current_user,
|
|
agent=agent
|
|
))
|
|
|
|
@app.post("/agent/{agent_id}/delete", response_class=HTMLResponse)
|
|
async def delete_agent_form(request: Request, agent_id: str):
|
|
current_user = await get_current_user_optional(request)
|
|
if not current_user:
|
|
return RedirectResponse(url=config.get_full_url("login"), status_code=303)
|
|
|
|
# Check permission and delete
|
|
user_id = str(current_user["_id"]) if not current_user.get("is_admin") else None
|
|
deleted = await crud.delete_agent(agent_id, user_id)
|
|
|
|
if deleted:
|
|
return RedirectResponse(url=get_app_url("agent-management?success=Agent deleted successfully"), status_code=303)
|
|
else:
|
|
return RedirectResponse(url=get_app_url("agent-management?error=Failed to delete agent"), status_code=303)
|
|
|
|
# Agent API endpoints
|
|
@app.get("/api/agents/all", response_model=List[models.AiAgentResponse])
|
|
async def get_all_agents_for_users(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
"""Get all agents for regular users (read-only access)"""
|
|
agents = await crud.get_all_agents()
|
|
return [
|
|
models.AiAgentResponse(
|
|
agent_id=str(agent["_id"]),
|
|
agent_name=agent["agent_name"],
|
|
agent_description=agent.get("agent_description"),
|
|
agent_purpose=agent.get("agent_purpose"),
|
|
agent_version=agent.get("agent_version"),
|
|
agent_status=agent.get("agent_status"),
|
|
agent_location=agent.get("agent_location"),
|
|
agent_department=agent.get("agent_department"),
|
|
agent_contact_person=agent.get("agent_contact_person"),
|
|
agent_created_at=agent["created_at"].isoformat() if agent.get("created_at") else None,
|
|
agent_updated_at=agent["updated_at"].isoformat() if agent.get("updated_at") else None,
|
|
agent_tags=agent.get("agent_tags"),
|
|
agent_metadata=agent.get("agent_metadata"),
|
|
agent_userbase=agent.get("agent_userbase"),
|
|
agent_capabilities=agent.get("agent_capabilities"),
|
|
created_by=agent["created_by"]
|
|
) for agent in agents
|
|
]
|
|
|
|
@app.post("/api/agents", response_model=models.AiAgentResponse)
|
|
async def create_agent(agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agent_data = agent.model_dump()
|
|
created_agent = await crud.create_agent(agent_data, str(current_user["_id"]))
|
|
return models.AiAgentResponse(
|
|
agent_id=str(created_agent["_id"]),
|
|
agent_name=created_agent["agent_name"],
|
|
agent_description=created_agent.get("agent_description"),
|
|
agent_purpose=created_agent.get("agent_purpose"),
|
|
agent_version=created_agent.get("agent_version"),
|
|
agent_status=created_agent.get("agent_status"),
|
|
agent_location=created_agent.get("agent_location"),
|
|
agent_department=created_agent.get("agent_department"),
|
|
agent_contact_person=created_agent.get("agent_contact_person"),
|
|
agent_created_at=created_agent["created_at"].isoformat(),
|
|
agent_updated_at=created_agent["updated_at"].isoformat(),
|
|
agent_tags=created_agent.get("agent_tags"),
|
|
agent_metadata=created_agent.get("agent_metadata"),
|
|
agent_userbase=created_agent.get("agent_userbase"),
|
|
agent_capabilities=created_agent.get("agent_capabilities"),
|
|
created_by=created_agent["created_by"]
|
|
)
|
|
|
|
@app.get("/api/agents", response_model=List[models.AiAgentResponse])
|
|
async def get_user_agents(current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agents = await crud.get_agents_by_user(str(current_user["_id"]))
|
|
return [
|
|
models.AiAgentResponse(
|
|
agent_id=str(agent["_id"]),
|
|
agent_name=agent["agent_name"],
|
|
agent_description=agent.get("agent_description"),
|
|
agent_purpose=agent.get("agent_purpose"),
|
|
agent_version=agent.get("agent_version"),
|
|
agent_status=agent.get("agent_status"),
|
|
agent_location=agent.get("agent_location"),
|
|
agent_department=agent.get("agent_department"),
|
|
agent_contact_person=agent.get("agent_contact_person"),
|
|
agent_created_at=agent["created_at"].isoformat() if agent.get("created_at") else None,
|
|
agent_updated_at=agent["updated_at"].isoformat() if agent.get("updated_at") else None,
|
|
agent_tags=agent.get("agent_tags"),
|
|
agent_metadata=agent.get("agent_metadata"),
|
|
agent_userbase=agent.get("agent_userbase"),
|
|
agent_capabilities=agent.get("agent_capabilities"),
|
|
created_by=agent["created_by"]
|
|
) for agent in agents
|
|
]
|
|
|
|
# Conditionally register MSAL routes if available
|
|
if msal_auth.is_msal_available():
|
|
app.get("/auth/azure/login")(azure_login)
|
|
app.get("/auth/azure/callback")(azure_callback)
|
|
|
|
@app.get("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
|
|
async def get_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
if agent["created_by"] != str(current_user["_id"]) and not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Not authorized to view this agent")
|
|
|
|
return models.AiAgentResponse(
|
|
agent_id=str(agent["_id"]),
|
|
agent_name=agent["agent_name"],
|
|
agent_description=agent.get("agent_description"),
|
|
agent_purpose=agent.get("agent_purpose"),
|
|
agent_version=agent.get("agent_version"),
|
|
agent_status=agent.get("agent_status"),
|
|
agent_location=agent.get("agent_location"),
|
|
agent_department=agent.get("agent_department"),
|
|
agent_contact_person=agent.get("agent_contact_person"),
|
|
agent_created_at=agent["created_at"].isoformat() if agent.get("created_at") else None,
|
|
agent_updated_at=agent["updated_at"].isoformat() if agent.get("updated_at") else None,
|
|
agent_tags=agent.get("agent_tags"),
|
|
agent_metadata=agent.get("agent_metadata"),
|
|
agent_userbase=agent.get("agent_userbase"),
|
|
created_by=agent["created_by"]
|
|
)
|
|
|
|
@app.put("/api/agents/{agent_id}", response_model=models.AiAgentResponse)
|
|
async def update_agent(agent_id: str, agent: models.AiAgentCreate, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
# First check if agent exists
|
|
existing_agent = await crud.get_agent_by_id(agent_id)
|
|
if not existing_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
# Check ownership: only allow users to edit their own agents (admins can edit any)
|
|
if existing_agent["created_by"] != str(current_user["_id"]) and not current_user.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Not authorized to edit this agent")
|
|
|
|
# For regular users, pass user_id to enforce ownership at DB level
|
|
# For admins, pass None to allow editing any agent
|
|
user_id_filter = str(current_user["_id"]) if not current_user.get("is_admin") else None
|
|
updated_agent = await crud.update_agent(agent_id, agent.model_dump(), user_id_filter)
|
|
|
|
if not updated_agent:
|
|
raise HTTPException(status_code=404, detail="Agent not found or not authorized")
|
|
|
|
return models.AiAgentResponse(
|
|
agent_id=str(updated_agent["_id"]),
|
|
agent_name=updated_agent["agent_name"],
|
|
agent_description=updated_agent.get("agent_description"),
|
|
agent_purpose=updated_agent.get("agent_purpose"),
|
|
agent_version=updated_agent.get("agent_version"),
|
|
agent_status=updated_agent.get("agent_status"),
|
|
agent_location=updated_agent.get("agent_location"),
|
|
agent_department=updated_agent.get("agent_department"),
|
|
agent_contact_person=updated_agent.get("agent_contact_person"),
|
|
agent_created_at=updated_agent["created_at"].isoformat() if updated_agent.get("created_at") else None,
|
|
agent_updated_at=updated_agent["updated_at"].isoformat() if updated_agent.get("updated_at") else None,
|
|
agent_tags=updated_agent.get("agent_tags"),
|
|
agent_metadata=updated_agent.get("agent_metadata"),
|
|
agent_userbase=updated_agent.get("agent_userbase"),
|
|
created_by=updated_agent["created_by"]
|
|
)
|
|
|
|
@app.delete("/api/agents/{agent_id}")
|
|
async def delete_agent(agent_id: str, current_user: dict = Depends(get_current_user_from_cookie)):
|
|
print(f"🗑️ DELETE attempt - Agent ID: {agent_id}, User ID: {current_user['_id']}")
|
|
|
|
# First check if agent exists
|
|
agent = await crud.get_agent_by_id(agent_id)
|
|
if not agent:
|
|
print(f"🗑️ Agent {agent_id} not found in database")
|
|
raise HTTPException(status_code=404, detail="Agent not found")
|
|
|
|
print(f"🗑️ Agent found - Created by: {agent.get('created_by')}, User is admin: {current_user.get('is_admin', False)}")
|
|
|
|
# Check permission
|
|
user_id = str(current_user["_id"])
|
|
if agent["created_by"] != user_id and not current_user.get("is_admin"):
|
|
print(f"🗑️ Permission denied - Agent owned by {agent['created_by']}, current user: {user_id}")
|
|
raise HTTPException(status_code=403, detail="Not authorized to delete this agent")
|
|
|
|
deleted = await crud.delete_agent(agent_id, user_id if not current_user.get("is_admin") else None)
|
|
if not deleted:
|
|
print(f"🗑️ Delete operation failed")
|
|
raise HTTPException(status_code=500, detail="Failed to delete agent")
|
|
|
|
print(f"🗑️ Agent {agent_id} deleted successfully")
|
|
return {"message": "Agent deleted successfully"}
|
|
|
|
# Admin endpoints
|
|
@app.get("/api/admin/users", response_model=List[models.UserResponse])
|
|
async def get_all_users(current_user: dict = Depends(require_admin)):
|
|
|
|
users = await crud.get_all_users()
|
|
return [
|
|
models.UserResponse(
|
|
email=user["email"],
|
|
full_name=user.get("full_name"),
|
|
is_active=user["is_active"],
|
|
is_admin=user["is_admin"]
|
|
) for user in users
|
|
]
|
|
|
|
@app.put("/api/admin/users/{email}", response_model=models.UserResponse)
|
|
async def update_user(email: str, user_update: models.UserUpdate, current_user: dict = Depends(require_admin)):
|
|
# Get the user by email first
|
|
existing_user = await crud.get_user_by_email(email)
|
|
if not existing_user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Update the user
|
|
update_data = user_update.model_dump(exclude_unset=True)
|
|
updated_user = await crud.update_user(str(existing_user["_id"]), update_data)
|
|
|
|
if not updated_user:
|
|
raise HTTPException(status_code=500, detail="Failed to update user")
|
|
|
|
return models.UserResponse(
|
|
email=updated_user["email"],
|
|
full_name=updated_user.get("full_name"),
|
|
is_active=updated_user["is_active"],
|
|
is_admin=updated_user["is_admin"]
|
|
)
|
|
|
|
@app.get("/api/admin/agents", response_model=List[models.AiAgentResponse])
|
|
async def get_all_agents_admin(current_user: dict = Depends(require_admin)):
|
|
|
|
agents = await crud.get_all_agents()
|
|
return [
|
|
models.AiAgentResponse(
|
|
agent_id=str(agent["_id"]),
|
|
agent_name=agent["agent_name"],
|
|
agent_description=agent.get("agent_description"),
|
|
agent_purpose=agent.get("agent_purpose"),
|
|
agent_version=agent.get("agent_version"),
|
|
agent_status=agent.get("agent_status"),
|
|
agent_location=agent.get("agent_location"),
|
|
agent_department=agent.get("agent_department"),
|
|
agent_contact_person=agent.get("agent_contact_person"),
|
|
agent_created_at=agent["created_at"].isoformat() if agent.get("created_at") else None,
|
|
agent_updated_at=agent["updated_at"].isoformat() if agent.get("updated_at") else None,
|
|
agent_tags=agent.get("agent_tags"),
|
|
agent_metadata=agent.get("agent_metadata"),
|
|
agent_userbase=agent.get("agent_userbase"),
|
|
agent_capabilities=agent.get("agent_capabilities"),
|
|
created_by=agent["created_by"]
|
|
) for agent in agents
|
|
]
|
|
|
|
# Agent Collector API Endpoints (for compatibility with agent_collector app)
|
|
@app.post("/agents")
|
|
async def create_agent_collector(
|
|
agent: models.AgentCollectorCreate,
|
|
request: Request,
|
|
api_key_valid: bool = Depends(verify_agent_collector_api_key)
|
|
):
|
|
"""Agent collector API endpoint - handles both new registrations and usage tracking"""
|
|
try:
|
|
# Check content type
|
|
content_type = request.headers.get("content-type", "")
|
|
if not content_type.startswith("application/json"):
|
|
return JSONResponse(
|
|
status_code=415,
|
|
content={
|
|
"error": "Unsupported Media Type",
|
|
"message": "Request must be JSON"
|
|
}
|
|
)
|
|
|
|
# Check if agent already exists by name
|
|
existing_agent = await crud.get_agent_by_name(agent.name)
|
|
|
|
if existing_agent:
|
|
# Agent exists - log usage instead of creating duplicate
|
|
internal_data = map_agent_collector_to_internal(agent)
|
|
await crud.create_agent_usage_record(agent.name, internal_data)
|
|
|
|
return models.AgentUsageTrackingResponse(
|
|
status="usage_logged",
|
|
message="Agent already exists, usage tracked",
|
|
agent_name=agent.name
|
|
)
|
|
else:
|
|
# Agent doesn't exist - create new registration
|
|
internal_data = map_agent_collector_to_internal(agent)
|
|
|
|
# Handle datetime fields if provided
|
|
if agent.creation_date:
|
|
internal_data["agent_created_at"] = agent.creation_date
|
|
if agent.last_updated:
|
|
internal_data["agent_updated_at"] = agent.last_updated
|
|
|
|
# Create agent using collector-specific function
|
|
created_agent = await crud.create_agent_from_collector(internal_data)
|
|
|
|
return models.AgentCollectorResponse(
|
|
status="success",
|
|
message="Agent data collected successfully",
|
|
agent_id=str(created_agent["_id"])
|
|
)
|
|
|
|
except Exception as e:
|
|
# Check if it's a database connectivity issue
|
|
from database import check_database_health
|
|
try:
|
|
db_health = await check_database_health()
|
|
if not db_health.get("healthy"):
|
|
return JSONResponse(
|
|
status_code=503,
|
|
content={
|
|
"error": "Database Unavailable",
|
|
"message": "MongoDB connection is not available. Please check the database setup.",
|
|
"agent_data": agent.model_dump()
|
|
}
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# General database error
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"error": "Database Error",
|
|
"message": "Failed to store agent data. MongoDB may be unavailable or there was an error processing the request.",
|
|
"agent_data": agent.model_dump()
|
|
}
|
|
)
|
|
|
|
# Agent Usage API Endpoints
|
|
@app.get("/api/agents/{agent_name}/usage", response_model=models.AgentUsageStatsResponse)
|
|
async def get_agent_usage(
|
|
agent_name: str,
|
|
start_date: Optional[str] = Query(None),
|
|
end_date: Optional[str] = Query(None),
|
|
current_user: dict = Depends(get_current_user_from_cookie)
|
|
):
|
|
"""Get usage statistics for a specific agent"""
|
|
try:
|
|
# Parse date strings if provided
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_date:
|
|
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
if end_date:
|
|
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
|
|
|
|
# Get usage stats
|
|
stats = await crud.get_agent_usage_stats(agent_name, start_dt, end_dt)
|
|
|
|
# Get usage by period for the response
|
|
usage_by_period = await crud.get_agent_usage_by_period(agent_name, "daily", start_dt, end_dt)
|
|
|
|
return models.AgentUsageStatsResponse(
|
|
agent_name=agent_name,
|
|
total_usage_count=stats["total_usage_count"],
|
|
first_usage=stats["first_usage"].isoformat() if stats["first_usage"] else None,
|
|
last_usage=stats["last_usage"].isoformat() if stats["last_usage"] else None,
|
|
usage_by_period=usage_by_period
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get usage stats: {str(e)}")
|
|
|
|
@app.get("/api/agents/{agent_name}/usage/chart")
|
|
async def get_agent_usage_chart(
|
|
agent_name: str,
|
|
period: str = Query("daily", regex="^(daily|weekly|monthly)$"),
|
|
start_date: Optional[str] = Query(None),
|
|
end_date: Optional[str] = Query(None),
|
|
current_user: dict = Depends(get_current_user_from_cookie)
|
|
):
|
|
"""Get usage chart data for a specific agent"""
|
|
try:
|
|
# Parse date strings if provided
|
|
start_dt = None
|
|
end_dt = None
|
|
if start_date:
|
|
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
if end_date:
|
|
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
|
|
|
|
# Get usage data grouped by period
|
|
usage_data = await crud.get_agent_usage_by_period(agent_name, period, start_dt, end_dt)
|
|
|
|
# Format for Chart.js
|
|
chart_data = {
|
|
"labels": list(usage_data.keys()),
|
|
"datasets": [{
|
|
"label": f"Usage Count ({period})",
|
|
"data": list(usage_data.values()),
|
|
"backgroundColor": "rgba(54, 162, 235, 0.2)",
|
|
"borderColor": "rgba(54, 162, 235, 1)",
|
|
"borderWidth": 1
|
|
}]
|
|
}
|
|
|
|
return chart_data
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get chart data: {str(e)}")
|
|
|
|
# Conditionally register MSAL routes if available
|
|
if msal_auth.is_msal_available():
|
|
app.get("/auth/azure/login")(azure_login)
|
|
app.get("/auth/azure/callback")(azure_callback)
|