241 lines
7.1 KiB
Python
241 lines
7.1 KiB
Python
"""Audit log model for tracking sensitive operations."""
|
|
|
|
from datetime import datetime
|
|
from enum import StrEnum
|
|
from typing import Any
|
|
|
|
from bson import ObjectId
|
|
from pydantic import BaseModel, Field
|
|
|
|
from .user import PyObjectId
|
|
|
|
|
|
class AuditAction(StrEnum):
|
|
"""Enumeration of auditable actions."""
|
|
|
|
# Authentication actions
|
|
LOGIN_SUCCESS = "auth.login.success"
|
|
LOGIN_FAILURE = "auth.login.failure"
|
|
LOGOUT = "auth.logout"
|
|
TOKEN_REFRESH = "auth.token.refresh"
|
|
PASSWORD_CHANGE = "auth.password.change"
|
|
PASSWORD_RESET = "auth.password.reset"
|
|
|
|
# User management actions
|
|
USER_CREATE = "user.create"
|
|
USER_UPDATE = "user.update"
|
|
USER_DELETE = "user.delete"
|
|
USER_ROLE_CHANGE = "user.role.change"
|
|
USER_ACTIVATE = "user.activate"
|
|
USER_DEACTIVATE = "user.deactivate"
|
|
|
|
# Job management actions
|
|
JOB_CREATE = "job.create"
|
|
JOB_UPDATE = "job.update"
|
|
JOB_DELETE = "job.delete"
|
|
JOB_APPROVE = "job.approve"
|
|
JOB_REJECT = "job.reject"
|
|
JOB_CANCEL = "job.cancel"
|
|
JOB_STATUS_CHANGE = "job.status.change"
|
|
JOB_TASK_FAILED = "job.task.failed"
|
|
JOB_RETRY = "job.retry"
|
|
JOB_BULK_RETRY = "job.bulk_retry"
|
|
|
|
# File operations
|
|
FILE_UPLOAD = "file.upload"
|
|
FILE_DOWNLOAD = "file.download"
|
|
FILE_DELETE = "file.delete"
|
|
FILE_ACCESS = "file.access"
|
|
|
|
# VTT editing actions
|
|
VTT_EDIT = "vtt.edit"
|
|
VTT_APPROVE = "vtt.approve"
|
|
VTT_REJECT = "vtt.reject"
|
|
VTT_RETRANSLATE = "vtt.retranslate"
|
|
|
|
# Per-language QC actions
|
|
LANGUAGE_QC_ASSIGN = "language_qc.assign"
|
|
LANGUAGE_QC_REASSIGN = "language_qc.reassign"
|
|
LANGUAGE_QC_REVIEWER_ASSIGN = "language_qc.reviewer_assign"
|
|
LANGUAGE_QC_REVIEWER_REASSIGN = "language_qc.reviewer_reassign"
|
|
LANGUAGE_QC_SUBMIT = "language_qc.submit"
|
|
LANGUAGE_QC_OPEN_REVIEW = "language_qc.open_review"
|
|
LANGUAGE_QC_APPROVE = "language_qc.approve"
|
|
LANGUAGE_QC_REJECT = "language_qc.reject"
|
|
LANGUAGE_QC_REOPEN = "language_qc.reopen"
|
|
LANGUAGE_QC_COMMENT = "language_qc.comment"
|
|
|
|
# Admin actions
|
|
ADMIN_CONFIG_CHANGE = "admin.config.change"
|
|
ADMIN_SYSTEM_ACTION = "admin.system.action"
|
|
ADMIN_DATA_EXPORT = "admin.data.export"
|
|
ADMIN_AUDIT_ACCESS = "admin.audit.access"
|
|
|
|
# Glossary management
|
|
GLOSSARY_UPLOAD = "glossary.upload"
|
|
GLOSSARY_VERSION_UPLOAD = "glossary.version.upload"
|
|
GLOSSARY_ACTIVATE = "glossary.activate"
|
|
GLOSSARY_ARCHIVE = "glossary.archive"
|
|
|
|
# Client management
|
|
CLIENT_CREATE = "client.create"
|
|
CLIENT_UPDATE = "client.update"
|
|
CLIENT_DEACTIVATE = "client.deactivate"
|
|
CLIENT_PM_ASSIGN = "client.pm_assign"
|
|
CLIENT_PM_REMOVE = "client.pm_remove"
|
|
CLIENT_TEAM_CREATE = "client.team_create"
|
|
CLIENT_TEAM_UPDATE = "client.team_update"
|
|
CLIENT_TEAM_DELETE = "client.team_delete"
|
|
CLIENT_TEAM_MEMBER_ADD = "client.team_member_add"
|
|
CLIENT_TEAM_MEMBER_REMOVE = "client.team_member_remove"
|
|
CLIENT_PROJECT_CREATE = "client.project_create"
|
|
CLIENT_PROJECT_UPDATE = "client.project_update"
|
|
CLIENT_PROJECT_ARCHIVE = "client.project_archive"
|
|
|
|
# Organization management
|
|
ORG_CREATE = "org.create"
|
|
ORG_UPDATE = "org.update"
|
|
ORG_MEMBER_ADD = "org.member_add"
|
|
ORG_MEMBER_UPDATE = "org.member_update"
|
|
ORG_MEMBER_REMOVE = "org.member_remove"
|
|
|
|
# Invitations
|
|
INVITATION_CREATE = "invitation.create"
|
|
INVITATION_REVOKE = "invitation.revoke"
|
|
INVITATION_ACCEPT = "invitation.accept"
|
|
|
|
# Language QC (additional)
|
|
LANGUAGE_QC_BULK_ASSIGN = "language_qc.bulk_assign"
|
|
LANGUAGE_QC_START_WORK = "language_qc.start_work"
|
|
LANGUAGE_QC_MARK_CUE_REVIEWED = "language_qc.mark_cue_reviewed"
|
|
|
|
# Brief management
|
|
BRIEF_CREATE = "brief.create"
|
|
BRIEF_UPDATE = "brief.update"
|
|
BRIEF_SUBMIT = "brief.submit"
|
|
BRIEF_APPROVE = "brief.approve"
|
|
|
|
# Share tokens
|
|
SHARE_TOKEN_CREATE = "share.token_create"
|
|
SHARE_TOKEN_REVOKE = "share.token_revoke"
|
|
SHARE_CLIENT_DECISION = "share.client_decision"
|
|
|
|
# Security events
|
|
RATE_LIMIT_EXCEEDED = "security.rate_limit.exceeded"
|
|
VALIDATION_FAILURE = "security.validation.failure"
|
|
UNAUTHORIZED_ACCESS = "security.unauthorized.access"
|
|
SUSPICIOUS_ACTIVITY = "security.suspicious.activity"
|
|
|
|
|
|
class AuditLogSeverity(StrEnum):
|
|
"""Severity levels for audit events."""
|
|
|
|
INFO = "info" # Normal operations
|
|
WARNING = "warning" # Suspicious but not critical
|
|
ERROR = "error" # Failed operations
|
|
CRITICAL = "critical" # Security incidents
|
|
|
|
|
|
class AuditLog(BaseModel):
|
|
"""Audit log entry model."""
|
|
|
|
id: PyObjectId | None = Field(default_factory=lambda: str(ObjectId()), alias="_id")
|
|
|
|
# Core audit fields
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
|
action: AuditAction
|
|
severity: AuditLogSeverity = AuditLogSeverity.INFO
|
|
|
|
# Actor information
|
|
user_id: PyObjectId | None = None
|
|
user_email: str | None = None
|
|
user_role: str | None = None
|
|
|
|
# Request context
|
|
ip_address: str | None = None
|
|
user_agent: str | None = None
|
|
request_id: str | None = None
|
|
session_id: str | None = None
|
|
|
|
# Resource information
|
|
resource_type: str | None = None # e.g., "job", "user", "file"
|
|
resource_id: str | None = None
|
|
resource_name: str | None = None
|
|
|
|
# Action details
|
|
description: str
|
|
details: dict[str, Any] = Field(default_factory=dict)
|
|
|
|
# Outcome
|
|
success: bool = True
|
|
error_message: str | None = None
|
|
|
|
# Additional metadata
|
|
environment: str = "prod"
|
|
service_name: str = "accessible-video-api"
|
|
api_version: str = "v1"
|
|
|
|
class Config:
|
|
populate_by_name = True
|
|
arbitrary_types_allowed = True
|
|
json_encoders = {ObjectId: str}
|
|
|
|
|
|
class AuditLogCreate(BaseModel):
|
|
"""Schema for creating audit log entries."""
|
|
|
|
action: AuditAction
|
|
severity: AuditLogSeverity = AuditLogSeverity.INFO
|
|
description: str
|
|
|
|
# Optional fields that can be provided
|
|
user_id: PyObjectId | None = None
|
|
user_email: str | None = None
|
|
user_role: str | None = None
|
|
ip_address: str | None = None
|
|
user_agent: str | None = None
|
|
request_id: str | None = None
|
|
resource_type: str | None = None
|
|
resource_id: str | None = None
|
|
resource_name: str | None = None
|
|
details: dict[str, Any] = Field(default_factory=dict)
|
|
success: bool = True
|
|
error_message: str | None = None
|
|
|
|
|
|
class AuditLogQuery(BaseModel):
|
|
"""Schema for querying audit logs."""
|
|
|
|
# Time range
|
|
start_date: datetime | None = None
|
|
end_date: datetime | None = None
|
|
|
|
# Filters
|
|
action: AuditAction | None = None
|
|
severity: AuditLogSeverity | None = None
|
|
user_id: PyObjectId | None = None
|
|
user_email: str | None = None
|
|
resource_type: str | None = None
|
|
resource_id: str | None = None
|
|
success: bool | None = None
|
|
|
|
# Search
|
|
search: str | None = None # Full-text search in description and details
|
|
|
|
# Pagination
|
|
skip: int = 0
|
|
limit: int = 100
|
|
|
|
# Sorting
|
|
sort_by: str = "timestamp"
|
|
sort_order: int = -1 # -1 for descending, 1 for ascending
|
|
|
|
|
|
class AuditLogResponse(BaseModel):
|
|
"""Response schema for audit log queries."""
|
|
|
|
logs: list[AuditLog]
|
|
total_count: int
|
|
page: int
|
|
page_size: int
|
|
has_more: bool
|