adi-o3-multipass/process_brief_enhanced.py

915 lines
No EOL
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import datetime
import logging
import json
import csv
import re
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
# File Processing Libraries
import pptx
import pandas as pd
import fitz # PyMuPDF
from PIL import Image
import docx
from openpyxl import load_workbook
# AI Libraries
import google.generativeai as genai
import json5
from google.generativeai.types import GenerationConfig
from openai import OpenAI
import base64
# Configuration
GEMINI_API_KEY = "AIzaSyAESTMYdQUVW6_XduJoSsAUoTMEmJGlfO4"
OPENAI_API_KEY = "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" # Replace with your actual API key
LLAMACLOUD_API_KEY = "llx-chSdMBrzHcHu72Yyr5dWh7eobfRoGeCKiNoSdrPkaUdEtelO"
# OpenAI GPT-5 Pricing (per 1M tokens)
OPENAI_PRICING = {
'gpt-5': {
'input': 2.50, # Updated for GPT-5
'cached_input': 1.25,
'output': 10.00
}
}
CSV_HEADERS = [
'title', 'status', 'category', 'media', 'asset_type',
'brand_identifier', 'format', 'review_date', 'live_date',
'end_date', 'reference_material', 'language', 'country',
'quantity', 'page_number', 'section_context', 'priority_level',
'technical_requirements', 'creative_direction', 'approval_level'
]
# JSON Schema for structured output (OpenAI format)
OPENAI_ASSET_SCHEMA = {
"name": "asset_extraction",
"description": "Extract assets from document analysis",
"schema": {
"type": "object",
"properties": {
"assets": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Asset title or name"},
"status": {"type": "string", "description": "Current status"},
"category": {"type": "string", "description": "Asset category"},
"media": {"type": "string", "description": "Media type"},
"asset_type": {"type": "string", "description": "Specific asset type"},
"brand_identifier": {"type": "string", "description": "Brand or client"},
"format": {"type": "string", "description": "Exact dimensions (e.g., '1080x1920', '1920x1080') or descriptive format (e.g., 'Mobile Banner', 'Desktop Hero')"},
"review_date": {"type": "string", "description": "Review deadline"},
"live_date": {"type": "string", "description": "Go-live date"},
"end_date": {"type": "string", "description": "End/expiry date"},
"reference_material": {"type": "string", "description": "Detailed requirements"},
"language": {"type": "string", "description": "Target language"},
"country": {"type": "string", "description": "Target country/region"},
"quantity": {"type": "string", "description": "Number of assets"},
"page_number": {"type": "string", "description": "Source page"},
"section_context": {"type": "string", "description": "Document section"},
"priority_level": {"type": "string", "description": "Business priority"},
"technical_requirements": {"type": "string", "description": "Technical specs"},
"creative_direction": {"type": "string", "description": "Design requirements"},
"approval_level": {"type": "string", "description": "Required approvals"}
},
"required": ["title", "format"],
"additionalProperties": False
}
}
},
"required": ["assets"],
"additionalProperties": False
}
}
# Legacy Gemini Schema (keep for backward compatibility)
GEMINI_ASSET_SCHEMA = {
"type": "object",
"properties": {
"assets": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Asset title or name"},
"status": {"type": "string", "description": "Current status"},
"category": {"type": "string", "description": "Asset category"},
"media": {"type": "string", "description": "Media type"},
"asset_type": {"type": "string", "description": "Specific asset type"},
"brand_identifier": {"type": "string", "description": "Brand or client"},
"format": {"type": "string", "description": "Technical format/dimensions"},
"review_date": {"type": "string", "description": "Review deadline"},
"live_date": {"type": "string", "description": "Go-live date"},
"end_date": {"type": "string", "description": "End/expiry date"},
"reference_material": {"type": "string", "description": "Detailed requirements"},
"language": {"type": "string", "description": "Target language"},
"country": {"type": "string", "description": "Target country/region"},
"quantity": {"type": "string", "description": "Number of assets"},
"page_number": {"type": "string", "description": "Source page"},
"section_context": {"type": "string", "description": "Document section"},
"priority_level": {"type": "string", "description": "Business priority"},
"technical_requirements": {"type": "string", "description": "Technical specs"},
"creative_direction": {"type": "string", "description": "Design requirements"},
"approval_level": {"type": "string", "description": "Required approvals"}
},
"required": ["title", "format"]
}
}
},
"required": ["assets"]
}
class DocumentType(Enum):
POWERPOINT = "powerpoint"
WORD = "word"
PDF = "pdf"
EXCEL = "excel"
UNKNOWN = "unknown"
@dataclass
class DocumentSection:
title: str
content: str
page_number: int
section_type: str
importance_score: float
@dataclass
class TokenUsage:
input_tokens: int = 0
cached_input_tokens: int = 0
output_tokens: int = 0
def add_usage(self, usage_dict: Dict[str, int]):
"""Add token usage from OpenAI Responses API"""
# Support both old (Chat Completions) and new (Responses API) field names
self.input_tokens += usage_dict.get('prompt_tokens', usage_dict.get('input_tokens', 0))
self.cached_input_tokens += usage_dict.get('prompt_tokens_cached', usage_dict.get('input_tokens_cached', 0))
self.output_tokens += usage_dict.get('completion_tokens', usage_dict.get('output_tokens', 0))
def calculate_cost(self, model_name: str) -> float:
"""Calculate total cost based on GPT-5 pricing"""
if model_name not in OPENAI_PRICING:
logging.warning(f"No pricing info for model {model_name}, defaulting to gpt-5")
model_name = 'gpt-5'
pricing = OPENAI_PRICING[model_name]
# Calculate cost per component (pricing is per 1M tokens)
input_cost = (self.input_tokens / 1_000_000) * pricing['input']
cached_cost = (self.cached_input_tokens / 1_000_000) * pricing['cached_input']
output_cost = (self.output_tokens / 1_000_000) * pricing['output']
return input_cost + cached_cost + output_cost
def get_summary(self, model_name: str) -> Dict[str, Any]:
"""Get detailed cost breakdown"""
total_cost = self.calculate_cost(model_name)
return {
'input_tokens': self.input_tokens,
'cached_input_tokens': self.cached_input_tokens,
'output_tokens': self.output_tokens,
'total_tokens': self.input_tokens + self.cached_input_tokens + self.output_tokens,
'total_cost_usd': round(total_cost, 4),
'cost_breakdown': {
'input_cost': round((self.input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['input'], 4),
'cached_input_cost': round((self.cached_input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['cached_input'], 4),
'output_cost': round((self.output_tokens / 1_000_000) * OPENAI_PRICING[model_name]['output'], 4)
}
}
@dataclass
class ProcessingResult:
raw_data: List[Dict[str, Any]]
metadata: Dict[str, Any]
confidence_score: float
processing_notes: List[str]
token_usage: TokenUsage
class DocumentAnalyzer:
def __init__(self, model_name='gpt-5'):
self.model_name = model_name
self.is_openai = model_name == 'gpt-5'
self.model = self._setup_model()
self.token_usage = TokenUsage()
def _setup_model(self):
"""Configure and return the specified OpenAI model."""
if not OPENAI_API_KEY or OPENAI_API_KEY == "your-openai-api-key-here":
logging.error("OPENAI_API_KEY not set.")
sys.exit(1)
try:
logging.info(f"Using OpenAI GPT-5 model with medium reasoning effort")
# Configure OpenAI client with reduced retries for GPT-5 reasoning
return OpenAI(
api_key=OPENAI_API_KEY,
max_retries=2 # Reduce retries to avoid excessive waiting
)
except Exception as e:
logging.error(f"Error configuring OpenAI model: {e}")
sys.exit(1)
def classify_document(self, filepath: str) -> DocumentType:
"""Classify document type based on extension and content."""
extension = os.path.splitext(filepath)[1].lower()
if extension in ['.ppt', '.pptx']:
return DocumentType.POWERPOINT
elif extension in ['.doc', '.docx']:
return DocumentType.WORD
elif extension == '.pdf':
return DocumentType.PDF
elif extension in ['.xls', '.xlsx']:
return DocumentType.EXCEL
else:
return DocumentType.UNKNOWN
def _encode_file_for_openai(self, filepath: str) -> str:
"""Encode file content for OpenAI API."""
try:
with open(filepath, "rb") as file:
return base64.b64encode(file.read()).decode('utf-8')
except Exception as e:
logging.error(f"Error encoding file for OpenAI: {e}")
return None
def _extract_document_content(self, filepath: str) -> str:
"""Extract markdown content from document using LlamaParser cloud service."""
try:
from llama_cloud_services import LlamaParse
logging.info(f"Using LlamaParser to extract content from: {os.path.basename(filepath)}")
parser = LlamaParse(
api_key=LLAMACLOUD_API_KEY,
premium_mode=True,
result_type="markdown",
verbose=True,
language="en"
)
result = parser.parse(filepath)
markdown_documents = result.get_markdown_documents()
# Combine all markdown documents into a single string
combined_content = "\n\n".join([doc.text for doc in markdown_documents])
logging.info(f"LlamaParser extraction completed. Content length: {len(combined_content)} characters")
return combined_content
except Exception as e:
logging.error(f"Error extracting document content with LlamaParser: {e}")
raise Exception(f"LlamaParser extraction failed: {e}")
def extract_document_structure(self, uploaded_file) -> List[DocumentSection]:
"""Analyze document structure and identify key sections."""
structure_prompt = """
Analyze this document and identify its structure. Return a JSON array of sections with:
- title: Section heading/title
- content: Brief summary of section content
- page_number: Page where section appears
- section_type: Type of section (header, briefing, assets, requirements, timeline, etc.)
- importance_score: 0-1 score indicating importance for asset extraction
Focus on identifying:
- Executive summaries
- Asset requirement sections
- Technical specifications
- Timeline/dates sections
- Creative direction sections
- Approval workflows
Return only valid JSON array.
"""
try:
# For GPT-5 using Chat Completions with reasoning_effort
combined_prompt = f"{structure_prompt}\n\nDocument Content:\n{uploaded_file}"
response = self.model.chat.completions.create(
model=self.model_name,
messages=[
{"role": "user", "content": combined_prompt}
],
reasoning_effort="medium",
response_format={"type": "json_object"}
)
# Track token usage for GPT-5 Chat Completions
if hasattr(response, 'usage'):
usage_dict = {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'prompt_tokens_cached': getattr(response.usage, 'prompt_tokens_cached', 0)
}
self.token_usage.add_usage(usage_dict)
logging.info(f"Structure Analysis - Tokens: {usage_dict['prompt_tokens']} input, {usage_dict['completion_tokens']} output")
response_content = response.choices[0].message.content
logging.info(f"GPT-5 Structure Analysis Response: {response_content[:500]}...")
sections_data = self._extract_json(response_content)
sections = []
for section_data in sections_data:
section = DocumentSection(
title=section_data.get('title', ''),
content=section_data.get('content', ''),
page_number=section_data.get('page_number', 0),
section_type=section_data.get('section_type', 'unknown'),
importance_score=section_data.get('importance_score', 0.5)
)
sections.append(section)
return sections
except Exception as e:
logging.warning(f"Could not extract document structure: {e}")
return []
def process_document_multi_pass(self, filepath: str) -> ProcessingResult:
"""Process document using multi-pass analysis approach."""
logging.info(f"Starting multi-pass analysis of '{os.path.basename(filepath)}'")
# Stage 1: Extract document content using LlamaParser
try:
document_content = self._extract_document_content(filepath)
logging.info(f"Document content extracted using LlamaParser")
except Exception as e:
logging.error(f"Content extraction failed: {e}")
return ProcessingResult([], {}, 0.0, [f"Content extraction failed: {e}"], TokenUsage())
# Stage 2: Document structure analysis
sections = self.extract_document_structure(document_content)
logging.info(f"Identified {len(sections)} document sections")
# Stage 3: Multi-perspective analysis
logging.info("=== STAGE 3: Starting Multi-perspective Analysis ===")
doc_type = self.classify_document(filepath)
results = self._perform_multi_perspective_analysis(document_content, sections, doc_type)
logging.info(f"Multi-perspective analysis completed. Found {len(results.raw_data)} initial assets.")
# Stage 4: Cross-validation and enhancement
logging.info("=== STAGE 4: Starting Cross-validation and Enhancement ===")
enhanced_results = self._enhance_and_validate_results(document_content, results)
logging.info(f"Validation completed. Total assets: {len(enhanced_results.raw_data)}")
# Stage 5: Post-process to ensure deliverable separation and format extraction
logging.info("=== STAGE 5: Starting Asset Splitting and Finalization ===")
enhanced_results = self._split_multi_format_assets(enhanced_results)
enhanced_results = self._extract_formats_from_tech_specs(enhanced_results)
enhanced_results = self._consolidate_source_file_deliverables(enhanced_results)
logging.info(f"Asset processing completed. Final count: {len(enhanced_results.raw_data)} deliverables")
# Update token usage in final results
enhanced_results.token_usage = self.token_usage
return enhanced_results
def _perform_multi_perspective_analysis(self, uploaded_file, sections: List[DocumentSection], doc_type: DocumentType) -> ProcessingResult:
"""Perform analysis from multiple professional perspectives."""
# Create context summary from sections
context_summary = self._create_context_summary(sections)
# Multi-perspective prompt with structured output
multi_perspective_prompt = f"""
You are a specialized team analyzing this {doc_type.value} document for comprehensive asset extraction.
DOCUMENT CONTEXT:
{context_summary}
TEAM ANALYSIS APPROACH:
**1. TECHNICAL ANALYST** - Focus on precise specification extraction:
- SPECIFICATION MAPPING: Look for columns labeled "SPEC", "Specifications", "Dimensions", "Size", or similar headers in tables
- Extract the EXACT pixel dimensions, file formats, and technical requirements from these specification columns
- The 'format' field must contain precise dimensions (e.g., "1920x1080", "300x250") - NEVER use placeholders like "TBC" or "desktop here"
- If a brief contains a structured table with asset names and corresponding specs, create a direct one-to-one mapping
- ASSET TYPE CLASSIFICATION: Identify the actual file format (JPG, PNG, MP4, GIF, etc.) from the specifications, not the creative name
- The 'asset_type' field should contain technical file formats (JPG, PNG, MP4), while 'media' should be the category (Image, Video)
- Search for explicit file format mentions in technical requirements and specification sections
- Look for patterns like: "delivered as JPG", "PNG format", "MP4 video", "GIF animation"
**2. CREATIVE STRATEGIST** - Focus on creative requirements:
- Analyze creative direction, visual style, and brand guidelines
- Identify mood, tone, imagery requirements, and design principles
- Extract color schemes, typography, layout specifications
- Note any creative constraints or brand compliance requirements
- Distinguish between creative asset names (e.g., "Hero Banner") and technical specifications
**3. PROJECT COORDINATOR** - Focus on deliverable itemization:
- MULTI-VERSION IDENTIFICATION: When assets are required in multiple versions (languages, markets, variations), create separate line items
- If a brief states "8 markets" or "5 languages", generate individual rows for each variation
- Each variation gets its own row with quantity "1" and specific details in appropriate columns (language, country)
- NEVER consolidate multi-version deliverables into a single row with aggregate quantities
- Look for phrases like "per market", "each language", "all variations", "localized versions"
- Extract timelines, dependencies, and milestone requirements
- Assess priority levels and resource allocation needs
**4. QUALITY ASSURANCE** - Focus on accuracy and completeness:
- Verify that specification columns from source documents are accurately reflected in format fields
- Ensure asset_type contains file formats (JPG, PNG, MP4) not creative names
- Confirm that multi-version deliverables are properly itemized as separate rows
- Cross-reference technical specifications with creative requirements
- Validate that no deliverables are missed or incorrectly consolidated
CRITICAL SPECIFICATION EXTRACTION RULES:
- EXACT MAPPING: The 'format' field must contain the precise dimensions from the brief's specification columns
- NO PLACEHOLDERS: Never use "TBC", "desktop here", or similar placeholder text in the format field
- DIRECT TRANSCRIPTION: Copy pixel dimensions exactly as they appear in SPEC columns (e.g., "1920x1080", "750x1334")
- FILE FORMAT PRECISION: The 'asset_type' field must contain actual file formats (JPG, PNG, MP4, GIF) found in technical specs
- TABLE RECOGNITION: Pay special attention to structured tables that list assets alongside their specifications
MULTI-VERSION DELIVERABLE RULES:
- ITEMIZE DON'T SUMMARIZE: Create separate rows for each version, variation, language, or market requirement
- ONE ROW PER DELIVERABLE: Each individual file to be created gets its own row with quantity "1"
- VARIATION DETAILS: Populate specific variation information (language codes, market names) in appropriate columns
- MULTIPLICATION LOGIC: If brief says "5 banners x 8 markets", create 40 separate rows, not 1 row with quantity "40"
- LOCALIZATION TRACKING: Use ISO language codes when possible (EN, DE, FR, ES, etc.)
ASSET TYPE CLASSIFICATION RULES:
- TECHNICAL NOT CREATIVE: 'asset_type' should be JPG/PNG/MP4/GIF, not "Hero Banner" or "Display Ad"
- FORMAT IDENTIFICATION: Look for explicit file format mentions in specifications or delivery requirements
- MEDIA CATEGORIZATION: Use 'media' field for broad categories (Image, Video, Interactive) and 'asset_type' for specific formats
- SPECIFICATION PARSING: Extract file formats from phrases like "delivered as PNG", "JPG format required", "MP4 video file"
SOURCE FILE HANDLING:
- If source files (PSD, AI, InDesign, etc.) are mentioned alongside final files, treat as ONE deliverable
- Example: "Banner with source PSD file" = 1 deliverable (not 2)
- Only separate if different sizes, content, languages, or distinct requirements are specified
CRITICAL ACCURACY REQUIREMENTS:
1. Every specification dimension must be captured exactly as written in the source document
2. Multi-version assets must be itemized into individual deliverable rows
3. Asset types must reflect actual file formats, not creative descriptions
4. No placeholders or approximations are acceptable in technical fields
Please return your response as a structured JSON object containing an array of assets with precise specification mapping.
"""
try:
# For GPT-5 using Chat Completions with reasoning_effort
combined_prompt = f"{multi_perspective_prompt}\n\nDocument Content:\n{uploaded_file}"
response = self.model.chat.completions.create(
model=self.model_name,
messages=[
{"role": "user", "content": combined_prompt}
],
reasoning_effort="medium",
response_format={
"type": "json_schema",
"json_schema": OPENAI_ASSET_SCHEMA
}
)
# Track token usage for GPT-5 Chat Completions
if hasattr(response, 'usage'):
usage_dict = {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'prompt_tokens_cached': getattr(response.usage, 'prompt_tokens_cached', 0)
}
self.token_usage.add_usage(usage_dict)
logging.info(f"Multi-perspective Analysis - Tokens: {usage_dict['prompt_tokens']} input, {usage_dict['completion_tokens']} output")
response_content = response.choices[0].message.content
logging.info(f"GPT-5 Multi-perspective Analysis Response Length: {len(response_content)} characters")
logging.info(f"GPT-5 Multi-perspective Analysis Response Preview: {response_content[:1000]}...")
extracted_data = self._extract_structured_json(response_content)
return ProcessingResult(
raw_data=extracted_data,
metadata={'sections': len(sections), 'doc_type': doc_type.value},
confidence_score=0.8,
processing_notes=['Multi-perspective analysis completed'],
token_usage=TokenUsage() # Will be updated later
)
except Exception as e:
logging.error(f"Multi-perspective analysis failed: {e}")
return ProcessingResult([], {}, 0.0, [f"Analysis failed: {e}"], TokenUsage())
def _enhance_and_validate_results(self, uploaded_file, initial_results: ProcessingResult) -> ProcessingResult:
"""Enhance results with cross-validation and gap analysis."""
if not initial_results.raw_data:
return initial_results
# Validation and enhancement prompt
validation_prompt = f"""
You are performing quality assurance on this asset extraction.
EXTRACTED DATA SUMMARY:
- Found {len(initial_results.raw_data)} assets
- Document type: {initial_results.metadata.get('doc_type', 'unknown')}
VALIDATION TASKS:
1. **Completeness Check**: Scan the entire document again. Are there any assets, deliverables, or requirements that were missed?
2. **Accuracy Verification**: Check if all technical specifications, dimensions, and quantities are correctly extracted.
3. **Context Enhancement**: Ensure reference_material fields provide clear, actionable information.
4. **Gap Analysis**: Identify any missing information that should be present.
If you find additional assets or significant corrections needed, provide them in the structured format.
If the existing extraction is comprehensive and accurate, return an empty assets array.
Focus especially on:
- Alternative asset formats mentioned
- Localization requirements
- Seasonal or campaign-specific variations
- Technical specifications in fine print
- Assets mentioned in context but not explicitly listed
Return your response as a structured JSON object with any additional assets found.
"""
try:
# For GPT-5 using Chat Completions with reasoning_effort
combined_prompt = f"{validation_prompt}\n\nDocument Content:\n{uploaded_file}"
response = self.model.chat.completions.create(
model=self.model_name,
messages=[
{"role": "user", "content": combined_prompt}
],
reasoning_effort="medium",
response_format={
"type": "json_schema",
"json_schema": OPENAI_ASSET_SCHEMA
}
)
# Track token usage for GPT-5 Chat Completions
if hasattr(response, 'usage'):
usage_dict = {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'prompt_tokens_cached': getattr(response.usage, 'prompt_tokens_cached', 0)
}
self.token_usage.add_usage(usage_dict)
logging.info(f"Validation Analysis - Tokens: {usage_dict['prompt_tokens']} input, {usage_dict['completion_tokens']} output")
response_content = response.choices[0].message.content
logging.info(f"GPT-5 Validation Response Length: {len(response_content)} characters")
logging.info(f"GPT-5 Validation Response Preview: {response_content[:500]}...")
additional_data = self._extract_structured_json(response_content)
if additional_data and len(additional_data) > 0:
logging.info(f"Validation found {len(additional_data)} additional assets")
initial_results.raw_data.extend(additional_data)
initial_results.processing_notes.append(f"Added {len(additional_data)} assets from validation")
else:
logging.info("Validation confirmed extraction completeness")
initial_results.confidence_score = 0.95
initial_results.processing_notes.append("Validation confirmed completeness")
return initial_results
except Exception as e:
logging.warning(f"Validation step failed: {e}")
initial_results.processing_notes.append(f"Validation failed: {e}")
return initial_results
def _create_context_summary(self, sections: List[DocumentSection]) -> str:
"""Create a context summary from document sections."""
if not sections:
return "No structured sections identified."
summary_parts = []
for section in sorted(sections, key=lambda x: x.importance_score, reverse=True):
summary_parts.append(f"- {section.title} (Page {section.page_number}, {section.section_type}): {section.content}")
return "Document Structure:\n" + "\n".join(summary_parts[:10]) # Top 10 most important sections
def _split_multi_format_assets(self, results: ProcessingResult) -> ProcessingResult:
"""Split assets with multiple formats into separate deliverable rows."""
if not results.raw_data:
return results
expanded_assets = []
for asset in results.raw_data:
# Check if technical_requirements or format contains multiple formats
tech_req = asset.get('technical_requirements', '')
format_field = asset.get('format', '')
# Look for patterns indicating multiple formats
multi_format_patterns = [
r'Mobile:?\s*([^\n\r;]+)[\n\r;]?\s*Desktop:?\s*([^\n\r;]+)',
r'Desktop:?\s*([^\n\r;]+)[\n\r;]?\s*Mobile:?\s*([^\n\r;]+)',
r'([0-9]+x[0-9]+)[\s\n\r]*([0-9]+x[0-9]+)',
]
formats_found = []
# Check technical requirements for multiple formats
for pattern in multi_format_patterns:
import re
match = re.search(pattern, tech_req, re.IGNORECASE)
if match:
groups = match.groups()
if len(groups) >= 2:
# Extract format names and dimensions
if 'mobile' in pattern.lower() and 'desktop' in pattern.lower():
formats_found = [
('Mobile', groups[0].strip()),
('Desktop', groups[1].strip())
]
else:
# Generic format splitting
for i, group in enumerate(groups):
if group.strip():
formats_found.append((f'Format {i+1}', group.strip()))
break
# If multiple formats found, create separate assets
if len(formats_found) > 1:
base_title = asset.get('title', 'Asset')
for format_name, format_spec in formats_found:
new_asset = asset.copy()
new_asset['title'] = f"{base_title} - {format_name}"
new_asset['format'] = format_spec
new_asset['technical_requirements'] = format_spec
new_asset['quantity'] = '1' # Each format is one deliverable
# Update reference material to be specific to this format
ref_material = asset.get('reference_material', '')
if ref_material:
new_asset['reference_material'] = f"{ref_material}\n\nSpecific Format: {format_name} - {format_spec}"
expanded_assets.append(new_asset)
logging.info(f"Split '{base_title}' into {len(formats_found)} format-specific deliverables")
else:
# No multiple formats detected, keep as-is
expanded_assets.append(asset)
# Update results
original_count = len(results.raw_data)
results.raw_data = expanded_assets
new_count = len(expanded_assets)
if new_count > original_count:
split_count = new_count - original_count
results.processing_notes.append(f"Split {split_count} multi-format assets into individual deliverables")
logging.info(f"Asset splitting: {original_count}{new_count} deliverables (+{split_count})")
return results
def _extract_formats_from_tech_specs(self, results: ProcessingResult) -> ProcessingResult:
"""Extract dimensions from technical_requirements and move to format field."""
if not results.raw_data:
return results
dimension_patterns = [
r'(\d{3,4})\s*[x×]\s*(\d{3,4})', # 1920x1080, 1920 x 1080
r'(\d{3,4})\s*[by]\s*(\d{3,4})', # 1920 by 1080
r'(\d{3,4})w\s*[x×]\s*(\d{3,4})h', # 1920w x 1080h
r'(\d{3,4})\s*px\s*[x×]\s*(\d{3,4})\s*px', # 1920px x 1080px
r'(\d{2,4})\s*[x×]\s*(\d{2,4})\s*px', # 300x250 px, 728x90px
r'(\d{2,4})\s*[x×]\s*(\d{2,4})\s*pixels', # 300x250 pixels
r'width:?\s*(\d{3,4})[,\s]*height:?\s*(\d{3,4})', # width: 1920, height: 1080
r'(\d{3,4})\s*wide\s*[x×]\s*(\d{3,4})\s*tall', # 1920 wide x 1080 tall
r'dimensions?:?\s*(\d{3,4})\s*[x×]\s*(\d{3,4})', # dimensions: 1920x1080
]
format_updated_count = 0
for asset in results.raw_data:
# Skip if format already has dimensions
current_format = asset.get('format', '').strip()
if current_format and any(char.isdigit() and 'x' in current_format for char in current_format):
continue
# Search for dimensions in technical_requirements
tech_req = asset.get('technical_requirements', '')
creative_dir = asset.get('creative_direction', '')
reference_mat = asset.get('reference_material', '')
# Combine all text fields to search
search_text = f"{tech_req} {creative_dir} {reference_mat}"
extracted_format = None
for pattern in dimension_patterns:
import re
matches = re.findall(pattern, search_text, re.IGNORECASE)
if matches:
# Take the first match
width, height = matches[0]
extracted_format = f"{width}x{height}"
break
# Update format if dimension found
if extracted_format:
asset['format'] = extracted_format
format_updated_count += 1
logging.info(f"Extracted format '{extracted_format}' for asset: {asset.get('title', 'Unknown')}")
if format_updated_count > 0:
results.processing_notes.append(f"Extracted dimensions for {format_updated_count} assets from technical specifications")
logging.info(f"Format extraction: Updated {format_updated_count} assets with extracted dimensions")
return results
def _consolidate_source_file_deliverables(self, results: ProcessingResult) -> ProcessingResult:
"""Consolidate source file + final file combinations into single deliverables."""
if not results.raw_data:
return results
source_file_keywords = [
'source', 'psd', 'ai file', 'indesign', 'sketch', 'figma',
'working file', 'editable', 'layered', 'original file'
]
consolidated_assets = []
processed_titles = set()
consolidation_count = 0
for asset in results.raw_data:
title = asset.get('title', '').lower()
tech_req = asset.get('technical_requirements', '').lower()
ref_material = asset.get('reference_material', '').lower()
# Check if this asset mentions source files
has_source_mention = any(keyword in f"{title} {tech_req} {ref_material}"
for keyword in source_file_keywords)
# Create a normalized title for grouping
normalized_title = asset.get('title', '').replace(' - Source', '').replace(' Source', '').replace(' + Source', '')
if has_source_mention and normalized_title not in processed_titles:
# This asset includes source files - update its description
asset['title'] = normalized_title # Remove "Source" from title if present
# Update reference material to clarify it includes source files
current_ref = asset.get('reference_material', '')
if 'source' not in current_ref.lower():
asset['reference_material'] = f"{current_ref}\nIncludes source files (PSD/AI/etc.) with final deliverable"
# Keep quantity as 1 since source + final = 1 deliverable
asset['quantity'] = '1'
consolidated_assets.append(asset)
processed_titles.add(normalized_title)
consolidation_count += 1
logging.info(f"Consolidated source + final deliverable: {asset.get('title', 'Unknown')}")
elif normalized_title not in processed_titles:
# Regular asset without source file mention
consolidated_assets.append(asset)
processed_titles.add(normalized_title)
if consolidation_count > 0:
results.processing_notes.append(f"Consolidated {consolidation_count} source file + final file combinations")
logging.info(f"Source file consolidation: Processed {consolidation_count} combined deliverables")
results.raw_data = consolidated_assets
return results
def _extract_structured_json(self, raw_text: str) -> List[Dict[str, Any]]:
"""Extract structured JSON from AI response with schema validation."""
try:
# Log the raw response for debugging
logging.info(f"Raw response for JSON parsing: {raw_text[:200]}...")
# Parse the structured response
structured_data = json.loads(raw_text)
# Extract assets array from structured response
if 'assets' in structured_data:
assets = structured_data['assets']
logging.info(f"Successfully extracted {len(assets)} assets using structured output")
return assets
else:
logging.warning("No 'assets' key found in structured response")
logging.info(f"Available keys in response: {list(structured_data.keys())}")
return []
except json.JSONDecodeError as e:
logging.warning(f"Structured JSON parsing failed: {e}")
logging.info(f"Raw text causing JSON error: {raw_text[:500]}...")
logging.info("Falling back to legacy parsing")
return self._extract_json(raw_text)
except Exception as e:
logging.error(f"Structured JSON extraction failed: {e}")
logging.info(f"Raw text: {raw_text[:500]}...")
return []
def _extract_json(self, raw_text: str) -> List[Dict[str, Any]]:
"""Extract JSON from AI response using robust parsing."""
try:
# Try direct JSON parsing first
if raw_text.strip().startswith('['):
return json5.loads(raw_text.strip())
# Look for JSON array in response
start_index = raw_text.find('[')
end_index = raw_text.rfind(']')
if start_index != -1 and end_index != -1:
json_str = raw_text[start_index:end_index + 1]
return json5.loads(json_str)
# Look for individual JSON objects
json_objects = []
for line in raw_text.split('\n'):
line = line.strip()
if line.startswith('{') and line.endswith('}'):
try:
json_objects.append(json5.loads(line))
except:
continue
if json_objects:
return json_objects
raise ValueError("No valid JSON found in response")
except Exception as e:
logging.error(f"JSON extraction failed: {e}")
logging.debug(f"Raw text: {raw_text[:500]}...")
return []
def main():
# Enhanced logging setup
log_file = 'processing.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='w'),
logging.StreamHandler(sys.stdout)
]
)
if len(sys.argv) < 2:
logging.error("Usage: python process_brief_enhanced.py <path_to_file> [model_name]")
sys.exit(1)
filepath = sys.argv[1]
model_name = 'gpt-5' # Always use GPT-5 with high reasoning effort
# Initialize enhanced analyzer with specified model
analyzer = DocumentAnalyzer(model_name)
# Process document with enhanced multi-pass approach
logging.info("=== ENHANCED BRIEF PROCESSING STARTED ===")
results = analyzer.process_document_multi_pass(filepath)
if not results.raw_data:
logging.error("No data extracted from document")
return
# Generate output
current_date = datetime.datetime.now().strftime("%d_%m_%y")
base_name = os.path.basename(filepath)
sanitized_name = os.path.splitext(base_name)[0].replace(' ', '_').replace('.', '_')
output_filename = f"{current_date}-{sanitized_name}-ENHANCED-EXTRACTION.csv"
try:
with open(output_filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=CSV_HEADERS, extrasaction='ignore')
writer.writeheader()
writer.writerows(results.raw_data)
# Log processing summary
logging.info("=== PROCESSING SUMMARY ===")
logging.info(f"Document Type: {results.metadata.get('doc_type', 'unknown')}")
logging.info(f"Assets Extracted: {len(results.raw_data)}")
logging.info(f"Confidence Score: {results.confidence_score:.2f}")
logging.info(f"Processing Notes: {', '.join(results.processing_notes)}")
logging.info(f"Output File: {output_filename}")
# Log cost information
cost_summary = results.token_usage.get_summary(model_name)
logging.info("=== COST ANALYSIS ===")
logging.info(f"Model Used: {model_name}")
logging.info(f"Input Tokens: {cost_summary['input_tokens']:,}")
logging.info(f"Cached Input Tokens: {cost_summary['cached_input_tokens']:,}")
logging.info(f"Output Tokens: {cost_summary['output_tokens']:,}")
logging.info(f"Total Tokens: {cost_summary['total_tokens']:,}")
logging.info(f"Total Cost: ${cost_summary['total_cost_usd']:.4f}")
logging.info(f"Cost Breakdown: Input ${cost_summary['cost_breakdown']['input_cost']:.4f}, "
f"Cached ${cost_summary['cost_breakdown']['cached_input_cost']:.4f}, "
f"Output ${cost_summary['cost_breakdown']['output_cost']:.4f}")
# Print cost info for PHP integration
print(f"__COST_SUMMARY__:{cost_summary['total_cost_usd']:.4f}")
print(f"__TOKEN_USAGE__:{cost_summary['input_tokens']}:{cost_summary['output_tokens']}:{cost_summary['total_tokens']}")
# Print filename for PHP integration
print(f"__FILENAME__:{output_filename}")
except Exception as e:
logging.error(f"Error writing CSV: {e}")
if __name__ == "__main__":
main()