modcomms/backend/app/api/routes.py
michael d21036a0de Add 4-tier RBAC backend: auth dependencies, role enforcement, agency filtering
- Add CHECK constraint migration for users.role (super_admin, oversight_admin, agency_admin, basic_user)
- Add get_current_db_user dependency resolving Azure claims to User ORM with agency
- Add require_role() factory and require_write_access() dependency
- Auto-promote dev user to super_admin when DISABLE_AUTH=true
- Add /api/me, PUT /api/users/{id}, POST /api/agencies endpoints
- Apply agency-based data filtering on campaigns, analytics, audit routes
- Block oversight_admin from all mutation routes (campaigns, proofs, flags, resolves)
- Restrict dropdown option mutations to super_admin only
- Add role check in WebSocket handler to block oversight_admin from analysis
- Add CurrentUserResponse, UserUpdate, AgencyCreate schemas

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 08:28:23 -06:00

871 lines
32 KiB
Python
Executable file

"""REST API routes for campaigns, proofs, and audit items."""
import base64
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, Form
from fastapi.responses import Response
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.schemas import (
CampaignCreate,
CampaignUpdate,
CampaignResponse,
ProofCreate,
ProofResponse,
ProofVersionResponse,
FlaggedItemCreate,
FlaggedItemResponse,
ResolvedItemCreate,
ResolvedItemResponse,
ErrorItemResponse,
AnalyticsResponse,
DropdownOptionsResponse,
AgencyResponse,
AgencyCreate,
CurrentUserResponse,
UserResponse,
UserUpdate,
SupportEmailRequest,
)
from app.dependencies.auth import (
get_current_user,
get_current_db_user,
require_role,
require_write_access,
)
from app.models.database import get_db
from app.models.models import User
from app.repositories import (
CampaignRepository,
ProofRepository,
UserRepository,
AuditRepository,
DropdownRepository,
)
from app.services.storage_service import storage_service
from app.services.email_service import email_service
from app.services.pdf_service import pdf_service
router = APIRouter()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _resolve_agency_filter(current_user: User, agency_id_param: Optional[uuid.UUID] = None) -> Optional[uuid.UUID]:
"""Determine which agency_id to filter by based on user role and query param.
- super_admin / oversight_admin: use agency_id_param if provided, else None (all)
- agency_admin / basic_user: forced to current_user.agency_id
"""
if current_user.role in ("super_admin", "oversight_admin"):
return agency_id_param # None means "all"
return current_user.agency_id # may be None (user not assigned yet)
def _check_campaign_access(current_user: User, campaign) -> None:
"""Raise 404 if the user's role restricts them and the campaign doesn't belong to their agency."""
if current_user.role in ("super_admin", "oversight_admin"):
return
if current_user.agency_id and campaign.agency_id == current_user.agency_id:
return
raise HTTPException(status_code=404, detail="Campaign not found")
# ---------------------------------------------------------------------------
# Current User endpoint
# ---------------------------------------------------------------------------
@router.get("/me", response_model=CurrentUserResponse)
async def get_me(
current_user: User = Depends(get_current_db_user),
):
"""Get the authenticated user's profile."""
return CurrentUserResponse(
id=current_user.id,
email=current_user.email,
name=current_user.name,
role=current_user.role,
agency_id=current_user.agency_id,
agency_name=current_user.agency.name if current_user.agency else None,
)
# ---------------------------------------------------------------------------
# Campaign endpoints
# ---------------------------------------------------------------------------
@router.get("/campaigns", response_model=list[CampaignResponse])
async def list_campaigns(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List campaigns, filtered by the user's role and optional agency filter."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
repo = CampaignRepository(db)
campaigns_with_counts = await repo.get_with_proof_counts(agency_id=effective_agency_id)
return [
CampaignResponse(
id=item["campaign"].id,
name=item["campaign"].name,
workfront_id=item["campaign"].workfront_id,
client_lead=item["campaign"].client_lead,
agency_lead=item["campaign"].agency_lead,
brand_guidelines=item["campaign"].brand_guidelines,
status=item["campaign"].status,
agency=item["campaign"].agency.name if item["campaign"].agency else None,
created_at=item["campaign"].created_at,
updated_at=item["campaign"].updated_at,
proofs=item["proof_count"],
)
for item in campaigns_with_counts
]
@router.post("/campaigns", response_model=CampaignResponse, status_code=201)
async def create_campaign(
data: CampaignCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Create a new campaign. Blocked for oversight_admin."""
repo = CampaignRepository(db)
# Check if campaign name already exists
existing = await repo.get_by_name(data.name)
if existing:
raise HTTPException(status_code=400, detail="Campaign with this name already exists")
campaign = await repo.create(
name=data.name,
workfront_id=data.workfront_id,
client_lead=data.client_lead,
agency_lead=data.agency_lead,
brand_guidelines=data.brand_guidelines,
agency_id=current_user.agency_id,
created_by=current_user.id,
)
return CampaignResponse(
id=campaign.id,
name=campaign.name,
workfront_id=campaign.workfront_id,
client_lead=campaign.client_lead,
agency_lead=campaign.agency_lead,
brand_guidelines=campaign.brand_guidelines,
status=campaign.status,
agency=current_user.agency.name if current_user.agency else None,
created_at=campaign.created_at,
updated_at=campaign.updated_at,
proofs=0,
)
@router.get("/campaigns/{campaign_id}", response_model=CampaignResponse)
async def get_campaign(
campaign_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Get a campaign by ID."""
repo = CampaignRepository(db)
campaign = await repo.get_by_id(campaign_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, campaign)
return CampaignResponse(
id=campaign.id,
name=campaign.name,
workfront_id=campaign.workfront_id,
client_lead=campaign.client_lead,
agency_lead=campaign.agency_lead,
brand_guidelines=campaign.brand_guidelines,
status=campaign.status,
agency=campaign.agency.name if campaign.agency else None,
created_at=campaign.created_at,
updated_at=campaign.updated_at,
proofs=len(campaign.proofs),
)
@router.put("/campaigns/{campaign_id}", response_model=CampaignResponse)
async def update_campaign(
campaign_id: uuid.UUID,
data: CampaignUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Update a campaign. Blocked for oversight_admin."""
repo = CampaignRepository(db)
# Verify campaign exists and user has access
existing = await repo.get_by_id(campaign_id)
if not existing:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, existing)
campaign = await repo.update(
campaign_id,
name=data.name,
workfront_id=data.workfront_id,
client_lead=data.client_lead,
agency_lead=data.agency_lead,
brand_guidelines=data.brand_guidelines,
status=data.status,
)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
return CampaignResponse(
id=campaign.id,
name=campaign.name,
workfront_id=campaign.workfront_id,
client_lead=campaign.client_lead,
agency_lead=campaign.agency_lead,
brand_guidelines=campaign.brand_guidelines,
status=campaign.status,
agency=campaign.agency.name if campaign.agency else None,
created_at=campaign.created_at,
updated_at=campaign.updated_at,
proofs=len(campaign.proofs) if campaign.proofs else 0,
)
@router.delete("/campaigns/{campaign_id}", status_code=204)
async def delete_campaign(
campaign_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Delete a campaign and all associated files. Blocked for oversight_admin."""
repo = CampaignRepository(db)
campaign = await repo.get_by_id(campaign_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, campaign)
# Delete files from storage for all proofs
for proof in campaign.proofs:
for version in proof.versions:
if version.file_storage_key:
await storage_service.delete_file(version.file_storage_key)
await repo.delete(campaign_id)
# ---------------------------------------------------------------------------
# Proof endpoints
# ---------------------------------------------------------------------------
@router.get("/campaigns/{campaign_id}/proofs", response_model=list[ProofResponse])
async def list_proofs(
campaign_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""List all proofs for a campaign."""
# Verify campaign access
campaign_repo = CampaignRepository(db)
campaign = await campaign_repo.get_by_id(campaign_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, campaign)
repo = ProofRepository(db)
proofs = await repo.list_by_campaign(campaign_id)
return [
ProofResponse(
id=proof.id,
proof_name=proof.proof_name,
channel=proof.channel,
sub_channel=proof.sub_channel,
proof_type=proof.proof_type,
workfront_id=proof.workfront_id,
created_at=proof.created_at,
versions=[
ProofVersionResponse(
id=v.id,
version=v.version,
file_storage_key=v.file_storage_key,
thumbnail_url=v.thumbnail_url,
agent_review=v.agent_review,
overall_status=v.overall_status,
workfront_id=v.workfront_id,
is_identical_file=v.is_identical_file,
created_at=v.created_at,
)
for v in proof.versions
],
)
for proof in proofs
]
@router.get("/proofs/{proof_id}", response_model=ProofResponse)
async def get_proof(
proof_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Get a proof by ID."""
repo = ProofRepository(db)
proof = await repo.get_by_id(proof_id)
if not proof:
raise HTTPException(status_code=404, detail="Proof not found")
return ProofResponse(
id=proof.id,
proof_name=proof.proof_name,
channel=proof.channel,
sub_channel=proof.sub_channel,
proof_type=proof.proof_type,
workfront_id=proof.workfront_id,
created_at=proof.created_at,
versions=[
ProofVersionResponse(
id=v.id,
version=v.version,
file_storage_key=v.file_storage_key,
thumbnail_url=v.thumbnail_url,
agent_review=v.agent_review,
overall_status=v.overall_status,
workfront_id=v.workfront_id,
is_identical_file=v.is_identical_file,
created_at=v.created_at,
)
for v in proof.versions
],
)
@router.delete("/proofs/{proof_id}", status_code=204)
async def delete_proof(
proof_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Delete a proof and its associated files. Blocked for oversight_admin."""
repo = ProofRepository(db)
proof = await repo.get_by_id(proof_id)
if not proof:
raise HTTPException(status_code=404, detail="Proof not found")
# Delete files from storage
for version in proof.versions:
if version.file_storage_key:
await storage_service.delete_file(version.file_storage_key)
await repo.delete(proof_id)
# ---------------------------------------------------------------------------
# Audit endpoints
# ---------------------------------------------------------------------------
@router.post("/proofs/{proof_id}/versions/{version}/flag", response_model=FlaggedItemResponse, status_code=201)
async def flag_proof_version(
proof_id: uuid.UUID,
version: int,
data: FlaggedItemCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Flag an issue on a proof version. Blocked for oversight_admin."""
proof_repo = ProofRepository(db)
audit_repo = AuditRepository(db)
proof_version = await proof_repo.get_version(proof_id, version)
if not proof_version:
raise HTTPException(status_code=404, detail="Proof version not found")
flagged = await audit_repo.create_flagged_item(
proof_version_id=proof_version.id,
agent_flagged=data.agent_flagged,
comments=data.comments,
submitter_id=current_user.id,
)
proof = await proof_repo.get_by_id(proof_id)
return FlaggedItemResponse(
id=flagged.id,
proof_version_id=flagged.proof_version_id,
agent_flagged=flagged.agent_flagged,
comments=flagged.comments,
submitter_name=current_user.name,
submitter_agency=current_user.agency.name if current_user.agency else None,
campaign_name=proof.campaign.name if proof and proof.campaign else None,
proof_name=proof.proof_name if proof else None,
version=version,
created_at=flagged.created_at,
)
@router.post("/proofs/{proof_id}/versions/{version}/resolve", response_model=ResolvedItemResponse, status_code=201)
async def resolve_proof_version(
proof_id: uuid.UUID,
version: int,
data: ResolvedItemCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Resolve an issue on a proof version. Blocked for oversight_admin."""
proof_repo = ProofRepository(db)
audit_repo = AuditRepository(db)
proof_version = await proof_repo.get_version(proof_id, version)
if not proof_version:
raise HTTPException(status_code=404, detail="Proof version not found")
resolved = await audit_repo.create_resolved_item(
proof_version_id=proof_version.id,
agent=data.agent,
issue=data.issue,
resolution=data.resolution,
submitter_id=current_user.id,
)
proof = await proof_repo.get_by_id(proof_id)
return ResolvedItemResponse(
id=resolved.id,
proof_version_id=resolved.proof_version_id,
agent=resolved.agent,
issue=resolved.issue,
resolution=resolved.resolution,
submitter_name=current_user.name,
submitter_agency=current_user.agency.name if current_user.agency else None,
campaign_name=proof.campaign.name if proof and proof.campaign else None,
proof_name=proof.proof_name if proof else None,
version=version,
created_at=resolved.created_at,
)
@router.get("/audit/flagged", response_model=list[FlaggedItemResponse])
async def list_flagged_items(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List flagged items, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
audit_repo = AuditRepository(db)
flagged_items = await audit_repo.get_flagged_items(agency_id=effective_agency_id, limit=limit, offset=offset)
return [
FlaggedItemResponse(
id=item.id,
proof_version_id=item.proof_version_id,
agent_flagged=item.agent_flagged,
comments=item.comments,
submitter_name=item.submitter.name if item.submitter else None,
submitter_agency=item.submitter.agency.name if item.submitter and item.submitter.agency else None,
campaign_name=item.proof_version.proof.campaign.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.campaign else None,
proof_name=item.proof_version.proof.proof_name if item.proof_version and item.proof_version.proof else None,
version=item.proof_version.version if item.proof_version else None,
created_at=item.created_at,
)
for item in flagged_items
]
@router.get("/audit/resolved", response_model=list[ResolvedItemResponse])
async def list_resolved_items(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List resolved items, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
audit_repo = AuditRepository(db)
resolved_items = await audit_repo.get_resolved_items(agency_id=effective_agency_id, limit=limit, offset=offset)
return [
ResolvedItemResponse(
id=item.id,
proof_version_id=item.proof_version_id,
agent=item.agent,
issue=item.issue,
resolution=item.resolution,
submitter_name=item.submitter.name if item.submitter else None,
submitter_agency=item.submitter.agency.name if item.submitter and item.submitter.agency else None,
campaign_name=item.proof_version.proof.campaign.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.campaign else None,
proof_name=item.proof_version.proof.proof_name if item.proof_version and item.proof_version.proof else None,
version=item.proof_version.version if item.proof_version else None,
created_at=item.created_at,
)
for item in resolved_items
]
@router.get("/audit/errors", response_model=list[ErrorItemResponse])
async def list_error_items(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List error items, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
audit_repo = AuditRepository(db)
error_items = await audit_repo.get_error_items(agency_id=effective_agency_id, limit=limit, offset=offset)
return [
ErrorItemResponse(
id=item.id,
proof_version_id=item.proof_version_id,
error_summary=item.error_summary,
submitter_name=item.proof_version.proof.created_by_user.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.created_by_user else None,
submitter_agency=item.proof_version.proof.created_by_user.agency.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.created_by_user and item.proof_version.proof.created_by_user.agency else None,
campaign_name=item.proof_version.proof.campaign.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.campaign else None,
proof_name=item.proof_version.proof.proof_name if item.proof_version and item.proof_version.proof else None,
version=item.proof_version.version if item.proof_version else None,
created_at=item.created_at,
)
for item in error_items
]
# ---------------------------------------------------------------------------
# Analytics endpoint
# ---------------------------------------------------------------------------
@router.get("/analytics", response_model=AnalyticsResponse)
async def get_analytics(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
):
"""Get analytics data, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
repo = CampaignRepository(db)
analytics = await repo.get_analytics(agency_id=effective_agency_id)
return AnalyticsResponse(**analytics)
# ---------------------------------------------------------------------------
# User Management endpoints
# ---------------------------------------------------------------------------
@router.get("/users", response_model=list[UserResponse])
async def list_users(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""List all users (super_admin only)."""
user_repo = UserRepository(db)
users = await user_repo.list_all()
return [
UserResponse(
id=u.id,
email=u.email,
name=u.name,
role=u.role,
agency=u.agency.name if u.agency else None,
agency_id=u.agency_id,
created_at=u.created_at,
)
for u in users
]
@router.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: uuid.UUID,
data: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Update a user's role and/or agency (super_admin only)."""
user_repo = UserRepository(db)
# Build kwargs, using sentinel to distinguish "not provided" from None
kwargs: dict = {}
if data.role is not None:
valid_roles = ("super_admin", "oversight_admin", "agency_admin", "basic_user")
if data.role not in valid_roles:
raise HTTPException(status_code=400, detail=f"Invalid role. Must be one of: {', '.join(valid_roles)}")
kwargs["role"] = data.role
# agency_id can be explicitly set to None (unassign) or a UUID
if "agency_id" in (data.model_fields_set or set()):
kwargs["agency_id"] = data.agency_id
else:
kwargs["agency_id"] = ... # sentinel: "not provided"
user = await user_repo.update_user(user_id, **kwargs)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse(
id=user.id,
email=user.email,
name=user.name,
role=user.role,
agency=user.agency.name if user.agency else None,
agency_id=user.agency_id,
created_at=user.created_at,
)
# ---------------------------------------------------------------------------
# Agency endpoints
# ---------------------------------------------------------------------------
@router.get("/agencies", response_model=list[AgencyResponse])
async def list_agencies(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""List all agencies."""
from sqlalchemy import select
from app.models.models import Agency
stmt = select(Agency).order_by(Agency.name)
result = await db.execute(stmt)
agencies = result.scalars().all()
return [AgencyResponse(id=a.id, name=a.name) for a in agencies]
@router.post("/agencies", response_model=AgencyResponse, status_code=201)
async def create_agency(
data: AgencyCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Create a new agency (super_admin only)."""
user_repo = UserRepository(db)
# Check for duplicate name
existing = await user_repo.get_or_create_agency(data.name)
# get_or_create returns existing — check if it was just created
# A simpler approach: try to create and catch unique violation
from sqlalchemy import select
from app.models.models import Agency
result = await db.execute(select(Agency).where(Agency.name == data.name))
agency = result.scalar_one_or_none()
if agency:
return AgencyResponse(id=agency.id, name=agency.name)
agency = await user_repo.create_agency(data.name)
return AgencyResponse(id=agency.id, name=agency.name)
# ---------------------------------------------------------------------------
# Dropdown options endpoints
# ---------------------------------------------------------------------------
@router.get("/dropdown-options", response_model=DropdownOptionsResponse)
async def get_dropdown_options(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Get all dropdown options as hierarchical structure."""
import logging
logger = logging.getLogger(__name__)
repo = DropdownRepository(db)
options = await repo.get_all_hierarchical()
channels = options.get("channels", {})
social = channels.get("Social", {})
meta_proof_types = social.get("Meta", [])
logger.info(f"[DEBUG API] Returning dropdown options - Social.Meta has {len(meta_proof_types)} proof types: {meta_proof_types}")
return DropdownOptionsResponse(**options)
@router.post("/dropdown-options/channels", status_code=201)
async def add_channel(
name: str = Query(..., description="Channel name"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Add a new channel (super_admin only)."""
repo = DropdownRepository(db)
await repo.add_channel(name)
await db.commit()
return {"message": f"Channel '{name}' added successfully"}
@router.post("/dropdown-options/channels/{channel}/sub-channels", status_code=201)
async def add_sub_channel(
channel: str,
name: str = Query(..., description="Sub-channel name"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Add a sub-channel under a channel (super_admin only)."""
repo = DropdownRepository(db)
result = await repo.add_sub_channel(channel, name)
if not result:
raise HTTPException(status_code=404, detail=f"Channel '{channel}' not found")
await db.commit()
return {"message": f"Sub-channel '{name}' added to '{channel}'"}
@router.post("/dropdown-options/channels/{channel}/sub-channels/{sub_channel}/proof-types", status_code=201)
async def add_proof_type(
channel: str,
sub_channel: str,
name: str = Query(..., description="Proof type name"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Add a proof type under a sub-channel (super_admin only)."""
repo = DropdownRepository(db)
result = await repo.add_proof_type(channel, sub_channel, name)
if not result:
raise HTTPException(status_code=404, detail=f"Channel '{channel}' or sub-channel '{sub_channel}' not found")
await db.commit()
return {"message": f"Proof type '{name}' added to '{channel}/{sub_channel}'"}
@router.delete("/dropdown-options/channels/{channel}", status_code=204)
async def delete_channel(
channel: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Delete a channel and all its sub-channels and proof types (super_admin only)."""
repo = DropdownRepository(db)
success = await repo.remove_channel(channel)
if not success:
raise HTTPException(status_code=404, detail=f"Channel '{channel}' not found")
await db.commit()
@router.delete("/dropdown-options/channels/{channel}/sub-channels/{sub_channel}", status_code=204)
async def delete_sub_channel(
channel: str,
sub_channel: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Delete a sub-channel and all its proof types (super_admin only)."""
repo = DropdownRepository(db)
success = await repo.remove_sub_channel(channel, sub_channel)
if not success:
raise HTTPException(status_code=404, detail=f"Sub-channel '{sub_channel}' not found in channel '{channel}'")
await db.commit()
@router.delete("/dropdown-options/channels/{channel}/sub-channels/{sub_channel}/proof-types/{proof_type}", status_code=204)
async def delete_proof_type(
channel: str,
sub_channel: str,
proof_type: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Delete a proof type (super_admin only)."""
repo = DropdownRepository(db)
success = await repo.remove_proof_type(channel, sub_channel, proof_type)
if not success:
raise HTTPException(status_code=404, detail=f"Proof type '{proof_type}' not found")
await db.commit()
# ---------------------------------------------------------------------------
# File endpoints
# ---------------------------------------------------------------------------
# PDF pages endpoint (must be defined BEFORE the base file endpoint for correct routing)
@router.get("/files/{storage_key:path}/pages")
async def get_pdf_pages(
storage_key: str,
max_pages: int = Query(10, ge=1, le=50),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Rasterize a stored PDF and return pages as data URLs."""
if not storage_key.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="File is not a PDF")
file_data = await storage_service.get_file(storage_key)
if file_data is None:
raise HTTPException(status_code=404, detail="File not found")
try:
pages = pdf_service.rasterize(file_data, max_pages=max_pages)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"pages": [
{
"page": i + 1,
"data_url": f"data:image/png;base64,{base64.b64encode(png_data).decode('utf-8')}",
"width": width,
"height": height,
}
for i, (png_data, width, height) in enumerate(pages)
]
}
# File download endpoint
@router.get("/files/{storage_key:path}")
async def get_file(
storage_key: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Retrieve a stored file by its storage key."""
file_data = await storage_service.get_file(storage_key)
if file_data is None:
raise HTTPException(status_code=404, detail="File not found")
extension = storage_key.split('.')[-1].lower() if '.' in storage_key else ''
content_types = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'webp': 'image/webp',
'svg': 'image/svg+xml',
'pdf': 'application/pdf',
}
return Response(
content=file_data,
media_type=content_types.get(extension, 'application/octet-stream'),
)
# ---------------------------------------------------------------------------
# Support email endpoint (public - no auth required for login page access)
# ---------------------------------------------------------------------------
@router.post("/support/email")
async def send_support_email(
data: SupportEmailRequest,
):
"""Send support email - no auth required (for login page)."""
success = await email_service.send_support_email(
message=data.message,
subject=data.subject,
user_name=data.user_name,
user_email=data.user_email,
)
if not success:
raise HTTPException(status_code=500, detail="Failed to send email")
return {"success": True, "message": "Email sent successfully"}