from datetime import datetime, timedelta from typing import Optional from bson import ObjectId from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from motor.motor_asyncio import AsyncIOMotorDatabase from ...core.database import get_database from ...core.dependencies import get_current_user, require_roles from ...core.logging import get_logger from ...core.security import get_password_hash, verify_password from ...models.user import User, UserRole from ...models.audit_log import AuditAction, AuditLogQuery, AuditLogResponse from ...schemas.auth import ( AdminStatsResponse, ChangePasswordRequest, CreateUserRequest, ResetPasswordRequest, UpdateUserRequest, UserListResponse, UserResponse, ) from ...services.audit_logger import audit_logger, log_user_management, log_security_event from ...telemetry import app_metrics logger = get_logger(__name__) router = APIRouter(prefix="/admin", tags=["admin"]) @router.get("/users", response_model=UserListResponse) async def list_users( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), role: Optional[str] = Query(None), active_only: bool = Query(True), current_user: User = Depends(require_roles(UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """List users with filtering and pagination (admin only)""" query = {} if role: query["role"] = role if active_only: query["is_active"] = True # Get total count total = await db.users.count_documents(query) # Get paginated results skip = (page - 1) * size cursor = db.users.find(query, {"hashed_password": 0}).sort("created_at", -1).skip(skip).limit(size) users = await cursor.to_list(length=size) user_responses = [] for user_doc in users: user_responses.append(UserResponse( id=str(user_doc["_id"]), email=user_doc["email"], full_name=user_doc["full_name"], role=user_doc["role"], auth_provider=user_doc.get("auth_provider", "local"), is_active=user_doc["is_active"], created_at=user_doc.get("created_at", datetime.utcnow()).isoformat(), pm_client_ids=user_doc.get("pm_client_ids", []), )) return UserListResponse( users=user_responses, total=total, page=page, size=size ) @router.get("/users/{user_id}", response_model=UserResponse) async def get_user( user_id: str, current_user: User = Depends(require_roles(UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Get user details by ID (admin only)""" user_doc = await db.users.find_one({"_id": user_id}, {"hashed_password": 0}) if not user_doc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return UserResponse( id=str(user_doc["_id"]), email=user_doc["email"], full_name=user_doc["full_name"], role=user_doc["role"], auth_provider=user_doc.get("auth_provider", "local"), is_active=user_doc["is_active"], created_at=user_doc.get("created_at", datetime.utcnow()).isoformat(), pm_client_ids=user_doc.get("pm_client_ids", []), ) @router.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def create_user( user_data: CreateUserRequest, request: Request, current_user: User = Depends(require_roles(UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Create a new user (admin only)""" # Check if user already exists existing_user = await db.users.find_one({"email": user_data.email}) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User with this email already exists" ) # Create user document user_id = str(ObjectId()) user_doc = { "_id": user_id, "email": user_data.email, "hashed_password": get_password_hash(user_data.password), "full_name": user_data.full_name, "role": user_data.role.value, "auth_provider": "local", "is_active": True, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } await db.users.insert_one(user_doc) # Record metrics app_metrics.record_auth_attempt("user_created", user_data.role.value) logger.info(f"Admin {current_user.id} created user {user_id} with role {user_data.role.value}") await log_user_management( AuditAction.USER_CREATE, user_id, current_user, request, details={"email": user_data.email, "role": user_data.role.value}, ) return UserResponse( id=user_id, email=user_data.email, full_name=user_data.full_name, role=user_data.role, auth_provider="local", is_active=True, created_at=user_doc["created_at"].isoformat(), pm_client_ids=[], ) @router.patch("/users/{user_id}", response_model=UserResponse) async def update_user( user_id: str, user_update: UpdateUserRequest, request: Request, current_user: User = Depends(require_roles(UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Update user details (admin only)""" # Check if user exists user_doc = await db.users.find_one({"_id": user_id}) if not user_doc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Check if email is being changed and doesn't conflict if user_update.email and user_update.email != user_doc["email"]: existing_user = await db.users.find_one({"email": user_update.email, "_id": {"$ne": user_id}}) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already in use by another user" ) # Build update document update_data = {"updated_at": datetime.utcnow()} if user_update.email: update_data["email"] = user_update.email if user_update.full_name: update_data["full_name"] = user_update.full_name if user_update.role: update_data["role"] = user_update.role.value if user_update.is_active is not None: update_data["is_active"] = user_update.is_active # Update user result = await db.users.find_one_and_update( {"_id": user_id}, {"$set": update_data}, return_document=True ) logger.info(f"Admin {current_user.id} updated user {user_id}") action = AuditAction.USER_ROLE_CHANGE if user_update.role else AuditAction.USER_UPDATE await log_user_management( action, user_id, current_user, request, details={k: v for k, v in user_update.dict(exclude_none=True).items()}, ) return UserResponse( id=str(result["_id"]), email=result["email"], full_name=result["full_name"], role=result["role"], auth_provider=result.get("auth_provider", "local"), is_active=result["is_active"], created_at=result.get("created_at", datetime.utcnow()).isoformat(), pm_client_ids=result.get("pm_client_ids", []), ) @router.delete("/users/{user_id}") async def deactivate_user( user_id: str, request: Request, current_user: User = Depends(require_roles(UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Deactivate user account (admin only) - soft delete""" if str(current_user.id) == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate your own account" ) result = await db.users.update_one( {"_id": user_id}, { "$set": { "is_active": False, "updated_at": datetime.utcnow() } } ) if result.matched_count == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) logger.info(f"Admin {current_user.id} deactivated user {user_id}") await log_user_management(AuditAction.USER_DEACTIVATE, user_id, current_user, request) return {"message": "User deactivated successfully"} @router.post("/users/{user_id}/reset-password") async def admin_reset_password( user_id: str, reset_request: ResetPasswordRequest, current_user: User = Depends(require_roles(UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Reset user password (admin only)""" # Generate temporary password import secrets import string temp_password = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(12)) hashed_password = get_password_hash(temp_password) result = await db.users.update_one( {"_id": user_id}, { "$set": { "hashed_password": hashed_password, "updated_at": datetime.utcnow() } } ) if result.matched_count == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) logger.info(f"Admin {current_user.id} reset password for user {user_id}") # In production, send email with temp password instead of returning it return { "message": "Password reset successfully", "temporary_password": temp_password # Remove this in production, send via email } @router.get("/stats", response_model=AdminStatsResponse) async def get_admin_stats( current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Get system statistics (production/admin only)""" # Get user count total_users = await db.users.count_documents({"is_active": True}) # Get job counts total_jobs = await db.jobs.count_documents({}) # Get jobs by status pipeline = [ {"$group": {"_id": "$status", "count": {"$sum": 1}}} ] status_counts = await db.jobs.aggregate(pipeline).to_list(None) jobs_by_status = {item["_id"]: item["count"] for item in status_counts} # Get jobs created today today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) active_jobs_today = await db.jobs.count_documents({ "created_at": {"$gte": today_start} }) # Calculate average processing time for completed jobs avg_processing_pipeline = [ {"$match": {"status": "completed", "created_at": {"$exists": True}, "updated_at": {"$exists": True}}}, { "$project": { "processing_time_hours": { "$divide": [ {"$subtract": ["$updated_at", "$created_at"]}, 3600000 # Convert milliseconds to hours ] } } }, { "$group": { "_id": None, "avg_processing_time": {"$avg": "$processing_time_hours"} } } ] avg_result = await db.jobs.aggregate(avg_processing_pipeline).to_list(None) avg_processing_time = avg_result[0]["avg_processing_time"] if avg_result else 0.0 return AdminStatsResponse( total_users=total_users, total_jobs=total_jobs, jobs_by_status=jobs_by_status, active_jobs_today=active_jobs_today, avg_processing_time_hours=round(avg_processing_time, 2) ) @router.get("/health/detailed") async def detailed_health_check( current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Detailed health check with system component status (reviewer/production/admin only)""" health_status = { "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "components": {} } # Check MongoDB try: await db.command("ping") health_status["components"]["mongodb"] = {"status": "healthy"} except Exception as e: health_status["components"]["mongodb"] = {"status": "unhealthy", "error": str(e)} health_status["status"] = "degraded" # Check Redis (via import to avoid circular dependency) try: from ...core.redis import redis_client if redis_client: await redis_client.ping() health_status["components"]["redis"] = {"status": "healthy"} else: health_status["components"]["redis"] = {"status": "not_configured"} except Exception as e: health_status["components"]["redis"] = {"status": "unhealthy", "error": str(e)} health_status["status"] = "degraded" # Check GCS (basic check) try: from ...services.gcs import gcs_service # Simple check to see if bucket is accessible bucket_exists = await gcs_service.file_exists("health_check_dummy") # This will return False but won't error if bucket accessible health_status["components"]["gcs"] = {"status": "healthy"} except Exception as e: health_status["components"]["gcs"] = {"status": "unhealthy", "error": str(e)} health_status["status"] = "degraded" # Check job queue health try: from ...tasks import celery_app inspect = celery_app.control.inspect() active_tasks = inspect.active() if active_tasks: total_active = sum(len(tasks) for tasks in active_tasks.values()) health_status["components"]["celery"] = { "status": "healthy", "active_tasks": total_active, "workers": len(active_tasks) } else: health_status["components"]["celery"] = { "status": "no_workers", "active_tasks": 0, "workers": 0 } except Exception as e: health_status["components"]["celery"] = {"status": "unhealthy", "error": str(e)} health_status["status"] = "degraded" return health_status @router.get("/jobs/stats") async def get_job_statistics( days: int = Query(7, ge=1, le=90), current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Get job processing statistics (reviewer/production/admin only)""" since_date = datetime.utcnow() - timedelta(days=days) # Jobs created in period jobs_in_period = await db.jobs.count_documents({ "created_at": {"$gte": since_date} }) # Jobs completed in period jobs_completed = await db.jobs.count_documents({ "status": "completed", "updated_at": {"$gte": since_date} }) # Average processing time for completed jobs avg_pipeline = [ { "$match": { "status": "completed", "created_at": {"$gte": since_date}, "updated_at": {"$exists": True} } }, { "$project": { "processing_time_hours": { "$divide": [ {"$subtract": ["$updated_at", "$created_at"]}, 3600000 ] } } }, { "$group": { "_id": None, "avg_time": {"$avg": "$processing_time_hours"}, "min_time": {"$min": "$processing_time_hours"}, "max_time": {"$max": "$processing_time_hours"} } } ] avg_result = await db.jobs.aggregate(avg_pipeline).to_list(None) processing_stats = avg_result[0] if avg_result else { "avg_time": 0, "min_time": 0, "max_time": 0 } # Current queue status current_queue_stats = {} pipeline = [ {"$group": {"_id": "$status", "count": {"$sum": 1}}} ] status_counts = await db.jobs.aggregate(pipeline).to_list(None) for item in status_counts: current_queue_stats[item["_id"]] = item["count"] return { "period_days": days, "jobs_created": jobs_in_period, "jobs_completed": jobs_completed, "completion_rate": round(jobs_completed / max(jobs_in_period, 1) * 100, 2), "avg_processing_time_hours": round(processing_stats["avg_time"], 2), "min_processing_time_hours": round(processing_stats["min_time"], 2), "max_processing_time_hours": round(processing_stats["max_time"], 2), "current_queue_status": current_queue_stats } @router.post("/users/{user_id}/password/reset") async def admin_force_password_reset( user_id: str, current_user: User = Depends(require_roles(UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Force password reset for user (admin only)""" if str(current_user.id) == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot reset your own password this way" ) # Check if user exists user_doc = await db.users.find_one({"_id": user_id}) if not user_doc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Generate secure temporary password import secrets import string temp_password = ''.join(secrets.choice( string.ascii_letters + string.digits + "!@#$%" ) for _ in range(16)) # Update password await db.users.update_one( {"_id": user_id}, { "$set": { "hashed_password": get_password_hash(temp_password), "updated_at": datetime.utcnow() } } ) # TODO: In production, send via secure email instead of returning password logger.info(f"Admin {current_user.id} reset password for user {user_id}") return { "message": "Password reset successfully", "temporary_password": temp_password, "note": "User should change this password immediately" } @router.get("/audit-logs") async def get_audit_logs( job_id: Optional[str] = Query(None), action: Optional[str] = Query(None), days: int = Query(7, ge=1, le=90), page: int = Query(1, ge=1), size: int = Query(50, ge=1, le=200), current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Get audit logs with filtering (production/admin only)""" query = { "when": {"$gte": datetime.utcnow() - timedelta(days=days)} } if job_id: query["job_id"] = job_id if action: query["action"] = action # Get total count total = await db.audit_logs.count_documents(query) # Get paginated results skip = (page - 1) * size cursor = ( db.audit_logs.find(query) .sort("when", -1) .skip(skip) .limit(size) ) logs = await cursor.to_list(length=size) return { "logs": logs, "total": total, "page": page, "size": size, "period_days": days } @router.post("/maintenance/reprocess-job/{job_id}") async def reprocess_job( job_id: str, current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Force reprocessing of a job (production/admin emergency function)""" # Check if job exists job_doc = await db.jobs.find_one({"_id": job_id}) if not job_doc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Job not found" ) # Reset job to created status for reprocessing await db.jobs.update_one( {"_id": job_id}, { "$set": { "status": "created", "error": None, "updated_at": datetime.utcnow() }, "$push": { "review.history": { "at": datetime.utcnow(), "status": "reprocessing", "by": str(current_user.id), "notes": "Admin-triggered reprocessing" } } } ) # Broadcast status update try: from ...services.websocket import connection_manager await connection_manager.broadcast_job_status_update( job_id=job_id, status="created", job_title=job_doc.get("title"), message=f"{job_doc.get('title', 'Job')} has been reset and is queued for reprocessing" ) except Exception as e: logger.warning(f"Failed to broadcast status update for job reset {job_id}: {e}") # Trigger ingestion task from ...tasks.ingest_and_ai import ingest_and_ai_task ingest_and_ai_task.delay(job_id) logger.warning(f"Admin {current_user.id} triggered reprocessing for job {job_id}") return {"message": f"Job {job_id} queued for reprocessing"} @router.get("/audit-logs", response_model=AuditLogResponse) async def get_audit_logs_detailed( # Time range start_date: Optional[datetime] = Query(None, description="Start date for audit logs"), end_date: Optional[datetime] = Query(None, description="End date for audit logs"), # Filters action: Optional[str] = Query(None, description="Filter by action type"), severity: Optional[str] = Query(None, description="Filter by severity level"), user_email: Optional[str] = Query(None, description="Filter by user email"), resource_type: Optional[str] = Query(None, description="Filter by resource type"), resource_id: Optional[str] = Query(None, description="Filter by resource ID"), success: Optional[bool] = Query(None, description="Filter by success status"), # Search search: Optional[str] = Query(None, description="Search in description and details"), # Pagination page: int = Query(1, ge=1, description="Page number"), size: int = Query(50, ge=1, le=500, description="Page size"), # Sorting sort_by: str = Query("timestamp", description="Field to sort by"), sort_order: int = Query(-1, ge=-1, le=1, description="Sort order (-1 desc, 1 asc)"), current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), request: Request = None, ): """Get audit logs with filtering and pagination (production/admin only)""" # Log audit log access await audit_logger.log_action( action="admin.audit.access", description=f"Admin {current_user.email} accessed audit logs", user=current_user, request=request, details={ "filters": { "start_date": start_date.isoformat() if start_date else None, "end_date": end_date.isoformat() if end_date else None, "action": action, "severity": severity, "user_email": user_email, "resource_type": resource_type, "search": search } } ) # Build query query = AuditLogQuery( start_date=start_date, end_date=end_date, action=action, severity=severity, user_email=user_email, resource_type=resource_type, resource_id=resource_id, success=success, search=search, skip=(page - 1) * size, limit=size, sort_by=sort_by, sort_order=sort_order ) return await audit_logger.query_logs(query) @router.get("/audit-logs/user/{user_id}") async def get_user_audit_logs( user_id: str, days: int = Query(30, ge=1, le=365, description="Number of days to look back"), current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), request: Request = None, ): """Get audit logs for a specific user (production/admin only)""" # Validate user_id try: ObjectId(user_id) except Exception: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid user ID format" ) # Log access to user audit logs await audit_logger.log_action( action="admin.audit.access", description=f"Admin {current_user.email} accessed user audit logs for {user_id}", user=current_user, request=request, resource_type="user", resource_id=user_id, details={"days_requested": days} ) logs = await audit_logger.get_user_activity(user_id, days) return {"logs": logs, "user_id": user_id, "days": days} @router.get("/audit-logs/security") async def get_security_events( hours: int = Query(24, ge=1, le=168, description="Number of hours to look back"), current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), request: Request = None, ): """Get recent security events (production/admin only)""" # Log access to security events await audit_logger.log_action( action="admin.audit.access", description=f"Admin {current_user.email} accessed security events", user=current_user, request=request, details={"hours_requested": hours} ) logs = await audit_logger.get_security_events(hours) return {"logs": logs, "hours": hours} @router.delete("/audit-logs/cleanup") async def cleanup_audit_logs( retention_days: int = Query(365, ge=30, le=2555, description="Retention period in days"), current_user: User = Depends(require_roles(UserRole.ADMIN)), request: Request = None, ): """Clean up old audit logs (admin only)""" # Log audit cleanup action await audit_logger.log_action( action="admin.system.action", description=f"Admin {current_user.email} initiated audit log cleanup", user=current_user, request=request, details={"retention_days": retention_days}, severity="warning" ) deleted_count = await audit_logger.cleanup_old_logs(retention_days) # Log cleanup completion await audit_logger.log_action( action="admin.system.action", description=f"Audit log cleanup completed: {deleted_count} logs deleted", user=current_user, request=request, details={ "retention_days": retention_days, "deleted_count": deleted_count } ) return { "message": f"Deleted {deleted_count} audit logs older than {retention_days} days", "deleted_count": deleted_count, "retention_days": retention_days }