modcomms/backend/app/api/routes.py
Vadym Samoilenko 447c4b2a95 Add CSV export of campaign data for super_admin and oversight_admin
Adds a server-side CSV export covering all campaign, proof, and version
data including agent RAG statuses. The export respects the active agency
filter so oversight admins can scope the download to a single agency.

- backend: `CampaignRepository.get_export_rows()` — flat join across
  Campaign → Proof → ProofVersion with Agency and User, extracts agent
  RAG statuses from the `agent_review` JSONB column
- backend: `GET /api/export/campaigns-csv` endpoint gated to
  super_admin / oversight_admin, streams a dated CSV file
- frontend: `apiService.downloadCampaignsCsv(agencyId?)` — fetches blob
  and triggers browser download
- frontend: threads `selectedAgencyId` prop from App → Campaigns →
  CampaignList so the export uses the active filter
- frontend: Export CSV button in CampaignList header, visible only to
  super_admin / oversight_admin, with spinner while downloading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-19 11:35:24 +00:00

1020 lines
37 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""REST API routes for campaigns, proofs, and audit items."""
import base64
import csv
import io
import uuid
from datetime import date
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, Form
from fastapi.responses import Response, StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.schemas import (
CampaignCreate,
CampaignUpdate,
CampaignResponse,
ProofCreate,
ProofResponse,
ProofVersionResponse,
FlaggedItemCreate,
FlaggedItemResponse,
ResolvedItemCreate,
ResolvedItemResponse,
ErrorItemResponse,
AnalyticsResponse,
AgencyAnalyticsItem,
AgencyAnalyticsResponse,
DropdownOptionsResponse,
AgencyResponse,
AgencyCreate,
CurrentUserResponse,
UserResponse,
UserUpdate,
UserChangeLogResponse,
SupportEmailRequest,
)
from app.dependencies.auth import (
get_current_user,
get_current_db_user,
require_role,
require_write_access,
)
from app.models.database import get_db
from app.models.models import User
from app.repositories import (
CampaignRepository,
ProofRepository,
UserRepository,
AuditRepository,
DropdownRepository,
)
from app.services.storage_service import storage_service
from app.services.email_service import email_service
from app.services.pdf_service import pdf_service
router = APIRouter()
# Sentinel: distinguishes "no filter (admins see all)" from "user has no agency"
_NO_AGENCY = object()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _resolve_agency_filter(current_user: User, agency_id_param: Optional[uuid.UUID] = None):
"""Determine which agency_id to filter by based on user role and query param.
Returns:
uuid.UUID filter to a specific agency
None no filter (admins see all)
_NO_AGENCY user is unassigned; callers should return empty results
"""
if current_user.role in ("super_admin", "oversight_admin"):
return agency_id_param # None means "all"
if current_user.agency_id is None:
return _NO_AGENCY
return current_user.agency_id
def _check_campaign_access(current_user: User, campaign) -> None:
"""Raise 404 if the user's role restricts them and the campaign doesn't belong to their agency."""
if current_user.role in ("super_admin", "oversight_admin"):
return
if current_user.agency_id is None:
raise HTTPException(status_code=404, detail="Campaign not found")
if campaign.agency_id == current_user.agency_id:
return
raise HTTPException(status_code=404, detail="Campaign not found")
# ---------------------------------------------------------------------------
# Current User endpoint
# ---------------------------------------------------------------------------
@router.get("/me", response_model=CurrentUserResponse)
async def get_me(
current_user: User = Depends(get_current_db_user),
):
"""Get the authenticated user's profile."""
return CurrentUserResponse(
id=current_user.id,
email=current_user.email,
name=current_user.name,
role=current_user.role,
agency_id=current_user.agency_id,
agency_name=current_user.agency.name if current_user.agency else None,
)
# ---------------------------------------------------------------------------
# Campaign endpoints
# ---------------------------------------------------------------------------
@router.get("/campaigns", response_model=list[CampaignResponse])
async def list_campaigns(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List campaigns, filtered by the user's role and optional agency filter."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
if effective_agency_id is _NO_AGENCY:
return []
repo = CampaignRepository(db)
campaigns_with_counts = await repo.get_with_proof_counts(agency_id=effective_agency_id)
return [
CampaignResponse(
id=item["campaign"].id,
name=item["campaign"].name,
workfront_id=item["campaign"].workfront_id,
client_lead=item["campaign"].client_lead,
agency_lead=item["campaign"].agency_lead,
brand_guidelines=item["campaign"].brand_guidelines,
status=item["campaign"].status,
agency=item["campaign"].agency.name if item["campaign"].agency else None,
created_by=item["campaign"].created_by,
created_at=item["campaign"].created_at,
updated_at=item["campaign"].updated_at,
proofs=item["proof_count"],
)
for item in campaigns_with_counts
]
@router.post("/campaigns", response_model=CampaignResponse, status_code=201)
async def create_campaign(
data: CampaignCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Create a new campaign."""
if current_user.agency_id is None and current_user.role != "super_admin":
raise HTTPException(status_code=403, detail="You must be assigned to an agency before creating campaigns.")
repo = CampaignRepository(db)
# Check if campaign name already exists
existing = await repo.get_by_name(data.name)
if existing:
raise HTTPException(status_code=400, detail="Campaign with this name already exists")
campaign = await repo.create(
name=data.name,
workfront_id=data.workfront_id,
client_lead=data.client_lead,
agency_lead=data.agency_lead,
brand_guidelines=data.brand_guidelines,
agency_id=current_user.agency_id,
created_by=current_user.id,
)
return CampaignResponse(
id=campaign.id,
name=campaign.name,
workfront_id=campaign.workfront_id,
client_lead=campaign.client_lead,
agency_lead=campaign.agency_lead,
brand_guidelines=campaign.brand_guidelines,
status=campaign.status,
agency=current_user.agency.name if current_user.agency else None,
created_by=campaign.created_by,
created_at=campaign.created_at,
updated_at=campaign.updated_at,
proofs=0,
)
@router.get("/campaigns/{campaign_id}", response_model=CampaignResponse)
async def get_campaign(
campaign_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Get a campaign by ID."""
repo = CampaignRepository(db)
campaign = await repo.get_by_id(campaign_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, campaign)
return CampaignResponse(
id=campaign.id,
name=campaign.name,
workfront_id=campaign.workfront_id,
client_lead=campaign.client_lead,
agency_lead=campaign.agency_lead,
brand_guidelines=campaign.brand_guidelines,
status=campaign.status,
agency=campaign.agency.name if campaign.agency else None,
created_by=campaign.created_by,
created_at=campaign.created_at,
updated_at=campaign.updated_at,
proofs=len(campaign.proofs),
)
@router.put("/campaigns/{campaign_id}", response_model=CampaignResponse)
async def update_campaign(
campaign_id: uuid.UUID,
data: CampaignUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Update a campaign. """
repo = CampaignRepository(db)
# Verify campaign exists and user has access
existing = await repo.get_by_id(campaign_id)
if not existing:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, existing)
campaign = await repo.update(
campaign_id,
name=data.name,
workfront_id=data.workfront_id,
client_lead=data.client_lead,
agency_lead=data.agency_lead,
brand_guidelines=data.brand_guidelines,
status=data.status,
)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
return CampaignResponse(
id=campaign.id,
name=campaign.name,
workfront_id=campaign.workfront_id,
client_lead=campaign.client_lead,
agency_lead=campaign.agency_lead,
brand_guidelines=campaign.brand_guidelines,
status=campaign.status,
agency=campaign.agency.name if campaign.agency else None,
created_by=campaign.created_by,
created_at=campaign.created_at,
updated_at=campaign.updated_at,
proofs=len(campaign.proofs) if campaign.proofs else 0,
)
@router.delete("/campaigns/{campaign_id}", status_code=204)
async def delete_campaign(
campaign_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Delete a campaign and all associated files. """
repo = CampaignRepository(db)
campaign = await repo.get_by_id(campaign_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, campaign)
# Delete files from storage for all proofs
for proof in campaign.proofs:
for version in proof.versions:
if version.file_storage_key:
await storage_service.delete_file(version.file_storage_key)
await repo.delete(campaign_id)
# ---------------------------------------------------------------------------
# Proof endpoints
# ---------------------------------------------------------------------------
@router.get("/campaigns/{campaign_id}/proofs", response_model=list[ProofResponse])
async def list_proofs(
campaign_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""List all proofs for a campaign."""
# Verify campaign access
campaign_repo = CampaignRepository(db)
campaign = await campaign_repo.get_by_id(campaign_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
_check_campaign_access(current_user, campaign)
repo = ProofRepository(db)
proofs = await repo.list_by_campaign(campaign_id)
return [
ProofResponse(
id=proof.id,
proof_name=proof.proof_name,
channel=proof.channel,
sub_channel=proof.sub_channel,
proof_type=proof.proof_type,
workfront_id=proof.workfront_id,
created_at=proof.created_at,
versions=[
ProofVersionResponse(
id=v.id,
version=v.version,
file_storage_key=v.file_storage_key,
thumbnail_url=v.thumbnail_url,
agent_review=v.agent_review,
overall_status=v.overall_status,
workfront_id=v.workfront_id,
is_identical_file=v.is_identical_file,
created_at=v.created_at,
)
for v in proof.versions
],
)
for proof in proofs
]
@router.get("/proofs/{proof_id}", response_model=ProofResponse)
async def get_proof(
proof_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Get a proof by ID."""
repo = ProofRepository(db)
proof = await repo.get_by_id(proof_id)
if not proof:
raise HTTPException(status_code=404, detail="Proof not found")
return ProofResponse(
id=proof.id,
proof_name=proof.proof_name,
channel=proof.channel,
sub_channel=proof.sub_channel,
proof_type=proof.proof_type,
workfront_id=proof.workfront_id,
created_at=proof.created_at,
versions=[
ProofVersionResponse(
id=v.id,
version=v.version,
file_storage_key=v.file_storage_key,
thumbnail_url=v.thumbnail_url,
agent_review=v.agent_review,
overall_status=v.overall_status,
workfront_id=v.workfront_id,
is_identical_file=v.is_identical_file,
created_at=v.created_at,
)
for v in proof.versions
],
)
@router.delete("/proofs/{proof_id}", status_code=204)
async def delete_proof(
proof_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Delete a proof and its associated files. """
repo = ProofRepository(db)
proof = await repo.get_by_id(proof_id)
if not proof:
raise HTTPException(status_code=404, detail="Proof not found")
# Delete files from storage
for version in proof.versions:
if version.file_storage_key:
await storage_service.delete_file(version.file_storage_key)
await repo.delete(proof_id)
# ---------------------------------------------------------------------------
# Audit endpoints
# ---------------------------------------------------------------------------
@router.post("/proofs/{proof_id}/versions/{version}/flag", response_model=FlaggedItemResponse, status_code=201)
async def flag_proof_version(
proof_id: uuid.UUID,
version: int,
data: FlaggedItemCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Flag an issue on a proof version. """
proof_repo = ProofRepository(db)
audit_repo = AuditRepository(db)
proof_version = await proof_repo.get_version(proof_id, version)
if not proof_version:
raise HTTPException(status_code=404, detail="Proof version not found")
flagged = await audit_repo.create_flagged_item(
proof_version_id=proof_version.id,
agent_flagged=data.agent_flagged,
comments=data.comments,
submitter_id=current_user.id,
)
proof = await proof_repo.get_by_id(proof_id)
return FlaggedItemResponse(
id=flagged.id,
proof_version_id=flagged.proof_version_id,
agent_flagged=flagged.agent_flagged,
comments=flagged.comments,
submitter_name=current_user.name,
submitter_agency=current_user.agency.name if current_user.agency else None,
campaign_name=proof.campaign.name if proof and proof.campaign else None,
proof_name=proof.proof_name if proof else None,
version=version,
created_at=flagged.created_at,
)
@router.post("/proofs/{proof_id}/versions/{version}/resolve", response_model=ResolvedItemResponse, status_code=201)
async def resolve_proof_version(
proof_id: uuid.UUID,
version: int,
data: ResolvedItemCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_write_access),
):
"""Resolve an issue on a proof version. """
proof_repo = ProofRepository(db)
audit_repo = AuditRepository(db)
proof_version = await proof_repo.get_version(proof_id, version)
if not proof_version:
raise HTTPException(status_code=404, detail="Proof version not found")
resolved = await audit_repo.create_resolved_item(
proof_version_id=proof_version.id,
agent=data.agent,
issue=data.issue,
resolution=data.resolution,
submitter_id=current_user.id,
)
proof = await proof_repo.get_by_id(proof_id)
return ResolvedItemResponse(
id=resolved.id,
proof_version_id=resolved.proof_version_id,
agent=resolved.agent,
issue=resolved.issue,
resolution=resolved.resolution,
submitter_name=current_user.name,
submitter_agency=current_user.agency.name if current_user.agency else None,
campaign_name=proof.campaign.name if proof and proof.campaign else None,
proof_name=proof.proof_name if proof else None,
version=version,
created_at=resolved.created_at,
)
@router.get("/audit/flagged", response_model=list[FlaggedItemResponse])
async def list_flagged_items(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List flagged items, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
if effective_agency_id is _NO_AGENCY:
return []
audit_repo = AuditRepository(db)
flagged_items = await audit_repo.get_flagged_items(agency_id=effective_agency_id, limit=limit, offset=offset)
return [
FlaggedItemResponse(
id=item.id,
proof_version_id=item.proof_version_id,
agent_flagged=item.agent_flagged,
comments=item.comments,
submitter_name=item.submitter.name if item.submitter else None,
submitter_agency=item.submitter.agency.name if item.submitter and item.submitter.agency else None,
campaign_name=item.proof_version.proof.campaign.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.campaign else None,
proof_name=item.proof_version.proof.proof_name if item.proof_version and item.proof_version.proof else None,
version=item.proof_version.version if item.proof_version else None,
created_at=item.created_at,
)
for item in flagged_items
]
@router.get("/audit/resolved", response_model=list[ResolvedItemResponse])
async def list_resolved_items(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List resolved items, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
if effective_agency_id is _NO_AGENCY:
return []
audit_repo = AuditRepository(db)
resolved_items = await audit_repo.get_resolved_items(agency_id=effective_agency_id, limit=limit, offset=offset)
return [
ResolvedItemResponse(
id=item.id,
proof_version_id=item.proof_version_id,
agent=item.agent,
issue=item.issue,
resolution=item.resolution,
submitter_name=item.submitter.name if item.submitter else None,
submitter_agency=item.submitter.agency.name if item.submitter and item.submitter.agency else None,
campaign_name=item.proof_version.proof.campaign.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.campaign else None,
proof_name=item.proof_version.proof.proof_name if item.proof_version and item.proof_version.proof else None,
version=item.proof_version.version if item.proof_version else None,
created_at=item.created_at,
)
for item in resolved_items
]
@router.get("/audit/errors", response_model=list[ErrorItemResponse])
async def list_error_items(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List error items, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
if effective_agency_id is _NO_AGENCY:
return []
audit_repo = AuditRepository(db)
error_items = await audit_repo.get_error_items(agency_id=effective_agency_id, limit=limit, offset=offset)
return [
ErrorItemResponse(
id=item.id,
proof_version_id=item.proof_version_id,
error_summary=item.error_summary,
submitter_name=item.proof_version.proof.created_by_user.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.created_by_user else None,
submitter_agency=item.proof_version.proof.created_by_user.agency.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.created_by_user and item.proof_version.proof.created_by_user.agency else None,
campaign_name=item.proof_version.proof.campaign.name if item.proof_version and item.proof_version.proof and item.proof_version.proof.campaign else None,
proof_name=item.proof_version.proof.proof_name if item.proof_version and item.proof_version.proof else None,
version=item.proof_version.version if item.proof_version else None,
created_at=item.created_at,
)
for item in error_items
]
# ---------------------------------------------------------------------------
# Analytics endpoint
# ---------------------------------------------------------------------------
@router.get("/analytics", response_model=AnalyticsResponse)
async def get_analytics(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
):
"""Get analytics data, filtered by role."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
if effective_agency_id is _NO_AGENCY:
return AnalyticsResponse(total_reviews=0, passed=0, failed=0, errors=0, legal_review=0)
repo = CampaignRepository(db)
analytics = await repo.get_analytics(agency_id=effective_agency_id)
return AnalyticsResponse(**analytics)
@router.get("/analytics/by-agency", response_model=AgencyAnalyticsResponse)
async def get_analytics_by_agency(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Get per-agency analytics breakdown (admin only)."""
if current_user.role not in ("super_admin", "oversight_admin"):
return AgencyAnalyticsResponse(agencies=[])
repo = CampaignRepository(db)
rows = await repo.get_analytics_by_agency()
return AgencyAnalyticsResponse(
agencies=[AgencyAnalyticsItem(**row) for row in rows]
)
# ---------------------------------------------------------------------------
# User Management endpoints
# ---------------------------------------------------------------------------
@router.get("/users", response_model=list[UserResponse])
async def list_users(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin", "oversight_admin")),
):
"""List all users (super_admin and oversight_admin)."""
user_repo = UserRepository(db)
users = await user_repo.list_all()
return [
UserResponse(
id=u.id,
email=u.email,
name=u.name,
role=u.role,
agency=u.agency.name if u.agency else None,
agency_id=u.agency_id,
created_at=u.created_at,
)
for u in users
]
@router.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: uuid.UUID,
data: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Update a user's role and/or agency (super_admin only)."""
user_repo = UserRepository(db)
# Fetch current state BEFORE applying changes (for change logging)
existing_user = await user_repo.get_by_id(user_id)
if not existing_user:
raise HTTPException(status_code=404, detail="User not found")
old_role = existing_user.role
old_agency_id = existing_user.agency_id
old_agency_name = existing_user.agency.name if existing_user.agency else None
# Build kwargs, using sentinel to distinguish "not provided" from None
kwargs: dict = {}
if data.role is not None:
valid_roles = ("super_admin", "oversight_admin", "agency_admin", "basic_user")
if data.role not in valid_roles:
raise HTTPException(status_code=400, detail=f"Invalid role. Must be one of: {', '.join(valid_roles)}")
kwargs["role"] = data.role
# agency_id can be explicitly set to None (unassign) or a UUID
agency_id_provided = "agency_id" in (data.model_fields_set or set())
if agency_id_provided:
kwargs["agency_id"] = data.agency_id
else:
kwargs["agency_id"] = ... # sentinel: "not provided"
user = await user_repo.update_user(user_id, **kwargs)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Log role change
if data.role is not None and data.role != old_role:
await user_repo.create_change_log(
user_id=user_id,
changed_by_id=current_user.id,
change_type="role_changed",
field_changed="role",
old_value=old_role,
new_value=data.role,
)
# Log agency change
if agency_id_provided and data.agency_id != old_agency_id:
new_agency_name = user.agency.name if user.agency else None
await user_repo.create_change_log(
user_id=user_id,
changed_by_id=current_user.id,
change_type="agency_changed",
field_changed="agency",
old_value=old_agency_name,
new_value=new_agency_name,
)
return UserResponse(
id=user.id,
email=user.email,
name=user.name,
role=user.role,
agency=user.agency.name if user.agency else None,
agency_id=user.agency_id,
created_at=user.created_at,
)
@router.get("/users/{user_id}/change-history", response_model=list[UserChangeLogResponse])
async def get_user_change_history(
user_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin", "oversight_admin")),
):
"""Get change history for a user (super_admin and oversight_admin)."""
user_repo = UserRepository(db)
logs = await user_repo.get_change_logs(user_id)
return [
UserChangeLogResponse(
id=log.id,
change_type=log.change_type,
field_changed=log.field_changed,
old_value=log.old_value,
new_value=log.new_value,
changed_by_name=log.changed_by.name if log.changed_by else None,
created_at=log.created_at,
)
for log in logs
]
# ---------------------------------------------------------------------------
# Agency endpoints
# ---------------------------------------------------------------------------
@router.get("/agencies", response_model=list[AgencyResponse])
async def list_agencies(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""List all agencies."""
from sqlalchemy import select
from app.models.models import Agency
stmt = select(Agency).order_by(Agency.name)
result = await db.execute(stmt)
agencies = result.scalars().all()
return [AgencyResponse(id=a.id, name=a.name) for a in agencies]
@router.post("/agencies", response_model=AgencyResponse, status_code=201)
async def create_agency(
data: AgencyCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Create a new agency (super_admin only)."""
user_repo = UserRepository(db)
# Check for duplicate name
existing = await user_repo.get_or_create_agency(data.name)
# get_or_create returns existing — check if it was just created
# A simpler approach: try to create and catch unique violation
from sqlalchemy import select
from app.models.models import Agency
result = await db.execute(select(Agency).where(Agency.name == data.name))
agency = result.scalar_one_or_none()
if agency:
return AgencyResponse(id=agency.id, name=agency.name)
agency = await user_repo.create_agency(data.name)
return AgencyResponse(id=agency.id, name=agency.name)
# ---------------------------------------------------------------------------
# Dropdown options endpoints
# ---------------------------------------------------------------------------
@router.get("/dropdown-options", response_model=DropdownOptionsResponse)
async def get_dropdown_options(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Get all dropdown options as hierarchical structure."""
import logging
logger = logging.getLogger(__name__)
repo = DropdownRepository(db)
options = await repo.get_all_hierarchical()
channels = options.get("channels", {})
social = channels.get("Social", {})
meta_proof_types = social.get("Meta", [])
logger.info(f"[DEBUG API] Returning dropdown options - Social.Meta has {len(meta_proof_types)} proof types: {meta_proof_types}")
return DropdownOptionsResponse(**options)
@router.post("/dropdown-options/channels", status_code=201)
async def add_channel(
name: str = Query(..., description="Channel name"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Add a new channel (super_admin only)."""
repo = DropdownRepository(db)
await repo.add_channel(name)
await db.commit()
return {"message": f"Channel '{name}' added successfully"}
@router.post("/dropdown-options/channels/{channel}/sub-channels", status_code=201)
async def add_sub_channel(
channel: str,
name: str = Query(..., description="Sub-channel name"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Add a sub-channel under a channel (super_admin only)."""
repo = DropdownRepository(db)
result = await repo.add_sub_channel(channel, name)
if not result:
raise HTTPException(status_code=404, detail=f"Channel '{channel}' not found")
await db.commit()
return {"message": f"Sub-channel '{name}' added to '{channel}'"}
@router.post("/dropdown-options/channels/{channel}/sub-channels/{sub_channel}/proof-types", status_code=201)
async def add_proof_type(
channel: str,
sub_channel: str,
name: str = Query(..., description="Proof type name"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Add a proof type under a sub-channel (super_admin only)."""
repo = DropdownRepository(db)
result = await repo.add_proof_type(channel, sub_channel, name)
if not result:
raise HTTPException(status_code=404, detail=f"Channel '{channel}' or sub-channel '{sub_channel}' not found")
await db.commit()
return {"message": f"Proof type '{name}' added to '{channel}/{sub_channel}'"}
@router.delete("/dropdown-options/channels/{channel}", status_code=204)
async def delete_channel(
channel: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Delete a channel and all its sub-channels and proof types (super_admin only)."""
repo = DropdownRepository(db)
success = await repo.remove_channel(channel)
if not success:
raise HTTPException(status_code=404, detail=f"Channel '{channel}' not found")
await db.commit()
@router.delete("/dropdown-options/channels/{channel}/sub-channels/{sub_channel}", status_code=204)
async def delete_sub_channel(
channel: str,
sub_channel: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Delete a sub-channel and all its proof types (super_admin only)."""
repo = DropdownRepository(db)
success = await repo.remove_sub_channel(channel, sub_channel)
if not success:
raise HTTPException(status_code=404, detail=f"Sub-channel '{sub_channel}' not found in channel '{channel}'")
await db.commit()
@router.delete("/dropdown-options/channels/{channel}/sub-channels/{sub_channel}/proof-types/{proof_type}", status_code=204)
async def delete_proof_type(
channel: str,
sub_channel: str,
proof_type: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin")),
):
"""Delete a proof type (super_admin only)."""
repo = DropdownRepository(db)
success = await repo.remove_proof_type(channel, sub_channel, proof_type)
if not success:
raise HTTPException(status_code=404, detail=f"Proof type '{proof_type}' not found")
await db.commit()
# ---------------------------------------------------------------------------
# File endpoints
# ---------------------------------------------------------------------------
# PDF pages endpoint (must be defined BEFORE the base file endpoint for correct routing)
@router.get("/files/{storage_key:path}/pages")
async def get_pdf_pages(
storage_key: str,
max_pages: int = Query(10, ge=1, le=50),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Rasterize a stored PDF and return pages as data URLs."""
if not storage_key.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="File is not a PDF")
file_data = await storage_service.get_file(storage_key)
if file_data is None:
raise HTTPException(status_code=404, detail="File not found")
try:
pages = pdf_service.rasterize(file_data, max_pages=max_pages)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"pages": [
{
"page": i + 1,
"data_url": f"data:image/png;base64,{base64.b64encode(png_data).decode('utf-8')}",
"width": width,
"height": height,
}
for i, (png_data, width, height) in enumerate(pages)
]
}
# File download endpoint
@router.get("/files/{storage_key:path}")
async def get_file(
storage_key: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_db_user),
):
"""Retrieve a stored file by its storage key."""
file_data = await storage_service.get_file(storage_key)
if file_data is None:
raise HTTPException(status_code=404, detail="File not found")
extension = storage_key.split('.')[-1].lower() if '.' in storage_key else ''
content_types = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'webp': 'image/webp',
'svg': 'image/svg+xml',
'pdf': 'application/pdf',
}
return Response(
content=file_data,
media_type=content_types.get(extension, 'application/octet-stream'),
)
# ---------------------------------------------------------------------------
# Export endpoints
# ---------------------------------------------------------------------------
@router.get("/export/campaigns-csv")
async def export_campaigns_csv(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("super_admin", "oversight_admin")),
agency_id: Optional[uuid.UUID] = Query(None, description="Filter by agency"),
):
"""Export campaign data as a CSV file (super_admin and oversight_admin only)."""
effective_agency_id = _resolve_agency_filter(current_user, agency_id)
if effective_agency_id is _NO_AGENCY:
effective_agency_id = None
repo = CampaignRepository(db)
rows = await repo.get_export_rows(agency_id=effective_agency_id)
output = io.StringIO()
if rows:
writer = csv.DictWriter(output, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
else:
# Write empty CSV with headers only
headers = [
"Campaign Name", "Campaign Workfront ID", "Client Lead", "Agency Lead",
"Brand Guidelines", "Campaign Status", "Agency", "Campaign Created",
"Proof Name", "Channel", "Sub-Channel", "Proof Type", "Proof Workfront ID",
"Proof Created By", "Proof Created By Email", "Proof Created",
"Version", "Overall Status", "Version Workfront ID", "Version Created",
"Legal RAG", "Brand RAG", "Channel Best Practices RAG", "Channel Tech Specs RAG",
"Lead Agent Summary",
]
writer = csv.DictWriter(output, fieldnames=headers)
writer.writeheader()
filename = f"campaigns_export_{date.today().isoformat()}.csv"
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# ---------------------------------------------------------------------------
# Support email endpoint (public - no auth required for login page access)
# ---------------------------------------------------------------------------
@router.post("/support/email")
async def send_support_email(
data: SupportEmailRequest,
):
"""Send support email - no auth required (for login page)."""
success = await email_service.send_support_email(
message=data.message,
subject=data.subject,
user_name=data.user_name,
user_email=data.user_email,
)
if not success:
raise HTTPException(status_code=500, detail="Failed to send email")
return {"success": True, "message": "Email sent successfully"}