P0 Critical: presentation isolation (client scoping), storage super_admin fix, template selection in worker, IMAGE_PROVIDERS list fix. P1 High: template layout management UI (delete/filter/bulk), slide-based parsing mode, LLM model listing & connection test, settings persistence to DB (Fernet encryption), logout button. P2 Polish: storage improvements (master deck files, per-client breakdown, bulk delete, hard purge, client selector), image generation error visibility (__image_error__ badge), hamster wheel loading animation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
250 lines
10 KiB
Python
250 lines
10 KiB
Python
"""ARQ worker task: generate a presentation end-to-end."""
|
|
import asyncio
|
|
import math
|
|
import random
|
|
import traceback
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import List
|
|
|
|
import dirtyjson
|
|
from fastapi import HTTPException
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from models.sql.presentation import PresentationModel
|
|
from models.presentation_outline_model import PresentationOutlineModel, SlideOutlineModel
|
|
from models.presentation_structure_model import PresentationStructureModel
|
|
from models.sql.slide import SlideModel
|
|
from models.sql.job import JobModel
|
|
from services.brand_enforcement_service import BrandEnforcementService
|
|
from services.content_intelligence_service import ContentIntelligenceService
|
|
from services.database import async_session_maker
|
|
from services.image_generation_service import ImageGenerationService
|
|
from services.redis_service import publish_job_progress
|
|
from services.slide_mapping_engine import SlideMappingEngine
|
|
from utils.asset_directory_utils import get_images_directory
|
|
from utils.export_utils import export_presentation
|
|
from utils.get_layout_by_name import get_layout_by_name
|
|
from utils.llm_calls.generate_presentation_outlines import generate_ppt_outline
|
|
from utils.llm_calls.generate_presentation_structure import generate_presentation_structure
|
|
from utils.llm_calls.generate_slide_content import get_slide_content_from_type_and_outline
|
|
from utils.ppt_utils import get_presentation_title_from_outlines
|
|
from utils.process_slides import process_slide_and_fetch_assets
|
|
|
|
|
|
async def generate_presentation_task(ctx: dict, job_id: str) -> None:
|
|
"""ARQ task: full presentation generation pipeline."""
|
|
job_uuid = uuid.UUID(job_id)
|
|
|
|
async with async_session_maker() as session:
|
|
job = await session.get(JobModel, job_uuid)
|
|
if not job:
|
|
return
|
|
|
|
try:
|
|
job.status = "processing"
|
|
job.started_at = datetime.utcnow()
|
|
job.progress = 0
|
|
job.progress_message = "Starting generation"
|
|
await session.commit()
|
|
await _publish(job_uuid, 0, "Starting generation")
|
|
|
|
# Load the stored request data from the presentation record
|
|
presentation = await session.get(PresentationModel, job.presentation_id)
|
|
if not presentation:
|
|
raise ValueError("Presentation record not found")
|
|
|
|
# Extract request parameters from the stored presentation
|
|
content = presentation.content or ""
|
|
n_slides = presentation.n_slides or 10
|
|
language = presentation.language or "en"
|
|
tone = presentation.tone or "professional"
|
|
verbosity = presentation.verbosity or "standard"
|
|
instructions = presentation.instructions
|
|
template = getattr(presentation, "template_name", None) or "general"
|
|
include_title_slide = getattr(presentation, "include_title_slide", True)
|
|
|
|
# --- Step 1: Brand context ---
|
|
brand_context = ""
|
|
if job.client_id:
|
|
brand_svc = BrandEnforcementService()
|
|
brand_context = await brand_svc.get_brand_context_for_llm(
|
|
job.client_id, session
|
|
)
|
|
|
|
await _update_job(session, job, 5, "Analyzing content")
|
|
|
|
# --- Step 2: Content intelligence (if raw content provided) ---
|
|
content_summary = None
|
|
if content and len(content) > 100:
|
|
ci_service = ContentIntelligenceService()
|
|
classified = await ci_service.classify(content)
|
|
content_summary = classified.summary
|
|
|
|
await _update_job(session, job, 10, "Generating outlines")
|
|
|
|
# --- Step 3: Generate outlines ---
|
|
presentation_outlines_text = ""
|
|
async for chunk in generate_ppt_outline(
|
|
content,
|
|
n_slides,
|
|
language,
|
|
None, # additional_context
|
|
tone,
|
|
verbosity,
|
|
instructions,
|
|
include_title_slide,
|
|
False, # web_search
|
|
brand_context=brand_context,
|
|
content_summary=content_summary,
|
|
):
|
|
if isinstance(chunk, HTTPException):
|
|
raise chunk
|
|
presentation_outlines_text += chunk
|
|
|
|
try:
|
|
outlines_json = dict(dirtyjson.loads(presentation_outlines_text))
|
|
except Exception:
|
|
raise ValueError("Failed to parse generated outlines")
|
|
|
|
presentation_outlines = PresentationOutlineModel(**outlines_json)
|
|
total_outlines = n_slides
|
|
|
|
await _update_job(session, job, 25, "Selecting layouts")
|
|
|
|
# --- Step 4: Layout selection ---
|
|
layout_model = await get_layout_by_name(template)
|
|
total_slide_layouts = len(layout_model.slides)
|
|
|
|
if layout_model.ordered:
|
|
presentation_structure = layout_model.to_presentation_structure()
|
|
else:
|
|
presentation_structure = await generate_presentation_structure(
|
|
presentation_outlines, layout_model, instructions
|
|
)
|
|
|
|
presentation_structure.slides = presentation_structure.slides[:total_outlines]
|
|
for index in range(total_outlines):
|
|
random_slide_index = random.randint(0, total_slide_layouts - 1)
|
|
if index >= len(presentation_structure.slides):
|
|
presentation_structure.slides.append(random_slide_index)
|
|
elif presentation_structure.slides[index] >= total_slide_layouts:
|
|
presentation_structure.slides[index] = random_slide_index
|
|
|
|
# Update presentation model with outlines & structure
|
|
presentation.title = get_presentation_title_from_outlines(presentation_outlines)
|
|
presentation.outlines = presentation_outlines.model_dump()
|
|
presentation.layout = layout_model.model_dump()
|
|
presentation.structure = presentation_structure.model_dump()
|
|
await session.commit()
|
|
|
|
await _update_job(session, job, 35, "Generating slides")
|
|
|
|
# --- Step 5: Generate slide content ---
|
|
image_generation_service = ImageGenerationService(get_images_directory())
|
|
async_assets_generation_tasks = []
|
|
slides: List[SlideModel] = []
|
|
|
|
slide_layout_indices = presentation_structure.slides
|
|
slide_layouts = [layout_model.slides[idx] for idx in slide_layout_indices]
|
|
|
|
batch_size = 10
|
|
for start in range(0, len(slide_layouts), batch_size):
|
|
end = min(start + batch_size, len(slide_layouts))
|
|
|
|
content_tasks = [
|
|
get_slide_content_from_type_and_outline(
|
|
slide_layouts[i],
|
|
presentation_outlines.slides[i],
|
|
language,
|
|
tone,
|
|
verbosity,
|
|
instructions,
|
|
brand_context=brand_context,
|
|
)
|
|
for i in range(start, end)
|
|
]
|
|
batch_contents = await asyncio.gather(*content_tasks)
|
|
|
|
batch_slides = []
|
|
for offset, slide_content in enumerate(batch_contents):
|
|
i = start + offset
|
|
slide = SlideModel(
|
|
presentation=job.presentation_id,
|
|
layout_group=layout_model.name,
|
|
layout=slide_layouts[i].id,
|
|
index=i,
|
|
speaker_note=slide_content.get("__speaker_note__"),
|
|
content=slide_content,
|
|
)
|
|
slides.append(slide)
|
|
batch_slides.append(slide)
|
|
|
|
asset_tasks = [
|
|
process_slide_and_fetch_assets(image_generation_service, slide)
|
|
for slide in batch_slides
|
|
]
|
|
async_assets_generation_tasks.extend(asset_tasks)
|
|
|
|
pct = 35 + int((end / len(slide_layouts)) * 40)
|
|
await _update_job(session, job, pct, f"Generating slide {end}/{len(slide_layouts)}")
|
|
|
|
await _update_job(session, job, 80, "Fetching assets")
|
|
|
|
# --- Step 6: Fetch assets ---
|
|
generated_assets_list = await asyncio.gather(*async_assets_generation_tasks)
|
|
generated_assets = []
|
|
for assets_list in generated_assets_list:
|
|
generated_assets.extend(assets_list)
|
|
|
|
await _update_job(session, job, 90, "Saving presentation")
|
|
|
|
# --- Step 7: Save ---
|
|
session.add(presentation)
|
|
session.add_all(slides)
|
|
session.add_all(generated_assets)
|
|
await session.commit()
|
|
|
|
await _update_job(session, job, 95, "Exporting PPTX")
|
|
|
|
# --- Step 8: Export ---
|
|
await export_presentation(
|
|
job.presentation_id,
|
|
presentation.title or str(uuid.uuid4()),
|
|
"pptx",
|
|
)
|
|
|
|
# --- Done ---
|
|
job.status = "completed"
|
|
job.progress = 100
|
|
job.progress_message = "Generation complete"
|
|
job.completed_at = datetime.utcnow()
|
|
await session.commit()
|
|
await _publish(job_uuid, 100, "Generation complete", "completed")
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
job.status = "failed"
|
|
job.error_message = str(e)[:500]
|
|
job.progress_message = "Generation failed"
|
|
job.completed_at = datetime.utcnow()
|
|
await session.commit()
|
|
await _publish(job_uuid, job.progress, "Generation failed", "failed")
|
|
|
|
|
|
async def _update_job(
|
|
session: AsyncSession, job: JobModel, progress: int, message: str
|
|
) -> None:
|
|
job.progress = progress
|
|
job.progress_message = message
|
|
await session.commit()
|
|
await publish_job_progress(job.id, progress, message)
|
|
|
|
|
|
async def _publish(
|
|
job_id: uuid.UUID, progress: int, message: str, status: str = "processing"
|
|
) -> None:
|
|
try:
|
|
await publish_job_progress(job_id, progress, message, status)
|
|
except Exception:
|
|
pass # Redis unavailable is not fatal
|