In layouts mode, screenshots were matched by array index (0,1,2...) which broke when PPTX had more slideLayouts than actual slides. Now builds an explicit mapping from slideLayout filename to the first slide that uses it, so each layout gets the correct screenshot. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
738 lines
28 KiB
Python
738 lines
28 KiB
Python
"""Master deck parser: extract layouts from a PPTX master/template file.
|
|
|
|
Pipeline per layout:
|
|
1. Unzip PPTX → extract slide layout XMLs and slide master XML
|
|
2. Convert to PDF via LibreOffice → screenshots per layout
|
|
3. Extract theme colors, fonts from OXML
|
|
4. For each slide layout: generate HTML → generate React code via LLM
|
|
5. Auto-classify layout type via LLM
|
|
6. Persist results to MasterDeckModel
|
|
"""
|
|
import asyncio
|
|
import base64
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import traceback
|
|
import uuid
|
|
import zipfile
|
|
import xml.etree.ElementTree as ET
|
|
from typing import List, Optional
|
|
|
|
from services.database import async_session_maker
|
|
|
|
# Reuse existing extraction utilities
|
|
from api.v1.ppt.endpoints.pptx_slides import (
|
|
_extract_slide_xmls,
|
|
_convert_pptx_to_pdf,
|
|
extract_fonts_from_oxml,
|
|
normalize_font_family_name,
|
|
)
|
|
from api.v1.ppt.endpoints.prompts import (
|
|
GENERATE_HTML_SYSTEM_PROMPT,
|
|
HTML_TO_REACT_SYSTEM_PROMPT,
|
|
)
|
|
from services.documents_loader import DocumentsLoader
|
|
|
|
# OXML namespaces
|
|
NS = {
|
|
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
|
|
"p": "http://schemas.openxmlformats.org/presentationml/2006/main",
|
|
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
|
|
}
|
|
|
|
LAYOUT_TYPE_HINTS = {
|
|
"title": "title_slide",
|
|
"section": "section_header",
|
|
"two content": "two_column",
|
|
"comparison": "comparison",
|
|
"content": "content",
|
|
"blank": "blank",
|
|
"picture": "picture",
|
|
"caption": "caption",
|
|
}
|
|
|
|
|
|
def _build_layout_to_slide_map(pptx_path: str, temp_dir: str) -> dict:
|
|
"""Build mapping from slideLayout filename → first slide index that uses it.
|
|
|
|
Returns e.g. {"slideLayout2.xml": 0, "slideLayout5.xml": 3, ...}
|
|
This lets us pick the right screenshot for each layout in 'layouts' mode.
|
|
"""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
slides_dir = os.path.join(extract_dir, "ppt", "slides")
|
|
if not os.path.exists(slides_dir):
|
|
return {}
|
|
|
|
slide_files = sorted(
|
|
[f for f in os.listdir(slides_dir) if f.startswith("slide") and f.endswith(".xml")],
|
|
key=lambda x: int(x.replace("slide", "").replace(".xml", "")),
|
|
)
|
|
|
|
RELS_NS = {"r": "http://schemas.openxmlformats.org/package/2006/relationships"}
|
|
layout_to_slide: dict = {} # slideLayout filename → first slide index
|
|
|
|
for slide_idx, sf in enumerate(slide_files):
|
|
rels_path = os.path.join(slides_dir, "_rels", sf + ".rels")
|
|
if not os.path.exists(rels_path):
|
|
continue
|
|
try:
|
|
with open(rels_path, "r", encoding="utf-8") as f:
|
|
rels_root = ET.fromstring(f.read())
|
|
# Try with namespace first, then without
|
|
for rel in list(rels_root.findall("r:Relationship", RELS_NS)) + list(rels_root.iter()):
|
|
target = rel.get("Target", "")
|
|
if "slideLayout" in target:
|
|
layout_file = target.split("/")[-1]
|
|
# Only store the FIRST slide that uses this layout
|
|
if layout_file not in layout_to_slide:
|
|
layout_to_slide[layout_file] = slide_idx
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
return layout_to_slide
|
|
|
|
|
|
def _extract_slide_layout_xmls(pptx_path: str, temp_dir: str) -> List[dict]:
|
|
"""Extract slide layout XMLs from ppt/slideLayouts/ and return metadata."""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
layouts_dir = os.path.join(extract_dir, "ppt", "slideLayouts")
|
|
if not os.path.exists(layouts_dir):
|
|
return []
|
|
|
|
layout_files = sorted(
|
|
[f for f in os.listdir(layouts_dir) if f.endswith(".xml")],
|
|
key=lambda x: int("".join(c for c in x if c.isdigit()) or "0"),
|
|
)
|
|
|
|
layouts = []
|
|
for lf in layout_files:
|
|
path = os.path.join(layouts_dir, lf)
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
xml_content = f.read()
|
|
|
|
# Try to extract layout name from OXML
|
|
layout_name = lf.replace(".xml", "")
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
cSld = root.find("p:cSld", NS)
|
|
if cSld is not None and cSld.get("name"):
|
|
layout_name = cSld.get("name")
|
|
except Exception:
|
|
pass
|
|
|
|
layouts.append({
|
|
"filename": lf,
|
|
"layout_name": layout_name,
|
|
"xml_content": xml_content,
|
|
})
|
|
|
|
return layouts
|
|
|
|
|
|
def _extract_slides_with_layout_info(pptx_path: str, temp_dir: str) -> List[dict]:
|
|
"""Extract actual slide XMLs with their associated layout name.
|
|
|
|
Each slide in ppt/slides/ has a .rels file that references which
|
|
slideLayout it uses. This gives us 1:1 mapping between slides and
|
|
screenshots (since screenshots are generated from actual slides).
|
|
"""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
slides_dir = os.path.join(extract_dir, "ppt", "slides")
|
|
if not os.path.exists(slides_dir):
|
|
return []
|
|
|
|
slide_files = sorted(
|
|
[f for f in os.listdir(slides_dir) if f.startswith("slide") and f.endswith(".xml")],
|
|
key=lambda x: int(x.replace("slide", "").replace(".xml", "")),
|
|
)
|
|
|
|
# Pre-load slideLayout names by filename for fast lookup
|
|
layout_names_by_file = {}
|
|
layouts_dir = os.path.join(extract_dir, "ppt", "slideLayouts")
|
|
if os.path.exists(layouts_dir):
|
|
for lf in os.listdir(layouts_dir):
|
|
if not lf.endswith(".xml"):
|
|
continue
|
|
path = os.path.join(layouts_dir, lf)
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
root = ET.fromstring(f.read())
|
|
cSld = root.find("p:cSld", NS)
|
|
if cSld is not None and cSld.get("name"):
|
|
layout_names_by_file[lf] = cSld.get("name")
|
|
else:
|
|
layout_names_by_file[lf] = lf.replace(".xml", "")
|
|
except Exception:
|
|
layout_names_by_file[lf] = lf.replace(".xml", "")
|
|
|
|
RELS_NS = {"r": "http://schemas.openxmlformats.org/package/2006/relationships"}
|
|
|
|
slides = []
|
|
for sf in slide_files:
|
|
slide_path = os.path.join(slides_dir, sf)
|
|
with open(slide_path, "r", encoding="utf-8") as f:
|
|
xml_content = f.read()
|
|
|
|
# Resolve layout name from .rels file
|
|
layout_name = sf.replace(".xml", "")
|
|
rels_path = os.path.join(slides_dir, "_rels", sf + ".rels")
|
|
if os.path.exists(rels_path):
|
|
try:
|
|
with open(rels_path, "r", encoding="utf-8") as f:
|
|
rels_root = ET.fromstring(f.read())
|
|
for rel in rels_root.findall("r:Relationship", RELS_NS):
|
|
# Fallback: try without namespace
|
|
target = rel.get("Target", "")
|
|
rel_type = rel.get("Type", "")
|
|
if "slideLayout" in rel_type or "slideLayout" in target:
|
|
# Target is like "../slideLayouts/slideLayout2.xml"
|
|
layout_file = target.split("/")[-1]
|
|
if layout_file in layout_names_by_file:
|
|
layout_name = layout_names_by_file[layout_file]
|
|
else:
|
|
layout_name = layout_file.replace(".xml", "")
|
|
break
|
|
# If namespace didn't match, try without namespace
|
|
if layout_name == sf.replace(".xml", ""):
|
|
for rel in rels_root.iter():
|
|
target = rel.get("Target", "")
|
|
if "slideLayout" in target:
|
|
layout_file = target.split("/")[-1]
|
|
if layout_file in layout_names_by_file:
|
|
layout_name = layout_names_by_file[layout_file]
|
|
else:
|
|
layout_name = layout_file.replace(".xml", "")
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
slides.append({
|
|
"filename": sf,
|
|
"layout_name": layout_name,
|
|
"xml_content": xml_content,
|
|
})
|
|
|
|
return slides
|
|
|
|
|
|
def _extract_theme_info(pptx_path: str, temp_dir: str) -> dict:
|
|
"""Extract theme colors and font scheme from the PPTX theme XML."""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
theme_dir = os.path.join(extract_dir, "ppt", "theme")
|
|
if not os.path.exists(theme_dir):
|
|
return {"colors": [], "fonts": {}}
|
|
|
|
theme_files = [f for f in os.listdir(theme_dir) if f.endswith(".xml")]
|
|
if not theme_files:
|
|
return {"colors": [], "fonts": {}}
|
|
|
|
theme_path = os.path.join(theme_dir, theme_files[0])
|
|
with open(theme_path, "r", encoding="utf-8") as f:
|
|
theme_xml = f.read()
|
|
|
|
colors = []
|
|
fonts_info = {}
|
|
|
|
try:
|
|
root = ET.fromstring(theme_xml)
|
|
|
|
# Extract color scheme
|
|
clrScheme = root.find(".//a:clrScheme", NS)
|
|
if clrScheme is not None:
|
|
for child in clrScheme:
|
|
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
|
|
# Look for srgbClr or sysClr
|
|
srgb = child.find("a:srgbClr", NS)
|
|
if srgb is not None:
|
|
colors.append({"name": tag, "hex": f"#{srgb.get('val', '')}"})
|
|
else:
|
|
sys_clr = child.find("a:sysClr", NS)
|
|
if sys_clr is not None:
|
|
last_clr = sys_clr.get("lastClr", "")
|
|
colors.append({"name": tag, "hex": f"#{last_clr}"})
|
|
|
|
# Extract font scheme
|
|
majorFont = root.find(".//a:majorFont/a:latin", NS)
|
|
minorFont = root.find(".//a:minorFont/a:latin", NS)
|
|
if majorFont is not None:
|
|
fonts_info["heading"] = majorFont.get("typeface", "")
|
|
if minorFont is not None:
|
|
fonts_info["body"] = minorFont.get("typeface", "")
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return {"colors": colors, "fonts": fonts_info}
|
|
|
|
|
|
def _guess_layout_type(layout_name: str) -> str:
|
|
"""Heuristic layout type guess from layout name."""
|
|
name_lower = layout_name.lower()
|
|
for hint, layout_type in LAYOUT_TYPE_HINTS.items():
|
|
if hint in name_lower:
|
|
return layout_type
|
|
return "custom"
|
|
|
|
|
|
def _detect_llm_provider() -> Optional[dict]:
|
|
"""Detect which LLM provider is available for vision calls."""
|
|
openai_key = os.getenv("OPENAI_API_KEY")
|
|
if openai_key:
|
|
return {"provider": "openai", "api_key": openai_key}
|
|
|
|
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
|
|
if anthropic_key:
|
|
model = os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-5-20250514")
|
|
return {"provider": "anthropic", "api_key": anthropic_key, "model": model}
|
|
|
|
google_key = os.getenv("GOOGLE_API_KEY")
|
|
if google_key:
|
|
return {"provider": "google", "api_key": google_key}
|
|
|
|
return None
|
|
|
|
|
|
async def _llm_generate_html(
|
|
provider: dict, img_b64: str, xml_content: str, fonts: Optional[List[str]]
|
|
) -> str:
|
|
"""Generate HTML from slide screenshot + OXML using the available LLM provider."""
|
|
fonts_text = (
|
|
f"\nFONTS (Normalized root families used in this slide, use where required): {', '.join(fonts)}"
|
|
if fonts else ""
|
|
)
|
|
user_text = f"OXML:\n{xml_content[:4000]}\n{fonts_text}"
|
|
|
|
def _call_openai():
|
|
from openai import OpenAI
|
|
client = OpenAI(api_key=provider["api_key"])
|
|
data_url = f"data:image/png;base64,{img_b64}"
|
|
response = client.responses.create(
|
|
model="gpt-5",
|
|
input=[
|
|
{"role": "system", "content": GENERATE_HTML_SYSTEM_PROMPT},
|
|
{"role": "user", "content": [
|
|
{"type": "input_image", "image_url": data_url},
|
|
{"type": "input_text", "text": user_text},
|
|
]},
|
|
],
|
|
reasoning={"effort": "high"},
|
|
text={"verbosity": "low"},
|
|
)
|
|
return getattr(response, "output_text", "") or getattr(response, "text", "") or ""
|
|
|
|
def _call_anthropic():
|
|
import anthropic
|
|
client = anthropic.Anthropic(api_key=provider["api_key"])
|
|
response = client.messages.create(
|
|
model=provider.get("model", "claude-sonnet-4-5-20250514"),
|
|
max_tokens=8192,
|
|
system=GENERATE_HTML_SYSTEM_PROMPT,
|
|
messages=[{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "image", "source": {
|
|
"type": "base64", "media_type": "image/png", "data": img_b64,
|
|
}},
|
|
{"type": "text", "text": user_text},
|
|
],
|
|
}],
|
|
)
|
|
return response.content[0].text if response.content else ""
|
|
|
|
def _call_google():
|
|
import google.genai as genai
|
|
client = genai.Client(api_key=provider["api_key"])
|
|
response = client.models.generate_content(
|
|
model="gemini-2.0-flash",
|
|
contents=[
|
|
GENERATE_HTML_SYSTEM_PROMPT,
|
|
{"inline_data": {"mime_type": "image/png", "data": img_b64}},
|
|
user_text,
|
|
],
|
|
)
|
|
return response.text or ""
|
|
|
|
if provider["provider"] == "openai":
|
|
return await asyncio.to_thread(_call_openai)
|
|
elif provider["provider"] == "anthropic":
|
|
return await asyncio.to_thread(_call_anthropic)
|
|
elif provider["provider"] == "google":
|
|
return await asyncio.to_thread(_call_google)
|
|
|
|
raise ValueError(f"Unknown provider: {provider['provider']}")
|
|
|
|
|
|
async def _llm_generate_react(
|
|
provider: dict, html_content: str, img_b64: str
|
|
) -> str:
|
|
"""Convert HTML to React TSX component using the available LLM provider."""
|
|
user_text = f"HTML INPUT:\n{html_content}"
|
|
|
|
def _call_openai():
|
|
from openai import OpenAI
|
|
client = OpenAI(api_key=provider["api_key"])
|
|
data_url = f"data:image/png;base64,{img_b64}"
|
|
response = client.responses.create(
|
|
model="gpt-5",
|
|
input=[
|
|
{"role": "system", "content": HTML_TO_REACT_SYSTEM_PROMPT},
|
|
{"role": "user", "content": [
|
|
{"type": "input_image", "image_url": data_url},
|
|
{"type": "input_text", "text": user_text},
|
|
]},
|
|
],
|
|
reasoning={"effort": "minimal"},
|
|
text={"verbosity": "low"},
|
|
)
|
|
return getattr(response, "output_text", "") or getattr(response, "text", "") or ""
|
|
|
|
def _call_anthropic():
|
|
import anthropic
|
|
client = anthropic.Anthropic(api_key=provider["api_key"])
|
|
response = client.messages.create(
|
|
model=provider.get("model", "claude-sonnet-4-5-20250514"),
|
|
max_tokens=8192,
|
|
system=HTML_TO_REACT_SYSTEM_PROMPT,
|
|
messages=[{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "image", "source": {
|
|
"type": "base64", "media_type": "image/png", "data": img_b64,
|
|
}},
|
|
{"type": "text", "text": user_text},
|
|
],
|
|
}],
|
|
)
|
|
return response.content[0].text if response.content else ""
|
|
|
|
def _call_google():
|
|
import google.genai as genai
|
|
client = genai.Client(api_key=provider["api_key"])
|
|
response = client.models.generate_content(
|
|
model="gemini-2.0-flash",
|
|
contents=[
|
|
HTML_TO_REACT_SYSTEM_PROMPT,
|
|
{"inline_data": {"mime_type": "image/png", "data": img_b64}},
|
|
user_text,
|
|
],
|
|
)
|
|
return response.text or ""
|
|
|
|
if provider["provider"] == "openai":
|
|
result = await asyncio.to_thread(_call_openai)
|
|
elif provider["provider"] == "anthropic":
|
|
result = await asyncio.to_thread(_call_anthropic)
|
|
elif provider["provider"] == "google":
|
|
result = await asyncio.to_thread(_call_google)
|
|
else:
|
|
raise ValueError(f"Unknown provider: {provider['provider']}")
|
|
|
|
# Clean up: remove markdown fences and import/export lines
|
|
result = (
|
|
result.replace("```tsx", "").replace("```typescript", "")
|
|
.replace("```javascript", "").replace("```", "")
|
|
)
|
|
filtered_lines = []
|
|
for line in result.split("\n"):
|
|
stripped = line.strip()
|
|
if not (stripped.startswith("import ") or stripped.startswith("export ")):
|
|
filtered_lines.append(line)
|
|
return "\n".join(filtered_lines)
|
|
|
|
|
|
async def parse_master_deck(deck_id: uuid.UUID) -> None:
|
|
"""Parse a master deck PPTX asynchronously. Updates DB on completion/failure."""
|
|
async with async_session_maker() as session:
|
|
from models.sql.master_deck import MasterDeckModel
|
|
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
return
|
|
|
|
deck.parse_status = "processing"
|
|
await session.commit()
|
|
|
|
try:
|
|
result = await _do_parse(deck_id)
|
|
async with async_session_maker() as session:
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
return
|
|
deck.parsed_config = result["parsed_config"]
|
|
deck.layouts = result["layouts"]
|
|
deck.thumbnail_path = result.get("thumbnail_path")
|
|
deck.parse_status = "completed"
|
|
await session.commit()
|
|
|
|
# Bridge: register parsed layouts as a custom template
|
|
await _register_as_template(deck_id, deck.name, result["layouts"], session)
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
async with async_session_maker() as session:
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
return
|
|
deck.parse_status = "failed"
|
|
deck.parsed_config = {"error": str(e)}
|
|
await session.commit()
|
|
|
|
|
|
async def _do_parse(deck_id: uuid.UUID) -> dict:
|
|
"""Core parsing logic. Returns dict with parsed_config, layouts, thumbnail_path."""
|
|
async with async_session_maker() as session:
|
|
from models.sql.master_deck import MasterDeckModel
|
|
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
raise ValueError("Deck not found")
|
|
pptx_path = deck.original_file_path
|
|
client_id = deck.client_id
|
|
parse_mode = getattr(deck, "parse_mode", None) or "layouts"
|
|
|
|
if not os.path.exists(pptx_path):
|
|
raise FileNotFoundError(f"PPTX file not found: {pptx_path}")
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# 1. Extract slide XMLs (actual slides) — always needed for font collection
|
|
slide_xmls = _extract_slide_xmls(pptx_path, temp_dir)
|
|
|
|
# 2. Choose primary source based on parse_mode
|
|
# Build layout→slide map so we can match screenshots to layouts
|
|
layout_to_slide_map = _build_layout_to_slide_map(pptx_path, temp_dir)
|
|
print(f"[MasterDeckParser] Layout→slide mapping: {layout_to_slide_map}")
|
|
|
|
if parse_mode == "layouts":
|
|
# Unique slideLayout XMLs from ppt/slideLayouts/
|
|
primary_metas = _extract_slide_layout_xmls(pptx_path, temp_dir)
|
|
print(f"[MasterDeckParser] Mode=layouts: {len(primary_metas)} slideLayouts")
|
|
else:
|
|
# "slides" mode: actual slides with layout name resolution
|
|
primary_metas = _extract_slides_with_layout_info(pptx_path, temp_dir)
|
|
print(f"[MasterDeckParser] Mode=slides: {len(primary_metas)} actual slides")
|
|
|
|
# Also get layout XMLs for font extraction even in slides mode
|
|
layout_metas_for_fonts = _extract_slide_layout_xmls(pptx_path, temp_dir)
|
|
|
|
# 3. Extract theme info
|
|
theme_info = _extract_theme_info(pptx_path, temp_dir)
|
|
|
|
# 4. Convert to PDF → screenshots (for slides, used as layout previews)
|
|
screenshots = []
|
|
thumbnail_path = None
|
|
try:
|
|
pdf_path = await _convert_pptx_to_pdf(pptx_path, temp_dir)
|
|
screenshot_paths = await DocumentsLoader.get_page_images_from_pdf_async(
|
|
pdf_path, temp_dir
|
|
)
|
|
# Copy screenshots to permanent location
|
|
app_data = os.environ.get("APP_DATA_DIRECTORY", os.path.join(os.path.dirname(__file__), "..", "data"))
|
|
deck_dir = os.path.join(
|
|
app_data, "clients",
|
|
str(client_id), "master_decks", str(deck_id), "screenshots"
|
|
)
|
|
os.makedirs(deck_dir, exist_ok=True)
|
|
|
|
for i, sp in enumerate(screenshot_paths):
|
|
if os.path.exists(sp) and os.path.getsize(sp) > 0:
|
|
dest = os.path.join(deck_dir, f"slide_{i + 1}.png")
|
|
shutil.copy2(sp, dest)
|
|
screenshots.append(dest)
|
|
if i == 0:
|
|
thumbnail_path = dest
|
|
|
|
except Exception as e:
|
|
print(f"Screenshot generation failed (non-fatal): {e}")
|
|
|
|
# 5. Collect all fonts used
|
|
all_fonts = set()
|
|
for lm in layout_metas_for_fonts:
|
|
raw = extract_fonts_from_oxml(lm["xml_content"])
|
|
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
|
|
for sx in slide_xmls:
|
|
raw = extract_fonts_from_oxml(sx)
|
|
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
|
|
|
|
# 6. Process each item through LLM pipeline
|
|
llm_provider = _detect_llm_provider()
|
|
layouts_result = []
|
|
|
|
# Build per-layout screenshot mapping
|
|
# In "slides" mode: screenshots[idx] maps directly (1:1)
|
|
# In "layouts" mode: use layout_to_slide_map to find the right screenshot
|
|
layout_screenshot_map: dict = {} # layout index → screenshot path
|
|
if parse_mode == "layouts":
|
|
for idx, lm in enumerate(primary_metas):
|
|
layout_filename = lm.get("filename", "")
|
|
slide_idx = layout_to_slide_map.get(layout_filename)
|
|
if slide_idx is not None and slide_idx < len(screenshots):
|
|
layout_screenshot_map[idx] = screenshots[slide_idx]
|
|
else:
|
|
for idx in range(min(len(primary_metas), len(screenshots))):
|
|
layout_screenshot_map[idx] = screenshots[idx]
|
|
|
|
llm_count = len(layout_screenshot_map)
|
|
print(f"[MasterDeckParser] LLM provider: {llm_provider['provider'] if llm_provider else 'NONE'}")
|
|
print(f"[MasterDeckParser] Processing {len(primary_metas)} items, {llm_count} with screenshots for LLM")
|
|
|
|
# Optional: LayoutParser region detection for better classification
|
|
from services.layout_analysis_service import (
|
|
analyze_slide_layout,
|
|
classify_layout_from_regions,
|
|
regions_to_description,
|
|
)
|
|
|
|
for idx, lm in enumerate(primary_metas):
|
|
screenshot_path = layout_screenshot_map.get(idx)
|
|
|
|
# Try LayoutParser classification if a screenshot is available
|
|
lp_layout_type = None
|
|
lp_region_desc = ""
|
|
if screenshot_path and os.path.exists(screenshot_path):
|
|
try:
|
|
regions = await asyncio.to_thread(analyze_slide_layout, screenshot_path)
|
|
if regions:
|
|
lp_layout_type = classify_layout_from_regions(regions)
|
|
lp_region_desc = regions_to_description(regions)
|
|
except Exception as lp_err:
|
|
print(f"[MasterDeckParser] LayoutParser skipped for {idx}: {lp_err}")
|
|
|
|
layout_entry = {
|
|
"index": idx,
|
|
"layout_name": lm["layout_name"],
|
|
"layout_type": lp_layout_type or _guess_layout_type(lm["layout_name"]),
|
|
"xml_snippet": lm["xml_content"][:2000],
|
|
"fonts": list(
|
|
{normalize_font_family_name(f) for f in extract_fonts_from_oxml(lm["xml_content"]) if f}
|
|
),
|
|
"html": None,
|
|
"react_code": None,
|
|
"screenshot_path": screenshot_path,
|
|
}
|
|
|
|
# Run LLM pipeline if provider available and we have a screenshot
|
|
if llm_provider and screenshot_path and os.path.exists(screenshot_path):
|
|
try:
|
|
print(f"[MasterDeckParser] Layout {idx + 1}/{llm_count}: {lm['layout_name']} — generating HTML...")
|
|
with open(screenshot_path, "rb") as img_f:
|
|
img_b64 = base64.b64encode(img_f.read()).decode("utf-8")
|
|
|
|
# Include LayoutParser region info in LLM context
|
|
xml_context = lm["xml_content"]
|
|
if lp_region_desc:
|
|
xml_context = f"{lp_region_desc}\n\n---\n\n{xml_context}"
|
|
|
|
html = await _llm_generate_html(
|
|
llm_provider, img_b64, xml_context,
|
|
layout_entry["fonts"] or None,
|
|
)
|
|
html = html.replace("```html", "").replace("```", "")
|
|
layout_entry["html"] = html
|
|
|
|
print(f"[MasterDeckParser] Layout {idx + 1}/{llm_count}: {lm['layout_name']} — generating React...")
|
|
react_code = await _llm_generate_react(
|
|
llm_provider, html, img_b64,
|
|
)
|
|
react_code = react_code.replace("```tsx", "").replace("```", "")
|
|
layout_entry["react_code"] = react_code
|
|
print(f"[MasterDeckParser] Layout {idx + 1}/{llm_count}: {lm['layout_name']} — done ({len(react_code)} chars)")
|
|
|
|
except Exception as e:
|
|
print(f"[MasterDeckParser] LLM FAILED for layout {idx} ({lm['layout_name']}): {e}")
|
|
traceback.print_exc()
|
|
layout_entry["html"] = None
|
|
layout_entry["react_code"] = None
|
|
|
|
layouts_result.append(layout_entry)
|
|
|
|
parsed_config = {
|
|
"theme": theme_info,
|
|
"total_slides": len(slide_xmls),
|
|
"total_layouts": len(layout_metas_for_fonts),
|
|
"parse_mode": parse_mode,
|
|
"fonts": sorted(all_fonts),
|
|
}
|
|
|
|
return {
|
|
"parsed_config": parsed_config,
|
|
"layouts": layouts_result,
|
|
"thumbnail_path": thumbnail_path,
|
|
}
|
|
|
|
|
|
async def _register_as_template(
|
|
deck_id: uuid.UUID,
|
|
deck_name: str,
|
|
layouts: list,
|
|
session,
|
|
) -> None:
|
|
"""Bridge master deck layouts into the custom template system.
|
|
|
|
Creates a TemplateModel and PresentationLayoutCodeModel records
|
|
so the parsed layouts appear in the template picker during generation.
|
|
"""
|
|
from models.sql.template import TemplateModel
|
|
from models.sql.presentation_layout_code import PresentationLayoutCodeModel
|
|
from sqlalchemy import select, delete
|
|
|
|
try:
|
|
# Upsert TemplateModel — use deck_id as template id
|
|
existing = await session.get(TemplateModel, deck_id)
|
|
if existing:
|
|
existing.name = deck_name or "Custom Template"
|
|
else:
|
|
template = TemplateModel(
|
|
id=deck_id,
|
|
name=deck_name or "Custom Template",
|
|
description=f"Auto-generated from master deck: {deck_name}",
|
|
)
|
|
session.add(template)
|
|
|
|
# Remove old layout codes for this deck (reparse case)
|
|
await session.execute(
|
|
delete(PresentationLayoutCodeModel).where(
|
|
PresentationLayoutCodeModel.presentation == deck_id
|
|
)
|
|
)
|
|
|
|
# Create PresentationLayoutCodeModel for each layout with react_code
|
|
for idx, layout in enumerate(layouts):
|
|
react_code = layout.get("react_code")
|
|
if not react_code:
|
|
continue
|
|
|
|
layout_code = PresentationLayoutCodeModel(
|
|
presentation=deck_id,
|
|
layout_id=f"layout-{idx}",
|
|
layout_name=layout.get("layout_name", f"Layout {idx + 1}"),
|
|
layout_code=react_code,
|
|
fonts=layout.get("fonts"),
|
|
)
|
|
session.add(layout_code)
|
|
|
|
await session.commit()
|
|
print(f"Registered master deck {deck_id} as custom template with "
|
|
f"{sum(1 for l in layouts if l.get('react_code'))} layouts")
|
|
|
|
except Exception as e:
|
|
print(f"Failed to register master deck as template: {e}")
|
|
# Don't fail the entire parse — template registration is non-critical
|
|
await session.rollback()
|