"""Audit logging service for tracking sensitive operations.""" import uuid from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from fastapi import Request from motor.motor_asyncio import AsyncIOMotorCollection from app.core.database import get_database from app.core.config import get_settings from app.models.audit_log import ( AuditLog, AuditLogCreate, AuditLogQuery, AuditLogResponse, AuditAction, AuditLogSeverity ) from app.models.user import User from app.telemetry.tracing import trace_async_operation class AuditLogger: """Service for managing audit logs.""" def __init__(self): self.settings = get_settings() self.collection: Optional[AsyncIOMotorCollection] = None async def _get_collection(self) -> AsyncIOMotorCollection: """Get the audit logs collection.""" if not self.collection: db = await get_database() self.collection = db.audit_logs return self.collection @trace_async_operation("audit_logger.log_action") async def log_action( self, action: AuditAction, description: str, user: Optional[User] = None, request: Optional[Request] = None, resource_type: Optional[str] = None, resource_id: Optional[str] = None, resource_name: Optional[str] = None, details: Optional[Dict[str, Any]] = None, severity: AuditLogSeverity = AuditLogSeverity.INFO, success: bool = True, error_message: Optional[str] = None ) -> str: """ Log an audit event. Returns: The ID of the created audit log entry. """ # Extract request context ip_address = None user_agent = None request_id = None if request: # Get IP address (handle forwarded headers) forwarded_for = request.headers.get("X-Forwarded-For") if forwarded_for: ip_address = forwarded_for.split(',')[0].strip() elif request.client: ip_address = request.client.host user_agent = request.headers.get("User-Agent") request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())) # Create audit log entry audit_log = AuditLog( action=action, severity=severity, description=description, user_id=user.id if user else None, user_email=user.email if user else None, user_role=user.role.value if user else None, ip_address=ip_address, user_agent=user_agent, request_id=request_id, resource_type=resource_type, resource_id=resource_id, resource_name=resource_name, details=details or {}, success=success, error_message=error_message, environment=self.settings.app_env, service_name="accessible-video-api", api_version="v1" ) # Save to database collection = await self._get_collection() result = await collection.insert_one(audit_log.dict(by_alias=True)) return str(result.inserted_id) @trace_async_operation("audit_logger.query_logs") async def query_logs(self, query: AuditLogQuery) -> AuditLogResponse: """Query audit logs with filtering and pagination.""" collection = await self._get_collection() # Build MongoDB query mongo_query = {} # Time range filter if query.start_date or query.end_date: timestamp_filter = {} if query.start_date: timestamp_filter["$gte"] = query.start_date if query.end_date: timestamp_filter["$lte"] = query.end_date mongo_query["timestamp"] = timestamp_filter # Exact match filters if query.action: mongo_query["action"] = query.action if query.severity: mongo_query["severity"] = query.severity if query.user_id: mongo_query["user_id"] = query.user_id if query.user_email: mongo_query["user_email"] = query.user_email if query.resource_type: mongo_query["resource_type"] = query.resource_type if query.resource_id: mongo_query["resource_id"] = query.resource_id if query.success is not None: mongo_query["success"] = query.success # Text search if query.search: mongo_query["$or"] = [ {"description": {"$regex": query.search, "$options": "i"}}, {"details": {"$regex": query.search, "$options": "i"}}, {"error_message": {"$regex": query.search, "$options": "i"}} ] # Get total count total_count = await collection.count_documents(mongo_query) # Execute query with pagination and sorting cursor = collection.find(mongo_query) # Apply sorting sort_direction = query.sort_order cursor = cursor.sort(query.sort_by, sort_direction) # Apply pagination cursor = cursor.skip(query.skip).limit(query.limit) # Execute query documents = await cursor.to_list(length=query.limit) # Convert to Pydantic models logs = [] for doc in documents: try: logs.append(AuditLog(**doc)) except Exception as e: # Log conversion error but continue print(f"Error converting audit log document: {e}") continue # Calculate pagination info page = (query.skip // query.limit) + 1 has_more = (query.skip + len(logs)) < total_count return AuditLogResponse( logs=logs, total_count=total_count, page=page, page_size=len(logs), has_more=has_more ) async def get_user_activity(self, user_id: str, days: int = 30) -> List[AuditLog]: """Get recent activity for a specific user.""" from_date = datetime.utcnow().replace( hour=0, minute=0, second=0, microsecond=0 ) - timedelta(days=days) query = AuditLogQuery( user_id=user_id, start_date=from_date, limit=1000, sort_by="timestamp", sort_order=-1 ) response = await self.query_logs(query) return response.logs async def get_security_events(self, hours: int = 24) -> List[AuditLog]: """Get recent security-related events.""" from_date = datetime.utcnow() - timedelta(hours=hours) security_actions = [ AuditAction.LOGIN_FAILURE, AuditAction.RATE_LIMIT_EXCEEDED, AuditAction.VALIDATION_FAILURE, AuditAction.UNAUTHORIZED_ACCESS, AuditAction.SUSPICIOUS_ACTIVITY ] collection = await self._get_collection() query = { "timestamp": {"$gte": from_date}, "action": {"$in": security_actions} } cursor = collection.find(query).sort("timestamp", -1).limit(1000) documents = await cursor.to_list(length=1000) logs = [] for doc in documents: try: logs.append(AuditLog(**doc)) except Exception: continue return logs async def cleanup_old_logs(self, retention_days: int = 365) -> int: """Clean up audit logs older than retention period.""" cutoff_date = datetime.utcnow().replace( hour=0, minute=0, second=0, microsecond=0 ) - timedelta(days=retention_days) collection = await self._get_collection() result = await collection.delete_many({ "timestamp": {"$lt": cutoff_date} }) return result.deleted_count # Global audit logger instance audit_logger = AuditLogger() # Convenience functions for common audit operations async def log_auth_success(user: User, request: Request): """Log successful authentication.""" await audit_logger.log_action( action=AuditAction.LOGIN_SUCCESS, description=f"User {user.email} logged in successfully", user=user, request=request, severity=AuditLogSeverity.INFO ) async def log_auth_failure(email: str, request: Request, reason: str): """Log failed authentication attempt.""" await audit_logger.log_action( action=AuditAction.LOGIN_FAILURE, description=f"Failed login attempt for {email}: {reason}", request=request, severity=AuditLogSeverity.WARNING, success=False, error_message=reason, details={"attempted_email": email} ) async def log_job_action(action: AuditAction, job_id: str, user: User, request: Request, details: Optional[Dict] = None): """Log job-related actions.""" action_descriptions = { AuditAction.JOB_CREATE: "Job created", AuditAction.JOB_APPROVE: "Job approved", AuditAction.JOB_REJECT: "Job rejected", AuditAction.JOB_CANCEL: "Job cancelled", AuditAction.JOB_UPDATE: "Job updated" } await audit_logger.log_action( action=action, description=f"{action_descriptions.get(action, str(action))} by {user.email}", user=user, request=request, resource_type="job", resource_id=job_id, details=details ) async def log_user_management(action: AuditAction, target_user_id: str, admin_user: User, request: Request, details: Optional[Dict] = None): """Log user management actions.""" action_descriptions = { AuditAction.USER_CREATE: "User created", AuditAction.USER_UPDATE: "User updated", AuditAction.USER_DELETE: "User deleted", AuditAction.USER_ROLE_CHANGE: "User role changed", AuditAction.USER_ACTIVATE: "User activated", AuditAction.USER_DEACTIVATE: "User deactivated" } await audit_logger.log_action( action=action, description=f"{action_descriptions.get(action, str(action))} by admin {admin_user.email}", user=admin_user, request=request, resource_type="user", resource_id=target_user_id, details=details, severity=AuditLogSeverity.INFO ) async def log_security_event(action: AuditAction, description: str, request: Request, user: Optional[User] = None, details: Optional[Dict] = None): """Log security-related events.""" await audit_logger.log_action( action=action, description=description, user=user, request=request, severity=AuditLogSeverity.WARNING if action != AuditAction.SUSPICIOUS_ACTIVITY else AuditLogSeverity.CRITICAL, success=False, details=details )