"""Audit log model for tracking sensitive operations.""" from datetime import datetime from enum import Enum from typing import Any, Dict, Optional from bson import ObjectId from pydantic import BaseModel, Field from .user import PyObjectId class AuditAction(str, Enum): """Enumeration of auditable actions.""" # Authentication actions LOGIN_SUCCESS = "auth.login.success" LOGIN_FAILURE = "auth.login.failure" LOGOUT = "auth.logout" TOKEN_REFRESH = "auth.token.refresh" PASSWORD_CHANGE = "auth.password.change" PASSWORD_RESET = "auth.password.reset" # User management actions USER_CREATE = "user.create" USER_UPDATE = "user.update" USER_DELETE = "user.delete" USER_ROLE_CHANGE = "user.role.change" USER_ACTIVATE = "user.activate" USER_DEACTIVATE = "user.deactivate" # Job management actions JOB_CREATE = "job.create" JOB_UPDATE = "job.update" JOB_DELETE = "job.delete" JOB_APPROVE = "job.approve" JOB_REJECT = "job.reject" JOB_CANCEL = "job.cancel" JOB_STATUS_CHANGE = "job.status.change" # File operations FILE_UPLOAD = "file.upload" FILE_DOWNLOAD = "file.download" FILE_DELETE = "file.delete" FILE_ACCESS = "file.access" # VTT editing actions VTT_EDIT = "vtt.edit" VTT_APPROVE = "vtt.approve" VTT_REJECT = "vtt.reject" # Admin actions ADMIN_CONFIG_CHANGE = "admin.config.change" ADMIN_SYSTEM_ACTION = "admin.system.action" ADMIN_DATA_EXPORT = "admin.data.export" ADMIN_AUDIT_ACCESS = "admin.audit.access" # Security events RATE_LIMIT_EXCEEDED = "security.rate_limit.exceeded" VALIDATION_FAILURE = "security.validation.failure" UNAUTHORIZED_ACCESS = "security.unauthorized.access" SUSPICIOUS_ACTIVITY = "security.suspicious.activity" class AuditLogSeverity(str, Enum): """Severity levels for audit events.""" INFO = "info" # Normal operations WARNING = "warning" # Suspicious but not critical ERROR = "error" # Failed operations CRITICAL = "critical" # Security incidents class AuditLog(BaseModel): """Audit log entry model.""" id: Optional[PyObjectId] = Field(default_factory=PyObjectId, alias="_id") # Core audit fields timestamp: datetime = Field(default_factory=datetime.utcnow) action: AuditAction severity: AuditLogSeverity = AuditLogSeverity.INFO # Actor information user_id: Optional[PyObjectId] = None user_email: Optional[str] = None user_role: Optional[str] = None # Request context ip_address: Optional[str] = None user_agent: Optional[str] = None request_id: Optional[str] = None session_id: Optional[str] = None # Resource information resource_type: Optional[str] = None # e.g., "job", "user", "file" resource_id: Optional[str] = None resource_name: Optional[str] = None # Action details description: str details: Dict[str, Any] = Field(default_factory=dict) # Outcome success: bool = True error_message: Optional[str] = None # Additional metadata environment: str = "prod" service_name: str = "accessible-video-api" api_version: str = "v1" class Config: populate_by_name = True arbitrary_types_allowed = True json_encoders = {ObjectId: str} class AuditLogCreate(BaseModel): """Schema for creating audit log entries.""" action: AuditAction severity: AuditLogSeverity = AuditLogSeverity.INFO description: str # Optional fields that can be provided user_id: Optional[PyObjectId] = None user_email: Optional[str] = None user_role: Optional[str] = None ip_address: Optional[str] = None user_agent: Optional[str] = None request_id: Optional[str] = None resource_type: Optional[str] = None resource_id: Optional[str] = None resource_name: Optional[str] = None details: Dict[str, Any] = Field(default_factory=dict) success: bool = True error_message: Optional[str] = None class AuditLogQuery(BaseModel): """Schema for querying audit logs.""" # Time range start_date: Optional[datetime] = None end_date: Optional[datetime] = None # Filters action: Optional[AuditAction] = None severity: Optional[AuditLogSeverity] = None user_id: Optional[PyObjectId] = None user_email: Optional[str] = None resource_type: Optional[str] = None resource_id: Optional[str] = None success: Optional[bool] = None # Search search: Optional[str] = None # Full-text search in description and details # Pagination skip: int = 0 limit: int = 100 # Sorting sort_by: str = "timestamp" sort_order: int = -1 # -1 for descending, 1 for ascending class AuditLogResponse(BaseModel): """Response schema for audit log queries.""" logs: list[AuditLog] total_count: int page: int page_size: int has_more: bool