ppt-tool/backend/services/master_deck_parser_service.py
Vadym Samoilenko d3d1667a79 Phase 2: Admin panel, analytics, storage, template pipeline, multi-provider LLM
- Fix admin sidebar: remove duplicate Teams, add Storage nav item
- Analytics: client-scoped queries, super_admin sees all (including NULL client_id)
- Storage management: list/download/delete presentations with file metadata
- Settings page with brand config router
- AI usage tracking: new AIUsageModel, ai_usage_service, analytics endpoint
- Master deck → template bridge: _register_as_template creates TemplateModel
  + PresentationLayoutCodeModel so parsed layouts appear in template picker
- Multi-provider LLM vision in parser: Anthropic/Google/OpenAI with asyncio.to_thread
- Fix PPTX upload 400: accept by .pptx extension (browser sends octet-stream)
- Fix reparse FK violation: presentation_id=None for parse_master_deck jobs
- Worker job_timeout increased to 1800s for LLM-heavy master deck parsing
- PYTHONUNBUFFERED=1 in docker-compose worker for real-time log output
- Auth: clientId in /me response, dev-login cookie improvements
- Frontend: auth slice clientId, master-deck thumbnails, storage page

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 23:39:34 +00:00

546 lines
20 KiB
Python

"""Master deck parser: extract layouts from a PPTX master/template file.
Pipeline per layout:
1. Unzip PPTX → extract slide layout XMLs and slide master XML
2. Convert to PDF via LibreOffice → screenshots per layout
3. Extract theme colors, fonts from OXML
4. For each slide layout: generate HTML → generate React code via LLM
5. Auto-classify layout type via LLM
6. Persist results to MasterDeckModel
"""
import asyncio
import base64
import os
import shutil
import tempfile
import traceback
import uuid
import zipfile
import xml.etree.ElementTree as ET
from typing import List, Optional
from services.database import async_session_maker
# Reuse existing extraction utilities
from api.v1.ppt.endpoints.pptx_slides import (
_extract_slide_xmls,
_convert_pptx_to_pdf,
extract_fonts_from_oxml,
normalize_font_family_name,
)
from api.v1.ppt.endpoints.prompts import (
GENERATE_HTML_SYSTEM_PROMPT,
HTML_TO_REACT_SYSTEM_PROMPT,
)
from services.documents_loader import DocumentsLoader
# OXML namespaces
NS = {
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
"p": "http://schemas.openxmlformats.org/presentationml/2006/main",
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
}
LAYOUT_TYPE_HINTS = {
"title": "title_slide",
"section": "section_header",
"two content": "two_column",
"comparison": "comparison",
"content": "content",
"blank": "blank",
"picture": "picture",
"caption": "caption",
}
def _extract_slide_layout_xmls(pptx_path: str, temp_dir: str) -> List[dict]:
"""Extract slide layout XMLs from ppt/slideLayouts/ and return metadata."""
extract_dir = os.path.join(temp_dir, "pptx_extract")
if not os.path.exists(extract_dir):
with zipfile.ZipFile(pptx_path, "r") as zf:
zf.extractall(extract_dir)
layouts_dir = os.path.join(extract_dir, "ppt", "slideLayouts")
if not os.path.exists(layouts_dir):
return []
layout_files = sorted(
[f for f in os.listdir(layouts_dir) if f.endswith(".xml")],
key=lambda x: int("".join(c for c in x if c.isdigit()) or "0"),
)
layouts = []
for lf in layout_files:
path = os.path.join(layouts_dir, lf)
with open(path, "r", encoding="utf-8") as f:
xml_content = f.read()
# Try to extract layout name from OXML
layout_name = lf.replace(".xml", "")
try:
root = ET.fromstring(xml_content)
cSld = root.find("p:cSld", NS)
if cSld is not None and cSld.get("name"):
layout_name = cSld.get("name")
except Exception:
pass
layouts.append({
"filename": lf,
"layout_name": layout_name,
"xml_content": xml_content,
})
return layouts
def _extract_theme_info(pptx_path: str, temp_dir: str) -> dict:
"""Extract theme colors and font scheme from the PPTX theme XML."""
extract_dir = os.path.join(temp_dir, "pptx_extract")
if not os.path.exists(extract_dir):
with zipfile.ZipFile(pptx_path, "r") as zf:
zf.extractall(extract_dir)
theme_dir = os.path.join(extract_dir, "ppt", "theme")
if not os.path.exists(theme_dir):
return {"colors": [], "fonts": {}}
theme_files = [f for f in os.listdir(theme_dir) if f.endswith(".xml")]
if not theme_files:
return {"colors": [], "fonts": {}}
theme_path = os.path.join(theme_dir, theme_files[0])
with open(theme_path, "r", encoding="utf-8") as f:
theme_xml = f.read()
colors = []
fonts_info = {}
try:
root = ET.fromstring(theme_xml)
# Extract color scheme
clrScheme = root.find(".//a:clrScheme", NS)
if clrScheme is not None:
for child in clrScheme:
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
# Look for srgbClr or sysClr
srgb = child.find("a:srgbClr", NS)
if srgb is not None:
colors.append({"name": tag, "hex": f"#{srgb.get('val', '')}"})
else:
sys_clr = child.find("a:sysClr", NS)
if sys_clr is not None:
last_clr = sys_clr.get("lastClr", "")
colors.append({"name": tag, "hex": f"#{last_clr}"})
# Extract font scheme
majorFont = root.find(".//a:majorFont/a:latin", NS)
minorFont = root.find(".//a:minorFont/a:latin", NS)
if majorFont is not None:
fonts_info["heading"] = majorFont.get("typeface", "")
if minorFont is not None:
fonts_info["body"] = minorFont.get("typeface", "")
except Exception:
pass
return {"colors": colors, "fonts": fonts_info}
def _guess_layout_type(layout_name: str) -> str:
"""Heuristic layout type guess from layout name."""
name_lower = layout_name.lower()
for hint, layout_type in LAYOUT_TYPE_HINTS.items():
if hint in name_lower:
return layout_type
return "custom"
def _detect_llm_provider() -> Optional[dict]:
"""Detect which LLM provider is available for vision calls."""
openai_key = os.getenv("OPENAI_API_KEY")
if openai_key:
return {"provider": "openai", "api_key": openai_key}
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
if anthropic_key:
model = os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-5-20250514")
return {"provider": "anthropic", "api_key": anthropic_key, "model": model}
google_key = os.getenv("GOOGLE_API_KEY")
if google_key:
return {"provider": "google", "api_key": google_key}
return None
async def _llm_generate_html(
provider: dict, img_b64: str, xml_content: str, fonts: Optional[List[str]]
) -> str:
"""Generate HTML from slide screenshot + OXML using the available LLM provider."""
fonts_text = (
f"\nFONTS (Normalized root families used in this slide, use where required): {', '.join(fonts)}"
if fonts else ""
)
user_text = f"OXML:\n{xml_content[:4000]}\n{fonts_text}"
def _call_openai():
from openai import OpenAI
client = OpenAI(api_key=provider["api_key"])
data_url = f"data:image/png;base64,{img_b64}"
response = client.responses.create(
model="gpt-5",
input=[
{"role": "system", "content": GENERATE_HTML_SYSTEM_PROMPT},
{"role": "user", "content": [
{"type": "input_image", "image_url": data_url},
{"type": "input_text", "text": user_text},
]},
],
reasoning={"effort": "high"},
text={"verbosity": "low"},
)
return getattr(response, "output_text", "") or getattr(response, "text", "") or ""
def _call_anthropic():
import anthropic
client = anthropic.Anthropic(api_key=provider["api_key"])
response = client.messages.create(
model=provider.get("model", "claude-sonnet-4-5-20250514"),
max_tokens=8192,
system=GENERATE_HTML_SYSTEM_PROMPT,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {
"type": "base64", "media_type": "image/png", "data": img_b64,
}},
{"type": "text", "text": user_text},
],
}],
)
return response.content[0].text if response.content else ""
def _call_google():
import google.genai as genai
client = genai.Client(api_key=provider["api_key"])
response = client.models.generate_content(
model="gemini-2.0-flash",
contents=[
GENERATE_HTML_SYSTEM_PROMPT,
{"inline_data": {"mime_type": "image/png", "data": img_b64}},
user_text,
],
)
return response.text or ""
if provider["provider"] == "openai":
return await asyncio.to_thread(_call_openai)
elif provider["provider"] == "anthropic":
return await asyncio.to_thread(_call_anthropic)
elif provider["provider"] == "google":
return await asyncio.to_thread(_call_google)
raise ValueError(f"Unknown provider: {provider['provider']}")
async def _llm_generate_react(
provider: dict, html_content: str, img_b64: str
) -> str:
"""Convert HTML to React TSX component using the available LLM provider."""
user_text = f"HTML INPUT:\n{html_content}"
def _call_openai():
from openai import OpenAI
client = OpenAI(api_key=provider["api_key"])
data_url = f"data:image/png;base64,{img_b64}"
response = client.responses.create(
model="gpt-5",
input=[
{"role": "system", "content": HTML_TO_REACT_SYSTEM_PROMPT},
{"role": "user", "content": [
{"type": "input_image", "image_url": data_url},
{"type": "input_text", "text": user_text},
]},
],
reasoning={"effort": "minimal"},
text={"verbosity": "low"},
)
return getattr(response, "output_text", "") or getattr(response, "text", "") or ""
def _call_anthropic():
import anthropic
client = anthropic.Anthropic(api_key=provider["api_key"])
response = client.messages.create(
model=provider.get("model", "claude-sonnet-4-5-20250514"),
max_tokens=8192,
system=HTML_TO_REACT_SYSTEM_PROMPT,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {
"type": "base64", "media_type": "image/png", "data": img_b64,
}},
{"type": "text", "text": user_text},
],
}],
)
return response.content[0].text if response.content else ""
def _call_google():
import google.genai as genai
client = genai.Client(api_key=provider["api_key"])
response = client.models.generate_content(
model="gemini-2.0-flash",
contents=[
HTML_TO_REACT_SYSTEM_PROMPT,
{"inline_data": {"mime_type": "image/png", "data": img_b64}},
user_text,
],
)
return response.text or ""
if provider["provider"] == "openai":
result = await asyncio.to_thread(_call_openai)
elif provider["provider"] == "anthropic":
result = await asyncio.to_thread(_call_anthropic)
elif provider["provider"] == "google":
result = await asyncio.to_thread(_call_google)
else:
raise ValueError(f"Unknown provider: {provider['provider']}")
# Clean up: remove markdown fences and import/export lines
result = (
result.replace("```tsx", "").replace("```typescript", "")
.replace("```javascript", "").replace("```", "")
)
filtered_lines = []
for line in result.split("\n"):
stripped = line.strip()
if not (stripped.startswith("import ") or stripped.startswith("export ")):
filtered_lines.append(line)
return "\n".join(filtered_lines)
async def parse_master_deck(deck_id: uuid.UUID) -> None:
"""Parse a master deck PPTX asynchronously. Updates DB on completion/failure."""
async with async_session_maker() as session:
from models.sql.master_deck import MasterDeckModel
deck = await session.get(MasterDeckModel, deck_id)
if not deck:
return
deck.parse_status = "processing"
await session.commit()
try:
result = await _do_parse(deck_id)
async with async_session_maker() as session:
deck = await session.get(MasterDeckModel, deck_id)
if not deck:
return
deck.parsed_config = result["parsed_config"]
deck.layouts = result["layouts"]
deck.thumbnail_path = result.get("thumbnail_path")
deck.parse_status = "completed"
await session.commit()
# Bridge: register parsed layouts as a custom template
await _register_as_template(deck_id, deck.name, result["layouts"], session)
except Exception as e:
traceback.print_exc()
async with async_session_maker() as session:
deck = await session.get(MasterDeckModel, deck_id)
if not deck:
return
deck.parse_status = "failed"
deck.parsed_config = {"error": str(e)}
await session.commit()
async def _do_parse(deck_id: uuid.UUID) -> dict:
"""Core parsing logic. Returns dict with parsed_config, layouts, thumbnail_path."""
async with async_session_maker() as session:
from models.sql.master_deck import MasterDeckModel
deck = await session.get(MasterDeckModel, deck_id)
if not deck:
raise ValueError("Deck not found")
pptx_path = deck.original_file_path
client_id = deck.client_id
if not os.path.exists(pptx_path):
raise FileNotFoundError(f"PPTX file not found: {pptx_path}")
with tempfile.TemporaryDirectory() as temp_dir:
# 1. Extract slide XMLs (actual slides, not layouts) for screenshots
slide_xmls = _extract_slide_xmls(pptx_path, temp_dir)
# 2. Extract slide layout XMLs from ppt/slideLayouts/
layout_metas = _extract_slide_layout_xmls(pptx_path, temp_dir)
# 3. Extract theme info
theme_info = _extract_theme_info(pptx_path, temp_dir)
# 4. Convert to PDF → screenshots (for slides, used as layout previews)
screenshots = []
thumbnail_path = None
try:
pdf_path = await _convert_pptx_to_pdf(pptx_path, temp_dir)
screenshot_paths = await DocumentsLoader.get_page_images_from_pdf_async(
pdf_path, temp_dir
)
# Copy screenshots to permanent location
app_data = os.environ.get("APP_DATA_DIRECTORY", os.path.join(os.path.dirname(__file__), "..", "data"))
deck_dir = os.path.join(
app_data, "clients",
str(client_id), "master_decks", str(deck_id), "screenshots"
)
os.makedirs(deck_dir, exist_ok=True)
for i, sp in enumerate(screenshot_paths):
if os.path.exists(sp) and os.path.getsize(sp) > 0:
dest = os.path.join(deck_dir, f"slide_{i + 1}.png")
shutil.copy2(sp, dest)
screenshots.append(dest)
if i == 0:
thumbnail_path = dest
except Exception as e:
print(f"Screenshot generation failed (non-fatal): {e}")
# 5. Collect all fonts used
all_fonts = set()
for lm in layout_metas:
raw = extract_fonts_from_oxml(lm["xml_content"])
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
for sx in slide_xmls:
raw = extract_fonts_from_oxml(sx)
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
# 6. Process each slide layout through LLM pipeline
llm_provider = _detect_llm_provider()
layouts_result = []
llm_layouts = min(len(layout_metas), len(screenshots))
print(f"[MasterDeckParser] LLM provider: {llm_provider['provider'] if llm_provider else 'NONE'}")
print(f"[MasterDeckParser] Processing {len(layout_metas)} layouts, {llm_layouts} with screenshots for LLM")
for idx, lm in enumerate(layout_metas):
layout_entry = {
"index": idx,
"layout_name": lm["layout_name"],
"layout_type": _guess_layout_type(lm["layout_name"]),
"xml_snippet": lm["xml_content"][:2000],
"fonts": list(
{normalize_font_family_name(f) for f in extract_fonts_from_oxml(lm["xml_content"]) if f}
),
"html": None,
"react_code": None,
"screenshot_path": screenshots[idx] if idx < len(screenshots) else None,
}
# Run LLM pipeline if provider available and we have a screenshot
if llm_provider and idx < len(screenshots) and os.path.exists(screenshots[idx]):
try:
print(f"[MasterDeckParser] Layout {idx + 1}/{llm_layouts}: {lm['layout_name']} — generating HTML...")
with open(screenshots[idx], "rb") as img_f:
img_b64 = base64.b64encode(img_f.read()).decode("utf-8")
html = await _llm_generate_html(
llm_provider, img_b64, lm["xml_content"],
layout_entry["fonts"] or None,
)
html = html.replace("```html", "").replace("```", "")
layout_entry["html"] = html
print(f"[MasterDeckParser] Layout {idx + 1}/{llm_layouts}: {lm['layout_name']} — generating React...")
react_code = await _llm_generate_react(
llm_provider, html, img_b64,
)
react_code = react_code.replace("```tsx", "").replace("```", "")
layout_entry["react_code"] = react_code
print(f"[MasterDeckParser] Layout {idx + 1}/{llm_layouts}: {lm['layout_name']} — done ({len(react_code)} chars)")
except Exception as e:
print(f"[MasterDeckParser] LLM FAILED for layout {idx} ({lm['layout_name']}): {e}")
traceback.print_exc()
layout_entry["html"] = None
layout_entry["react_code"] = None
layouts_result.append(layout_entry)
parsed_config = {
"theme": theme_info,
"total_slides": len(slide_xmls),
"total_layouts": len(layout_metas),
"fonts": sorted(all_fonts),
}
return {
"parsed_config": parsed_config,
"layouts": layouts_result,
"thumbnail_path": thumbnail_path,
}
async def _register_as_template(
deck_id: uuid.UUID,
deck_name: str,
layouts: list,
session,
) -> None:
"""Bridge master deck layouts into the custom template system.
Creates a TemplateModel and PresentationLayoutCodeModel records
so the parsed layouts appear in the template picker during generation.
"""
from models.sql.template import TemplateModel
from models.sql.presentation_layout_code import PresentationLayoutCodeModel
from sqlalchemy import select, delete
try:
# Upsert TemplateModel — use deck_id as template id
existing = await session.get(TemplateModel, deck_id)
if existing:
existing.name = deck_name or "Custom Template"
else:
template = TemplateModel(
id=deck_id,
name=deck_name or "Custom Template",
description=f"Auto-generated from master deck: {deck_name}",
)
session.add(template)
# Remove old layout codes for this deck (reparse case)
await session.execute(
delete(PresentationLayoutCodeModel).where(
PresentationLayoutCodeModel.presentation == deck_id
)
)
# Create PresentationLayoutCodeModel for each layout with react_code
for idx, layout in enumerate(layouts):
react_code = layout.get("react_code")
if not react_code:
continue
layout_code = PresentationLayoutCodeModel(
presentation=deck_id,
layout_id=f"layout-{idx}",
layout_name=layout.get("layout_name", f"Layout {idx + 1}"),
layout_code=react_code,
fonts=layout.get("fonts"),
)
session.add(layout_code)
await session.commit()
print(f"Registered master deck {deck_id} as custom template with "
f"{sum(1 for l in layouts if l.get('react_code'))} layouts")
except Exception as e:
print(f"Failed to register master deck as template: {e}")
# Don't fail the entire parse — template registration is non-critical
await session.rollback()