175 lines
4.9 KiB
Python
175 lines
4.9 KiB
Python
"""Audit log model for tracking sensitive operations."""
|
|
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Any, Dict, Optional
|
|
from bson import ObjectId
|
|
from pydantic import BaseModel, Field
|
|
|
|
from .user import PyObjectId
|
|
|
|
|
|
class AuditAction(str, Enum):
|
|
"""Enumeration of auditable actions."""
|
|
|
|
# Authentication actions
|
|
LOGIN_SUCCESS = "auth.login.success"
|
|
LOGIN_FAILURE = "auth.login.failure"
|
|
LOGOUT = "auth.logout"
|
|
TOKEN_REFRESH = "auth.token.refresh"
|
|
PASSWORD_CHANGE = "auth.password.change"
|
|
PASSWORD_RESET = "auth.password.reset"
|
|
|
|
# User management actions
|
|
USER_CREATE = "user.create"
|
|
USER_UPDATE = "user.update"
|
|
USER_DELETE = "user.delete"
|
|
USER_ROLE_CHANGE = "user.role.change"
|
|
USER_ACTIVATE = "user.activate"
|
|
USER_DEACTIVATE = "user.deactivate"
|
|
|
|
# Job management actions
|
|
JOB_CREATE = "job.create"
|
|
JOB_UPDATE = "job.update"
|
|
JOB_DELETE = "job.delete"
|
|
JOB_APPROVE = "job.approve"
|
|
JOB_REJECT = "job.reject"
|
|
JOB_CANCEL = "job.cancel"
|
|
JOB_STATUS_CHANGE = "job.status.change"
|
|
|
|
# File operations
|
|
FILE_UPLOAD = "file.upload"
|
|
FILE_DOWNLOAD = "file.download"
|
|
FILE_DELETE = "file.delete"
|
|
FILE_ACCESS = "file.access"
|
|
|
|
# VTT editing actions
|
|
VTT_EDIT = "vtt.edit"
|
|
VTT_APPROVE = "vtt.approve"
|
|
VTT_REJECT = "vtt.reject"
|
|
|
|
# Admin actions
|
|
ADMIN_CONFIG_CHANGE = "admin.config.change"
|
|
ADMIN_SYSTEM_ACTION = "admin.system.action"
|
|
ADMIN_DATA_EXPORT = "admin.data.export"
|
|
ADMIN_AUDIT_ACCESS = "admin.audit.access"
|
|
|
|
# Security events
|
|
RATE_LIMIT_EXCEEDED = "security.rate_limit.exceeded"
|
|
VALIDATION_FAILURE = "security.validation.failure"
|
|
UNAUTHORIZED_ACCESS = "security.unauthorized.access"
|
|
SUSPICIOUS_ACTIVITY = "security.suspicious.activity"
|
|
|
|
|
|
class AuditLogSeverity(str, Enum):
|
|
"""Severity levels for audit events."""
|
|
|
|
INFO = "info" # Normal operations
|
|
WARNING = "warning" # Suspicious but not critical
|
|
ERROR = "error" # Failed operations
|
|
CRITICAL = "critical" # Security incidents
|
|
|
|
|
|
class AuditLog(BaseModel):
|
|
"""Audit log entry model."""
|
|
|
|
id: Optional[PyObjectId] = Field(default_factory=PyObjectId, alias="_id")
|
|
|
|
# Core audit fields
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
|
action: AuditAction
|
|
severity: AuditLogSeverity = AuditLogSeverity.INFO
|
|
|
|
# Actor information
|
|
user_id: Optional[PyObjectId] = None
|
|
user_email: Optional[str] = None
|
|
user_role: Optional[str] = None
|
|
|
|
# Request context
|
|
ip_address: Optional[str] = None
|
|
user_agent: Optional[str] = None
|
|
request_id: Optional[str] = None
|
|
session_id: Optional[str] = None
|
|
|
|
# Resource information
|
|
resource_type: Optional[str] = None # e.g., "job", "user", "file"
|
|
resource_id: Optional[str] = None
|
|
resource_name: Optional[str] = None
|
|
|
|
# Action details
|
|
description: str
|
|
details: Dict[str, Any] = Field(default_factory=dict)
|
|
|
|
# Outcome
|
|
success: bool = True
|
|
error_message: Optional[str] = None
|
|
|
|
# Additional metadata
|
|
environment: str = "prod"
|
|
service_name: str = "accessible-video-api"
|
|
api_version: str = "v1"
|
|
|
|
class Config:
|
|
populate_by_name = True
|
|
arbitrary_types_allowed = True
|
|
json_encoders = {ObjectId: str}
|
|
|
|
|
|
class AuditLogCreate(BaseModel):
|
|
"""Schema for creating audit log entries."""
|
|
|
|
action: AuditAction
|
|
severity: AuditLogSeverity = AuditLogSeverity.INFO
|
|
description: str
|
|
|
|
# Optional fields that can be provided
|
|
user_id: Optional[PyObjectId] = None
|
|
user_email: Optional[str] = None
|
|
user_role: Optional[str] = None
|
|
ip_address: Optional[str] = None
|
|
user_agent: Optional[str] = None
|
|
request_id: Optional[str] = None
|
|
resource_type: Optional[str] = None
|
|
resource_id: Optional[str] = None
|
|
resource_name: Optional[str] = None
|
|
details: Dict[str, Any] = Field(default_factory=dict)
|
|
success: bool = True
|
|
error_message: Optional[str] = None
|
|
|
|
|
|
class AuditLogQuery(BaseModel):
|
|
"""Schema for querying audit logs."""
|
|
|
|
# Time range
|
|
start_date: Optional[datetime] = None
|
|
end_date: Optional[datetime] = None
|
|
|
|
# Filters
|
|
action: Optional[AuditAction] = None
|
|
severity: Optional[AuditLogSeverity] = None
|
|
user_id: Optional[PyObjectId] = None
|
|
user_email: Optional[str] = None
|
|
resource_type: Optional[str] = None
|
|
resource_id: Optional[str] = None
|
|
success: Optional[bool] = None
|
|
|
|
# Search
|
|
search: Optional[str] = None # Full-text search in description and details
|
|
|
|
# Pagination
|
|
skip: int = 0
|
|
limit: int = 100
|
|
|
|
# Sorting
|
|
sort_by: str = "timestamp"
|
|
sort_order: int = -1 # -1 for descending, 1 for ascending
|
|
|
|
|
|
class AuditLogResponse(BaseModel):
|
|
"""Response schema for audit log queries."""
|
|
|
|
logs: list[AuditLog]
|
|
total_count: int
|
|
page: int
|
|
page_size: int
|
|
has_more: bool
|