Barclays-banner-builder/backend/app/workers/tasks.py
Vadym Samoilenko 7e82a535a9 Add conversational brief interface (AC1 chat-style)
Replaces the one-shot form with a Copygen-style chat at
/conversations/:id. Each turn is classified as generate / refine /
clarify by a lightweight LLM intent router; generate and refine intents
enqueue an RQ job that produces a new BannerSet, while clarify returns
an inline reply without touching banners.

New backend:
- Conversation + ConversationMessage models + migration 0005
- intent_router service (chat_structured, 3-intent schema)
- chat_turn RQ task with _persist_banner_set helper extracted from
  _generate_copy_async for reuse
- /api/conversations CRUD + POST /messages endpoint
- JobType.CHAT_TURN added

New frontend:
- ChatBrief page: message bubbles, inline BannerPreview cards with
  checkbox selection and "Open banner editor" CTA (same Medium+Large
  validation rule as VariantsGrid)
- ConversationLanding: /conversations/new creates and redirects
- conversationId added to journey store
- "New Brief" nav now points to /conversations/new
- Default route redirects to /conversations/new

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 21:53:17 +01:00

310 lines
12 KiB
Python

"""RQ worker tasks.
All tasks are synchronous functions (RQ requires sync) that create
their own DB sessions via asyncio.run().
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
import redis
from rq import Queue
from app.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
_redis_conn = redis.from_url(settings.redis_url)
queue = Queue("banners", connection=_redis_conn)
def _run(coro):
return asyncio.run(coro)
async def _persist_banner_set(brief_id, copy_variants, db):
"""Create a BannerSet with paired Medium+Large variants. Returns the new BannerSet."""
import uuid as _uuid
from app.models.banner import BannerSet, BannerVariant
from app.services.icon_matcher import match_icon
from app.services.adobe_dam_client import get_dam_client
from app.config import get_settings as _get_settings
dam = get_dam_client(_get_settings())
banner_set = BannerSet(brief_id=brief_id)
db.add(banner_set)
await db.flush()
for cv in copy_variants:
pair_id = _uuid.uuid4()
icon = await match_icon(cv.icon_keyword, db)
icon_id = icon.id if icon else None
dam_asset = await dam.recommend_for_brief("")
dam_ref = dam_asset["id"] if dam_asset else None
dam_url = dam_asset["url"] if dam_asset else None
db.add(BannerVariant(
banner_set_id=banner_set.id,
aspect_ratio="Medium",
pair_id=pair_id,
theme=cv.theme,
short_title=cv.medium.short_title,
long_body=cv.medium.long_body,
cta=cv.medium.cta,
cta_secondary=None,
icon_id=icon_id,
dam_asset_ref=dam_ref,
dam_asset_url=dam_url,
))
db.add(BannerVariant(
banner_set_id=banner_set.id,
aspect_ratio="Large",
pair_id=pair_id,
theme=cv.theme,
short_title=cv.large.short_title,
long_body=cv.large.long_body,
cta=cv.large.cta,
cta_secondary=cv.large.cta_secondary or None,
icon_id=icon_id,
dam_asset_ref=dam_ref,
dam_asset_url=dam_url,
))
return banner_set
def generate_copy(job_id: str, brief_id: str) -> None:
"""RQ task: generate copy variants for a brief and persist them."""
_run(_generate_copy_async(job_id, brief_id))
async def _generate_copy_async(job_id: str, brief_id: str) -> None:
from uuid import UUID
from sqlalchemy import select
from app.database import AsyncSessionLocal
from app.models.job import Job, JobStatus
from app.models.brief import Brief
from app.services.copy_generation import generate_copy as _gen
async with AsyncSessionLocal() as db:
job_result = await db.execute(select(Job).where(Job.id == UUID(job_id)))
job = job_result.scalar_one()
job.status = JobStatus.RUNNING
await db.commit()
try:
brief_result = await db.execute(select(Brief).where(Brief.id == UUID(brief_id)))
brief = brief_result.scalar_one()
payload = job.payload or {}
n_variants = payload.get("n_variants", 4)
aspect_ratios = payload.get("aspect_ratios", ["Medium", "Large"])
copy_variants = await _gen(brief.text, aspect_ratios, n_variants, db)
banner_set = await _persist_banner_set(brief.id, copy_variants, db)
total_rows = len(banner_set.variants) if hasattr(banner_set, 'variants') else n_variants * 2
job.status = JobStatus.DONE
job.result = {"banner_set_id": str(banner_set.id), "variant_count": n_variants * 2}
job.finished_at = datetime.now(timezone.utc)
await db.commit()
logger.info("generate_copy done: job=%s banner_set=%s", job_id, banner_set.id)
except Exception as exc:
job.status = JobStatus.FAILED
job.error = str(exc)
job.finished_at = datetime.now(timezone.utc)
await db.commit()
logger.exception("generate_copy failed: job=%s", job_id)
raise
def chat_turn(job_id: str, conversation_id: str) -> None:
"""RQ task: process one chat turn (generate or refine intent)."""
_run(_chat_turn_async(job_id, conversation_id))
async def _chat_turn_async(job_id: str, conversation_id: str) -> None:
from uuid import UUID
from datetime import datetime, timezone as _tz
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.database import AsyncSessionLocal
from app.models.job import Job, JobStatus
from app.models.conversation import Conversation, ConversationMessage
from app.models.brief import Brief
from app.models.banner import BannerSet, BannerVariant
from app.services.copy_generation import generate_copy as _gen, refine_variant_copy
from app.services.intent_router import classify_turn
async with AsyncSessionLocal() as db:
job_result = await db.execute(select(Job).where(Job.id == UUID(job_id)))
job = job_result.scalar_one()
job.status = JobStatus.RUNNING
await db.commit()
try:
conv_result = await db.execute(
select(Conversation)
.where(Conversation.id == UUID(conversation_id))
.options(selectinload(Conversation.messages))
)
conversation = conv_result.scalar_one()
brief_result = await db.execute(select(Brief).where(Brief.id == conversation.brief_id))
brief = brief_result.scalar_one()
payload = job.payload or {}
latest_text = payload.get("message_text", "")
message_id = UUID(payload["message_id"])
history = [
{"role": m.role, "content": m.content}
for m in conversation.messages
if str(m.id) != payload["message_id"] and m.role in ("user", "assistant")
]
intent = await classify_turn(brief.text, history, latest_text)
if intent.intent == "generate":
effective_brief = intent.refined_brief or latest_text or brief.text
copy_variants = await _gen(effective_brief, ["Medium", "Large"], intent.n_variants, db)
banner_set = await _persist_banner_set(brief.id, copy_variants, db)
await db.flush()
assistant_msg = ConversationMessage(
conversation_id=conversation.id,
role="assistant",
kind="generation",
content=f"Generated {intent.n_variants} variants.",
banner_set_id=banner_set.id,
job_id=UUID(job_id),
)
db.add(assistant_msg)
elif intent.intent == "refine":
latest_set_result = await db.execute(
select(BannerSet)
.where(BannerSet.brief_id == brief.id)
.order_by(BannerSet.created_at.desc())
.limit(1)
)
latest_set = latest_set_result.scalar_one_or_none()
if latest_set:
variants_result = await db.execute(
select(BannerVariant).where(BannerVariant.banner_set_id == latest_set.id)
)
existing_variants = variants_result.scalars().all()
new_banner_set = BannerSet(brief_id=brief.id)
db.add(new_banner_set)
await db.flush()
feedback = intent.feedback or latest_text
for v in existing_variants:
from app.services.copy_generation import BannerCopy
current_copy = BannerCopy(
short_title=v.short_title,
long_body=v.long_body,
cta=v.cta,
cta_secondary=v.cta_secondary or "",
)
refined = await refine_variant_copy(v.aspect_ratio, current_copy, feedback, db)
db.add(BannerVariant(
banner_set_id=new_banner_set.id,
aspect_ratio=v.aspect_ratio,
pair_id=v.pair_id,
theme=v.theme,
short_title=refined.short_title,
long_body=refined.long_body,
cta=refined.cta,
cta_secondary=refined.cta_secondary or None,
icon_id=v.icon_id,
dam_asset_ref=v.dam_asset_ref,
dam_asset_url=v.dam_asset_url,
))
assistant_msg = ConversationMessage(
conversation_id=conversation.id,
role="assistant",
kind="refinement",
content="Here's the refined copy based on your feedback.",
banner_set_id=new_banner_set.id,
job_id=UUID(job_id),
)
db.add(assistant_msg)
else:
copy_variants = await _gen(brief.text, ["Medium", "Large"], 4, db)
banner_set = await _persist_banner_set(brief.id, copy_variants, db)
await db.flush()
assistant_msg = ConversationMessage(
conversation_id=conversation.id,
role="assistant",
kind="generation",
content="No previous variants found — here's a fresh set.",
banner_set_id=banner_set.id,
job_id=UUID(job_id),
)
db.add(assistant_msg)
user_msg_result = await db.execute(
select(ConversationMessage).where(ConversationMessage.id == message_id)
)
user_msg = user_msg_result.scalar_one_or_none()
if user_msg:
user_msg.job_id = UUID(job_id)
conversation.updated_at = datetime.now(_tz.utc)
job.status = JobStatus.DONE
job.result = {"message_id": str(assistant_msg.id), "conversation_id": conversation_id}
job.finished_at = datetime.now(_tz.utc)
await db.commit()
logger.info("chat_turn done: job=%s conv=%s", job_id, conversation_id)
except Exception as exc:
job.status = JobStatus.FAILED
job.error = str(exc)
job.finished_at = datetime.now(_tz.utc)
await db.commit()
logger.exception("chat_turn failed: job=%s", job_id)
raise
def render_pdf(job_id: str, banner_set_id: str) -> None:
"""RQ task: render PDF contact sheet for a banner set."""
_run(_render_pdf_async(job_id, banner_set_id))
async def _render_pdf_async(job_id: str, banner_set_id: str) -> None:
from uuid import UUID
from sqlalchemy import select
from app.database import AsyncSessionLocal
from app.models.job import Job, JobStatus
from app.services.exporter import render_contact_sheet_pdf
async with AsyncSessionLocal() as db:
job_result = await db.execute(select(Job).where(Job.id == UUID(job_id)))
job = job_result.scalar_one()
job.status = JobStatus.RUNNING
await db.commit()
try:
variant_ids = (job.payload or {}).get("variant_ids") or None
pdf_path = await render_contact_sheet_pdf(UUID(banner_set_id), db, variant_ids=variant_ids)
job.status = JobStatus.DONE
job.result = {"pdf_path": str(pdf_path)}
job.finished_at = datetime.now(timezone.utc)
await db.commit()
except Exception as exc:
job.status = JobStatus.FAILED
job.error = str(exc)
job.finished_at = datetime.now(timezone.utc)
await db.commit()
logger.exception("render_pdf failed: job=%s", job_id)
raise