ppt-tool/backend/workers/presentation_worker.py
Vadym Samoilenko 69a8829750 Phase 3: Bug fixes, feature enhancements, and polish
P0 Critical: presentation isolation (client scoping), storage super_admin fix,
template selection in worker, IMAGE_PROVIDERS list fix.

P1 High: template layout management UI (delete/filter/bulk), slide-based parsing
mode, LLM model listing & connection test, settings persistence to DB (Fernet
encryption), logout button.

P2 Polish: storage improvements (master deck files, per-client breakdown, bulk
delete, hard purge, client selector), image generation error visibility
(__image_error__ badge), hamster wheel loading animation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 12:58:52 +00:00

250 lines
10 KiB
Python

"""ARQ worker task: generate a presentation end-to-end."""
import asyncio
import math
import random
import traceback
import uuid
from datetime import datetime
from typing import List
import dirtyjson
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from models.sql.presentation import PresentationModel
from models.presentation_outline_model import PresentationOutlineModel, SlideOutlineModel
from models.presentation_structure_model import PresentationStructureModel
from models.sql.slide import SlideModel
from models.sql.job import JobModel
from services.brand_enforcement_service import BrandEnforcementService
from services.content_intelligence_service import ContentIntelligenceService
from services.database import async_session_maker
from services.image_generation_service import ImageGenerationService
from services.redis_service import publish_job_progress
from services.slide_mapping_engine import SlideMappingEngine
from utils.asset_directory_utils import get_images_directory
from utils.export_utils import export_presentation
from utils.get_layout_by_name import get_layout_by_name
from utils.llm_calls.generate_presentation_outlines import generate_ppt_outline
from utils.llm_calls.generate_presentation_structure import generate_presentation_structure
from utils.llm_calls.generate_slide_content import get_slide_content_from_type_and_outline
from utils.ppt_utils import get_presentation_title_from_outlines
from utils.process_slides import process_slide_and_fetch_assets
async def generate_presentation_task(ctx: dict, job_id: str) -> None:
"""ARQ task: full presentation generation pipeline."""
job_uuid = uuid.UUID(job_id)
async with async_session_maker() as session:
job = await session.get(JobModel, job_uuid)
if not job:
return
try:
job.status = "processing"
job.started_at = datetime.utcnow()
job.progress = 0
job.progress_message = "Starting generation"
await session.commit()
await _publish(job_uuid, 0, "Starting generation")
# Load the stored request data from the presentation record
presentation = await session.get(PresentationModel, job.presentation_id)
if not presentation:
raise ValueError("Presentation record not found")
# Extract request parameters from the stored presentation
content = presentation.content or ""
n_slides = presentation.n_slides or 10
language = presentation.language or "en"
tone = presentation.tone or "professional"
verbosity = presentation.verbosity or "standard"
instructions = presentation.instructions
template = getattr(presentation, "template_name", None) or "general"
include_title_slide = getattr(presentation, "include_title_slide", True)
# --- Step 1: Brand context ---
brand_context = ""
if job.client_id:
brand_svc = BrandEnforcementService()
brand_context = await brand_svc.get_brand_context_for_llm(
job.client_id, session
)
await _update_job(session, job, 5, "Analyzing content")
# --- Step 2: Content intelligence (if raw content provided) ---
content_summary = None
if content and len(content) > 100:
ci_service = ContentIntelligenceService()
classified = await ci_service.classify(content)
content_summary = classified.summary
await _update_job(session, job, 10, "Generating outlines")
# --- Step 3: Generate outlines ---
presentation_outlines_text = ""
async for chunk in generate_ppt_outline(
content,
n_slides,
language,
None, # additional_context
tone,
verbosity,
instructions,
include_title_slide,
False, # web_search
brand_context=brand_context,
content_summary=content_summary,
):
if isinstance(chunk, HTTPException):
raise chunk
presentation_outlines_text += chunk
try:
outlines_json = dict(dirtyjson.loads(presentation_outlines_text))
except Exception:
raise ValueError("Failed to parse generated outlines")
presentation_outlines = PresentationOutlineModel(**outlines_json)
total_outlines = n_slides
await _update_job(session, job, 25, "Selecting layouts")
# --- Step 4: Layout selection ---
layout_model = await get_layout_by_name(template)
total_slide_layouts = len(layout_model.slides)
if layout_model.ordered:
presentation_structure = layout_model.to_presentation_structure()
else:
presentation_structure = await generate_presentation_structure(
presentation_outlines, layout_model, instructions
)
presentation_structure.slides = presentation_structure.slides[:total_outlines]
for index in range(total_outlines):
random_slide_index = random.randint(0, total_slide_layouts - 1)
if index >= len(presentation_structure.slides):
presentation_structure.slides.append(random_slide_index)
elif presentation_structure.slides[index] >= total_slide_layouts:
presentation_structure.slides[index] = random_slide_index
# Update presentation model with outlines & structure
presentation.title = get_presentation_title_from_outlines(presentation_outlines)
presentation.outlines = presentation_outlines.model_dump()
presentation.layout = layout_model.model_dump()
presentation.structure = presentation_structure.model_dump()
await session.commit()
await _update_job(session, job, 35, "Generating slides")
# --- Step 5: Generate slide content ---
image_generation_service = ImageGenerationService(get_images_directory())
async_assets_generation_tasks = []
slides: List[SlideModel] = []
slide_layout_indices = presentation_structure.slides
slide_layouts = [layout_model.slides[idx] for idx in slide_layout_indices]
batch_size = 10
for start in range(0, len(slide_layouts), batch_size):
end = min(start + batch_size, len(slide_layouts))
content_tasks = [
get_slide_content_from_type_and_outline(
slide_layouts[i],
presentation_outlines.slides[i],
language,
tone,
verbosity,
instructions,
brand_context=brand_context,
)
for i in range(start, end)
]
batch_contents = await asyncio.gather(*content_tasks)
batch_slides = []
for offset, slide_content in enumerate(batch_contents):
i = start + offset
slide = SlideModel(
presentation=job.presentation_id,
layout_group=layout_model.name,
layout=slide_layouts[i].id,
index=i,
speaker_note=slide_content.get("__speaker_note__"),
content=slide_content,
)
slides.append(slide)
batch_slides.append(slide)
asset_tasks = [
process_slide_and_fetch_assets(image_generation_service, slide)
for slide in batch_slides
]
async_assets_generation_tasks.extend(asset_tasks)
pct = 35 + int((end / len(slide_layouts)) * 40)
await _update_job(session, job, pct, f"Generating slide {end}/{len(slide_layouts)}")
await _update_job(session, job, 80, "Fetching assets")
# --- Step 6: Fetch assets ---
generated_assets_list = await asyncio.gather(*async_assets_generation_tasks)
generated_assets = []
for assets_list in generated_assets_list:
generated_assets.extend(assets_list)
await _update_job(session, job, 90, "Saving presentation")
# --- Step 7: Save ---
session.add(presentation)
session.add_all(slides)
session.add_all(generated_assets)
await session.commit()
await _update_job(session, job, 95, "Exporting PPTX")
# --- Step 8: Export ---
await export_presentation(
job.presentation_id,
presentation.title or str(uuid.uuid4()),
"pptx",
)
# --- Done ---
job.status = "completed"
job.progress = 100
job.progress_message = "Generation complete"
job.completed_at = datetime.utcnow()
await session.commit()
await _publish(job_uuid, 100, "Generation complete", "completed")
except Exception as e:
traceback.print_exc()
job.status = "failed"
job.error_message = str(e)[:500]
job.progress_message = "Generation failed"
job.completed_at = datetime.utcnow()
await session.commit()
await _publish(job_uuid, job.progress, "Generation failed", "failed")
async def _update_job(
session: AsyncSession, job: JobModel, progress: int, message: str
) -> None:
job.progress = progress
job.progress_message = message
await session.commit()
await publish_job_progress(job.id, progress, message)
async def _publish(
job_id: uuid.UUID, progress: int, message: str, status: str = "processing"
) -> None:
try:
await publish_job_progress(job_id, progress, message, status)
except Exception:
pass # Redis unavailable is not fatal