hp-studios-ai-content-agent/backend/app/workers/tasks.py
DJP 7530f60007 Add deploy script, prod compose override, and Apache subpath proxy
For deployment to optical-dev.oliver.solutions under /hp-content-agent/.

- deploy/deploy.sh       idempotent bootstrap: secrets check, free-port
                         picker, build+up, migrations+seed, Apache Include
                         install + reload, UFW allow
- deploy/apache/*.conf.template   reverse-proxy snippet (API before SPA)
- deploy/README.md       runbook for first-time + re-deploy
- docker-compose.prod.yml  prod overrides: frontend target=prod (nginx),
                         uvicorn --workers 2, drops dev volume mounts
- docker-compose.yml     pinned project name (required on the shared
                         server per CLAUDE.md)
- frontend/nginx.conf    SPA fallback + asset caching, health endpoint
- frontend/Dockerfile    VITE_BASE_PATH build arg for subpath deploys
- frontend/vite.config.ts  reads VITE_BASE_PATH

Public: https://optical-dev.oliver.solutions/hp-content-agent/

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 15:23:51 -04:00

153 lines
5.8 KiB
Python

from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
import redis as _redis
from rq import Queue
from app.core.config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# RQ queue
# ---------------------------------------------------------------------------
_redis_conn: Optional[_redis.Redis] = None
_queue: Optional[Queue] = None
def _get_queue() -> Queue:
global _redis_conn, _queue
if _queue is None:
_redis_conn = _redis.from_url(settings.REDIS_URL)
_queue = Queue("default", connection=_redis_conn)
return _queue
# ---------------------------------------------------------------------------
# Enqueue helpers (called by API handlers)
# ---------------------------------------------------------------------------
def enqueue_ingest(document_id: str) -> None:
"""Enqueue an ingestion task for *document_id*."""
_get_queue().enqueue(ingest_document_task, document_id)
def enqueue_generate(generation_id: str) -> None:
"""Enqueue a generation task for *generation_id*."""
_get_queue().enqueue(generate_deliverable_task, generation_id)
# ---------------------------------------------------------------------------
# RQ task: document ingestion
# ---------------------------------------------------------------------------
def ingest_document_task(document_id: str) -> None:
"""RQ task — ingest a single document.
Opens its own DB session so it can run inside an RQ worker process.
Lazy-imports the orchestrator to avoid circular imports at boot time.
"""
from app.db.session import SessionLocal # noqa: PLC0415
db = SessionLocal()
try:
# Lazy import — orchestrator agent module may not exist at boot
from app.ingestion.orchestrator import ingest_document # noqa: PLC0415
ingest_document(UUID(document_id), db)
except ImportError as exc:
logger.warning("ingestion orchestrator not available: %s", exc)
# Mark document as failed so the UI can report the gap
try:
from app.db.models import Document # noqa: PLC0415
doc = db.query(Document).filter(Document.id == UUID(document_id)).first()
if doc:
doc.ingestion_status = "failed"
db.commit()
except Exception: # noqa: BLE001
pass
except Exception as exc: # noqa: BLE001
logger.exception("ingest_document_task failed for %s: %s", document_id, exc)
try:
from app.db.models import Document # noqa: PLC0415
doc = db.query(Document).filter(Document.id == UUID(document_id)).first()
if doc:
doc.ingestion_status = "failed"
db.commit()
except Exception: # noqa: BLE001
pass
raise
finally:
db.close()
# ---------------------------------------------------------------------------
# RQ task: deliverable generation
# ---------------------------------------------------------------------------
def generate_deliverable_task(generation_id: str) -> None:
"""RQ task — run the AI agent to produce a deliverable.
Sets ``generations.status`` to 'running', calls the agent, then saves
``structured_content`` and flips status to 'complete' (or 'failed').
"""
from app.db.session import SessionLocal # noqa: PLC0415
from app.db.models import Generation # noqa: PLC0415
db = SessionLocal()
gen: Optional[Generation] = None
try:
gen = db.query(Generation).filter(Generation.id == UUID(generation_id)).first()
if gen is None:
logger.error("Generation %s not found", generation_id)
return
gen.status = "running"
gen.started_at = datetime.now(timezone.utc)
db.commit()
# Lazy import — agent module may not exist at boot
from app.agents.generate import generate_deliverable # noqa: PLC0415
result = generate_deliverable(gen.brief_id, gen.deliverable_type, db)
# result is a GenerationResult dataclass with .content dict + usage stats
gen.structured_content = result.content if hasattr(result, "content") else result
gen.tokens_used = getattr(result, "tokens_used", None)
gen.input_tokens = getattr(result, "input_tokens", None)
gen.output_tokens = getattr(result, "output_tokens", None)
gen.cache_read_tokens = getattr(result, "cache_read_tokens", None)
gen.cache_write_tokens = getattr(result, "cache_write_tokens", None)
# Record the model used so cost re-computation survives pricing changes.
try:
from app.agents.client import OPUS_MODEL # noqa: PLC0415
gen.model = OPUS_MODEL
except Exception: # noqa: BLE001
pass
if getattr(result, "cost_usd_estimate", None) is not None:
from decimal import Decimal # noqa: PLC0415
gen.cost_usd = Decimal(str(result.cost_usd_estimate))
gen.status = "complete"
gen.completed_at = datetime.now(timezone.utc)
db.commit()
except ImportError as exc:
logger.warning("agents.generate not available: %s", exc)
if gen is not None:
gen.status = "failed"
gen.error = f"Agent module not available: {exc}"
gen.completed_at = datetime.now(timezone.utc)
db.commit()
except Exception as exc: # noqa: BLE001
logger.exception("generate_deliverable_task failed for %s: %s", generation_id, exc)
if gen is not None:
gen.status = "failed"
gen.error = str(exc)
gen.completed_at = datetime.now(timezone.utc)
db.commit()
raise
finally:
db.close()