For deployment to optical-dev.oliver.solutions under /hp-content-agent/.
- deploy/deploy.sh idempotent bootstrap: secrets check, free-port
picker, build+up, migrations+seed, Apache Include
install + reload, UFW allow
- deploy/apache/*.conf.template reverse-proxy snippet (API before SPA)
- deploy/README.md runbook for first-time + re-deploy
- docker-compose.prod.yml prod overrides: frontend target=prod (nginx),
uvicorn --workers 2, drops dev volume mounts
- docker-compose.yml pinned project name (required on the shared
server per CLAUDE.md)
- frontend/nginx.conf SPA fallback + asset caching, health endpoint
- frontend/Dockerfile VITE_BASE_PATH build arg for subpath deploys
- frontend/vite.config.ts reads VITE_BASE_PATH
Public: https://optical-dev.oliver.solutions/hp-content-agent/
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
153 lines
5.8 KiB
Python
153 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
import redis as _redis
|
|
from rq import Queue
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RQ queue
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_redis_conn: Optional[_redis.Redis] = None
|
|
_queue: Optional[Queue] = None
|
|
|
|
|
|
def _get_queue() -> Queue:
|
|
global _redis_conn, _queue
|
|
if _queue is None:
|
|
_redis_conn = _redis.from_url(settings.REDIS_URL)
|
|
_queue = Queue("default", connection=_redis_conn)
|
|
return _queue
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Enqueue helpers (called by API handlers)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def enqueue_ingest(document_id: str) -> None:
|
|
"""Enqueue an ingestion task for *document_id*."""
|
|
_get_queue().enqueue(ingest_document_task, document_id)
|
|
|
|
|
|
def enqueue_generate(generation_id: str) -> None:
|
|
"""Enqueue a generation task for *generation_id*."""
|
|
_get_queue().enqueue(generate_deliverable_task, generation_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RQ task: document ingestion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def ingest_document_task(document_id: str) -> None:
|
|
"""RQ task — ingest a single document.
|
|
|
|
Opens its own DB session so it can run inside an RQ worker process.
|
|
Lazy-imports the orchestrator to avoid circular imports at boot time.
|
|
"""
|
|
from app.db.session import SessionLocal # noqa: PLC0415
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Lazy import — orchestrator agent module may not exist at boot
|
|
from app.ingestion.orchestrator import ingest_document # noqa: PLC0415
|
|
ingest_document(UUID(document_id), db)
|
|
except ImportError as exc:
|
|
logger.warning("ingestion orchestrator not available: %s", exc)
|
|
# Mark document as failed so the UI can report the gap
|
|
try:
|
|
from app.db.models import Document # noqa: PLC0415
|
|
doc = db.query(Document).filter(Document.id == UUID(document_id)).first()
|
|
if doc:
|
|
doc.ingestion_status = "failed"
|
|
db.commit()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.exception("ingest_document_task failed for %s: %s", document_id, exc)
|
|
try:
|
|
from app.db.models import Document # noqa: PLC0415
|
|
doc = db.query(Document).filter(Document.id == UUID(document_id)).first()
|
|
if doc:
|
|
doc.ingestion_status = "failed"
|
|
db.commit()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RQ task: deliverable generation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_deliverable_task(generation_id: str) -> None:
|
|
"""RQ task — run the AI agent to produce a deliverable.
|
|
|
|
Sets ``generations.status`` to 'running', calls the agent, then saves
|
|
``structured_content`` and flips status to 'complete' (or 'failed').
|
|
"""
|
|
from app.db.session import SessionLocal # noqa: PLC0415
|
|
from app.db.models import Generation # noqa: PLC0415
|
|
|
|
db = SessionLocal()
|
|
gen: Optional[Generation] = None
|
|
try:
|
|
gen = db.query(Generation).filter(Generation.id == UUID(generation_id)).first()
|
|
if gen is None:
|
|
logger.error("Generation %s not found", generation_id)
|
|
return
|
|
|
|
gen.status = "running"
|
|
gen.started_at = datetime.now(timezone.utc)
|
|
db.commit()
|
|
|
|
# Lazy import — agent module may not exist at boot
|
|
from app.agents.generate import generate_deliverable # noqa: PLC0415
|
|
result = generate_deliverable(gen.brief_id, gen.deliverable_type, db)
|
|
|
|
# result is a GenerationResult dataclass with .content dict + usage stats
|
|
gen.structured_content = result.content if hasattr(result, "content") else result
|
|
gen.tokens_used = getattr(result, "tokens_used", None)
|
|
gen.input_tokens = getattr(result, "input_tokens", None)
|
|
gen.output_tokens = getattr(result, "output_tokens", None)
|
|
gen.cache_read_tokens = getattr(result, "cache_read_tokens", None)
|
|
gen.cache_write_tokens = getattr(result, "cache_write_tokens", None)
|
|
# Record the model used so cost re-computation survives pricing changes.
|
|
try:
|
|
from app.agents.client import OPUS_MODEL # noqa: PLC0415
|
|
gen.model = OPUS_MODEL
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
if getattr(result, "cost_usd_estimate", None) is not None:
|
|
from decimal import Decimal # noqa: PLC0415
|
|
gen.cost_usd = Decimal(str(result.cost_usd_estimate))
|
|
gen.status = "complete"
|
|
gen.completed_at = datetime.now(timezone.utc)
|
|
db.commit()
|
|
|
|
except ImportError as exc:
|
|
logger.warning("agents.generate not available: %s", exc)
|
|
if gen is not None:
|
|
gen.status = "failed"
|
|
gen.error = f"Agent module not available: {exc}"
|
|
gen.completed_at = datetime.now(timezone.utc)
|
|
db.commit()
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.exception("generate_deliverable_task failed for %s: %s", generation_id, exc)
|
|
if gen is not None:
|
|
gen.status = "failed"
|
|
gen.error = str(exc)
|
|
gen.completed_at = datetime.now(timezone.utc)
|
|
db.commit()
|
|
raise
|
|
finally:
|
|
db.close()
|