video-accessibility/backend/app/services/audit_logger.py
2025-08-24 16:28:33 -05:00

331 lines
No EOL
11 KiB
Python

"""Audit logging service for tracking sensitive operations."""
import uuid
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from fastapi import Request
from motor.motor_asyncio import AsyncIOMotorCollection
from app.core.database import get_database
from app.core.config import get_settings
from app.models.audit_log import (
AuditLog,
AuditLogCreate,
AuditLogQuery,
AuditLogResponse,
AuditAction,
AuditLogSeverity
)
from app.models.user import User
from app.telemetry.tracing import trace_async_operation
class AuditLogger:
"""Service for managing audit logs."""
def __init__(self):
self.settings = get_settings()
self.collection: Optional[AsyncIOMotorCollection] = None
async def _get_collection(self) -> AsyncIOMotorCollection:
"""Get the audit logs collection."""
if not self.collection:
db = await get_database()
self.collection = db.audit_logs
return self.collection
@trace_async_operation("audit_logger.log_action")
async def log_action(
self,
action: AuditAction,
description: str,
user: Optional[User] = None,
request: Optional[Request] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
resource_name: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
severity: AuditLogSeverity = AuditLogSeverity.INFO,
success: bool = True,
error_message: Optional[str] = None
) -> str:
"""
Log an audit event.
Returns:
The ID of the created audit log entry.
"""
# Extract request context
ip_address = None
user_agent = None
request_id = None
if request:
# Get IP address (handle forwarded headers)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
ip_address = forwarded_for.split(',')[0].strip()
elif request.client:
ip_address = request.client.host
user_agent = request.headers.get("User-Agent")
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
# Create audit log entry
audit_log = AuditLog(
action=action,
severity=severity,
description=description,
user_id=user.id if user else None,
user_email=user.email if user else None,
user_role=user.role.value if user else None,
ip_address=ip_address,
user_agent=user_agent,
request_id=request_id,
resource_type=resource_type,
resource_id=resource_id,
resource_name=resource_name,
details=details or {},
success=success,
error_message=error_message,
environment=self.settings.app_env,
service_name="accessible-video-api",
api_version="v1"
)
# Save to database
collection = await self._get_collection()
result = await collection.insert_one(audit_log.dict(by_alias=True))
return str(result.inserted_id)
@trace_async_operation("audit_logger.query_logs")
async def query_logs(self, query: AuditLogQuery) -> AuditLogResponse:
"""Query audit logs with filtering and pagination."""
collection = await self._get_collection()
# Build MongoDB query
mongo_query = {}
# Time range filter
if query.start_date or query.end_date:
timestamp_filter = {}
if query.start_date:
timestamp_filter["$gte"] = query.start_date
if query.end_date:
timestamp_filter["$lte"] = query.end_date
mongo_query["timestamp"] = timestamp_filter
# Exact match filters
if query.action:
mongo_query["action"] = query.action
if query.severity:
mongo_query["severity"] = query.severity
if query.user_id:
mongo_query["user_id"] = query.user_id
if query.user_email:
mongo_query["user_email"] = query.user_email
if query.resource_type:
mongo_query["resource_type"] = query.resource_type
if query.resource_id:
mongo_query["resource_id"] = query.resource_id
if query.success is not None:
mongo_query["success"] = query.success
# Text search
if query.search:
mongo_query["$or"] = [
{"description": {"$regex": query.search, "$options": "i"}},
{"details": {"$regex": query.search, "$options": "i"}},
{"error_message": {"$regex": query.search, "$options": "i"}}
]
# Get total count
total_count = await collection.count_documents(mongo_query)
# Execute query with pagination and sorting
cursor = collection.find(mongo_query)
# Apply sorting
sort_direction = query.sort_order
cursor = cursor.sort(query.sort_by, sort_direction)
# Apply pagination
cursor = cursor.skip(query.skip).limit(query.limit)
# Execute query
documents = await cursor.to_list(length=query.limit)
# Convert to Pydantic models
logs = []
for doc in documents:
try:
logs.append(AuditLog(**doc))
except Exception as e:
# Log conversion error but continue
print(f"Error converting audit log document: {e}")
continue
# Calculate pagination info
page = (query.skip // query.limit) + 1
has_more = (query.skip + len(logs)) < total_count
return AuditLogResponse(
logs=logs,
total_count=total_count,
page=page,
page_size=len(logs),
has_more=has_more
)
async def get_user_activity(self, user_id: str, days: int = 30) -> List[AuditLog]:
"""Get recent activity for a specific user."""
from_date = datetime.utcnow().replace(
hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=days)
query = AuditLogQuery(
user_id=user_id,
start_date=from_date,
limit=1000,
sort_by="timestamp",
sort_order=-1
)
response = await self.query_logs(query)
return response.logs
async def get_security_events(self, hours: int = 24) -> List[AuditLog]:
"""Get recent security-related events."""
from_date = datetime.utcnow() - timedelta(hours=hours)
security_actions = [
AuditAction.LOGIN_FAILURE,
AuditAction.RATE_LIMIT_EXCEEDED,
AuditAction.VALIDATION_FAILURE,
AuditAction.UNAUTHORIZED_ACCESS,
AuditAction.SUSPICIOUS_ACTIVITY
]
collection = await self._get_collection()
query = {
"timestamp": {"$gte": from_date},
"action": {"$in": security_actions}
}
cursor = collection.find(query).sort("timestamp", -1).limit(1000)
documents = await cursor.to_list(length=1000)
logs = []
for doc in documents:
try:
logs.append(AuditLog(**doc))
except Exception:
continue
return logs
async def cleanup_old_logs(self, retention_days: int = 365) -> int:
"""Clean up audit logs older than retention period."""
cutoff_date = datetime.utcnow().replace(
hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=retention_days)
collection = await self._get_collection()
result = await collection.delete_many({
"timestamp": {"$lt": cutoff_date}
})
return result.deleted_count
# Global audit logger instance
audit_logger = AuditLogger()
# Convenience functions for common audit operations
async def log_auth_success(user: User, request: Request):
"""Log successful authentication."""
await audit_logger.log_action(
action=AuditAction.LOGIN_SUCCESS,
description=f"User {user.email} logged in successfully",
user=user,
request=request,
severity=AuditLogSeverity.INFO
)
async def log_auth_failure(email: str, request: Request, reason: str):
"""Log failed authentication attempt."""
await audit_logger.log_action(
action=AuditAction.LOGIN_FAILURE,
description=f"Failed login attempt for {email}: {reason}",
request=request,
severity=AuditLogSeverity.WARNING,
success=False,
error_message=reason,
details={"attempted_email": email}
)
async def log_job_action(action: AuditAction, job_id: str, user: User, request: Request, details: Optional[Dict] = None):
"""Log job-related actions."""
action_descriptions = {
AuditAction.JOB_CREATE: "Job created",
AuditAction.JOB_APPROVE: "Job approved",
AuditAction.JOB_REJECT: "Job rejected",
AuditAction.JOB_CANCEL: "Job cancelled",
AuditAction.JOB_UPDATE: "Job updated"
}
await audit_logger.log_action(
action=action,
description=f"{action_descriptions.get(action, str(action))} by {user.email}",
user=user,
request=request,
resource_type="job",
resource_id=job_id,
details=details
)
async def log_user_management(action: AuditAction, target_user_id: str, admin_user: User, request: Request, details: Optional[Dict] = None):
"""Log user management actions."""
action_descriptions = {
AuditAction.USER_CREATE: "User created",
AuditAction.USER_UPDATE: "User updated",
AuditAction.USER_DELETE: "User deleted",
AuditAction.USER_ROLE_CHANGE: "User role changed",
AuditAction.USER_ACTIVATE: "User activated",
AuditAction.USER_DEACTIVATE: "User deactivated"
}
await audit_logger.log_action(
action=action,
description=f"{action_descriptions.get(action, str(action))} by admin {admin_user.email}",
user=admin_user,
request=request,
resource_type="user",
resource_id=target_user_id,
details=details,
severity=AuditLogSeverity.INFO
)
async def log_security_event(action: AuditAction, description: str, request: Request, user: Optional[User] = None, details: Optional[Dict] = None):
"""Log security-related events."""
await audit_logger.log_action(
action=action,
description=description,
user=user,
request=request,
severity=AuditLogSeverity.WARNING if action != AuditAction.SUSPICIOUS_ACTIVITY else AuditLogSeverity.CRITICAL,
success=False,
details=details
)