amazon-transcreation/backend/app/services/audit_service.py
DJP 98fa16bfc3 feat: complete Phase 1-2 scaffold — backend, frontend, pipeline skeleton
Full-stack Amazon AI Transcreation Platform with:
- FastAPI backend (async, PostgreSQL, Redis, Celery) with 11 DB tables
- JWT auth (SSO-ready abstract provider pattern)
- 6-agent pipeline orchestrator with deterministic modules
- Next.js 14 frontend with Amazon branding (Ember fonts, orange/dark theme)
- Job wizard, monitoring HUD, output review, admin screens
- 154 TM/reference files imported, 12 locales configured
- Docker Compose for all services

Agents 2-5 (TM retrieval, ranker, transcreator, compliance) are stubs
pending Phase 3 LLM integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-10 12:31:43 -04:00

77 lines
2.3 KiB
Python

from datetime import datetime
from typing import Any
from uuid import UUID
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.audit import AuditLog
class AuditService:
"""Service for audit log creation and retrieval."""
async def log(
self,
db: AsyncSession,
action: str,
entity_type: str,
entity_id: str,
user_id: UUID | None = None,
details: dict[str, Any] | None = None,
ip_address: str | None = None,
) -> AuditLog:
"""Create an audit log entry."""
entry = AuditLog(
user_id=user_id,
action=action,
entity_type=entity_type,
entity_id=entity_id,
details=details,
ip_address=ip_address,
)
db.add(entry)
await db.flush()
return entry
async def list_logs(
self,
db: AsyncSession,
user_id: UUID | None = None,
action: str | None = None,
entity_type: str | None = None,
entity_id: str | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
page: int = 1,
page_size: int = 50,
) -> tuple[list[AuditLog], int]:
"""List audit logs with filters and pagination."""
query = select(AuditLog)
if user_id:
query = query.where(AuditLog.user_id == user_id)
if action:
query = query.where(AuditLog.action == action)
if entity_type:
query = query.where(AuditLog.entity_type == entity_type)
if entity_id:
query = query.where(AuditLog.entity_id == entity_id)
if date_from:
query = query.where(AuditLog.timestamp >= date_from)
if date_to:
query = query.where(AuditLog.timestamp <= date_to)
# Count
count_query = select(func.count()).select_from(query.subquery())
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# Paginate
query = query.order_by(AuditLog.timestamp.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
logs = list(result.scalars().all())
return logs, total