- Step 10: Extended file upload for Excel/CSV/images/URLs (openpyxl, trafilatura) - Step 11: Content intelligence service with rule-based + LLM classification - Step 12: Slide mapping engine mapping content blocks to master deck layouts - Step 13: Chart data extractor, native PPTX chart service (bar/line/pie/gantt/waterfall), ChartDataEditor skeleton Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
430 lines
15 KiB
Python
430 lines
15 KiB
Python
"""Content Intelligence Service: classify brief content into typed blocks for slide mapping."""
|
|
import re
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from models.content_models import (
|
|
ClassifiedContent,
|
|
ContentBlock,
|
|
ContentBlockType,
|
|
)
|
|
from models.llm_message import LLMSystemMessage, LLMUserMessage
|
|
from services.attachment_parser_service import ImageInfo, TableData
|
|
from services.llm_client import LLMClient
|
|
from services.score_based_chunker import ScoreBasedChunker
|
|
from utils.llm_provider import get_model
|
|
|
|
# --- Regex patterns for rule-based classification ---
|
|
|
|
_METRIC_RE = re.compile(
|
|
r"""
|
|
(?: # value-first: $2.3M, 45%, 1,200 units
|
|
[\$€£¥]\s?\d[\d,.]*[KMBTkmbt%]? |
|
|
\d[\d,.]*\s?% |
|
|
\d[\d,.]*\s?[KMBTkmbt]\b
|
|
)
|
|
|
|
|
(?: # "grew 45%", "increased by $2M"
|
|
(?:grew|growth|increased?|decreased?|rose|fell|dropped|declined|revenue|profit|margin|roi|cagr|arpu)
|
|
.{0,30}?
|
|
[\$€£¥]?\d[\d,.]*[KMBTkmbt%]?
|
|
)
|
|
""",
|
|
re.IGNORECASE | re.VERBOSE,
|
|
)
|
|
|
|
_QUOTE_RE = re.compile(
|
|
r'["\u201c\u201d].{15,300}?["\u201c\u201d]' # 15-300 chars inside quotes
|
|
r"(?:\s*[-\u2014\u2013]\s*.{2,60})?", # optional attribution
|
|
re.DOTALL,
|
|
)
|
|
|
|
_TABLE_RE = re.compile(r"^\|.+\|$", re.MULTILINE)
|
|
|
|
_TIMELINE_RE = re.compile(
|
|
r"(?:(?:19|20)\d{2}|Q[1-4]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w*\s+\d{4})",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_COMPARISON_RE = re.compile(
|
|
r"\b(?:vs\.?|versus|compared?\s+to|in\s+contrast|on\s+the\s+other\s+hand|whereas|alternatively)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_LIST_RE = re.compile(r"^[\s]*[-*•]\s+.+", re.MULTILINE)
|
|
_NUMBERED_LIST_RE = re.compile(r"^[\s]*\d+[.)]\s+.+", re.MULTILINE)
|
|
|
|
_IMAGE_REF_RE = re.compile(
|
|
r"(?:!\[|see\s+(?:figure|image|diagram|chart|photo)|attached\s+image|\.(?:png|jpg|jpeg|gif|webp|svg)\b)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_CTA_RE = re.compile(
|
|
r"\b(?:contact\s+us|get\s+started|sign\s+up|learn\s+more|next\s+steps|action\s+items|call\s+to\s+action|let's\s+(?:discuss|connect|talk))\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Priority map: higher = more important for presentation
|
|
_PRIORITY_MAP = {
|
|
ContentBlockType.metric: 8,
|
|
ContentBlockType.quote: 7,
|
|
ContentBlockType.table: 6,
|
|
ContentBlockType.timeline: 6,
|
|
ContentBlockType.comparison: 6,
|
|
ContentBlockType.call_to_action: 7,
|
|
ContentBlockType.list_items: 5,
|
|
ContentBlockType.image_reference: 5,
|
|
ContentBlockType.narrative: 4,
|
|
}
|
|
|
|
|
|
class ContentIntelligenceService:
|
|
|
|
def __init__(self):
|
|
self._chunker = ScoreBasedChunker()
|
|
|
|
async def classify(
|
|
self,
|
|
markdown: str,
|
|
tables: Optional[List[TableData]] = None,
|
|
images: Optional[List[ImageInfo]] = None,
|
|
) -> ClassifiedContent:
|
|
"""Classify markdown content into typed content blocks."""
|
|
tables = tables or []
|
|
images = images or []
|
|
|
|
# 1. Extract a title (first heading, if any)
|
|
title = self._extract_title(markdown)
|
|
|
|
# 2. Chunk the content
|
|
chunks = await self._chunk_content(markdown)
|
|
|
|
# 3. Rule-based classification per chunk
|
|
blocks: List[ContentBlock] = []
|
|
ambiguous_chunks: List[tuple] = [] # (index, text) for LLM classification
|
|
|
|
for chunk in chunks:
|
|
text = f"{chunk.heading}\n{chunk.content}".strip()
|
|
block_type = self._classify_by_rules(text)
|
|
|
|
if block_type:
|
|
extracted = self._extract_data(block_type, text)
|
|
blocks.append(
|
|
ContentBlock(
|
|
type=block_type,
|
|
raw_text=text,
|
|
extracted_data=extracted,
|
|
source_section=chunk.heading.lstrip("# ").strip(),
|
|
priority=_PRIORITY_MAP.get(block_type, 4),
|
|
)
|
|
)
|
|
else:
|
|
ambiguous_chunks.append((len(blocks), text))
|
|
# Placeholder — will be replaced after LLM classification
|
|
blocks.append(
|
|
ContentBlock(
|
|
type=ContentBlockType.narrative,
|
|
raw_text=text,
|
|
source_section=chunk.heading.lstrip("# ").strip(),
|
|
priority=4,
|
|
)
|
|
)
|
|
|
|
# 4. LLM batch classification for ambiguous blocks
|
|
if ambiguous_chunks:
|
|
llm_types = await self._llm_classify_batch(
|
|
[text for _, text in ambiguous_chunks]
|
|
)
|
|
for (idx, text), btype in zip(ambiguous_chunks, llm_types):
|
|
extracted = self._extract_data(btype, text)
|
|
blocks[idx] = ContentBlock(
|
|
type=btype,
|
|
raw_text=text,
|
|
extracted_data=extracted,
|
|
source_section=blocks[idx].source_section,
|
|
priority=_PRIORITY_MAP.get(btype, 4),
|
|
)
|
|
|
|
# 5. Merge attachment data
|
|
for td in tables:
|
|
blocks.append(
|
|
ContentBlock(
|
|
type=ContentBlockType.table,
|
|
raw_text=f"Table: {td.title or td.sheet_name or 'Data'}\n"
|
|
f"Headers: {', '.join(td.headers)}\n"
|
|
f"Rows: {len(td.rows)}",
|
|
extracted_data={"headers": td.headers, "row_count": len(td.rows)},
|
|
source_section=td.title or td.sheet_name,
|
|
priority=_PRIORITY_MAP[ContentBlockType.table],
|
|
)
|
|
)
|
|
|
|
for img in images:
|
|
blocks.append(
|
|
ContentBlock(
|
|
type=ContentBlockType.image_reference,
|
|
raw_text=f"Image: {img.filename}",
|
|
extracted_data={
|
|
"file_path": img.file_path,
|
|
"width": img.width,
|
|
"height": img.height,
|
|
},
|
|
source_section=None,
|
|
priority=_PRIORITY_MAP[ContentBlockType.image_reference],
|
|
)
|
|
)
|
|
|
|
# 6. Sort by priority (descending), preserving order for same priority
|
|
blocks.sort(key=lambda b: -b.priority)
|
|
|
|
# 7. Generate summary
|
|
summary = await self._generate_summary(markdown, blocks)
|
|
|
|
return ClassifiedContent(
|
|
title=title,
|
|
blocks=blocks,
|
|
tables=tables,
|
|
images=images,
|
|
summary=summary,
|
|
)
|
|
|
|
async def ask_followup_questions(
|
|
self, content: ClassifiedContent
|
|
) -> Optional[List[str]]:
|
|
"""Ask follow-up questions if content is too thin."""
|
|
total_words = sum(len(b.raw_text.split()) for b in content.blocks)
|
|
if total_words >= 200 and len(content.blocks) >= 3:
|
|
return None
|
|
|
|
client = LLMClient()
|
|
model = get_model()
|
|
|
|
block_summary = "\n".join(
|
|
f"- [{b.type.value}] {b.raw_text[:100]}..." for b in content.blocks[:10]
|
|
)
|
|
|
|
messages = [
|
|
LLMSystemMessage(
|
|
content="You help identify missing information for a presentation brief. "
|
|
"Return a JSON array of 2-4 short questions that would help create a more complete presentation."
|
|
),
|
|
LLMUserMessage(
|
|
content=f"The user provided a brief with {total_words} words and {len(content.blocks)} content blocks:\n\n"
|
|
f"{block_summary}\n\n"
|
|
"What additional information would be helpful?"
|
|
),
|
|
]
|
|
|
|
schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"questions": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"minItems": 2,
|
|
"maxItems": 4,
|
|
}
|
|
},
|
|
"required": ["questions"],
|
|
}
|
|
|
|
try:
|
|
result = await client.generate_structured(
|
|
model=model,
|
|
messages=messages,
|
|
response_format=schema,
|
|
)
|
|
return result.get("questions", [])
|
|
except Exception:
|
|
return None
|
|
|
|
# --- Internal methods ---
|
|
|
|
def _extract_title(self, markdown: str) -> Optional[str]:
|
|
for line in markdown.split("\n"):
|
|
stripped = line.strip()
|
|
if stripped.startswith("# ") and not stripped.startswith("## "):
|
|
return stripped.lstrip("# ").strip()
|
|
return None
|
|
|
|
async def _chunk_content(self, markdown: str):
|
|
"""Chunk using ScoreBasedChunker. Fall back to paragraph splitting."""
|
|
try:
|
|
headings = self._chunker.extract_headings(markdown)
|
|
if len(headings) >= 2:
|
|
scores = self._chunker.score_headings(headings)
|
|
chunks = self._chunker.get_chunks_from_headings(
|
|
markdown, headings, scores, top_k=30
|
|
)
|
|
if chunks:
|
|
return chunks
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback: split by double newlines (paragraph-based)
|
|
from models.document_chunk import DocumentChunk
|
|
|
|
paragraphs = [p.strip() for p in re.split(r"\n{2,}", markdown) if p.strip()]
|
|
return [
|
|
DocumentChunk(
|
|
heading=f"Section {i + 1}",
|
|
content=p,
|
|
heading_index=i,
|
|
score=5.0,
|
|
)
|
|
for i, p in enumerate(paragraphs)
|
|
if len(p) > 20
|
|
]
|
|
|
|
def _classify_by_rules(self, text: str) -> Optional[ContentBlockType]:
|
|
"""Apply rule-based classification. Returns None if ambiguous."""
|
|
# Check from most specific to least
|
|
if _QUOTE_RE.search(text):
|
|
return ContentBlockType.quote
|
|
|
|
if _TABLE_RE.search(text):
|
|
return ContentBlockType.table
|
|
|
|
if _IMAGE_REF_RE.search(text):
|
|
return ContentBlockType.image_reference
|
|
|
|
if _CTA_RE.search(text):
|
|
return ContentBlockType.call_to_action
|
|
|
|
metric_matches = _METRIC_RE.findall(text)
|
|
if len(metric_matches) >= 2:
|
|
return ContentBlockType.metric
|
|
|
|
timeline_matches = _TIMELINE_RE.findall(text)
|
|
if len(timeline_matches) >= 2:
|
|
return ContentBlockType.timeline
|
|
|
|
if _COMPARISON_RE.search(text):
|
|
return ContentBlockType.comparison
|
|
|
|
list_matches = _LIST_RE.findall(text)
|
|
numbered_matches = _NUMBERED_LIST_RE.findall(text)
|
|
if len(list_matches) >= 3 or len(numbered_matches) >= 3:
|
|
return ContentBlockType.list_items
|
|
|
|
# Single metric mention
|
|
if metric_matches:
|
|
return ContentBlockType.metric
|
|
|
|
return None # Ambiguous — defer to LLM
|
|
|
|
def _extract_data(
|
|
self, block_type: ContentBlockType, text: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Extract structured data from a content block based on its type."""
|
|
if block_type == ContentBlockType.metric:
|
|
return self._extract_metric_data(text)
|
|
if block_type == ContentBlockType.quote:
|
|
return self._extract_quote_data(text)
|
|
return None
|
|
|
|
def _extract_metric_data(self, text: str) -> Dict[str, Any]:
|
|
"""Extract numeric values and labels from metric text."""
|
|
metrics = []
|
|
# Pattern: label ... value
|
|
for match in re.finditer(
|
|
r"([\w\s]+?)\s*(?::|is|was|reached|hit|grew\s+to|of)\s*"
|
|
r"([\$€£¥]?\s?\d[\d,.]*\s?[KMBTkmbt%]*)",
|
|
text,
|
|
re.IGNORECASE,
|
|
):
|
|
label = match.group(1).strip()
|
|
value = match.group(2).strip()
|
|
if len(label) < 50:
|
|
metrics.append({"label": label, "value": value})
|
|
|
|
# Fallback: just extract all numbers with context
|
|
if not metrics:
|
|
for match in _METRIC_RE.finditer(text):
|
|
metrics.append({"value": match.group().strip()})
|
|
|
|
return {"metrics": metrics[:10]}
|
|
|
|
def _extract_quote_data(self, text: str) -> Dict[str, Any]:
|
|
"""Extract quote text and attribution."""
|
|
match = _QUOTE_RE.search(text)
|
|
if match:
|
|
full = match.group()
|
|
# Try to split attribution
|
|
parts = re.split(r"\s*[-\u2014\u2013]\s*", full, maxsplit=1)
|
|
quote_text = parts[0].strip().strip('"\u201c\u201d')
|
|
attribution = parts[1].strip() if len(parts) > 1 else None
|
|
return {"quote": quote_text, "attribution": attribution}
|
|
return {}
|
|
|
|
async def _llm_classify_batch(
|
|
self, texts: List[str]
|
|
) -> List[ContentBlockType]:
|
|
"""Use LLM to classify a batch of ambiguous text chunks."""
|
|
if not texts:
|
|
return []
|
|
|
|
client = LLMClient()
|
|
model = get_model()
|
|
|
|
types_list = ", ".join(t.value for t in ContentBlockType)
|
|
numbered = "\n".join(f"{i + 1}. {t[:300]}" for i, t in enumerate(texts))
|
|
|
|
messages = [
|
|
LLMSystemMessage(
|
|
content=f"Classify each numbered text chunk into one of these content types: {types_list}.\n"
|
|
"Return a JSON object with a 'classifications' array of strings, one per chunk, in order."
|
|
),
|
|
LLMUserMessage(content=numbered),
|
|
]
|
|
|
|
schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"classifications": {
|
|
"type": "array",
|
|
"items": {"type": "string", "enum": [t.value for t in ContentBlockType]},
|
|
}
|
|
},
|
|
"required": ["classifications"],
|
|
}
|
|
|
|
try:
|
|
result = await client.generate_structured(
|
|
model=model,
|
|
messages=messages,
|
|
response_format=schema,
|
|
)
|
|
classifications = result.get("classifications", [])
|
|
output = []
|
|
for i, text in enumerate(texts):
|
|
if i < len(classifications):
|
|
try:
|
|
output.append(ContentBlockType(classifications[i]))
|
|
except ValueError:
|
|
output.append(ContentBlockType.narrative)
|
|
else:
|
|
output.append(ContentBlockType.narrative)
|
|
return output
|
|
except Exception:
|
|
return [ContentBlockType.narrative] * len(texts)
|
|
|
|
async def _generate_summary(
|
|
self, markdown: str, blocks: List[ContentBlock]
|
|
) -> str:
|
|
"""Generate a brief summary of the content."""
|
|
client = LLMClient()
|
|
model = get_model()
|
|
|
|
messages = [
|
|
LLMSystemMessage(
|
|
content="Summarize the following content in 1-2 sentences for use as a presentation overview."
|
|
),
|
|
LLMUserMessage(content=markdown[:3000]),
|
|
]
|
|
|
|
try:
|
|
result = await client.generate(model=model, messages=messages)
|
|
return result.strip()[:500]
|
|
except Exception:
|
|
# Fallback: first 200 chars
|
|
return markdown[:200].strip() + "..."
|