video-accessibility/backend/app/models/audit_log.py
2025-08-24 16:28:33 -05:00

175 lines
4.9 KiB
Python

"""Audit log model for tracking sensitive operations."""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
from bson import ObjectId
from pydantic import BaseModel, Field
from .user import PyObjectId
class AuditAction(str, Enum):
"""Enumeration of auditable actions."""
# Authentication actions
LOGIN_SUCCESS = "auth.login.success"
LOGIN_FAILURE = "auth.login.failure"
LOGOUT = "auth.logout"
TOKEN_REFRESH = "auth.token.refresh"
PASSWORD_CHANGE = "auth.password.change"
PASSWORD_RESET = "auth.password.reset"
# User management actions
USER_CREATE = "user.create"
USER_UPDATE = "user.update"
USER_DELETE = "user.delete"
USER_ROLE_CHANGE = "user.role.change"
USER_ACTIVATE = "user.activate"
USER_DEACTIVATE = "user.deactivate"
# Job management actions
JOB_CREATE = "job.create"
JOB_UPDATE = "job.update"
JOB_DELETE = "job.delete"
JOB_APPROVE = "job.approve"
JOB_REJECT = "job.reject"
JOB_CANCEL = "job.cancel"
JOB_STATUS_CHANGE = "job.status.change"
# File operations
FILE_UPLOAD = "file.upload"
FILE_DOWNLOAD = "file.download"
FILE_DELETE = "file.delete"
FILE_ACCESS = "file.access"
# VTT editing actions
VTT_EDIT = "vtt.edit"
VTT_APPROVE = "vtt.approve"
VTT_REJECT = "vtt.reject"
# Admin actions
ADMIN_CONFIG_CHANGE = "admin.config.change"
ADMIN_SYSTEM_ACTION = "admin.system.action"
ADMIN_DATA_EXPORT = "admin.data.export"
ADMIN_AUDIT_ACCESS = "admin.audit.access"
# Security events
RATE_LIMIT_EXCEEDED = "security.rate_limit.exceeded"
VALIDATION_FAILURE = "security.validation.failure"
UNAUTHORIZED_ACCESS = "security.unauthorized.access"
SUSPICIOUS_ACTIVITY = "security.suspicious.activity"
class AuditLogSeverity(str, Enum):
"""Severity levels for audit events."""
INFO = "info" # Normal operations
WARNING = "warning" # Suspicious but not critical
ERROR = "error" # Failed operations
CRITICAL = "critical" # Security incidents
class AuditLog(BaseModel):
"""Audit log entry model."""
id: Optional[PyObjectId] = Field(default_factory=PyObjectId, alias="_id")
# Core audit fields
timestamp: datetime = Field(default_factory=datetime.utcnow)
action: AuditAction
severity: AuditLogSeverity = AuditLogSeverity.INFO
# Actor information
user_id: Optional[PyObjectId] = None
user_email: Optional[str] = None
user_role: Optional[str] = None
# Request context
ip_address: Optional[str] = None
user_agent: Optional[str] = None
request_id: Optional[str] = None
session_id: Optional[str] = None
# Resource information
resource_type: Optional[str] = None # e.g., "job", "user", "file"
resource_id: Optional[str] = None
resource_name: Optional[str] = None
# Action details
description: str
details: Dict[str, Any] = Field(default_factory=dict)
# Outcome
success: bool = True
error_message: Optional[str] = None
# Additional metadata
environment: str = "prod"
service_name: str = "accessible-video-api"
api_version: str = "v1"
class Config:
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
class AuditLogCreate(BaseModel):
"""Schema for creating audit log entries."""
action: AuditAction
severity: AuditLogSeverity = AuditLogSeverity.INFO
description: str
# Optional fields that can be provided
user_id: Optional[PyObjectId] = None
user_email: Optional[str] = None
user_role: Optional[str] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
request_id: Optional[str] = None
resource_type: Optional[str] = None
resource_id: Optional[str] = None
resource_name: Optional[str] = None
details: Dict[str, Any] = Field(default_factory=dict)
success: bool = True
error_message: Optional[str] = None
class AuditLogQuery(BaseModel):
"""Schema for querying audit logs."""
# Time range
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
# Filters
action: Optional[AuditAction] = None
severity: Optional[AuditLogSeverity] = None
user_id: Optional[PyObjectId] = None
user_email: Optional[str] = None
resource_type: Optional[str] = None
resource_id: Optional[str] = None
success: Optional[bool] = None
# Search
search: Optional[str] = None # Full-text search in description and details
# Pagination
skip: int = 0
limit: int = 100
# Sorting
sort_by: str = "timestamp"
sort_order: int = -1 # -1 for descending, 1 for ascending
class AuditLogResponse(BaseModel):
"""Response schema for audit log queries."""
logs: list[AuditLog]
total_count: int
page: int
page_size: int
has_more: bool