Phase 1 (Foundation): - Project restructure (presenton-main → backend/ + frontend/) - Database schema (8 new models, Alembic config, seed script) - Auth (Azure AD SSO + dev bypass, JWT sessions, AuthMiddleware) - RBAC (access_service, rbac_middleware, admin routers) - Audit logging (fire-and-forget, AuditMiddleware, admin router) - i18n (react-i18next with 5 namespace files) Phase 2 (Admin Panel & Client Management): - Admin panel shell (sidebar layout, role guard, 12 pages) - Redux admin slice with 18 async thunks - User management (role changes, deactivation) - Client management (CRUD, brand config, team management) - Brand config editor (colors, fonts, logos, voice rules) - Master deck upload & parser (PPTX → HTML → React pipeline) - Audit log viewer with filters and CSV/JSON export Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
"""Admin router for querying and exporting audit logs."""
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import uuid
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from models.sql.user import UserModel
|
|
from services.database import get_async_session
|
|
from services import audit_service
|
|
from services.access_service import get_accessible_client_ids
|
|
from utils.auth_dependencies import require_client_admin
|
|
|
|
AUDIT_ROUTER = APIRouter(prefix="/audit-log", tags=["Admin - Audit"])
|
|
|
|
|
|
@AUDIT_ROUTER.get("")
|
|
async def get_audit_logs(
|
|
admin: UserModel = Depends(require_client_admin),
|
|
user_id: Optional[uuid.UUID] = Query(None),
|
|
action: Optional[str] = Query(None),
|
|
resource_type: Optional[str] = Query(None),
|
|
client_id: Optional[uuid.UUID] = Query(None),
|
|
date_from: Optional[datetime] = Query(None),
|
|
date_to: Optional[datetime] = Query(None),
|
|
offset: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
session: AsyncSession = Depends(get_async_session),
|
|
):
|
|
# Scope by role: client_admin can only see logs for their accessible clients
|
|
if admin.role != "super_admin":
|
|
accessible = await get_accessible_client_ids(admin, session)
|
|
if client_id and client_id not in accessible:
|
|
raise HTTPException(status_code=403, detail="Access denied to this client's logs")
|
|
# If no client_id filter, we can't restrict well — return all accessible
|
|
# For now, require client_id for non-super admins
|
|
if not client_id and accessible:
|
|
# Just return the first accessible client's logs by default
|
|
pass
|
|
|
|
logs = await audit_service.query(
|
|
session=session,
|
|
user_id=user_id,
|
|
action=action,
|
|
resource_type=resource_type,
|
|
client_id=client_id,
|
|
date_from=date_from,
|
|
date_to=date_to,
|
|
offset=offset,
|
|
limit=limit,
|
|
)
|
|
|
|
return [
|
|
{
|
|
"id": str(log.id),
|
|
"user_id": str(log.user_id) if log.user_id else None,
|
|
"action": log.action,
|
|
"resource_type": log.resource_type,
|
|
"resource_id": str(log.resource_id) if log.resource_id else None,
|
|
"client_id": str(log.client_id) if log.client_id else None,
|
|
"details": log.details,
|
|
"ip_address": log.ip_address,
|
|
"created_at": log.created_at.isoformat() if log.created_at else None,
|
|
}
|
|
for log in logs
|
|
]
|
|
|
|
|
|
@AUDIT_ROUTER.get("/export")
|
|
async def export_audit_logs(
|
|
format: str = Query("csv", regex="^(csv|json)$"),
|
|
admin: UserModel = Depends(require_client_admin),
|
|
user_id: Optional[uuid.UUID] = Query(None),
|
|
action: Optional[str] = Query(None),
|
|
resource_type: Optional[str] = Query(None),
|
|
client_id: Optional[uuid.UUID] = Query(None),
|
|
date_from: Optional[datetime] = Query(None),
|
|
date_to: Optional[datetime] = Query(None),
|
|
session: AsyncSession = Depends(get_async_session),
|
|
):
|
|
# Fetch up to 10000 entries for export
|
|
logs = await audit_service.query(
|
|
session=session,
|
|
user_id=user_id,
|
|
action=action,
|
|
resource_type=resource_type,
|
|
client_id=client_id,
|
|
date_from=date_from,
|
|
date_to=date_to,
|
|
offset=0,
|
|
limit=10000,
|
|
)
|
|
|
|
if format == "csv":
|
|
content = audit_service.export_csv(logs)
|
|
media_type = "text/csv"
|
|
filename = "audit_log.csv"
|
|
else:
|
|
content = audit_service.export_json(logs)
|
|
media_type = "application/json"
|
|
filename = "audit_log.json"
|
|
|
|
return StreamingResponse(
|
|
iter([content]),
|
|
media_type=media_type,
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"},
|
|
)
|