- Split deep extraction into two separate functions (pass1 + pass2) so the background task can update DB between them - Progress now shows: "Pass 1/2: Analyzing structure... (this takes 20-40 seconds)" "Pass 1 complete (23s). Pass 2/2: Extracting assets..." "Deep extraction complete (52s total). Found 45 assets." - Live elapsed timer (seconds) shown in the upload spinner - Timer ticks every second so user knows it's not hung Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
287 lines
12 KiB
Python
287 lines
12 KiB
Python
"""Parse uploaded client documents (Word/Excel) to extract asset lists."""
|
||
|
||
import logging
|
||
import io
|
||
from pathlib import Path
|
||
|
||
import openpyxl
|
||
import docx
|
||
|
||
from app.utils.claude_client import call_claude, extract_tool_result, extract_text
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
EXTRACT_TOOLS = [
|
||
{
|
||
"name": "extract_assets",
|
||
"description": "Extract a structured list of deliverable assets from a client brief or scope document.",
|
||
"input_schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"assets": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"name": {
|
||
"type": "string",
|
||
"description": "The asset/deliverable name as described by the client"
|
||
},
|
||
"description": {
|
||
"type": "string",
|
||
"description": "Description of what this asset involves, including any complexity or format details"
|
||
},
|
||
"complexity_hint": {
|
||
"type": "string",
|
||
"enum": ["simple", "medium", "complex", "unknown"],
|
||
"description": "Estimated complexity based on the brief"
|
||
},
|
||
"volume": {
|
||
"type": "integer",
|
||
"description": "Number of this asset needed (default 1 if not specified)"
|
||
},
|
||
"tier": {
|
||
"type": "string",
|
||
"description": "The client's tier/complexity label if specified (e.g. 'Tier A', 'A', 'Gold', '1', 'Premium'). Leave empty string if no tier is specified."
|
||
},
|
||
},
|
||
"required": ["name", "description", "complexity_hint", "volume", "tier"],
|
||
},
|
||
},
|
||
},
|
||
"required": ["assets"],
|
||
},
|
||
}
|
||
]
|
||
|
||
SYSTEM_PROMPT = """You are a creative agency asset specialist who understands production scoping.
|
||
Your job is to extract every distinct deliverable/asset from the client brief or scope document provided.
|
||
|
||
For each asset, provide:
|
||
- name: The asset name as the client describes it (e.g., "Social Media Banner", "TV Commercial Edit", "Brand Book")
|
||
- description: What this asset involves based on the document context. Include format, size, channel, and any other relevant details.
|
||
- complexity_hint: Your best estimate of complexity (simple/medium/complex) based on the description. Use "unknown" if unclear.
|
||
- volume: How many of this asset are needed. Default to 1 if not specified.
|
||
- tier: If the client specifies a tier, grade, or complexity label for this asset (e.g. "Tier A", "A", "Gold", "Premium", "1"), include it exactly as written. If the document has columns like A/B/C or Tier 1/2/3, extract those labels. Leave empty string if no tier is specified.
|
||
|
||
Be thorough - extract every distinct asset type mentioned. If the same asset appears at different tiers or complexity levels, list them as SEPARATE entries with their respective tier labels.
|
||
Do NOT combine different asset types into one entry.
|
||
Do NOT combine different asset types into one entry."""
|
||
|
||
|
||
def extract_text_from_file(file_content: bytes, filename: str) -> tuple[str, dict]:
|
||
"""Extract text from a file. Returns (text, metadata)."""
|
||
ext = Path(filename).suffix.lower()
|
||
|
||
if ext == ".docx":
|
||
text = _extract_docx_text(file_content)
|
||
sheet_count = 0
|
||
elif ext in (".xlsx", ".xls"):
|
||
text = _extract_excel_text(file_content)
|
||
wb = openpyxl.load_workbook(io.BytesIO(file_content), data_only=True)
|
||
sheet_count = len(wb.sheetnames)
|
||
elif ext == ".txt":
|
||
text = file_content.decode("utf-8", errors="replace")
|
||
sheet_count = 0
|
||
else:
|
||
raise ValueError(f"Unsupported file type: {ext}. Use .docx, .xlsx, or .txt")
|
||
|
||
if not text or len(text.strip()) < 20:
|
||
raise ValueError("Document appears to be empty or too short to extract assets from.")
|
||
|
||
metadata = {
|
||
"char_count": len(text),
|
||
"sheet_count": sheet_count,
|
||
"file_type": ext,
|
||
}
|
||
|
||
# Truncate very long documents to manage token usage
|
||
if len(text) > 50000:
|
||
text = text[:50000] + "\n\n[Document truncated...]"
|
||
|
||
return text, metadata
|
||
|
||
|
||
def parse_text_with_ai(text: str) -> tuple[list[dict], dict]:
|
||
"""Send extracted text to Claude to identify assets. Returns (assets, usage_info)."""
|
||
response = call_claude(
|
||
system=SYSTEM_PROMPT,
|
||
user_message=f"Extract all deliverable assets from this client document:\n\n{text}",
|
||
tools=EXTRACT_TOOLS,
|
||
tool_choice={"type": "tool", "name": "extract_assets"},
|
||
max_tokens=16000,
|
||
)
|
||
|
||
usage_info = getattr(response, '_usage_info', {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0})
|
||
|
||
result = extract_tool_result(response)
|
||
if not result or "assets" not in result:
|
||
logger.warning("Claude did not return structured asset data, response: %s", extract_text(response))
|
||
return [], usage_info
|
||
|
||
return result["assets"], usage_info
|
||
|
||
|
||
def _extract_docx_text(content: bytes) -> str:
|
||
"""Extract text from a .docx file."""
|
||
doc = docx.Document(io.BytesIO(content))
|
||
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
|
||
|
||
# Also extract text from tables
|
||
for table in doc.tables:
|
||
for row in table.rows:
|
||
cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
|
||
if cells:
|
||
paragraphs.append(" | ".join(cells))
|
||
|
||
return "\n".join(paragraphs)
|
||
|
||
|
||
def _extract_excel_text(content: bytes) -> str:
|
||
"""Extract text from an Excel file with header-aware labelling.
|
||
|
||
Detects header rows, labels data with column names, handles merged cells.
|
||
"""
|
||
wb = openpyxl.load_workbook(io.BytesIO(content), data_only=True)
|
||
parts = []
|
||
|
||
for sheet_name in wb.sheetnames:
|
||
ws = wb[sheet_name]
|
||
|
||
# Skip sheets with very little data
|
||
if ws.max_row is None or ws.max_row < 3:
|
||
continue
|
||
if ws.max_column is None or ws.max_column < 2:
|
||
continue
|
||
|
||
# Count non-empty cells to skip truly empty sheets
|
||
sample_count = sum(
|
||
1 for row in ws.iter_rows(min_row=1, max_row=min(10, ws.max_row), values_only=True)
|
||
for c in row if c is not None
|
||
)
|
||
if sample_count < 3:
|
||
continue
|
||
|
||
parts.append(f"\n=== Sheet: {sheet_name} ({ws.max_row} rows × {ws.max_column} cols) ===")
|
||
|
||
# Detect header row (row with most non-empty cells in first 5 rows)
|
||
header_row_idx = 1
|
||
max_cells = 0
|
||
for r in range(1, min(6, ws.max_row + 1)):
|
||
count = sum(1 for c in range(1, ws.max_column + 1) if ws.cell(row=r, column=c).value is not None)
|
||
if count > max_cells:
|
||
max_cells = count
|
||
header_row_idx = r
|
||
|
||
# Read header names
|
||
headers = {}
|
||
for c in range(1, ws.max_column + 1):
|
||
val = ws.cell(row=header_row_idx, column=c).value
|
||
if val:
|
||
headers[c] = str(val).strip()[:50]
|
||
|
||
if headers:
|
||
parts.append(f"[Headers from row {header_row_idx}]: {' | '.join(headers.values())}")
|
||
|
||
# Track last non-empty value per column for merged cell carry-forward
|
||
last_vals = {}
|
||
|
||
# Extract data rows with labelled fields
|
||
for row_idx in range(header_row_idx + 1, min(ws.max_row + 1, header_row_idx + 200)):
|
||
fields = []
|
||
has_data = False
|
||
for c in range(1, ws.max_column + 1):
|
||
val = ws.cell(row=row_idx, column=c).value
|
||
if val is not None:
|
||
last_vals[c] = str(val).strip()
|
||
has_data = True
|
||
|
||
display_val = str(val).strip() if val is not None else None
|
||
if display_val and c in headers:
|
||
fields.append(f"{headers[c]}: {display_val[:100]}")
|
||
elif display_val:
|
||
fields.append(display_val[:100])
|
||
|
||
if has_data and fields:
|
||
parts.append(f"[Row {row_idx}] {' | '.join(fields)}")
|
||
|
||
return "\n".join(parts)
|
||
|
||
|
||
# ── Deep Extraction (two-pass AI) ──────────────────────────────────────────
|
||
|
||
STRUCTURE_ANALYSIS_PROMPT = """You are an expert at understanding complex spreadsheet layouts.
|
||
|
||
Analyze this spreadsheet data and describe its structure. Your analysis will be used by another AI to extract deliverable assets accurately.
|
||
|
||
For each sheet with meaningful data, describe:
|
||
1. Which row contains the column headers
|
||
2. What each column represents (asset name, description, tier, volume, status, GMAL code, caveats, questions, etc.)
|
||
3. Where are the actual asset/deliverable names located (which column)?
|
||
4. Are there tier columns (A/B/C, 1/2/3, Gold/Silver/Bronze)? Which columns?
|
||
5. Where are volume numbers?
|
||
6. Are there Q&A columns mixed in? Which ones are questions vs actual data?
|
||
7. Are there merged cells creating category groupings? How is the hierarchy structured?
|
||
8. Which sheets contain deliverables vs metadata/reference data?
|
||
|
||
Be specific — reference actual column names and row numbers."""
|
||
|
||
|
||
def deep_pass1_structure_analysis(text: str) -> tuple[str, dict]:
|
||
"""Pass 1 of deep extraction: analyze spreadsheet structure.
|
||
|
||
Returns (structure_analysis_text, usage_info).
|
||
"""
|
||
logger.info("Deep extraction Pass 1: Analyzing spreadsheet structure...")
|
||
response = call_claude(
|
||
system=STRUCTURE_ANALYSIS_PROMPT,
|
||
user_message=f"Analyze the structure of this spreadsheet data:\n\n{text[:40000]}",
|
||
max_tokens=4096,
|
||
)
|
||
|
||
usage = getattr(response, '_usage_info', {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0})
|
||
analysis = extract_text(response)
|
||
logger.info(f"Deep extraction Pass 1 complete: {len(analysis)} chars of analysis")
|
||
return analysis, usage
|
||
|
||
|
||
def deep_pass2_guided_extraction(text: str, structure_analysis: str) -> tuple[list[dict], dict]:
|
||
"""Pass 2 of deep extraction: extract assets using structural understanding.
|
||
|
||
Returns (assets, usage_info).
|
||
"""
|
||
logger.info("Deep extraction Pass 2: Extracting assets with structural guidance...")
|
||
guided_prompt = f"""You have been given a structural analysis of a complex client spreadsheet.
|
||
Use this understanding to extract every deliverable asset accurately.
|
||
|
||
STRUCTURAL ANALYSIS:
|
||
{structure_analysis}
|
||
|
||
IMPORTANT GUIDELINES:
|
||
- Use the column mapping from the analysis to identify asset names, descriptions, tiers, and volumes
|
||
- If the document has tier columns (A/B/C, etc.), extract each asset WITH its tier label
|
||
- Skip rows that are questions, metadata, or caveats — those are not deliverables
|
||
- If volume is 0 or "No", still extract the asset but set volume to 0
|
||
- Carry forward category names from merged cells (the analysis explains the hierarchy)
|
||
|
||
Now extract all deliverable assets from this data:
|
||
|
||
{text[:45000]}"""
|
||
|
||
response = call_claude(
|
||
system=SYSTEM_PROMPT,
|
||
user_message=guided_prompt,
|
||
tools=EXTRACT_TOOLS,
|
||
tool_choice={"type": "tool", "name": "extract_assets"},
|
||
max_tokens=16000,
|
||
)
|
||
|
||
usage = getattr(response, '_usage_info', {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0})
|
||
|
||
result = extract_tool_result(response)
|
||
if not result or "assets" not in result:
|
||
logger.warning("Deep extraction Pass 2 returned no assets")
|
||
return [], usage
|
||
|
||
logger.info(f"Deep extraction complete: {len(result['assets'])} assets found")
|
||
return result["assets"], usage
|