ppt-tool/backend/api/v1/ppt/endpoints/presentation.py
Vadym Samoilenko 25b70af9fb Fix 422 errors: clientSlice bare fetch, prepare schemaJSON null, update SlideModel validation
- clientSlice: fetchMasterDecks + fetchClientPresentations use apiFetch (adds /ppt-tool basePath)
- useCustomTemplates: parsedLayoutToCompiled generates schemaJSON from elements instead of null
  (null schemaJSON caused 422 on /prepare because backend SlideLayoutModel.json_schema: dict is required)
- presentation.py: update_presentation uses SlideInput (plain Pydantic model with extra='ignore')
  instead of SlideModel (table=True SQLModel) to avoid strict validation causing 422 on /update

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 18:26:54 +00:00

1132 lines
42 KiB
Python

import asyncio
from datetime import datetime
import json
import math
import os
import random
import traceback
from typing import Annotated, List, Literal, Optional, Tuple
import dirtyjson
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Path, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy import delete, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
from constants.presentation import DEFAULT_TEMPLATES
from services.access_service import get_accessible_client_ids
from enums.webhook_event import WebhookEvent
from models.api_error_model import APIErrorModel
from models.generate_presentation_request import GeneratePresentationRequest
from models.presentation_and_path import PresentationPathAndEditPath
from models.presentation_from_template import EditPresentationRequest
from models.presentation_outline_model import (
PresentationOutlineModel,
SlideOutlineModel,
)
from enums.tone import Tone
from enums.verbosity import Verbosity
from models.pptx_models import PptxPresentationModel
from models.presentation_layout import PresentationLayoutModel
from models.presentation_structure_model import PresentationStructureModel
from models.presentation_with_slides import (
PresentationWithSlides,
)
from models.sql.template import TemplateModel
from services.documents_loader import DocumentsLoader
from services.webhook_service import WebhookService
from utils.get_layout_by_name import get_layout_by_name
from services.image_generation_service import ImageGenerationService
from utils.dict_utils import deep_update
from utils.export_utils import export_presentation
from utils.llm_calls.generate_presentation_outlines import generate_ppt_outline
from utils.llm_calls.summarize_brief import summarize_brief
from models.sql.slide import SlideModel
from models.sse_response import SSECompleteResponse, SSEErrorResponse, SSEResponse
from pydantic import BaseModel, ConfigDict
from services.database import get_async_session, async_session_maker
from services.temp_file_service import TEMP_FILE_SERVICE
from services.concurrent_service import CONCURRENT_SERVICE
from api.middlewares.rate_limit_middleware import limiter
from models.sql.presentation import PresentationModel
from models.sql.user import UserModel
from utils.auth_dependencies import get_current_user
from services.pptx_presentation_creator import PptxPresentationCreator
from models.sql.async_presentation_generation_status import (
AsyncPresentationGenerationTaskModel,
)
from utils.asset_directory_utils import get_exports_directory, get_images_directory
from utils.llm_calls.generate_presentation_structure import (
generate_presentation_structure,
)
from utils.llm_calls.generate_slide_content import (
get_slide_content_from_type_and_outline,
)
from utils.ppt_utils import (
get_presentation_title_from_outlines,
select_toc_or_list_slide_layout_index,
)
from utils.process_slides import (
process_slide_add_placeholder_assets,
process_slide_and_fetch_assets,
)
import uuid
class SlideInput(BaseModel):
"""Plain Pydantic model for slide data in request body (avoids SQLModel table=True validation quirks)."""
model_config = ConfigDict(extra="ignore")
id: Optional[uuid.UUID] = None
presentation: uuid.UUID
layout_group: str
layout: str
index: int
content: dict
html_content: Optional[str] = None
speaker_note: Optional[str] = None
properties: Optional[dict] = None
deleted_at: Optional[datetime] = None
PRESENTATION_ROUTER = APIRouter(prefix="/presentation", tags=["Presentation"])
@PRESENTATION_ROUTER.get("/all", response_model=List[PresentationWithSlides])
async def get_all_presentations(
client_id: Optional[uuid.UUID] = Query(None),
current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
# Build filters: always exclude soft-deleted
filters = [PresentationModel.deleted_at.is_(None)]
if client_id:
# Explicit client_id filter — verify access
accessible = await get_accessible_client_ids(current_user, sql_session)
if client_id not in accessible:
raise HTTPException(403, "Access denied to this client")
filters.append(PresentationModel.client_id == client_id)
elif current_user.role == "super_admin":
pass # No restriction
else:
# Non-admin: show only presentations from accessible clients + own presentations
accessible = await get_accessible_client_ids(current_user, sql_session)
if accessible:
filters.append(
(PresentationModel.client_id.in_(accessible))
| (PresentationModel.owner_id == current_user.id)
)
else:
filters.append(PresentationModel.owner_id == current_user.id)
query = (
select(PresentationModel, SlideModel)
.join(
SlideModel,
(SlideModel.presentation == PresentationModel.id) & (SlideModel.index == 0),
)
.where(and_(*filters))
.order_by(PresentationModel.created_at.desc())
)
results = await sql_session.execute(query)
rows = results.all()
presentations_with_slides = [
PresentationWithSlides(
**presentation.model_dump(),
slides=[first_slide],
)
for presentation, first_slide in rows
]
return presentations_with_slides
@PRESENTATION_ROUTER.get("/{id}", response_model=PresentationWithSlides)
async def get_presentation(
id: uuid.UUID,
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
presentation = await sql_session.get(PresentationModel, id)
if not presentation:
raise HTTPException(404, "Presentation not found")
slides = await sql_session.scalars(
select(SlideModel)
.where(SlideModel.presentation == id)
.order_by(SlideModel.index)
)
return PresentationWithSlides(
**presentation.model_dump(),
slides=slides,
)
@PRESENTATION_ROUTER.delete("/{id}", status_code=204)
async def delete_presentation(
id: uuid.UUID,
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
presentation = await sql_session.get(PresentationModel, id)
if not presentation:
raise HTTPException(404, "Presentation not found")
await sql_session.delete(presentation)
await sql_session.commit()
@PRESENTATION_ROUTER.post("/create", response_model=PresentationModel)
@limiter.limit("10/minute")
async def create_presentation(
request: Request,
content: Annotated[str, Body()],
n_slides: Annotated[int, Body()],
language: Annotated[str, Body()],
file_paths: Annotated[Optional[List[str]], Body()] = None,
tone: Annotated[Tone, Body()] = Tone.DEFAULT,
verbosity: Annotated[Verbosity, Body()] = Verbosity.STANDARD,
instructions: Annotated[Optional[str], Body()] = None,
include_table_of_contents: Annotated[bool, Body()] = False,
include_title_slide: Annotated[bool, Body()] = True,
web_search: Annotated[bool, Body()] = False,
current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
if include_table_of_contents and n_slides < 3:
raise HTTPException(
status_code=400,
detail="Number of slides cannot be less than 3 if table of contents is included",
)
presentation_id = uuid.uuid4()
# Resolve user's client_id from team memberships
user_client_id = None
accessible = await get_accessible_client_ids(current_user, sql_session)
if accessible:
user_client_id = accessible[0] # Primary client
presentation = PresentationModel(
id=presentation_id,
content=content,
n_slides=n_slides,
language=language,
file_paths=file_paths,
tone=tone.value,
verbosity=verbosity.value,
instructions=instructions,
include_table_of_contents=include_table_of_contents,
include_title_slide=include_title_slide,
web_search=web_search,
owner_id=current_user.id,
client_id=user_client_id,
)
sql_session.add(presentation)
await sql_session.commit()
return presentation
@PRESENTATION_ROUTER.post("/prepare", response_model=PresentationModel)
async def prepare_presentation(
presentation_id: Annotated[uuid.UUID, Body()],
outlines: Annotated[List[SlideOutlineModel], Body()],
layout: Annotated[PresentationLayoutModel, Body()],
title: Annotated[Optional[str], Body()] = None,
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
if not outlines:
raise HTTPException(status_code=400, detail="Outlines are required")
presentation = await sql_session.get(PresentationModel, presentation_id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
presentation_outline_model = PresentationOutlineModel(slides=outlines)
total_slide_layouts = len(layout.slides)
total_outlines = len(outlines)
if layout.ordered:
presentation_structure = layout.to_presentation_structure()
else:
presentation_structure: PresentationStructureModel = (
await generate_presentation_structure(
presentation_outline=presentation_outline_model,
presentation_layout=layout,
instructions=presentation.instructions,
)
)
presentation_structure.slides = presentation_structure.slides[: len(outlines)]
for index in range(total_outlines):
random_slide_index = random.randint(0, total_slide_layouts - 1)
if index >= total_outlines:
presentation_structure.slides.append(random_slide_index)
continue
if presentation_structure.slides[index] >= total_slide_layouts:
presentation_structure.slides[index] = random_slide_index
if presentation.include_table_of_contents:
n_toc_slides = presentation.n_slides - total_outlines
toc_slide_layout_index = select_toc_or_list_slide_layout_index(layout)
if toc_slide_layout_index != -1:
outline_index = 1 if presentation.include_title_slide else 0
for i in range(n_toc_slides):
outlines_to = outline_index + 10
if total_outlines == outlines_to:
outlines_to -= 1
presentation_structure.slides.insert(
i + 1 if presentation.include_title_slide else i,
toc_slide_layout_index,
)
toc_outline = "Table of Contents\n\n"
for outline in presentation_outline_model.slides[
outline_index:outlines_to
]:
page_number = (
outline_index - i + n_toc_slides + 1
if presentation.include_title_slide
else outline_index - i + n_toc_slides
)
toc_outline += f"Slide page number: {page_number}\n Slide Content: {outline.content[:100]}\n\n"
outline_index += 1
outline_index += 1
presentation_outline_model.slides.insert(
i + 1 if presentation.include_title_slide else i,
SlideOutlineModel(
content=toc_outline,
),
)
sql_session.add(presentation)
presentation.outlines = presentation_outline_model.model_dump(mode="json")
presentation.title = title or presentation.title
presentation.template_name = layout.name
presentation.set_layout(layout)
presentation.set_structure(presentation_structure)
await sql_session.commit()
return presentation
@PRESENTATION_ROUTER.get("/stream/{id}", response_model=PresentationWithSlides)
async def stream_presentation(
id: uuid.UUID,
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
presentation = await sql_session.get(PresentationModel, id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
if not presentation.structure:
raise HTTPException(
status_code=400,
detail="Presentation not prepared for stream",
)
if not presentation.outlines:
raise HTTPException(
status_code=400,
detail="Outlines can not be empty",
)
image_generation_service = ImageGenerationService(get_images_directory())
# Capture data before returning StreamingResponse, because the Depends
# session is closed once this function returns.
pres_id = id
structure = presentation.get_structure()
layout = presentation.get_layout()
outline = presentation.get_presentation_outline()
pres_language = presentation.language
pres_tone = presentation.tone
pres_verbosity = presentation.verbosity
pres_instructions = presentation.instructions
async def inner():
# These tasks will be gathered and awaited after all slides are generated
async_assets_generation_tasks = []
slides: List[SlideModel] = []
yield SSEResponse(
event="response",
data=json.dumps({"type": "chunk", "chunk": '{ "slides": [ '}),
).to_string()
for i, slide_layout_index in enumerate(structure.slides):
slide_layout = layout.slides[slide_layout_index]
try:
slide_content = await get_slide_content_from_type_and_outline(
slide_layout,
outline.slides[i],
pres_language,
pres_tone,
pres_verbosity,
pres_instructions,
)
except HTTPException as e:
yield SSEErrorResponse(detail=e.detail).to_string()
return
slide = SlideModel(
presentation=pres_id,
layout_group=layout.name,
layout=slide_layout.id,
index=i,
speaker_note=slide_content.get("__speaker_note__", ""),
content=slide_content,
)
slides.append(slide)
# This will mutate slide and add placeholder assets
process_slide_add_placeholder_assets(slide)
# This will mutate slide
async_assets_generation_tasks.append(
process_slide_and_fetch_assets(image_generation_service, slide)
)
yield SSEResponse(
event="response",
data=json.dumps({"type": "chunk", "chunk": slide.model_dump_json()}),
).to_string()
yield SSEResponse(
event="response",
data=json.dumps({"type": "chunk", "chunk": " ] }"}),
).to_string()
generated_assets_lists = await asyncio.gather(*async_assets_generation_tasks)
generated_assets = []
for assets_list in generated_assets_lists:
generated_assets.extend(assets_list)
# Use a new session for DB writes — the Depends session is already
# closed by the time the streaming generator executes.
async with async_session_maker() as session:
await session.execute(
delete(SlideModel).where(SlideModel.presentation == pres_id)
)
await session.commit()
pres = await session.get(PresentationModel, pres_id)
session.add(pres)
session.add_all(slides)
session.add_all(generated_assets)
await session.commit()
response = PresentationWithSlides(
**pres.model_dump(),
slides=slides,
)
yield SSECompleteResponse(
key="presentation",
value=response.model_dump(mode="json"),
).to_string()
return StreamingResponse(inner(), media_type="text/event-stream")
@PRESENTATION_ROUTER.patch("/update", response_model=PresentationWithSlides)
async def update_presentation(
id: Annotated[uuid.UUID, Body()],
n_slides: Annotated[Optional[int], Body()] = None,
title: Annotated[Optional[str], Body()] = None,
slides: Annotated[Optional[List[SlideInput]], Body()] = None,
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
presentation = await sql_session.get(PresentationModel, id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
presentation_update_dict = {}
if n_slides:
presentation_update_dict["n_slides"] = n_slides
if title:
presentation_update_dict["title"] = title
if n_slides or title:
presentation.sqlmodel_update(presentation_update_dict)
slide_models: List[SlideModel] = []
if slides:
for s in slides:
slide_models.append(SlideModel(
id=s.id or uuid.uuid4(),
presentation=s.presentation,
layout_group=s.layout_group,
layout=s.layout,
index=s.index,
content=s.content,
html_content=s.html_content,
speaker_note=s.speaker_note,
properties=s.properties,
deleted_at=s.deleted_at,
))
await sql_session.execute(
delete(SlideModel).where(SlideModel.presentation == presentation.id)
)
sql_session.add_all(slide_models)
await sql_session.commit()
return PresentationWithSlides(
**presentation.model_dump(),
slides=slide_models,
)
@PRESENTATION_ROUTER.post("/export/pptx", response_model=str)
async def export_presentation_as_pptx(
pptx_model: Annotated[PptxPresentationModel, Body()],
_current_user: UserModel = Depends(get_current_user),
):
temp_dir = TEMP_FILE_SERVICE.create_temp_dir()
pptx_creator = PptxPresentationCreator(pptx_model, temp_dir)
await pptx_creator.create_ppt()
export_directory = get_exports_directory()
pptx_path = os.path.join(
export_directory, f"{pptx_model.name or uuid.uuid4()}.pptx"
)
pptx_creator.save(pptx_path)
return pptx_path
@PRESENTATION_ROUTER.post("/export", response_model=PresentationPathAndEditPath)
async def export_presentation_as_pptx_or_pdf(
id: Annotated[uuid.UUID, Body(description="Presentation ID to export")],
export_as: Annotated[
Literal["pptx", "pdf"], Body(description="Format to export the presentation as")
] = "pptx",
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
presentation = await sql_session.get(PresentationModel, id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
presentation_and_path = await export_presentation(
id,
presentation.title or str(uuid.uuid4()),
export_as,
)
return PresentationPathAndEditPath(
**presentation_and_path.model_dump(),
edit_path=f"/presentation?id={id}",
)
async def check_if_api_request_is_valid(
request: GeneratePresentationRequest,
sql_session: AsyncSession = Depends(get_async_session),
) -> Tuple[uuid.UUID,]:
presentation_id = uuid.uuid4()
print(f"Presentation ID: {presentation_id}")
# Making sure either content, slides markdown or files is provided
if not (request.content or request.slides_markdown or request.files):
raise HTTPException(
status_code=400,
detail="Either content or slides markdown or files is required to generate presentation",
)
# Making sure number of slides is greater than 0
if request.n_slides <= 0:
raise HTTPException(
status_code=400,
detail="Number of slides must be greater than 0",
)
# Checking if template is valid
if request.template not in DEFAULT_TEMPLATES:
request.template = request.template.lower()
if not request.template.startswith("custom-"):
raise HTTPException(
status_code=400,
detail="Template not found. Please use a valid template.",
)
template_id = request.template.replace("custom-", "")
try:
template = await sql_session.get(TemplateModel, uuid.UUID(template_id))
if not template:
raise Exception()
except Exception:
raise HTTPException(
status_code=400,
detail="Template not found. Please use a valid template.",
)
return (presentation_id,)
async def generate_presentation_handler(
request: GeneratePresentationRequest,
presentation_id: uuid.UUID,
async_status: Optional[AsyncPresentationGenerationTaskModel],
sql_session: AsyncSession = Depends(get_async_session),
):
try:
using_slides_markdown = False
if request.slides_markdown:
using_slides_markdown = True
request.n_slides = len(request.slides_markdown)
if not using_slides_markdown:
additional_context = ""
# Updating async status
if async_status:
async_status.message = "Generating presentation outlines"
async_status.updated_at = datetime.now()
sql_session.add(async_status)
await sql_session.commit()
if request.files:
documents_loader = DocumentsLoader(file_paths=request.files)
await documents_loader.load_documents()
documents = documents_loader.documents
if documents:
additional_context = "\n\n".join(documents)
# Pre-process long content into structured sections to prevent LLM
# "lost middle" problem and enable per-slide section attribution.
full_content = request.content
if additional_context:
full_content = f"{request.content}\n\n{additional_context}"
brief_structure = await summarize_brief(full_content)
# Finding number of slides to generate by considering table of contents
n_slides_to_generate = request.n_slides
if request.include_table_of_contents:
needed_toc_count = math.ceil(
(
(request.n_slides - 1)
if request.include_title_slide
else request.n_slides
)
/ 10
)
n_slides_to_generate -= math.ceil(
(request.n_slides - needed_toc_count) / 10
)
presentation_outlines_text = ""
async for chunk in generate_ppt_outline(
request.content,
n_slides_to_generate,
request.language,
additional_context,
request.tone.value,
request.verbosity.value,
request.instructions,
request.include_title_slide,
request.web_search,
brief_structure=brief_structure,
):
if isinstance(chunk, HTTPException):
raise chunk
presentation_outlines_text += chunk
try:
presentation_outlines_json = dict(
dirtyjson.loads(presentation_outlines_text)
)
except Exception:
traceback.print_exc()
raise HTTPException(
status_code=400,
detail="Failed to generate presentation outlines. Please try again.",
)
presentation_outlines = PresentationOutlineModel(
**presentation_outlines_json
)
total_outlines = n_slides_to_generate
else:
# Setting outlines to slides markdown
presentation_outlines = PresentationOutlineModel(
slides=[
SlideOutlineModel(content=slide)
for slide in request.slides_markdown
]
)
total_outlines = len(request.slides_markdown)
# Updating async status
if async_status:
async_status.message = "Selecting layout for each slide"
async_status.updated_at = datetime.now()
sql_session.add(async_status)
await sql_session.commit()
print("-" * 40)
print(f"Generated {total_outlines} outlines for the presentation")
# Parse Layouts
layout_model = await get_layout_by_name(request.template)
total_slide_layouts = len(layout_model.slides)
# Generate Structure
if layout_model.ordered:
presentation_structure = layout_model.to_presentation_structure()
else:
presentation_structure: PresentationStructureModel = (
await generate_presentation_structure(
presentation_outlines,
layout_model,
request.instructions,
using_slides_markdown,
)
)
presentation_structure.slides = presentation_structure.slides[:total_outlines]
for index in range(total_outlines):
random_slide_index = random.randint(0, total_slide_layouts - 1)
if index >= total_outlines:
presentation_structure.slides.append(random_slide_index)
continue
if presentation_structure.slides[index] >= total_slide_layouts:
presentation_structure.slides[index] = random_slide_index
# Injecting table of contents to the presentation structure and outlines
if request.include_table_of_contents and not using_slides_markdown:
n_toc_slides = request.n_slides - total_outlines
toc_slide_layout_index = select_toc_or_list_slide_layout_index(layout_model)
if toc_slide_layout_index != -1:
outline_index = 1 if request.include_title_slide else 0
for i in range(n_toc_slides):
outlines_to = outline_index + 10
if total_outlines == outlines_to:
outlines_to -= 1
presentation_structure.slides.insert(
i + 1 if request.include_title_slide else i,
toc_slide_layout_index,
)
toc_outline = "Table of Contents\n\n"
for outline in presentation_outlines.slides[
outline_index:outlines_to
]:
page_number = (
outline_index - i + n_toc_slides + 1
if request.include_title_slide
else outline_index - i + n_toc_slides
)
toc_outline += f"Slide page number: {page_number}\n Slide Content: {outline.content[:100]}\n\n"
outline_index += 1
outline_index += 1
presentation_outlines.slides.insert(
i + 1 if request.include_title_slide else i,
SlideOutlineModel(
content=toc_outline,
),
)
# Create PresentationModel
presentation = PresentationModel(
id=presentation_id,
content=request.content,
n_slides=request.n_slides,
language=request.language,
title=get_presentation_title_from_outlines(presentation_outlines),
outlines=presentation_outlines.model_dump(),
layout=layout_model.model_dump(),
structure=presentation_structure.model_dump(),
tone=request.tone.value,
verbosity=request.verbosity.value,
instructions=request.instructions,
)
# Updating async status
if async_status:
async_status.message = "Generating slides"
async_status.updated_at = datetime.now()
sql_session.add(async_status)
await sql_session.commit()
image_generation_service = ImageGenerationService(get_images_directory())
async_assets_generation_tasks = []
# 7. Generate slide content concurrently (batched), then build slides and fetch assets
slides: List[SlideModel] = []
slide_layout_indices = presentation_structure.slides
slide_layouts = [layout_model.slides[idx] for idx in slide_layout_indices]
# Build a title lookup from already-generated slides for narrative continuity
generated_titles: List[Optional[str]] = []
# Schedule slide content generation and asset fetching in batches of 10
batch_size = 10
for start in range(0, len(slide_layouts), batch_size):
end = min(start + batch_size, len(slide_layouts))
print(f"Generating slides from {start} to {end}")
# Generate contents for this batch concurrently
content_tasks = []
for i in range(start, end):
outline = presentation_outlines.slides[i]
# Narrative continuity: pass the title of the preceding slide
prev_title = generated_titles[i - 1] if i > 0 and i - 1 < len(generated_titles) else None
# Section attribution: pass targeted brief excerpt when available
source_text = None
if brief_structure is not None and outline.source_section_idx is not None:
source_text = brief_structure.get_section_text(outline.source_section_idx)
content_tasks.append(
get_slide_content_from_type_and_outline(
slide_layouts[i],
outline,
request.language,
request.tone.value,
request.verbosity.value,
request.instructions,
prev_slide_title=prev_title,
source_section_text=source_text,
)
)
batch_contents: List[dict] = await asyncio.gather(*content_tasks)
# Record titles for next batch's narrative continuity
for content_dict in batch_contents:
generated_titles.append(content_dict.get("title"))
# Build slides for this batch
batch_slides: List[SlideModel] = []
for offset, slide_content in enumerate(batch_contents):
i = start + offset
slide_layout = slide_layouts[i]
slide = SlideModel(
presentation=presentation_id,
layout_group=layout_model.name,
layout=slide_layout.id,
index=i,
speaker_note=slide_content.get("__speaker_note__"),
content=slide_content,
)
slides.append(slide)
batch_slides.append(slide)
# Start asset fetch tasks for just-generated slides so they run while next batch is processed
asset_tasks = [
process_slide_and_fetch_assets(image_generation_service, slide)
for slide in batch_slides
]
async_assets_generation_tasks.extend(asset_tasks)
if async_status:
async_status.message = "Fetching assets for slides"
async_status.updated_at = datetime.now()
sql_session.add(async_status)
await sql_session.commit()
# Run all asset tasks concurrently while batches may still be generating content
generated_assets_list = await asyncio.gather(*async_assets_generation_tasks)
generated_assets = []
for assets_list in generated_assets_list:
generated_assets.extend(assets_list)
# 8. Save PresentationModel and Slides
sql_session.add(presentation)
sql_session.add_all(slides)
sql_session.add_all(generated_assets)
await sql_session.commit()
if async_status:
async_status.message = "Exporting presentation"
async_status.updated_at = datetime.now()
sql_session.add(async_status)
# 9. Export
presentation_and_path = await export_presentation(
presentation_id, presentation.title or str(uuid.uuid4()), request.export_as
)
response = PresentationPathAndEditPath(
**presentation_and_path.model_dump(),
edit_path=f"/presentation?id={presentation_id}",
)
if async_status:
async_status.message = "Presentation generation completed"
async_status.status = "completed"
async_status.data = response.model_dump(mode="json")
async_status.updated_at = datetime.now()
sql_session.add(async_status)
await sql_session.commit()
# Triggering webhook on success
CONCURRENT_SERVICE.run_task(
None,
WebhookService.send_webhook,
WebhookEvent.PRESENTATION_GENERATION_COMPLETED,
response.model_dump(mode="json"),
)
return response
except Exception as e:
if not isinstance(e, HTTPException):
traceback.print_exc()
e = HTTPException(status_code=500, detail="Presentation generation failed")
api_error_model = APIErrorModel.from_exception(e)
# Triggering webhook on failure
CONCURRENT_SERVICE.run_task(
None,
WebhookService.send_webhook,
WebhookEvent.PRESENTATION_GENERATION_FAILED,
api_error_model.model_dump(mode="json"),
)
if async_status:
async_status.status = "error"
async_status.message = "Presentation generation failed"
async_status.updated_at = datetime.now()
async_status.error = api_error_model.model_dump(mode="json")
sql_session.add(async_status)
await sql_session.commit()
else:
raise e
@PRESENTATION_ROUTER.post("/generate", response_model=PresentationPathAndEditPath)
@limiter.limit("10/minute")
async def generate_presentation_sync(
http_request: Request,
request: GeneratePresentationRequest,
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
try:
(presentation_id,) = await check_if_api_request_is_valid(request, sql_session)
return await generate_presentation_handler(
request, presentation_id, None, sql_session
)
except Exception:
traceback.print_exc()
raise HTTPException(status_code=500, detail="Presentation generation failed")
@PRESENTATION_ROUTER.post(
"/generate/async", response_model=AsyncPresentationGenerationTaskModel
)
@limiter.limit("10/minute")
async def generate_presentation_async(
http_request: Request,
request: GeneratePresentationRequest,
background_tasks: BackgroundTasks,
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
try:
(presentation_id,) = await check_if_api_request_is_valid(request, sql_session)
# Resolve user's client_id from team memberships
user_client_id = None
if request.client_id:
accessible = await get_accessible_client_ids(_current_user, sql_session)
if request.client_id not in accessible and _current_user.role != "super_admin":
raise HTTPException(403, "Access denied to this client")
user_client_id = request.client_id
else:
accessible = await get_accessible_client_ids(_current_user, sql_session)
if accessible:
user_client_id = accessible[0]
# Create a lightweight presentation record so the worker can load it
presentation = PresentationModel(
id=presentation_id,
content=request.content,
n_slides=request.n_slides,
language=request.language,
tone=request.tone.value,
verbosity=request.verbosity.value,
instructions=request.instructions,
include_table_of_contents=request.include_table_of_contents,
include_title_slide=request.include_title_slide,
web_search=request.web_search,
template_name=request.template,
owner_id=_current_user.id,
client_id=user_client_id,
)
sql_session.add(presentation)
async_status = AsyncPresentationGenerationTaskModel(
status="pending",
message="Queued for generation",
data=None,
)
sql_session.add(async_status)
await sql_session.commit()
# Try ARQ job queue first; fall back to BackgroundTasks
job_enqueued = False
try:
from models.sql.job import JobModel
from services.redis_service import enqueue_job
job = JobModel(
user_id=_current_user.id,
client_id=user_client_id,
presentation_id=presentation_id,
job_type="generate_presentation",
status="queued",
progress=0,
progress_message="Queued for generation",
)
sql_session.add(job)
await sql_session.commit()
await enqueue_job("generate_presentation_task", job_id=str(job.id))
job_enqueued = True
except Exception:
# Redis/ARQ unavailable — fall back to in-process background task
pass
if not job_enqueued:
raise HTTPException(
status_code=503,
detail="Generation service is currently unavailable. Queue is deeply saturated or Redis is down."
)
return async_status
except Exception as e:
if not isinstance(e, HTTPException):
print(e)
e = HTTPException(status_code=500, detail="Presentation generation failed")
raise e
@PRESENTATION_ROUTER.get(
"/status/{id}", response_model=AsyncPresentationGenerationTaskModel
)
async def check_async_presentation_generation_status(
id: str = Path(description="ID of the presentation generation task"),
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
status = await sql_session.get(AsyncPresentationGenerationTaskModel, id)
if not status:
raise HTTPException(
status_code=404, detail="No presentation generation task found"
)
return status
@PRESENTATION_ROUTER.post("/edit", response_model=PresentationPathAndEditPath)
async def edit_presentation_with_new_content(
data: Annotated[EditPresentationRequest, Body()],
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
presentation = await sql_session.get(PresentationModel, data.presentation_id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
slides = await sql_session.scalars(
select(SlideModel).where(SlideModel.presentation == data.presentation_id)
)
new_slides = []
slides_to_delete = []
for each_slide in slides:
updated_content = None
new_slide_data = list(
filter(lambda x: x.index == each_slide.index, data.slides)
)
if new_slide_data:
updated_content = deep_update(each_slide.content, new_slide_data[0].content)
new_slides.append(
each_slide.get_new_slide(presentation.id, updated_content)
)
slides_to_delete.append(each_slide.id)
await sql_session.execute(
delete(SlideModel).where(SlideModel.id.in_(slides_to_delete))
)
sql_session.add_all(new_slides)
await sql_session.commit()
presentation_and_path = await export_presentation(
presentation.id, presentation.title or str(uuid.uuid4()), data.export_as
)
return PresentationPathAndEditPath(
**presentation_and_path.model_dump(),
edit_path=f"/presentation?id={presentation.id}",
)
@PRESENTATION_ROUTER.post("/derive", response_model=PresentationPathAndEditPath)
async def derive_presentation_from_existing_one(
data: Annotated[EditPresentationRequest, Body()],
_current_user: UserModel = Depends(get_current_user),
sql_session: AsyncSession = Depends(get_async_session),
):
presentation = await sql_session.get(PresentationModel, data.presentation_id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
slides = await sql_session.scalars(
select(SlideModel).where(SlideModel.presentation == data.presentation_id)
)
new_presentation = presentation.get_new_presentation()
new_slides = []
for each_slide in slides:
updated_content = None
new_slide_data = list(
filter(lambda x: x.index == each_slide.index, data.slides)
)
if new_slide_data:
updated_content = deep_update(each_slide.content, new_slide_data[0].content)
new_slides.append(
each_slide.get_new_slide(new_presentation.id, updated_content)
)
sql_session.add(new_presentation)
sql_session.add_all(new_slides)
await sql_session.commit()
presentation_and_path = await export_presentation(
new_presentation.id, new_presentation.title or str(uuid.uuid4()), data.export_as
)
return PresentationPathAndEditPath(
**presentation_and_path.model_dump(),
edit_path=f"/presentation?id={new_presentation.id}",
)