Replaces TSX/Babel compilation pipeline with a JSON element model:
- New _do_parse_v2(): 1 LLM call/layout (vs 2) classifies OXML geometry
elements into placeholder types → JSON stored in layout_code
- SlideRenderer.tsx: renders JSON element model as %-positioned divs,
no Babel compilation or runtime errors
- parseLayoutSchema.ts: isJsonLayoutCode() / parseLayoutSchema() /
mergeElementsWithContent() — full JSON schema parsing layer
- useCustomTemplates.ts: transparent dual-format support (JSON + TSX)
via parsedLayoutToCompiled() adapter
Template management improvements:
- PresentationLayoutCodeModel: +is_enabled (bool) +thumbnail_path (str)
- Migration 005: adds both columns to presentation_layout_codes
- DELETE /master-decks/{id}: hard delete (files + TemplateModel +
PresentationLayoutCodeModel rows + MasterDeckModel)
- PATCH /template-management/layouts/{db_id}/toggle-enabled: new endpoint
- LayoutData response: +db_id, +is_enabled, +thumbnail_path
- _register_as_template(): stores thumbnail_path + is_enabled per layout
Admin UI:
- /admin/templates/ — list all custom templates with delete
- /admin/templates/[id]/ — layout grid with screenshots + enable/disable
- AdminSidebar: Templates nav item
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
996 lines
39 KiB
Python
996 lines
39 KiB
Python
"""Master deck parser: extract layouts from a PPTX master/template file.
|
|
|
|
Pipeline per layout:
|
|
1. Unzip PPTX → extract slide layout XMLs and slide master XML
|
|
2. Convert to PDF via LibreOffice → screenshots per layout
|
|
3. Extract theme colors, fonts from OXML
|
|
4. For each slide layout: generate HTML → generate React code via LLM
|
|
5. Auto-classify layout type via LLM
|
|
6. Persist results to MasterDeckModel
|
|
"""
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import traceback
|
|
import uuid
|
|
import zipfile
|
|
import xml.etree.ElementTree as ET
|
|
from typing import List, Optional
|
|
|
|
from services.database import async_session_maker
|
|
|
|
# Reuse existing extraction utilities
|
|
from api.v1.ppt.endpoints.pptx_slides import (
|
|
_extract_slide_xmls,
|
|
_convert_pptx_to_pdf,
|
|
extract_fonts_from_oxml,
|
|
normalize_font_family_name,
|
|
)
|
|
from utils.oxml_geometry import extract_geometry_from_oxml, format_geometry_for_llm
|
|
from api.v1.ppt.endpoints.prompts import (
|
|
GENERATE_HTML_SYSTEM_PROMPT,
|
|
HTML_TO_REACT_SYSTEM_PROMPT,
|
|
)
|
|
from services.documents_loader import DocumentsLoader
|
|
from services.llm_service import UnifiedLLMService, LLMProvider
|
|
|
|
# Use fast lite model for parsing (gemini-2.5-flash-lite)
|
|
def _get_parsing_provider():
|
|
"""Get provider config for master deck parsing with fast lite model."""
|
|
google_key = os.getenv("GOOGLE_API_KEY")
|
|
if not google_key:
|
|
raise ValueError("GOOGLE_API_KEY required for parsing")
|
|
|
|
# Use gemini-2.5-flash-lite: extremely fast, good enough for layout parsing
|
|
parsing_model = os.getenv("PARSING_MODEL", "gemini-2.5-flash-lite")
|
|
return {
|
|
"provider": LLMProvider.GOOGLE,
|
|
"api_key": google_key,
|
|
"model": parsing_model
|
|
}
|
|
|
|
# OXML namespaces
|
|
NS = {
|
|
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
|
|
"p": "http://schemas.openxmlformats.org/presentationml/2006/main",
|
|
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
|
|
}
|
|
|
|
LAYOUT_TYPE_HINTS = {
|
|
"title": "title_slide",
|
|
"section": "section_header",
|
|
"two content": "two_column",
|
|
"comparison": "comparison",
|
|
"content": "content",
|
|
"blank": "blank",
|
|
"picture": "picture",
|
|
"caption": "caption",
|
|
}
|
|
|
|
GEOMETRY_TO_ELEMENTS_PROMPT = """You are analyzing a PowerPoint slide layout to classify its elements.
|
|
|
|
You will receive:
|
|
1. A screenshot of the slide
|
|
2. A JSON array of elements with their pixel positions (x, y, width, height), assuming a 1280x720 slide
|
|
|
|
Your task: Assign a placeholder type to each element.
|
|
|
|
Available placeholder types:
|
|
- "title": Main slide title (large, prominent text)
|
|
- "subtitle": Secondary title or subtitle text
|
|
- "body": Main content (bullet points, paragraphs)
|
|
- "image": Image/picture placeholder
|
|
- "chart": Chart or graph area
|
|
- "shape": Decorative shape or background element (use for purely visual elements)
|
|
- "logo": Company logo area
|
|
- "footer": Footer text
|
|
- "date": Date/time placeholder
|
|
|
|
Also determine:
|
|
- "layoutName": A descriptive name (e.g., "Title Slide", "Two Column Content", "Section Header")
|
|
- "background": The predominant CSS hex background color (e.g., "#1a1a2e" or "#ffffff")
|
|
|
|
Respond with ONLY valid JSON, no markdown fences:
|
|
{
|
|
"layoutName": "Title Slide",
|
|
"background": "#1a1a2e",
|
|
"elements": [
|
|
{"id": "elem-0", "placeholder": "title", "defaultContent": "Slide Title"},
|
|
{"id": "elem-1", "placeholder": "body", "defaultContent": "Content here"}
|
|
]
|
|
}
|
|
|
|
RULES:
|
|
- Element IDs must match input order exactly (elem-0, elem-1, ...)
|
|
- Every input element must have a corresponding output element
|
|
- Use "shape" for decorative elements when purpose is unclear
|
|
- "defaultContent" should be a realistic sample (e.g., "Your Title Here" for title)
|
|
"""
|
|
|
|
|
|
def _build_layout_to_slide_map(pptx_path: str, temp_dir: str) -> dict:
|
|
"""Build mapping from slideLayout filename → first slide index that uses it.
|
|
|
|
Returns e.g. {"slideLayout2.xml": 0, "slideLayout5.xml": 3, ...}
|
|
This lets us pick the right screenshot for each layout in 'layouts' mode.
|
|
"""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
slides_dir = os.path.join(extract_dir, "ppt", "slides")
|
|
if not os.path.exists(slides_dir):
|
|
return {}
|
|
|
|
slide_files = sorted(
|
|
[f for f in os.listdir(slides_dir) if f.startswith("slide") and f.endswith(".xml")],
|
|
key=lambda x: int(x.replace("slide", "").replace(".xml", "")),
|
|
)
|
|
|
|
RELS_NS = {"r": "http://schemas.openxmlformats.org/package/2006/relationships"}
|
|
layout_to_slide: dict = {} # slideLayout filename → first slide index
|
|
|
|
for slide_idx, sf in enumerate(slide_files):
|
|
rels_path = os.path.join(slides_dir, "_rels", sf + ".rels")
|
|
if not os.path.exists(rels_path):
|
|
continue
|
|
try:
|
|
with open(rels_path, "r", encoding="utf-8") as f:
|
|
rels_root = ET.fromstring(f.read())
|
|
# Try with namespace first, then without
|
|
for rel in list(rels_root.findall("r:Relationship", RELS_NS)) + list(rels_root.iter()):
|
|
target = rel.get("Target", "")
|
|
if "slideLayout" in target:
|
|
layout_file = target.split("/")[-1]
|
|
# Only store the FIRST slide that uses this layout
|
|
if layout_file not in layout_to_slide:
|
|
layout_to_slide[layout_file] = slide_idx
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
return layout_to_slide
|
|
|
|
|
|
def _extract_slide_layout_xmls(pptx_path: str, temp_dir: str) -> List[dict]:
|
|
"""Extract slide layout XMLs from ppt/slideLayouts/ and return metadata."""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
layouts_dir = os.path.join(extract_dir, "ppt", "slideLayouts")
|
|
if not os.path.exists(layouts_dir):
|
|
return []
|
|
|
|
layout_files = sorted(
|
|
[f for f in os.listdir(layouts_dir) if f.endswith(".xml")],
|
|
key=lambda x: int("".join(c for c in x if c.isdigit()) or "0"),
|
|
)
|
|
|
|
layouts = []
|
|
for lf in layout_files:
|
|
path = os.path.join(layouts_dir, lf)
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
xml_content = f.read()
|
|
|
|
# Try to extract layout name from OXML
|
|
layout_name = lf.replace(".xml", "")
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
cSld = root.find("p:cSld", NS)
|
|
if cSld is not None and cSld.get("name"):
|
|
layout_name = cSld.get("name")
|
|
except Exception:
|
|
pass
|
|
|
|
layouts.append({
|
|
"filename": lf,
|
|
"layout_name": layout_name,
|
|
"xml_content": xml_content,
|
|
})
|
|
|
|
return layouts
|
|
|
|
|
|
def _extract_slides_with_layout_info(pptx_path: str, temp_dir: str) -> List[dict]:
|
|
"""Extract actual slide XMLs with their associated layout name.
|
|
|
|
Each slide in ppt/slides/ has a .rels file that references which
|
|
slideLayout it uses. This gives us 1:1 mapping between slides and
|
|
screenshots (since screenshots are generated from actual slides).
|
|
"""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
slides_dir = os.path.join(extract_dir, "ppt", "slides")
|
|
if not os.path.exists(slides_dir):
|
|
return []
|
|
|
|
slide_files = sorted(
|
|
[f for f in os.listdir(slides_dir) if f.startswith("slide") and f.endswith(".xml")],
|
|
key=lambda x: int(x.replace("slide", "").replace(".xml", "")),
|
|
)
|
|
|
|
# Pre-load slideLayout names by filename for fast lookup
|
|
layout_names_by_file = {}
|
|
layouts_dir = os.path.join(extract_dir, "ppt", "slideLayouts")
|
|
if os.path.exists(layouts_dir):
|
|
for lf in os.listdir(layouts_dir):
|
|
if not lf.endswith(".xml"):
|
|
continue
|
|
path = os.path.join(layouts_dir, lf)
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
root = ET.fromstring(f.read())
|
|
cSld = root.find("p:cSld", NS)
|
|
if cSld is not None and cSld.get("name"):
|
|
layout_names_by_file[lf] = cSld.get("name")
|
|
else:
|
|
layout_names_by_file[lf] = lf.replace(".xml", "")
|
|
except Exception:
|
|
layout_names_by_file[lf] = lf.replace(".xml", "")
|
|
|
|
RELS_NS = {"r": "http://schemas.openxmlformats.org/package/2006/relationships"}
|
|
|
|
slides = []
|
|
for sf in slide_files:
|
|
slide_path = os.path.join(slides_dir, sf)
|
|
with open(slide_path, "r", encoding="utf-8") as f:
|
|
xml_content = f.read()
|
|
|
|
# Resolve layout name from .rels file
|
|
layout_name = sf.replace(".xml", "")
|
|
rels_path = os.path.join(slides_dir, "_rels", sf + ".rels")
|
|
if os.path.exists(rels_path):
|
|
try:
|
|
with open(rels_path, "r", encoding="utf-8") as f:
|
|
rels_root = ET.fromstring(f.read())
|
|
for rel in rels_root.findall("r:Relationship", RELS_NS):
|
|
# Fallback: try without namespace
|
|
target = rel.get("Target", "")
|
|
rel_type = rel.get("Type", "")
|
|
if "slideLayout" in rel_type or "slideLayout" in target:
|
|
# Target is like "../slideLayouts/slideLayout2.xml"
|
|
layout_file = target.split("/")[-1]
|
|
if layout_file in layout_names_by_file:
|
|
layout_name = layout_names_by_file[layout_file]
|
|
else:
|
|
layout_name = layout_file.replace(".xml", "")
|
|
break
|
|
# If namespace didn't match, try without namespace
|
|
if layout_name == sf.replace(".xml", ""):
|
|
for rel in rels_root.iter():
|
|
target = rel.get("Target", "")
|
|
if "slideLayout" in target:
|
|
layout_file = target.split("/")[-1]
|
|
if layout_file in layout_names_by_file:
|
|
layout_name = layout_names_by_file[layout_file]
|
|
else:
|
|
layout_name = layout_file.replace(".xml", "")
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
slides.append({
|
|
"filename": sf,
|
|
"layout_name": layout_name,
|
|
"xml_content": xml_content,
|
|
})
|
|
|
|
return slides
|
|
|
|
|
|
def _extract_theme_info(pptx_path: str, temp_dir: str) -> dict:
|
|
"""Extract theme colors and font scheme from the PPTX theme XML."""
|
|
extract_dir = os.path.join(temp_dir, "pptx_extract")
|
|
if not os.path.exists(extract_dir):
|
|
with zipfile.ZipFile(pptx_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
|
|
theme_dir = os.path.join(extract_dir, "ppt", "theme")
|
|
if not os.path.exists(theme_dir):
|
|
return {"colors": [], "fonts": {}}
|
|
|
|
theme_files = [f for f in os.listdir(theme_dir) if f.endswith(".xml")]
|
|
if not theme_files:
|
|
return {"colors": [], "fonts": {}}
|
|
|
|
theme_path = os.path.join(theme_dir, theme_files[0])
|
|
with open(theme_path, "r", encoding="utf-8") as f:
|
|
theme_xml = f.read()
|
|
|
|
colors = []
|
|
fonts_info = {}
|
|
|
|
try:
|
|
root = ET.fromstring(theme_xml)
|
|
|
|
# Extract color scheme
|
|
clrScheme = root.find(".//a:clrScheme", NS)
|
|
if clrScheme is not None:
|
|
for child in clrScheme:
|
|
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
|
|
# Look for srgbClr or sysClr
|
|
srgb = child.find("a:srgbClr", NS)
|
|
if srgb is not None:
|
|
colors.append({"name": tag, "hex": f"#{srgb.get('val', '')}"})
|
|
else:
|
|
sys_clr = child.find("a:sysClr", NS)
|
|
if sys_clr is not None:
|
|
last_clr = sys_clr.get("lastClr", "")
|
|
colors.append({"name": tag, "hex": f"#{last_clr}"})
|
|
|
|
# Extract font scheme
|
|
majorFont = root.find(".//a:majorFont/a:latin", NS)
|
|
minorFont = root.find(".//a:minorFont/a:latin", NS)
|
|
if majorFont is not None:
|
|
fonts_info["heading"] = majorFont.get("typeface", "")
|
|
if minorFont is not None:
|
|
fonts_info["body"] = minorFont.get("typeface", "")
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return {"colors": colors, "fonts": fonts_info}
|
|
|
|
|
|
def _guess_layout_type(layout_name: str) -> str:
|
|
"""Heuristic layout type guess from layout name."""
|
|
name_lower = layout_name.lower()
|
|
for hint, layout_type in LAYOUT_TYPE_HINTS.items():
|
|
if hint in name_lower:
|
|
return layout_type
|
|
return "custom"
|
|
|
|
|
|
def _placeholder_to_type(placeholder: str) -> str:
|
|
"""Map placeholder name to element type."""
|
|
if placeholder in ("title", "subtitle", "body", "footer", "date", "logo"):
|
|
return "text"
|
|
elif placeholder == "image":
|
|
return "image"
|
|
elif placeholder == "chart":
|
|
return "chart"
|
|
else:
|
|
return "shape"
|
|
|
|
|
|
def _get_theme_text_color(theme_info: dict, placeholder: str) -> Optional[str]:
|
|
"""Get text color from theme based on placeholder type."""
|
|
colors = theme_info.get("colors", [])
|
|
if not colors:
|
|
return None
|
|
# Try to find a suitable text color
|
|
# Dark/accent colors first for title, light for body on dark backgrounds
|
|
color_map = {c.get("name", ""): c.get("hex", "") for c in colors}
|
|
if placeholder in ("title", "subtitle"):
|
|
return color_map.get("dk1") or color_map.get("lt1") or colors[0].get("hex") if colors else None
|
|
return color_map.get("lt1") or color_map.get("dk1") or colors[0].get("hex") if colors else None
|
|
|
|
|
|
async def _llm_classify_elements(
|
|
provider: dict, img_b64: str, geometry_elements: List[dict]
|
|
) -> dict:
|
|
"""1 LLM call to classify element placeholder types. Returns dict with layoutName, background, elements."""
|
|
import json
|
|
# Attach IDs to elements
|
|
labeled = [{"id": f"elem-{i}", **e} for i, e in enumerate(geometry_elements)]
|
|
user_text = f"SLIDE ELEMENTS (JSON):\n{json.dumps(labeled, indent=2)}"
|
|
|
|
result = await UnifiedLLMService.generate_vision_completion(
|
|
system_prompt=GEOMETRY_TO_ELEMENTS_PROMPT,
|
|
user_text=user_text,
|
|
image_base64=img_b64,
|
|
provider_override=provider
|
|
)
|
|
cleaned = UnifiedLLMService.clean_llm_code_output(result, ["json"])
|
|
# Strip markdown fences if any
|
|
cleaned = cleaned.strip()
|
|
if cleaned.startswith("```"):
|
|
lines = cleaned.split("\n")
|
|
cleaned = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:])
|
|
return json.loads(cleaned)
|
|
|
|
|
|
def _build_element_model(
|
|
idx: int,
|
|
layout_name: str,
|
|
geometry_elements: List[dict],
|
|
llm_result: dict,
|
|
fonts: List[str],
|
|
theme_info: dict,
|
|
) -> dict:
|
|
"""Build the JSON element model from geometry + LLM classification."""
|
|
classified = {e["id"]: e for e in llm_result.get("elements", [])}
|
|
|
|
elements = []
|
|
for i, geom in enumerate(geometry_elements):
|
|
elem_id = f"elem-{i}"
|
|
classification = classified.get(elem_id, {})
|
|
placeholder = classification.get("placeholder", "shape")
|
|
default_content = classification.get("defaultContent", "")
|
|
elem_type = _placeholder_to_type(placeholder)
|
|
|
|
# Build basic style
|
|
style: dict = {}
|
|
if fonts:
|
|
style["fontFamily"] = fonts[0]
|
|
if placeholder == "title":
|
|
style.update({"fontSize": 48, "fontWeight": "bold"})
|
|
elif placeholder == "subtitle":
|
|
style.update({"fontSize": 28})
|
|
elif placeholder == "body":
|
|
style.update({"fontSize": 20})
|
|
elif placeholder == "footer":
|
|
style.update({"fontSize": 14})
|
|
|
|
# Theme colors
|
|
theme_color = _get_theme_text_color(theme_info, placeholder)
|
|
if theme_color:
|
|
style["color"] = theme_color
|
|
|
|
elements.append({
|
|
"id": elem_id,
|
|
"type": elem_type,
|
|
"placeholder": placeholder,
|
|
"x": geom.get("x", 0),
|
|
"y": geom.get("y", 0),
|
|
"w": geom.get("width", geom.get("w", 0)),
|
|
"h": geom.get("height", geom.get("h", 0)),
|
|
"style": style,
|
|
"defaultContent": default_content,
|
|
})
|
|
|
|
return {
|
|
"layoutId": f"layout-{idx}",
|
|
"layoutName": llm_result.get("layoutName", layout_name),
|
|
"slideWidth": 1280,
|
|
"slideHeight": 720,
|
|
"background": llm_result.get("background", "#ffffff"),
|
|
"elements": elements,
|
|
}
|
|
|
|
|
|
|
|
async def _llm_generate_html(
|
|
provider: dict, img_b64: str, xml_content: str, fonts: Optional[List[str]]
|
|
) -> str:
|
|
"""Generate HTML from slide screenshot + OXML JSON geometry using the available LLM provider."""
|
|
|
|
# 1. Format the XML into geometric JSON
|
|
geometric_elements = extract_geometry_from_oxml(xml_content)
|
|
geometry_json = format_geometry_for_llm(geometric_elements)
|
|
|
|
fonts_text = (
|
|
f"\nFONTS (Normalized root families used in this slide, use where required): {', '.join(fonts)}"
|
|
if fonts else ""
|
|
)
|
|
user_text = f"Slide Design Extracted Elements (JSON):\n{geometry_json}\n{fonts_text}"
|
|
|
|
html_content = await UnifiedLLMService.generate_vision_completion(
|
|
system_prompt=GENERATE_HTML_SYSTEM_PROMPT,
|
|
user_text=user_text,
|
|
image_base64=img_b64,
|
|
provider_override=provider
|
|
)
|
|
|
|
return UnifiedLLMService.clean_llm_code_output(html_content, ["html"])
|
|
|
|
|
|
async def _llm_generate_react(
|
|
provider: dict, html_content: str, img_b64: str
|
|
) -> str:
|
|
"""Convert HTML to React TSX component using the available LLM provider."""
|
|
user_text = f"HTML INPUT:\n{html_content}"
|
|
|
|
react_content = await UnifiedLLMService.generate_vision_completion(
|
|
system_prompt=HTML_TO_REACT_SYSTEM_PROMPT,
|
|
user_text=user_text,
|
|
image_base64=img_b64,
|
|
provider_override=provider
|
|
)
|
|
|
|
react_content = UnifiedLLMService.clean_llm_code_output(react_content, ["tsx", "typescript", "javascript"])
|
|
|
|
# Clean up: remove import/export lines (often hallucinated)
|
|
filtered_lines = []
|
|
for line in react_content.split("\n"):
|
|
stripped = line.strip()
|
|
if not (stripped.startswith("import ") or stripped.startswith("export ")):
|
|
filtered_lines.append(line)
|
|
return "\n".join(filtered_lines)
|
|
|
|
|
|
async def parse_master_deck(deck_id: uuid.UUID) -> None:
|
|
"""Parse a master deck PPTX asynchronously. Updates DB on completion/failure."""
|
|
async with async_session_maker() as session:
|
|
from models.sql.master_deck import MasterDeckModel
|
|
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
return
|
|
|
|
deck.parse_status = "processing"
|
|
await session.commit()
|
|
|
|
try:
|
|
result = await _do_parse_v2(deck_id)
|
|
async with async_session_maker() as session:
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
return
|
|
deck.parsed_config = result["parsed_config"]
|
|
deck.layouts = result["layouts"]
|
|
deck.thumbnail_path = result.get("thumbnail_path")
|
|
deck.parse_status = "completed"
|
|
await session.commit()
|
|
|
|
# Bridge: register parsed layouts as a custom template
|
|
await _register_as_template(deck_id, deck.name, result["layouts"], session)
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
async with async_session_maker() as session:
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
return
|
|
deck.parse_status = "failed"
|
|
deck.parsed_config = {"error": str(e)}
|
|
await session.commit()
|
|
|
|
|
|
async def _do_parse(deck_id: uuid.UUID) -> dict:
|
|
"""Core parsing logic. Returns dict with parsed_config, layouts, thumbnail_path."""
|
|
async with async_session_maker() as session:
|
|
from models.sql.master_deck import MasterDeckModel
|
|
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
raise ValueError("Deck not found")
|
|
pptx_path = deck.original_file_path
|
|
client_id = deck.client_id
|
|
parse_mode = getattr(deck, "parse_mode", None) or "layouts"
|
|
|
|
if not os.path.exists(pptx_path):
|
|
raise FileNotFoundError(f"PPTX file not found: {pptx_path}")
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# 1. Extract slide XMLs (actual slides) — always needed for font collection
|
|
slide_xmls = _extract_slide_xmls(pptx_path, temp_dir)
|
|
|
|
# 2. Choose primary source based on parse_mode
|
|
# Build layout→slide map so we can match screenshots to layouts
|
|
layout_to_slide_map = _build_layout_to_slide_map(pptx_path, temp_dir)
|
|
print(f"[MasterDeckParser] Layout→slide mapping: {layout_to_slide_map}")
|
|
|
|
if parse_mode == "layouts":
|
|
# Unique slideLayout XMLs from ppt/slideLayouts/
|
|
primary_metas = _extract_slide_layout_xmls(pptx_path, temp_dir)
|
|
print(f"[MasterDeckParser] Mode=layouts: {len(primary_metas)} slideLayouts")
|
|
else:
|
|
# "slides" mode: actual slides with layout name resolution
|
|
primary_metas = _extract_slides_with_layout_info(pptx_path, temp_dir)
|
|
print(f"[MasterDeckParser] Mode=slides: {len(primary_metas)} actual slides")
|
|
|
|
# Also get layout XMLs for font extraction even in slides mode
|
|
layout_metas_for_fonts = _extract_slide_layout_xmls(pptx_path, temp_dir)
|
|
|
|
# 3. Extract theme info
|
|
theme_info = _extract_theme_info(pptx_path, temp_dir)
|
|
|
|
# 4. Convert to PDF → screenshots (for slides, used as layout previews)
|
|
screenshots = []
|
|
thumbnail_path = None
|
|
try:
|
|
pdf_path = await _convert_pptx_to_pdf(pptx_path, temp_dir)
|
|
screenshot_paths = await DocumentsLoader.get_page_images_from_pdf_async(
|
|
pdf_path, temp_dir
|
|
)
|
|
# Copy screenshots to permanent location
|
|
app_data = os.environ.get("APP_DATA_DIRECTORY", os.path.join(os.path.dirname(__file__), "..", "data"))
|
|
deck_dir = os.path.join(
|
|
app_data, "clients",
|
|
str(client_id), "master_decks", str(deck_id), "screenshots"
|
|
)
|
|
os.makedirs(deck_dir, exist_ok=True)
|
|
|
|
for i, sp in enumerate(screenshot_paths):
|
|
if os.path.exists(sp) and os.path.getsize(sp) > 0:
|
|
dest = os.path.join(deck_dir, f"slide_{i + 1}.png")
|
|
shutil.copy2(sp, dest)
|
|
screenshots.append(dest)
|
|
if i == 0:
|
|
thumbnail_path = dest
|
|
|
|
except Exception as e:
|
|
print(f"Screenshot generation failed (non-fatal): {e}")
|
|
|
|
# 5. Collect all fonts used
|
|
all_fonts = set()
|
|
for lm in layout_metas_for_fonts:
|
|
raw = extract_fonts_from_oxml(lm["xml_content"])
|
|
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
|
|
for sx in slide_xmls:
|
|
raw = extract_fonts_from_oxml(sx)
|
|
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
|
|
|
|
# 6. Process each item through LLM pipeline
|
|
llm_provider = _get_parsing_provider() # Use fast lite model for parsing
|
|
print(f"[MasterDeckParser] Using {llm_provider['model']} for parsing")
|
|
layouts_result = []
|
|
|
|
# Build per-layout screenshot mapping
|
|
# In "slides" mode: screenshots[idx] maps directly (1:1)
|
|
# In "layouts" mode: use layout_to_slide_map to find the right screenshot
|
|
layout_screenshot_map: dict = {} # layout index → screenshot path
|
|
if parse_mode == "layouts":
|
|
for idx, lm in enumerate(primary_metas):
|
|
layout_filename = lm.get("filename", "")
|
|
slide_idx = layout_to_slide_map.get(layout_filename)
|
|
if slide_idx is not None and slide_idx < len(screenshots):
|
|
layout_screenshot_map[idx] = screenshots[slide_idx]
|
|
else:
|
|
for idx in range(min(len(primary_metas), len(screenshots))):
|
|
layout_screenshot_map[idx] = screenshots[idx]
|
|
|
|
llm_count = len(layout_screenshot_map)
|
|
print(f"[MasterDeckParser] LLM provider: {llm_provider['provider'] if llm_provider else 'NONE'}")
|
|
print(f"[MasterDeckParser] Processing {len(primary_metas)} items, {llm_count} with screenshots for LLM")
|
|
|
|
# Optional: LayoutParser region detection for better classification
|
|
from services.layout_analysis_service import (
|
|
analyze_slide_layout,
|
|
classify_layout_from_regions,
|
|
regions_to_description,
|
|
)
|
|
|
|
# Step 1: Prepare all layout entries with metadata
|
|
layout_entries = []
|
|
for idx, lm in enumerate(primary_metas):
|
|
screenshot_path = layout_screenshot_map.get(idx)
|
|
|
|
# Try LayoutParser classification if a screenshot is available
|
|
lp_layout_type = None
|
|
lp_region_desc = ""
|
|
if screenshot_path and os.path.exists(screenshot_path):
|
|
try:
|
|
regions = await asyncio.to_thread(analyze_slide_layout, screenshot_path)
|
|
if regions:
|
|
lp_layout_type = classify_layout_from_regions(regions)
|
|
lp_region_desc = regions_to_description(regions)
|
|
except Exception as lp_err:
|
|
print(f"[LayoutAnalysis] Detection failed: {lp_err}")
|
|
|
|
layout_entry = {
|
|
"index": idx,
|
|
"layout_name": lm["layout_name"],
|
|
"layout_type": lp_layout_type or _guess_layout_type(lm["layout_name"]),
|
|
"xml_snippet": format_geometry_for_llm(extract_geometry_from_oxml(lm["xml_content"])),
|
|
"fonts": list(
|
|
{normalize_font_family_name(f) for f in extract_fonts_from_oxml(lm["xml_content"]) if f}
|
|
),
|
|
"html": None,
|
|
"react_code": None,
|
|
"screenshot_path": screenshot_path,
|
|
"lp_region_desc": lp_region_desc, # Store for LLM context
|
|
"xml_content": lm["xml_content"],
|
|
}
|
|
layout_entries.append(layout_entry)
|
|
|
|
# Step 2: Parallel HTML generation for all layouts with screenshots
|
|
if llm_provider:
|
|
print(f"[MasterDeckParser] PARALLEL MODE: Generating HTML for {llm_count} layouts...")
|
|
|
|
async def generate_html_for_layout(entry):
|
|
"""Generate HTML for a single layout."""
|
|
if not entry["screenshot_path"] or not os.path.exists(entry["screenshot_path"]):
|
|
return None
|
|
|
|
try:
|
|
with open(entry["screenshot_path"], "rb") as img_f:
|
|
img_b64 = base64.b64encode(img_f.read()).decode("utf-8")
|
|
|
|
xml_context = entry["xml_content"]
|
|
if entry["lp_region_desc"]:
|
|
xml_context = f"{entry['lp_region_desc']}\n\n---\n\n{xml_context}"
|
|
|
|
html = await _llm_generate_html(
|
|
llm_provider, img_b64, xml_context, entry["fonts"] or None
|
|
)
|
|
return html.replace("```html", "").replace("```", "")
|
|
except Exception as e:
|
|
print(f"[MasterDeckParser] HTML gen failed for {entry['layout_name']}: {e}")
|
|
return None
|
|
|
|
# Parallel HTML generation
|
|
html_tasks = [generate_html_for_layout(entry) for entry in layout_entries]
|
|
html_results = await asyncio.gather(*html_tasks, return_exceptions=True)
|
|
|
|
# Assign HTML results
|
|
for entry, html in zip(layout_entries, html_results):
|
|
if html and not isinstance(html, Exception):
|
|
entry["html"] = html
|
|
|
|
print(f"[MasterDeckParser] HTML generation complete. Generating React...")
|
|
|
|
# Step 3: Parallel React generation for layouts with HTML
|
|
async def generate_react_for_layout(entry, idx):
|
|
"""Generate React for a single layout."""
|
|
if not entry["html"] or not entry["screenshot_path"]:
|
|
return None
|
|
|
|
try:
|
|
with open(entry["screenshot_path"], "rb") as img_f:
|
|
img_b64 = base64.b64encode(img_f.read()).decode("utf-8")
|
|
|
|
react_code = await _llm_generate_react(llm_provider, entry["html"], img_b64)
|
|
react_cleaned = react_code.replace("```tsx", "").replace("```", "")
|
|
print(f"[MasterDeckParser] Layout {idx + 1}/{llm_count}: {entry['layout_name']} — done ({len(react_cleaned)} chars)")
|
|
return react_cleaned
|
|
except Exception as e:
|
|
print(f"[MasterDeckParser] React gen failed for {entry['layout_name']}: {e}")
|
|
return None
|
|
|
|
# Parallel React generation
|
|
react_tasks = [
|
|
generate_react_for_layout(entry, idx)
|
|
for idx, entry in enumerate(layout_entries)
|
|
]
|
|
react_results = await asyncio.gather(*react_tasks, return_exceptions=True)
|
|
|
|
# Assign React results
|
|
for entry, react in zip(layout_entries, react_results):
|
|
if react and not isinstance(react, Exception):
|
|
entry["react_code"] = react
|
|
|
|
# Clean up temporary fields
|
|
for entry in layout_entries:
|
|
entry.pop("lp_region_desc", None)
|
|
entry.pop("xml_content", None)
|
|
|
|
layouts_result = layout_entries
|
|
|
|
parsed_config = {
|
|
"theme": theme_info,
|
|
"total_slides": len(slide_xmls),
|
|
"total_layouts": len(layout_metas_for_fonts),
|
|
"parse_mode": parse_mode,
|
|
"fonts": sorted(all_fonts),
|
|
}
|
|
|
|
return {
|
|
"parsed_config": parsed_config,
|
|
"layouts": layouts_result,
|
|
"thumbnail_path": thumbnail_path,
|
|
}
|
|
|
|
|
|
async def _do_parse_v2(deck_id: uuid.UUID) -> dict:
|
|
"""New JSON-based parsing pipeline. 1 LLM call per layout instead of 2.
|
|
|
|
Output layout_code is a JSON element model, not TSX code.
|
|
"""
|
|
import json
|
|
async with async_session_maker() as session:
|
|
from models.sql.master_deck import MasterDeckModel
|
|
deck = await session.get(MasterDeckModel, deck_id)
|
|
if not deck:
|
|
raise ValueError("Deck not found")
|
|
pptx_path = deck.original_file_path
|
|
client_id = deck.client_id
|
|
parse_mode = getattr(deck, "parse_mode", None) or "layouts"
|
|
|
|
if not os.path.exists(pptx_path):
|
|
raise FileNotFoundError(f"PPTX file not found: {pptx_path}")
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
slide_xmls = _extract_slide_xmls(pptx_path, temp_dir)
|
|
layout_to_slide_map = _build_layout_to_slide_map(pptx_path, temp_dir)
|
|
|
|
if parse_mode == "layouts":
|
|
primary_metas = _extract_slide_layout_xmls(pptx_path, temp_dir)
|
|
print(f"[ParserV2] Mode=layouts: {len(primary_metas)} slideLayouts")
|
|
else:
|
|
primary_metas = _extract_slides_with_layout_info(pptx_path, temp_dir)
|
|
print(f"[ParserV2] Mode=slides: {len(primary_metas)} actual slides")
|
|
|
|
layout_metas_for_fonts = _extract_slide_layout_xmls(pptx_path, temp_dir)
|
|
theme_info = _extract_theme_info(pptx_path, temp_dir)
|
|
|
|
# Generate screenshots
|
|
screenshots = []
|
|
thumbnail_path = None
|
|
try:
|
|
pdf_path = await _convert_pptx_to_pdf(pptx_path, temp_dir)
|
|
screenshot_paths = await DocumentsLoader.get_page_images_from_pdf_async(pdf_path, temp_dir)
|
|
app_data = os.environ.get("APP_DATA_DIRECTORY", os.path.join(os.path.dirname(__file__), "..", "data"))
|
|
deck_dir = os.path.join(app_data, "clients", str(client_id), "master_decks", str(deck_id), "screenshots")
|
|
os.makedirs(deck_dir, exist_ok=True)
|
|
for i, sp in enumerate(screenshot_paths):
|
|
if os.path.exists(sp) and os.path.getsize(sp) > 0:
|
|
dest = os.path.join(deck_dir, f"slide_{i + 1}.png")
|
|
shutil.copy2(sp, dest)
|
|
screenshots.append(dest)
|
|
if i == 0:
|
|
thumbnail_path = dest
|
|
except Exception as e:
|
|
print(f"[ParserV2] Screenshot generation failed (non-fatal): {e}")
|
|
|
|
# Collect fonts
|
|
all_fonts = set()
|
|
for lm in layout_metas_for_fonts:
|
|
raw = extract_fonts_from_oxml(lm["xml_content"])
|
|
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
|
|
for sx in slide_xmls:
|
|
raw = extract_fonts_from_oxml(sx)
|
|
all_fonts.update(normalize_font_family_name(f) for f in raw if f)
|
|
|
|
# Screenshot mapping
|
|
layout_screenshot_map: dict = {}
|
|
if parse_mode == "layouts":
|
|
for idx, lm in enumerate(primary_metas):
|
|
layout_filename = lm.get("filename", "")
|
|
slide_idx = layout_to_slide_map.get(layout_filename)
|
|
if slide_idx is not None and slide_idx < len(screenshots):
|
|
layout_screenshot_map[idx] = screenshots[slide_idx]
|
|
else:
|
|
for idx in range(min(len(primary_metas), len(screenshots))):
|
|
layout_screenshot_map[idx] = screenshots[idx]
|
|
|
|
llm_provider = _get_parsing_provider()
|
|
print(f"[ParserV2] Using {llm_provider['model']} for element classification")
|
|
|
|
# Build layout entries
|
|
layout_entries = []
|
|
for idx, lm in enumerate(primary_metas):
|
|
screenshot_path = layout_screenshot_map.get(idx)
|
|
per_layout_fonts = list(
|
|
{normalize_font_family_name(f) for f in extract_fonts_from_oxml(lm["xml_content"]) if f}
|
|
)
|
|
layout_entries.append({
|
|
"index": idx,
|
|
"layout_name": lm["layout_name"],
|
|
"xml_content": lm["xml_content"],
|
|
"fonts": per_layout_fonts,
|
|
"screenshot_path": screenshot_path,
|
|
"element_model": None,
|
|
})
|
|
|
|
# Parallel element classification
|
|
print(f"[ParserV2] Classifying elements for {len(layout_entries)} layouts...")
|
|
|
|
async def classify_layout(entry):
|
|
if not entry["screenshot_path"] or not os.path.exists(entry["screenshot_path"]):
|
|
return None
|
|
try:
|
|
with open(entry["screenshot_path"], "rb") as img_f:
|
|
img_b64 = base64.b64encode(img_f.read()).decode("utf-8")
|
|
geometry = extract_geometry_from_oxml(entry["xml_content"])
|
|
if not geometry:
|
|
return None
|
|
llm_result = await _llm_classify_elements(llm_provider, img_b64, geometry)
|
|
fonts_for_model = entry["fonts"] or list(all_fonts)[:3]
|
|
model = _build_element_model(
|
|
entry["index"], entry["layout_name"],
|
|
geometry, llm_result, fonts_for_model, theme_info
|
|
)
|
|
return model
|
|
except Exception as e:
|
|
print(f"[ParserV2] Classification failed for {entry['layout_name']}: {e}")
|
|
return None
|
|
|
|
tasks = [classify_layout(entry) for entry in layout_entries]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
for entry, result in zip(layout_entries, results):
|
|
if result and not isinstance(result, Exception):
|
|
entry["element_model"] = result
|
|
print(f"[ParserV2] Layout '{entry['layout_name']}' classified: {len(result.get('elements', []))} elements")
|
|
|
|
# Build final layouts list (same structure as old, but layout_code is JSON)
|
|
layouts_result = []
|
|
for entry in layout_entries:
|
|
model = entry.get("element_model")
|
|
if model:
|
|
layouts_result.append({
|
|
"index": entry["index"],
|
|
"layout_name": model["layoutName"],
|
|
"layout_type": _guess_layout_type(model["layoutName"]),
|
|
"react_code": json.dumps(model), # Store JSON as react_code for compat
|
|
"fonts": entry["fonts"],
|
|
"screenshot_path": entry["screenshot_path"],
|
|
})
|
|
else:
|
|
# No screenshot/failed → include without code
|
|
layouts_result.append({
|
|
"index": entry["index"],
|
|
"layout_name": entry["layout_name"],
|
|
"layout_type": _guess_layout_type(entry["layout_name"]),
|
|
"react_code": None,
|
|
"fonts": entry["fonts"],
|
|
"screenshot_path": entry["screenshot_path"],
|
|
})
|
|
|
|
parsed_config = {
|
|
"theme": theme_info,
|
|
"total_slides": len(slide_xmls),
|
|
"total_layouts": len(layout_metas_for_fonts),
|
|
"parse_mode": parse_mode,
|
|
"fonts": sorted(all_fonts),
|
|
"parser_version": "v2", # Mark as new parser
|
|
}
|
|
|
|
return {
|
|
"parsed_config": parsed_config,
|
|
"layouts": layouts_result,
|
|
"thumbnail_path": thumbnail_path,
|
|
}
|
|
|
|
|
|
async def _register_as_template(
|
|
deck_id: uuid.UUID,
|
|
deck_name: str,
|
|
layouts: list,
|
|
session,
|
|
) -> None:
|
|
"""Bridge master deck layouts into the custom template system.
|
|
|
|
Creates a TemplateModel and PresentationLayoutCodeModel records
|
|
so the parsed layouts appear in the template picker during generation.
|
|
"""
|
|
from models.sql.template import TemplateModel
|
|
from models.sql.presentation_layout_code import PresentationLayoutCodeModel
|
|
from sqlalchemy import select, delete
|
|
|
|
try:
|
|
# Upsert TemplateModel — use deck_id as template id
|
|
existing = await session.get(TemplateModel, deck_id)
|
|
if existing:
|
|
existing.name = deck_name or "Custom Template"
|
|
else:
|
|
template = TemplateModel(
|
|
id=deck_id,
|
|
name=deck_name or "Custom Template",
|
|
description=f"Auto-generated from master deck: {deck_name}",
|
|
)
|
|
session.add(template)
|
|
|
|
# Remove old layout codes for this deck (reparse case)
|
|
await session.execute(
|
|
delete(PresentationLayoutCodeModel).where(
|
|
PresentationLayoutCodeModel.presentation == deck_id
|
|
)
|
|
)
|
|
|
|
# Create PresentationLayoutCodeModel for each layout with react_code
|
|
for idx, layout in enumerate(layouts):
|
|
react_code = layout.get("react_code")
|
|
if not react_code:
|
|
continue
|
|
|
|
layout_code = PresentationLayoutCodeModel(
|
|
presentation=deck_id,
|
|
layout_id=f"layout-{idx}",
|
|
layout_name=layout.get("layout_name", f"Layout {idx + 1}"),
|
|
layout_code=react_code,
|
|
fonts=layout.get("fonts"),
|
|
thumbnail_path=layout.get("screenshot_path"),
|
|
is_enabled=True,
|
|
)
|
|
session.add(layout_code)
|
|
|
|
await session.commit()
|
|
print(f"Registered master deck {deck_id} as custom template with "
|
|
f"{sum(1 for l in layouts if l.get('react_code'))} layouts")
|
|
|
|
except Exception as e:
|
|
print(f"Failed to register master deck as template: {e}")
|
|
# Don't fail the entire parse — template registration is non-critical
|
|
await session.rollback()
|