hp-studios-ai-content-agent/backend/app/api/generations.py
DJP 7530f60007 Add deploy script, prod compose override, and Apache subpath proxy
For deployment to optical-dev.oliver.solutions under /hp-content-agent/.

- deploy/deploy.sh       idempotent bootstrap: secrets check, free-port
                         picker, build+up, migrations+seed, Apache Include
                         install + reload, UFW allow
- deploy/apache/*.conf.template   reverse-proxy snippet (API before SPA)
- deploy/README.md       runbook for first-time + re-deploy
- docker-compose.prod.yml  prod overrides: frontend target=prod (nginx),
                         uvicorn --workers 2, drops dev volume mounts
- docker-compose.yml     pinned project name (required on the shared
                         server per CLAUDE.md)
- frontend/nginx.conf    SPA fallback + asset caching, health endpoint
- frontend/Dockerfile    VITE_BASE_PATH build arg for subpath deploys
- frontend/vite.config.ts  reads VITE_BASE_PATH

Public: https://optical-dev.oliver.solutions/hp-content-agent/

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 15:23:51 -04:00

340 lines
12 KiB
Python

from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.deps import get_current_user
from app.db.models import Brief, DeliverableType, Export, Generation, User
from app.db.session import get_db
router = APIRouter(tags=["generations"])
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
class GenerationsCreateRequest(BaseModel):
deliverable_types: List[str]
class GenerationOut(BaseModel):
id: str
brief_id: str
deliverable_type: str
status: str
structured_content: Optional[Dict[str, Any]]
tokens_used: Optional[int]
input_tokens: Optional[int] = None
output_tokens: Optional[int] = None
cache_read_tokens: Optional[int] = None
cache_write_tokens: Optional[int] = None
model: Optional[str] = None
cost_usd: Optional[float] = None
error: Optional[str]
started_at: Optional[datetime]
completed_at: Optional[datetime]
model_config = {"from_attributes": True, "protected_namespaces": ()}
class GenerationPatch(BaseModel):
structured_content: Dict[str, Any]
class ExportOut(BaseModel):
download_url: str
def _gen_out(g: Generation) -> GenerationOut:
return GenerationOut(
id=str(g.id),
brief_id=str(g.brief_id),
deliverable_type=g.deliverable_type,
status=g.status,
structured_content=g.structured_content,
tokens_used=g.tokens_used,
input_tokens=getattr(g, "input_tokens", None),
output_tokens=getattr(g, "output_tokens", None),
cache_read_tokens=getattr(g, "cache_read_tokens", None),
cache_write_tokens=getattr(g, "cache_write_tokens", None),
model=getattr(g, "model", None),
cost_usd=float(g.cost_usd) if g.cost_usd is not None else None,
error=g.error,
started_at=g.started_at,
completed_at=g.completed_at,
)
def _get_brief_or_403(brief_id: str, db: Session, current_user: User) -> Brief:
brief: Optional[Brief] = db.query(Brief).filter(Brief.id == uuid.UUID(brief_id)).first()
if brief is None:
raise HTTPException(status_code=404, detail="Brief not found")
if current_user.role != "admin" and str(brief.created_by) != str(current_user.id):
raise HTTPException(status_code=403, detail="Not authorised")
return brief
def _get_generation_or_403(gen_id: str, db: Session, current_user: User) -> Generation:
gen: Optional[Generation] = db.query(Generation).filter(Generation.id == uuid.UUID(gen_id)).first()
if gen is None:
raise HTTPException(status_code=404, detail="Generation not found")
# Check ownership via brief
brief: Optional[Brief] = db.query(Brief).filter(Brief.id == gen.brief_id).first()
if brief and current_user.role != "admin" and str(brief.created_by) != str(current_user.id):
raise HTTPException(status_code=403, detail="Not authorised")
return gen
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.post("/briefs/{brief_id}/generations", response_model=List[GenerationOut], status_code=status.HTTP_201_CREATED)
def create_generations(
brief_id: str,
body: GenerationsCreateRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
brief = _get_brief_or_403(brief_id, db, current_user)
# Validate that all requested types exist in the DB as active
# Fall back to the hardcoded set if the deliverable_types table doesn't exist yet
# (e.g. before migration 002 has run).
_FALLBACK_VALID = {
"leadership_themes",
"regional_enrichment",
"linkedin_posts",
"webinar_spec",
"infographic_specs",
"abm_enablement",
}
# Build a slug -> DeliverableType map from the DB
db_type_map: dict[str, DeliverableType] = {}
try:
active_types = (
db.query(DeliverableType)
.filter(DeliverableType.is_active.is_(True))
.all()
)
db_type_map = {dt.slug: dt for dt in active_types}
except Exception: # noqa: BLE001
# Table not yet created — fall back to hardcoded set
pass
if db_type_map:
invalid = [t for t in body.deliverable_types if t not in db_type_map]
if invalid:
raise HTTPException(
status_code=400,
detail=f"Unknown or inactive deliverable_type(s): {invalid}",
)
else:
# Fallback: use hardcoded set
invalid = [t for t in body.deliverable_types if t not in _FALLBACK_VALID]
if invalid:
raise HTTPException(status_code=400, detail=f"Invalid deliverable_type(s): {invalid}")
generations = []
for dt_slug in body.deliverable_types:
dt_row = db_type_map.get(dt_slug)
gen = Generation(
brief_id=brief.id,
deliverable_type=dt_slug,
deliverable_type_id=dt_row.id if dt_row else None,
status="queued",
)
db.add(gen)
db.flush() # get gen.id before commit
generations.append(gen)
db.commit()
# Enqueue tasks (lazy import to tolerate missing worker deps at boot)
for gen in generations:
try:
from app.workers.tasks import enqueue_generate # noqa: PLC0415
enqueue_generate(str(gen.id))
except Exception: # noqa: BLE001
pass
return [_gen_out(g) for g in generations]
@router.get("/briefs/{brief_id}/generations", response_model=List[GenerationOut])
def list_generations(
brief_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
brief = _get_brief_or_403(brief_id, db, current_user)
gens = (
db.query(Generation)
.filter(Generation.brief_id == brief.id)
.order_by(Generation.deliverable_type)
.all()
)
return [_gen_out(g) for g in gens]
@router.post("/generations/{generation_id}/rerun", response_model=GenerationOut)
def rerun_generation(
generation_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
"""Re-enqueue a generation. Works for any status (failed, complete, queued,
running — useful for stuck jobs). Keeps existing structured_content until
the worker writes new output, so rerun failures don't lose data."""
gen = _get_generation_or_403(generation_id, db, current_user)
gen.status = "queued"
gen.error = None
gen.started_at = None
gen.completed_at = None
# Deliberately do NOT clear structured_content — preserve last-known-good
# until the worker overwrites it.
db.commit()
db.refresh(gen)
try:
from app.workers.tasks import enqueue_generate # noqa: PLC0415
enqueue_generate(str(gen.id))
except Exception: # noqa: BLE001
pass
return _gen_out(gen)
@router.get("/generations/{generation_id}", response_model=GenerationOut)
def get_generation(
generation_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
return _gen_out(_get_generation_or_403(generation_id, db, current_user))
@router.patch("/generations/{generation_id}", response_model=GenerationOut)
def patch_generation(
generation_id: str,
body: GenerationPatch,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
gen = _get_generation_or_403(generation_id, db, current_user)
# ------------------------------------------------------------------
# JSON Schema validation via stored schema_json (works for all types,
# including new non-built-in ones).
# ------------------------------------------------------------------
dt_row: Optional[DeliverableType] = None
if gen.deliverable_type_id is not None:
dt_row = db.query(DeliverableType).filter(
DeliverableType.id == gen.deliverable_type_id
).first()
if dt_row is None and gen.deliverable_type:
dt_row = db.query(DeliverableType).filter(
DeliverableType.slug == gen.deliverable_type
).first()
if dt_row is not None and dt_row.schema_json:
try:
import jsonschema # noqa: PLC0415
try:
jsonschema.validate(
instance=body.structured_content,
schema=dt_row.schema_json,
)
except jsonschema.ValidationError as exc:
path = " -> ".join(str(p) for p in exc.absolute_path) or "(root)"
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"msg": "Content failed JSON Schema validation",
"path": path,
"error": exc.message,
"schema_path": " -> ".join(
str(p) for p in exc.absolute_schema_path
),
},
)
except ImportError:
pass # jsonschema not installed — skip validation
# ------------------------------------------------------------------
# Legacy pydantic validation as additional safety net for built-ins
# ------------------------------------------------------------------
try:
from app.schemas.registry import SCHEMA_BY_TYPE # noqa: PLC0415
schema_cls = SCHEMA_BY_TYPE.get(gen.deliverable_type)
if schema_cls is not None:
try:
schema_cls(**body.structured_content)
except Exception as exc: # noqa: BLE001
# Pydantic validation failed — but we only hard-block if
# there's no separate jsonschema that already passed.
# For built-ins the jsonschema should match; surface the error.
if dt_row is None or not dt_row.schema_json:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Content validation failed: {exc}",
)
except ImportError:
pass # schemas not yet built — accept any dict
gen.structured_content = body.structured_content
db.commit()
return _gen_out(gen)
@router.post("/generations/{generation_id}/export", response_model=ExportOut)
def export_generation(
generation_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Any:
from pathlib import Path # noqa: PLC0415
from app.core.config import settings # noqa: PLC0415
gen = _get_generation_or_403(generation_id, db, current_user)
if gen.structured_content is None:
raise HTTPException(status_code=400, detail="Generation has no content to export")
# Render to docx bytes. Prefer the DB-stored template_json (works for
# both built-ins and admin-authored custom types); fall back to the legacy
# hardcoded renderer map if no template is stored.
try:
from app.hp_branding.render import render_to_bytes # noqa: PLC0415
from app.db.models import DeliverableType # noqa: PLC0415
dt_row = (
db.query(DeliverableType)
.filter(DeliverableType.slug == gen.deliverable_type)
.first()
)
template_json = dt_row.template_json if dt_row else None
docx_bytes = render_to_bytes(
gen.deliverable_type, gen.structured_content, template_json=template_json
)
except ImportError:
raise HTTPException(status_code=503, detail="Render module not yet available")
export_dir = Path(settings.EXPORT_DIR)
export_dir.mkdir(parents=True, exist_ok=True)
file_name = f"{uuid.uuid4()}.docx"
dest = export_dir / file_name
dest.write_bytes(docx_bytes)
export = Export(generation_id=gen.id, docx_path=str(dest))
db.add(export)
db.commit()
return ExportOut(download_url=f"/exports/{export.id}/download")