ppt-tool/backend/services/content_intelligence_service.py
Vadym Samoilenko a2bd4cfefa Phase 3: Content Pipeline — file parsing, content intelligence, slide mapping, native charts
- Step 10: Extended file upload for Excel/CSV/images/URLs (openpyxl, trafilatura)
- Step 11: Content intelligence service with rule-based + LLM classification
- Step 12: Slide mapping engine mapping content blocks to master deck layouts
- Step 13: Chart data extractor, native PPTX chart service (bar/line/pie/gantt/waterfall), ChartDataEditor skeleton

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:54:04 +00:00

430 lines
15 KiB
Python

"""Content Intelligence Service: classify brief content into typed blocks for slide mapping."""
import re
from typing import Any, Dict, List, Optional
from models.content_models import (
ClassifiedContent,
ContentBlock,
ContentBlockType,
)
from models.llm_message import LLMSystemMessage, LLMUserMessage
from services.attachment_parser_service import ImageInfo, TableData
from services.llm_client import LLMClient
from services.score_based_chunker import ScoreBasedChunker
from utils.llm_provider import get_model
# --- Regex patterns for rule-based classification ---
_METRIC_RE = re.compile(
r"""
(?: # value-first: $2.3M, 45%, 1,200 units
[\$€£¥]\s?\d[\d,.]*[KMBTkmbt%]? |
\d[\d,.]*\s?% |
\d[\d,.]*\s?[KMBTkmbt]\b
)
|
(?: # "grew 45%", "increased by $2M"
(?:grew|growth|increased?|decreased?|rose|fell|dropped|declined|revenue|profit|margin|roi|cagr|arpu)
.{0,30}?
[\$€£¥]?\d[\d,.]*[KMBTkmbt%]?
)
""",
re.IGNORECASE | re.VERBOSE,
)
_QUOTE_RE = re.compile(
r'["\u201c\u201d].{15,300}?["\u201c\u201d]' # 15-300 chars inside quotes
r"(?:\s*[-\u2014\u2013]\s*.{2,60})?", # optional attribution
re.DOTALL,
)
_TABLE_RE = re.compile(r"^\|.+\|$", re.MULTILINE)
_TIMELINE_RE = re.compile(
r"(?:(?:19|20)\d{2}|Q[1-4]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w*\s+\d{4})",
re.IGNORECASE,
)
_COMPARISON_RE = re.compile(
r"\b(?:vs\.?|versus|compared?\s+to|in\s+contrast|on\s+the\s+other\s+hand|whereas|alternatively)\b",
re.IGNORECASE,
)
_LIST_RE = re.compile(r"^[\s]*[-*•]\s+.+", re.MULTILINE)
_NUMBERED_LIST_RE = re.compile(r"^[\s]*\d+[.)]\s+.+", re.MULTILINE)
_IMAGE_REF_RE = re.compile(
r"(?:!\[|see\s+(?:figure|image|diagram|chart|photo)|attached\s+image|\.(?:png|jpg|jpeg|gif|webp|svg)\b)",
re.IGNORECASE,
)
_CTA_RE = re.compile(
r"\b(?:contact\s+us|get\s+started|sign\s+up|learn\s+more|next\s+steps|action\s+items|call\s+to\s+action|let's\s+(?:discuss|connect|talk))\b",
re.IGNORECASE,
)
# Priority map: higher = more important for presentation
_PRIORITY_MAP = {
ContentBlockType.metric: 8,
ContentBlockType.quote: 7,
ContentBlockType.table: 6,
ContentBlockType.timeline: 6,
ContentBlockType.comparison: 6,
ContentBlockType.call_to_action: 7,
ContentBlockType.list_items: 5,
ContentBlockType.image_reference: 5,
ContentBlockType.narrative: 4,
}
class ContentIntelligenceService:
def __init__(self):
self._chunker = ScoreBasedChunker()
async def classify(
self,
markdown: str,
tables: Optional[List[TableData]] = None,
images: Optional[List[ImageInfo]] = None,
) -> ClassifiedContent:
"""Classify markdown content into typed content blocks."""
tables = tables or []
images = images or []
# 1. Extract a title (first heading, if any)
title = self._extract_title(markdown)
# 2. Chunk the content
chunks = await self._chunk_content(markdown)
# 3. Rule-based classification per chunk
blocks: List[ContentBlock] = []
ambiguous_chunks: List[tuple] = [] # (index, text) for LLM classification
for chunk in chunks:
text = f"{chunk.heading}\n{chunk.content}".strip()
block_type = self._classify_by_rules(text)
if block_type:
extracted = self._extract_data(block_type, text)
blocks.append(
ContentBlock(
type=block_type,
raw_text=text,
extracted_data=extracted,
source_section=chunk.heading.lstrip("# ").strip(),
priority=_PRIORITY_MAP.get(block_type, 4),
)
)
else:
ambiguous_chunks.append((len(blocks), text))
# Placeholder — will be replaced after LLM classification
blocks.append(
ContentBlock(
type=ContentBlockType.narrative,
raw_text=text,
source_section=chunk.heading.lstrip("# ").strip(),
priority=4,
)
)
# 4. LLM batch classification for ambiguous blocks
if ambiguous_chunks:
llm_types = await self._llm_classify_batch(
[text for _, text in ambiguous_chunks]
)
for (idx, text), btype in zip(ambiguous_chunks, llm_types):
extracted = self._extract_data(btype, text)
blocks[idx] = ContentBlock(
type=btype,
raw_text=text,
extracted_data=extracted,
source_section=blocks[idx].source_section,
priority=_PRIORITY_MAP.get(btype, 4),
)
# 5. Merge attachment data
for td in tables:
blocks.append(
ContentBlock(
type=ContentBlockType.table,
raw_text=f"Table: {td.title or td.sheet_name or 'Data'}\n"
f"Headers: {', '.join(td.headers)}\n"
f"Rows: {len(td.rows)}",
extracted_data={"headers": td.headers, "row_count": len(td.rows)},
source_section=td.title or td.sheet_name,
priority=_PRIORITY_MAP[ContentBlockType.table],
)
)
for img in images:
blocks.append(
ContentBlock(
type=ContentBlockType.image_reference,
raw_text=f"Image: {img.filename}",
extracted_data={
"file_path": img.file_path,
"width": img.width,
"height": img.height,
},
source_section=None,
priority=_PRIORITY_MAP[ContentBlockType.image_reference],
)
)
# 6. Sort by priority (descending), preserving order for same priority
blocks.sort(key=lambda b: -b.priority)
# 7. Generate summary
summary = await self._generate_summary(markdown, blocks)
return ClassifiedContent(
title=title,
blocks=blocks,
tables=tables,
images=images,
summary=summary,
)
async def ask_followup_questions(
self, content: ClassifiedContent
) -> Optional[List[str]]:
"""Ask follow-up questions if content is too thin."""
total_words = sum(len(b.raw_text.split()) for b in content.blocks)
if total_words >= 200 and len(content.blocks) >= 3:
return None
client = LLMClient()
model = get_model()
block_summary = "\n".join(
f"- [{b.type.value}] {b.raw_text[:100]}..." for b in content.blocks[:10]
)
messages = [
LLMSystemMessage(
content="You help identify missing information for a presentation brief. "
"Return a JSON array of 2-4 short questions that would help create a more complete presentation."
),
LLMUserMessage(
content=f"The user provided a brief with {total_words} words and {len(content.blocks)} content blocks:\n\n"
f"{block_summary}\n\n"
"What additional information would be helpful?"
),
]
schema = {
"type": "object",
"properties": {
"questions": {
"type": "array",
"items": {"type": "string"},
"minItems": 2,
"maxItems": 4,
}
},
"required": ["questions"],
}
try:
result = await client.generate_structured(
model=model,
messages=messages,
response_format=schema,
)
return result.get("questions", [])
except Exception:
return None
# --- Internal methods ---
def _extract_title(self, markdown: str) -> Optional[str]:
for line in markdown.split("\n"):
stripped = line.strip()
if stripped.startswith("# ") and not stripped.startswith("## "):
return stripped.lstrip("# ").strip()
return None
async def _chunk_content(self, markdown: str):
"""Chunk using ScoreBasedChunker. Fall back to paragraph splitting."""
try:
headings = self._chunker.extract_headings(markdown)
if len(headings) >= 2:
scores = self._chunker.score_headings(headings)
chunks = self._chunker.get_chunks_from_headings(
markdown, headings, scores, top_k=30
)
if chunks:
return chunks
except Exception:
pass
# Fallback: split by double newlines (paragraph-based)
from models.document_chunk import DocumentChunk
paragraphs = [p.strip() for p in re.split(r"\n{2,}", markdown) if p.strip()]
return [
DocumentChunk(
heading=f"Section {i + 1}",
content=p,
heading_index=i,
score=5.0,
)
for i, p in enumerate(paragraphs)
if len(p) > 20
]
def _classify_by_rules(self, text: str) -> Optional[ContentBlockType]:
"""Apply rule-based classification. Returns None if ambiguous."""
# Check from most specific to least
if _QUOTE_RE.search(text):
return ContentBlockType.quote
if _TABLE_RE.search(text):
return ContentBlockType.table
if _IMAGE_REF_RE.search(text):
return ContentBlockType.image_reference
if _CTA_RE.search(text):
return ContentBlockType.call_to_action
metric_matches = _METRIC_RE.findall(text)
if len(metric_matches) >= 2:
return ContentBlockType.metric
timeline_matches = _TIMELINE_RE.findall(text)
if len(timeline_matches) >= 2:
return ContentBlockType.timeline
if _COMPARISON_RE.search(text):
return ContentBlockType.comparison
list_matches = _LIST_RE.findall(text)
numbered_matches = _NUMBERED_LIST_RE.findall(text)
if len(list_matches) >= 3 or len(numbered_matches) >= 3:
return ContentBlockType.list_items
# Single metric mention
if metric_matches:
return ContentBlockType.metric
return None # Ambiguous — defer to LLM
def _extract_data(
self, block_type: ContentBlockType, text: str
) -> Optional[Dict[str, Any]]:
"""Extract structured data from a content block based on its type."""
if block_type == ContentBlockType.metric:
return self._extract_metric_data(text)
if block_type == ContentBlockType.quote:
return self._extract_quote_data(text)
return None
def _extract_metric_data(self, text: str) -> Dict[str, Any]:
"""Extract numeric values and labels from metric text."""
metrics = []
# Pattern: label ... value
for match in re.finditer(
r"([\w\s]+?)\s*(?::|is|was|reached|hit|grew\s+to|of)\s*"
r"([\$€£¥]?\s?\d[\d,.]*\s?[KMBTkmbt%]*)",
text,
re.IGNORECASE,
):
label = match.group(1).strip()
value = match.group(2).strip()
if len(label) < 50:
metrics.append({"label": label, "value": value})
# Fallback: just extract all numbers with context
if not metrics:
for match in _METRIC_RE.finditer(text):
metrics.append({"value": match.group().strip()})
return {"metrics": metrics[:10]}
def _extract_quote_data(self, text: str) -> Dict[str, Any]:
"""Extract quote text and attribution."""
match = _QUOTE_RE.search(text)
if match:
full = match.group()
# Try to split attribution
parts = re.split(r"\s*[-\u2014\u2013]\s*", full, maxsplit=1)
quote_text = parts[0].strip().strip('"\u201c\u201d')
attribution = parts[1].strip() if len(parts) > 1 else None
return {"quote": quote_text, "attribution": attribution}
return {}
async def _llm_classify_batch(
self, texts: List[str]
) -> List[ContentBlockType]:
"""Use LLM to classify a batch of ambiguous text chunks."""
if not texts:
return []
client = LLMClient()
model = get_model()
types_list = ", ".join(t.value for t in ContentBlockType)
numbered = "\n".join(f"{i + 1}. {t[:300]}" for i, t in enumerate(texts))
messages = [
LLMSystemMessage(
content=f"Classify each numbered text chunk into one of these content types: {types_list}.\n"
"Return a JSON object with a 'classifications' array of strings, one per chunk, in order."
),
LLMUserMessage(content=numbered),
]
schema = {
"type": "object",
"properties": {
"classifications": {
"type": "array",
"items": {"type": "string", "enum": [t.value for t in ContentBlockType]},
}
},
"required": ["classifications"],
}
try:
result = await client.generate_structured(
model=model,
messages=messages,
response_format=schema,
)
classifications = result.get("classifications", [])
output = []
for i, text in enumerate(texts):
if i < len(classifications):
try:
output.append(ContentBlockType(classifications[i]))
except ValueError:
output.append(ContentBlockType.narrative)
else:
output.append(ContentBlockType.narrative)
return output
except Exception:
return [ContentBlockType.narrative] * len(texts)
async def _generate_summary(
self, markdown: str, blocks: List[ContentBlock]
) -> str:
"""Generate a brief summary of the content."""
client = LLMClient()
model = get_model()
messages = [
LLMSystemMessage(
content="Summarize the following content in 1-2 sentences for use as a presentation overview."
),
LLMUserMessage(content=markdown[:3000]),
]
try:
result = await client.generate(model=model, messages=messages)
return result.strip()[:500]
except Exception:
# Fallback: first 200 chars
return markdown[:200].strip() + "..."