ac-tool/backend/core/process_brief_enhanced.py
Vadym Samoilenko d71a044a3c Fix model config alignment and improve error messaging
- Replace sys.exit(1) with ValueError in _validate_models() — prevents killing the worker process on invalid model keys
- ModelConfiguration now reads defaults from core.config instead of hardcoding model keys
- Fix .env: google-gemini20 → google-gemini31 (align with MODEL_MAPPINGS)
- Improve "No data extracted" error message to explain the document must be a marketing brief

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 15:33:54 +00:00

1256 lines
No EOL
54 KiB
Python
Executable file

import sys
import os
import datetime
import logging
import json
import csv
import re
import itertools
import asyncio
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
from enum import Enum
from pydantic import BaseModel
# File Processing Libraries
import pptx
import pandas as pd
import fitz # PyMuPDF
from PIL import Image
import docx
from openpyxl import load_workbook
# AI Libraries
import json5
import base64
# Configuration and LLM Services
from .config import config
from .llm_service import ProviderManager, LLMResponse
from .consolidation_processor import ConsolidationProcessor
# OpenAI GPT-5.1 Pricing (per 1M tokens)
OPENAI_PRICING = {
'gpt-5.1': {
'input': 1.25,
'cached_input': 0.625,
'output': 10.00
}
}
CSV_HEADERS = [
'title', 'status', 'category', 'media', 'asset_type',
'brand_identifier', 'technical_specifications', 'review_date', 'live_date',
'end_date', 'reference_material', 'language_country_market',
'quantity', 'page_number', 'priority_level',
'creative_direction'
]
# Base deliverable with mixed field types (strings for metadata, arrays for multipliers)
class BaseDeliverable(BaseModel):
title: str
status: Optional[str] = ""
category: Optional[str] = ""
media: Optional[str] = ""
asset_type: Optional[str] = ""
brand_identifier: Optional[str] = ""
technical_specifications: Optional[List[str]] = []
review_date: Optional[str] = ""
live_date: Optional[str] = ""
end_date: Optional[str] = ""
reference_material: Optional[str] = ""
language_country_market: Optional[List[str]] = []
quantity: Optional[str] = "1"
page_number: Optional[str] = ""
priority_level: Optional[str] = ""
creative_direction: Optional[str] = ""
# Individual marketing asset (for final CSV output)
class MarketingAsset(BaseModel):
title: str
status: Optional[str] = ""
category: Optional[str] = ""
media: Optional[str] = ""
asset_type: Optional[str] = ""
brand_identifier: Optional[str] = ""
technical_specifications: Optional[str] = ""
review_date: Optional[str] = ""
live_date: Optional[str] = ""
end_date: Optional[str] = ""
reference_material: Optional[str] = ""
language_country_market: Optional[str] = ""
quantity: Optional[str] = "1"
page_number: Optional[str] = ""
priority_level: Optional[str] = ""
creative_direction: Optional[str] = ""
# Base extraction result (from LLM)
class BaseExtractionResult(BaseModel):
assets: List[BaseDeliverable]
# Final extraction result (expanded individual assets)
class AssetExtractionResult(BaseModel):
assets: List[MarketingAsset]
# Load universal schema from external file
def _load_universal_schema():
"""Load universal schema from JSON file"""
try:
# Go up one level from core/ to find prompts/
schema_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts', 'universal_schema.json')
with open(schema_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.error(f"Error loading universal schema: {e}")
raise
# Universal schema for base deliverable extraction (works with all providers)
UNIVERSAL_BASE_DELIVERABLE_SCHEMA = _load_universal_schema()
# Legacy schema maintained for backward compatibility
OPENAI_ASSET_SCHEMA = {
"name": "asset_extraction",
"description": "Extract assets from document analysis",
"schema": {
"type": "object",
"properties": {
"assets": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Asset title or name"},
"status": {"type": "string", "description": "Current status"},
"category": {"type": "string", "description": "Asset category"},
"media": {"type": "string", "description": "Media type"},
"asset_type": {"type": "string", "description": "Specific asset type"},
"brand_identifier": {"type": "string", "description": "Brand or client"},
"technical_specifications": {"type": "string", "description": "Technical specifications including dimensions (e.g., '1080x1920', '1920x1080'), descriptive formats (e.g., 'Mobile Banner', 'Desktop Hero'), file formats, technical requirements, or any other technical details"},
"review_date": {"type": "string", "description": "Review deadline"},
"live_date": {"type": "string", "description": "Go-live date"},
"end_date": {"type": "string", "description": "End/expiry date"},
"reference_material": {"type": "string", "description": "Detailed requirements"},
"language": {"type": "string", "description": "Target language"},
"country": {"type": "string", "description": "Target country/region"},
"quantity": {"type": "string", "description": "Number of assets"},
"page_number": {"type": "string", "description": "Source page"},
"priority_level": {"type": "string", "description": "Business priority"},
"creative_direction": {"type": "string", "description": "Design requirements"}
},
"required": ["title", "technical_specifications"],
"additionalProperties": False
}
}
},
"required": ["assets"],
"additionalProperties": False
}
}
# Legacy Gemini Schema (keep for backward compatibility)
GEMINI_ASSET_SCHEMA = {
"type": "object",
"properties": {
"assets": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Asset title or name"},
"status": {"type": "string", "description": "Current status"},
"category": {"type": "string", "description": "Asset category"},
"media": {"type": "string", "description": "Media type"},
"asset_type": {"type": "string", "description": "Specific asset type"},
"brand_identifier": {"type": "string", "description": "Brand or client"},
"technical_specifications": {"type": "string", "description": "Technical specifications including format/dimensions, file formats, technical requirements, or any other technical details"},
"review_date": {"type": "string", "description": "Review deadline"},
"live_date": {"type": "string", "description": "Go-live date"},
"end_date": {"type": "string", "description": "End/expiry date"},
"reference_material": {"type": "string", "description": "Detailed requirements"},
"language": {"type": "string", "description": "Target language"},
"country": {"type": "string", "description": "Target country/region"},
"quantity": {"type": "string", "description": "Number of assets"},
"page_number": {"type": "string", "description": "Source page"},
"priority_level": {"type": "string", "description": "Business priority"},
"creative_direction": {"type": "string", "description": "Design requirements"}
},
"required": ["title", "technical_specifications"]
}
}
},
"required": ["assets"]
}
class DocumentType(Enum):
POWERPOINT = "powerpoint"
WORD = "word"
PDF = "pdf"
EXCEL = "excel"
UNKNOWN = "unknown"
@dataclass
class TokenUsage:
input_tokens: int = 0
cached_input_tokens: int = 0
output_tokens: int = 0
def add_usage(self, usage_dict: Dict[str, int]):
"""Add token usage from OpenAI Responses API"""
# Support both old (Chat Completions) and new (Responses API) field names
self.input_tokens += usage_dict.get('prompt_tokens', usage_dict.get('input_tokens', 0))
self.cached_input_tokens += usage_dict.get('prompt_tokens_cached', usage_dict.get('input_tokens_cached', 0))
self.output_tokens += usage_dict.get('completion_tokens', usage_dict.get('output_tokens', 0))
def calculate_cost(self, model_name: str) -> float:
"""Calculate total cost based on GPT-5 pricing"""
if model_name not in OPENAI_PRICING:
logging.warning(f"No pricing info for model {model_name}, defaulting to gpt-5")
model_name = 'gpt-5'
pricing = OPENAI_PRICING[model_name]
# Calculate cost per component (pricing is per 1M tokens)
input_cost = (self.input_tokens / 1_000_000) * pricing['input']
cached_cost = (self.cached_input_tokens / 1_000_000) * pricing['cached_input']
output_cost = (self.output_tokens / 1_000_000) * pricing['output']
return input_cost + cached_cost + output_cost
def get_summary(self, model_name: str) -> Dict[str, Any]:
"""Get detailed cost breakdown"""
total_cost = self.calculate_cost(model_name)
return {
'input_tokens': self.input_tokens,
'cached_input_tokens': self.cached_input_tokens,
'output_tokens': self.output_tokens,
'total_tokens': self.input_tokens + self.cached_input_tokens + self.output_tokens,
'total_cost_usd': round(total_cost, 4),
'cost_breakdown': {
'input_cost': round((self.input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['input'], 4),
'cached_input_cost': round((self.cached_input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['cached_input'], 4),
'output_cost': round((self.output_tokens / 1_000_000) * OPENAI_PRICING[model_name]['output'], 4)
}
}
@dataclass
class ProcessingResult:
raw_data: List[Dict[str, Any]]
metadata: Dict[str, Any]
confidence_score: float
processing_notes: List[str]
token_usage: TokenUsage
def create_unique_title(base_title: str, multiplier_values: Dict[str, str], max_suffix_length: int = 40) -> str:
"""Create unique title by appending abbreviated multiplier values"""
def csv_safe_string(text: str) -> str:
"""Replace special characters for CSV compatibility"""
replacements = {
',': '_',
'"': '',
"'": '',
'\n': '_',
'\r': '_',
'\t': '_'
}
result = text
for old, new in replacements.items():
result = result.replace(old, new)
return result
def abbreviate_value(value: str, max_length: int) -> str:
"""Intelligently abbreviate a value while preserving meaning"""
if len(value) <= max_length:
return value
# For technical specs, try to preserve dimensions
if 'x' in value and value.replace('x', '').replace('-', '').isdigit():
# It's a dimension - keep as much as possible
return value[:max_length]
# For market codes, prioritize country part
if '-' in value:
parts = value.split('-')
if len(parts) >= 2:
# Try to keep country code at least
country = parts[-1]
if len(country) <= max_length:
return country
# Fallback: simple truncation
return value[:max_length]
# Extract variable values in priority order: market first, then specs
suffix_parts = []
# Priority 1: Market (language_country_market)
if 'language_country_market' in multiplier_values:
market = multiplier_values['language_country_market']
suffix_parts.append(abbreviate_value(market, 15))
# Priority 2: Technical specifications
if 'technical_specifications' in multiplier_values:
spec = multiplier_values['technical_specifications']
suffix_parts.append(abbreviate_value(spec, 20))
# Create suffix within length limit
if suffix_parts:
suffix = '_'.join(suffix_parts)
if len(suffix) > max_suffix_length:
# Truncate proportionally
if len(suffix_parts) == 2:
market_part = suffix_parts[0][:15]
spec_part = suffix_parts[1][:20]
suffix = f"{market_part}_{spec_part}"
else:
suffix = suffix[:max_suffix_length]
unique_title = f"{base_title}_{suffix}"
else:
unique_title = base_title
# Ensure CSV safety
return csv_safe_string(unique_title)
def expand_deliverables(base_deliverables: List[BaseDeliverable]) -> Tuple[List[MarketingAsset], List[str]]:
"""
Expand base deliverables with multiplier arrays into individual MarketingAsset objects.
Returns: (expanded_assets, warnings)
"""
expanded_assets = []
warnings = []
for base in base_deliverables:
# Convert base deliverable to dict for easier processing
base_dict = base.model_dump()
# Identify fields with arrays (multipliers)
multiplier_fields = {}
single_fields = {}
# Define which fields are multipliers (arrays) vs metadata (strings)
multiplier_field_names = {'technical_specifications', 'language_country_market'}
for field, value in base_dict.items():
if field in multiplier_field_names:
# Multiplier fields should be arrays
if isinstance(value, list) and len(value) > 0:
# Skip empty arrays and arrays with only empty strings
if any(v.strip() for v in value if v): # Has non-empty values
multiplier_fields[field] = [v for v in value if v.strip()] # Filter out empty strings
else:
single_fields[field] = None # Empty array becomes None
elif isinstance(value, str) and value.strip():
# Single string value becomes single-item array for multiplier field
multiplier_fields[field] = [value]
else:
single_fields[field] = None
else:
# Non-multiplier fields should be strings
if isinstance(value, str) and value.strip():
single_fields[field] = value
elif isinstance(value, list) and len(value) > 0:
# If somehow we get an array for a string field, take the first value
single_fields[field] = next((v for v in value if v.strip()), None)
else:
single_fields[field] = None
# If no multiplier fields, create single asset
if not multiplier_fields:
asset_data = {**single_fields, "quantity": "1"}
expanded_assets.append(MarketingAsset(**asset_data))
continue
# Calculate expected count from quantity field if present
expected_quantity = None
if 'quantity' in base_dict and base_dict['quantity']:
try:
# Quantity is now a string field
quantity_str = str(base_dict['quantity'])
if quantity_str and quantity_str != "1":
expected_quantity = int(quantity_str)
except (ValueError, TypeError):
pass
# Generate all combinations using itertools.product
field_names = list(multiplier_fields.keys())
field_values = [multiplier_fields[field] for field in field_names]
combinations = list(itertools.product(*field_values))
actual_count = len(combinations)
# Validate quantity if expected quantity was specified
if expected_quantity and actual_count != expected_quantity:
warnings.append(
f"Quantity mismatch for '{base.title}': expected {expected_quantity}, "
f"but expansion created {actual_count} deliverables"
)
# Create individual assets for each combination
for combo in combinations:
asset_data = single_fields.copy()
# Assign multiplier values from this combination
multiplier_combination = {}
for i, field_name in enumerate(field_names):
asset_data[field_name] = combo[i]
multiplier_combination[field_name] = combo[i]
# Generate unique title with multiplier values
unique_title = create_unique_title(base.title, multiplier_combination)
asset_data["title"] = unique_title
# Ensure quantity is always "1" for individual assets
asset_data["quantity"] = "1"
try:
expanded_assets.append(MarketingAsset(**asset_data))
except Exception as e:
warnings.append(f"Error creating asset for '{base.title}': {e}")
# Log concise expansion summary (reduced verbosity)
expanding_fields = {field: values for field, values in multiplier_fields.items() if len(values) > 1}
if expanding_fields:
logging.debug(f"EXPANDED '{base.title}': {actual_count} deliverables from {len(expanding_fields)} multiplier fields")
else:
logging.debug(f"EXPANDED '{base.title}': {actual_count} deliverable (no multipliers)")
return expanded_assets, warnings
class DocumentAnalyzer:
def __init__(self, primary_models: List[str] = None, consolidation_model: str = None):
self.primary_models = primary_models or config.get_default_primary_models()
self.consolidation_model = consolidation_model or config.DEFAULT_CONSOLIDATION_MODEL
self.provider_manager = ProviderManager()
self.consolidation_processor = ConsolidationProcessor()
self.token_usage = TokenUsage()
# Validate models
self._validate_models()
def _validate_models(self):
"""Validate that specified models are available and configured"""
valid_models = list(config.MODEL_MAPPINGS.keys())
# Validate primary models
for model in self.primary_models:
if model not in valid_models:
raise ValueError(f"Invalid primary model: {model}. Available: {valid_models}")
# Validate consolidation model
if self.consolidation_model not in valid_models:
raise ValueError(f"Invalid consolidation model: {self.consolidation_model}. Available: {valid_models}")
# Validate API keys
api_key_status = config.validate_api_keys()
missing_keys = [provider for provider, valid in api_key_status.items() if not valid]
if missing_keys:
logging.warning(f"Missing API keys for: {missing_keys}")
logging.info(f"Using primary models: {self.primary_models}")
logging.info(f"Using consolidation model: {self.consolidation_model}")
async def _load_prompt(self, prompt_name: str) -> str:
"""Load prompt from external file asynchronously."""
import asyncio
def _read_prompt():
"""Blocking prompt read operation for thread pool"""
# Go up one level from core/ to find prompts/
prompt_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts', f'{prompt_name}.txt')
with open(prompt_path, 'r', encoding='utf-8') as f:
return f.read().strip()
try:
loop = asyncio.get_running_loop()
content = await loop.run_in_executor(None, _read_prompt)
return content
except FileNotFoundError:
logging.error(f"Prompt file not found: {prompt_name}")
raise
except Exception as e:
logging.error(f"Error loading prompt {prompt_name}: {e}")
raise
async def _save_base_deliverables_json(self, base_deliverables: List[BaseDeliverable], doc_type: str):
"""Save intermediate base deliverables with multiplier arrays as JSON."""
import asyncio
def _write_json():
"""Blocking JSON write operation for thread pool"""
# Create base_deliverable_JSON directory if it doesn't exist
json_dir = os.path.join(os.path.dirname(__file__), 'base_deliverable_JSON')
os.makedirs(json_dir, exist_ok=True)
# Generate timestamp for filename
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
filename = f"base_deliverables_{doc_type}_{timestamp}.json"
filepath = os.path.join(json_dir, filename)
# Convert base deliverables to serializable format
json_data = {
"timestamp": timestamp,
"document_type": doc_type,
"base_deliverables_count": len(base_deliverables),
"base_deliverables": [base.model_dump() for base in base_deliverables]
}
# Write JSON file
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=2, ensure_ascii=False)
logging.info(f"Saved base deliverables JSON: {filepath}")
return filepath
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _write_json)
except Exception as e:
logging.warning(f"Failed to save base deliverables JSON: {e}")
def classify_document(self, filepath: str) -> DocumentType:
"""Classify document type based on extension and content."""
extension = os.path.splitext(filepath)[1].lower()
if extension in ['.ppt', '.pptx']:
return DocumentType.POWERPOINT
elif extension in ['.doc', '.docx']:
return DocumentType.WORD
elif extension == '.pdf':
return DocumentType.PDF
elif extension in ['.xls', '.xlsx']:
return DocumentType.EXCEL
else:
return DocumentType.UNKNOWN
def _encode_file_for_openai(self, filepath: str) -> str:
"""Encode file content for OpenAI API."""
try:
with open(filepath, "rb") as file:
return base64.b64encode(file.read()).decode('utf-8')
except Exception as e:
logging.error(f"Error encoding file for OpenAI: {e}")
return None
def _extract_document_content_local(self, filepath: str) -> str:
"""Local fallback extraction using PyMuPDF / python-pptx / python-docx / openpyxl."""
ext = os.path.splitext(filepath)[1].lower()
logging.info(f"Local extraction for {os.path.basename(filepath)} (ext={ext})")
if ext == '.pdf':
doc = fitz.open(filepath)
pages = []
for i, page in enumerate(doc, 1):
text = page.get_text("text")
if text.strip():
pages.append(f"--- Page {i} ---\n{text}")
doc.close()
return "\n\n".join(pages) or "No text content found in PDF."
elif ext in ('.pptx', '.ppt'):
prs = pptx.Presentation(filepath)
slides = []
for i, slide in enumerate(prs.slides, 1):
texts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
texts.append(shape.text.strip())
if texts:
slides.append(f"--- Slide {i} ---\n" + "\n".join(texts))
return "\n\n".join(slides) or "No text content found in presentation."
elif ext in ('.docx', '.doc'):
document = docx.Document(filepath)
paragraphs = [p.text for p in document.paragraphs if p.text.strip()]
for table in document.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
if row_text:
paragraphs.append(row_text)
return "\n".join(paragraphs) or "No text content found in document."
elif ext in ('.xlsx', '.xls'):
wb = load_workbook(filepath, read_only=True, data_only=True)
sheets_text = []
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
rows = []
for row in ws.iter_rows(values_only=True):
cells = [str(c) for c in row if c is not None and str(c).strip()]
if cells:
rows.append(" | ".join(cells))
if rows:
sheets_text.append(f"--- Sheet: {sheet_name} ---\n" + "\n".join(rows))
wb.close()
return "\n\n".join(sheets_text) or "No content found in spreadsheet."
else:
raise Exception(f"Unsupported file type for local extraction: {ext}")
async def _extract_document_content(self, filepath: str) -> str:
"""Extract content from document — LlamaParser if key is configured, else local fallback."""
if not config.LLAMACLOUD_API_KEY:
logging.warning("LLAMACLOUD_API_KEY not set — using local document extraction")
return self._extract_document_content_local(filepath)
try:
from llama_cloud_services import LlamaParse
logging.info(f"Using LlamaParser to extract content from: {os.path.basename(filepath)}")
parser = LlamaParse(
api_key=config.LLAMACLOUD_API_KEY,
parse_mode="parse_page_with_agent",
model="openai-gpt-5",
high_res_ocr=True,
adaptive_long_table=True,
outlined_table_extraction=True,
output_tables_as_HTML=True,
page_separator="\n\n---\n\n",
)
result = await parser.aparse(filepath)
markdown_documents = result.get_markdown_documents(split_by_page=True)
combined_content = "\n\n".join([doc.text for doc in markdown_documents])
logging.info(f"LlamaParser extraction completed. Content length: {len(combined_content)} characters")
return combined_content
except Exception as e:
logging.error(f"LlamaParser failed: {e} — falling back to local extraction")
return self._extract_document_content_local(filepath)
async def process_document_multi_model(self, filepath: str, progress=None) -> ProcessingResult:
"""Process document using parallel multi-model analysis with consolidation."""
logging.info(f"Starting parallel multi-model analysis of '{os.path.basename(filepath)}'")
# Import JobPhase if progress reporting is enabled
if progress:
try:
from server.jobs.models import JobPhase
except ImportError:
# Fallback for CLI usage - create mock enum
class JobPhase:
EXTRACT_CONTENT = 'EXTRACT_CONTENT'
LLM_ANALYSIS = 'LLM_ANALYSIS'
CONSOLIDATION = 'CONSOLIDATION'
# Progress: EXTRACT_CONTENT 0% → 25%
if progress:
await progress.emit(JobPhase.EXTRACT_CONTENT, 10, f'Starting analysis of {os.path.basename(filepath)}')
# Stage 1: Extract document content using LlamaParser
try:
document_content = await self._extract_document_content(filepath)
logging.info(f"Document content extracted using LlamaParser")
if progress:
await progress.emit(JobPhase.EXTRACT_CONTENT, 25, 'Document content extracted successfully')
except Exception as e:
logging.error(f"Content extraction failed: {e}")
if progress:
await progress.emit_failure(f"Content extraction failed: {e}")
return ProcessingResult([], {}, 0.0, [f"Content extraction failed: {e}"], TokenUsage())
# Stage 2: Parallel multi-model analysis
# Progress: LLM_ANALYSIS 25% → 75% (50% weight)
if progress:
await progress.emit(JobPhase.LLM_ANALYSIS, 30, 'Starting parallel multi-model analysis')
logging.info("=== STAGE 2: Starting Parallel Multi-Model Analysis ===")
doc_type = self.classify_document(filepath)
try:
analysis_responses, analysis_metadata = await self._perform_parallel_analysis(
document_content, doc_type, progress
)
logging.info(f"Parallel analysis completed - {len(analysis_responses)} successful models")
if progress:
await progress.emit(JobPhase.LLM_ANALYSIS, 75, f'Parallel analysis completed - {len(analysis_responses)} successful models')
except Exception as e:
logging.error(f"Parallel analysis failed: {e}")
if progress:
await progress.emit_failure(f"Parallel analysis failed: {e}")
return ProcessingResult([], {}, 0.0, [f"Parallel analysis failed: {e}"], TokenUsage())
# Stage 3: Consolidation
# Progress: CONSOLIDATION 75% → 90% (15% weight)
if progress:
await progress.emit(JobPhase.CONSOLIDATION, 75, 'Starting result consolidation')
logging.info("=== STAGE 3: Starting Result Consolidation ===")
try:
consolidation_result = await self.consolidation_processor.consolidate_results(
analysis_responses, self.consolidation_model, document_content
)
logging.info(f"Consolidation completed: {len(consolidation_result.expanded_assets)} final deliverables")
if progress:
await progress.emit(JobPhase.CONSOLIDATION, 90, f'Consolidation completed: {len(consolidation_result.expanded_assets)} final assets')
except Exception as e:
logging.error(f"Consolidation failed: {e}")
if progress:
await progress.emit_failure(f"Consolidation failed: {e}")
return ProcessingResult([], {}, 0.0, [f"Consolidation failed: {e}"], TokenUsage())
# Convert expanded assets to dict format for compatibility
extracted_data = [asset.model_dump() for asset in consolidation_result.expanded_assets]
# Aggregate token usage from all models
total_token_usage = self.provider_manager.get_aggregated_token_usage(analysis_responses)
# Combine processing notes
successful_count = analysis_metadata.get('successful_models', len(analysis_responses))
total_count = analysis_metadata.get('total_models_attempted', len(self.primary_models))
processing_notes = [f"Parallel analysis: {successful_count}/{total_count} models"]
processing_notes.extend(consolidation_result.warnings)
# Merge metadata
combined_metadata = {
'doc_type': doc_type.value,
'primary_models_used': self.primary_models,
'consolidation_model': self.consolidation_model,
'analysis_metadata': analysis_metadata,
'consolidation_metadata': consolidation_result.consolidation_metadata
}
return ProcessingResult(
raw_data=extracted_data,
metadata=combined_metadata,
confidence_score=0.9, # Higher confidence due to multi-model consensus
processing_notes=processing_notes,
token_usage=total_token_usage
)
async def _perform_parallel_analysis(self, document_content: str, doc_type: DocumentType, progress=None) -> Tuple[List[LLMResponse], Dict[str, Any]]:
"""Perform parallel analysis across multiple models"""
# Load prompt from external file
multi_perspective_prompt_template = await self._load_prompt('multi_perspective_analysis')
multi_perspective_prompt = multi_perspective_prompt_template.format(doc_type=doc_type.value)
# Load system message from external file
system_message = await self._load_prompt('system_multi_perspective')
# Prepare combined prompt
combined_prompt = f"{multi_perspective_prompt}\n\nDocument Content:\n{document_content}"
# Prepare messages for all providers
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": combined_prompt}
]
# Get schema for structured output
schema = UNIVERSAL_BASE_DELIVERABLE_SCHEMA
# Create progress callback for provider updates
progress_callback = None
if progress:
progress_callback = self._create_provider_progress_callback(progress)
# Execute parallel analysis
successful_responses, metadata = await self.provider_manager.execute_parallel_analysis(
model_keys=self.primary_models,
messages=messages,
schema=schema,
minimum_success_threshold=config.MINIMUM_SUCCESS_THRESHOLD,
on_model_event=progress_callback
)
return successful_responses, metadata
def _create_provider_progress_callback(self, progress):
"""Create callback function for provider progress updates"""
async def on_model_event(model_key: str, stage: str, data):
try:
if stage == 'start':
await progress.emit_provider_update(model_key, {
'provider': self._get_provider_name(model_key),
'model': self._get_model_display_name(model_key),
'status': 'started',
'startedAt': data.get('timestamp') if data else None
})
elif stage == 'end':
if 'error' in data:
await progress.emit_provider_update(model_key, {
'provider': self._get_provider_name(model_key),
'model': self._get_model_display_name(model_key),
'status': 'error',
'error': str(data['error']),
'completedAt': data.get('timestamp') if data else None
})
else:
response = data.get('response')
cost = data.get('cost', 0)
if response:
await progress.emit_provider_update(model_key, {
'provider': self._get_provider_name(model_key),
'model': self._get_model_display_name(model_key),
'status': 'success',
'completedAt': data.get('timestamp'),
'latencyMs': response.processing_time * 1000 if response.processing_time else None,
'tokensIn': response.token_usage.input_tokens,
'tokensOut': response.token_usage.output_tokens,
'tokensCached': response.token_usage.cached_input_tokens,
'costUsd': cost
})
# Update overall LLM_ANALYSIS progress (25% + completed/total * 50%)
completed_count = len([
p for p in progress.job.provider_updates.values()
if p.status in ['success', 'error']
])
total_count = len(self.primary_models)
analysis_progress = 25 + (completed_count / total_count) * 50
await progress.emit('LLM_ANALYSIS', int(analysis_progress),
f'Analysis progress: {completed_count}/{total_count} models complete')
except Exception as e:
logging.error(f"Error in provider progress callback: {e}")
return on_model_event
def _get_provider_name(self, model_key: str) -> str:
"""Get provider name from model key"""
try:
provider_name, _ = config.get_model_info(model_key)
return provider_name
except:
return model_key.split('-')[0] if '-' in model_key else 'unknown'
def _get_model_display_name(self, model_key: str) -> str:
"""Get display name for model"""
display_names = {
'openai-gpt51': 'GPT-5.1',
'anthropic-opus45': 'Claude Opus 4.5',
'anthropic-sonnet45': 'Claude Sonnet 4.5',
'google-gemini31': 'Gemini 3.1 Pro'
}
return display_names.get(model_key, model_key)
async def _enhance_and_validate_results(self, uploaded_file, initial_results: ProcessingResult) -> ProcessingResult:
"""Enhance results with cross-validation and gap analysis."""
if not initial_results.raw_data:
return initial_results
# Load validation prompt from external file
validation_prompt_template = await self._load_prompt('validation_analysis')
validation_prompt = validation_prompt_template.format(
asset_count=len(initial_results.raw_data),
doc_type=initial_results.metadata.get('doc_type', 'unknown')
)
try:
# For GPT-5 using Responses API with reasoning_effort
combined_prompt = f"{validation_prompt}\n\nDocument Content:\n{uploaded_file}"
logging.info(f"=== CALLING OPENAI RESPONSES API: /v1/responses (Validation parse) ===")
logging.info(f"Model: {self.model_name}, Reasoning Effort: {self.reasoning_effort}")
# Load system message from external file
system_validation_message = await self._load_prompt('system_validation')
response = self.model.responses.parse(
model=self.model_name,
input=[
{"role": "system", "content": system_validation_message},
{"role": "user", "content": combined_prompt}
],
reasoning={"effort": self.reasoning_effort},
text_format=AssetExtractionResult
)
logging.info(f"=== RESPONSES API CALL COMPLETED SUCCESSFULLY (Validation) ===")
# Track token usage for GPT-5 Responses API
if hasattr(response, 'usage'):
usage_dict = {
'input_tokens': response.usage.input_tokens,
'output_tokens': response.usage.output_tokens,
'input_tokens_cached': getattr(response.usage, 'input_tokens_cached', 0)
}
self.token_usage.add_usage(usage_dict)
logging.info(f"Validation Analysis - Tokens: {usage_dict['input_tokens']} input, {usage_dict['output_tokens']} output")
# Extract parsed data from Responses API format
parsed_result = response.output_parsed
logging.info(f"GPT-5 Validation Analysis - Parsed {len(parsed_result.assets)} additional assets")
additional_data = [asset.model_dump() for asset in parsed_result.assets]
logging.info(f"VALIDATION DEBUG - Extracted data type: {type(additional_data)}, length: {len(additional_data)}")
if additional_data and len(additional_data) > 0:
logging.info(f"Validation found {len(additional_data)} additional assets")
logging.info(f"VALIDATION DEBUG - Adding assets: {[asset.get('title', 'No title') for asset in additional_data]}")
initial_results.raw_data.extend(additional_data)
initial_results.processing_notes.append(f"Added {len(additional_data)} assets from validation")
logging.info(f"VALIDATION DEBUG - Total assets after validation: {len(initial_results.raw_data)}")
else:
logging.info("Validation confirmed extraction completeness")
initial_results.confidence_score = 0.95
initial_results.processing_notes.append("Validation confirmed completeness")
if additional_data is not None:
logging.info(f"VALIDATION DEBUG - Validation returned empty array (expected when extraction is complete)")
else:
logging.warning(f"VALIDATION DEBUG - JSON extraction returned None - possible parsing issue")
return initial_results
except Exception as e:
logging.warning(f"Validation step failed: {e}")
initial_results.processing_notes.append(f"Validation failed: {e}")
return initial_results
def _extract_structured_json(self, raw_text: str) -> List[Dict[str, Any]]:
"""Extract structured JSON from AI response with schema validation."""
try:
# Log the raw response for debugging
logging.info(f"Raw response for JSON parsing: {raw_text[:200]}...")
# Parse the structured response
structured_data = json.loads(raw_text)
# Extract assets array from structured response
if 'assets' in structured_data:
assets = structured_data['assets']
logging.info(f"Successfully extracted {len(assets)} assets using structured output")
return assets
else:
logging.warning("No 'assets' key found in structured response")
logging.info(f"Available keys in response: {list(structured_data.keys())}")
return []
except json.JSONDecodeError as e:
logging.warning(f"Structured JSON parsing failed: {e}")
logging.info(f"Raw text causing JSON error: {raw_text[:500]}...")
logging.info("Falling back to legacy parsing")
return self._extract_json(raw_text)
except Exception as e:
logging.error(f"Structured JSON extraction failed: {e}")
logging.info(f"Raw text: {raw_text[:500]}...")
return []
def _extract_json(self, raw_text: str) -> List[Dict[str, Any]]:
"""Extract JSON from AI response using robust parsing."""
try:
# Try direct JSON parsing first
if raw_text.strip().startswith('['):
return json5.loads(raw_text.strip())
# Look for JSON array in response
start_index = raw_text.find('[')
end_index = raw_text.rfind(']')
if start_index != -1 and end_index != -1:
json_str = raw_text[start_index:end_index + 1]
return json5.loads(json_str)
# Look for individual JSON objects
json_objects = []
for line in raw_text.split('\n'):
line = line.strip()
if line.startswith('{') and line.endswith('}'):
try:
json_objects.append(json5.loads(line))
except:
continue
if json_objects:
return json_objects
raise ValueError("No valid JSON found in response")
except Exception as e:
logging.error(f"JSON extraction failed: {e}")
logging.debug(f"Raw text: {raw_text[:500]}...")
return []
def discover_supported_files(folder_path: str) -> List[str]:
"""Discover all supported document files in a folder (top-level only)"""
supported_extensions = {'.pdf', '.pptx', '.docx', '.xlsx', '.ppt', '.doc', '.xls'}
supported_files = []
try:
for filename in os.listdir(folder_path):
# Skip hidden files
if filename.startswith('.'):
continue
file_path = os.path.join(folder_path, filename)
# Only process files (not subdirectories)
if os.path.isfile(file_path):
_, ext = os.path.splitext(filename)
if ext.lower() in supported_extensions:
supported_files.append(file_path)
# Sort alphabetically for consistent processing order
supported_files.sort()
logging.info(f"Discovered {len(supported_files)} supported documents in {folder_path}")
except Exception as e:
logging.error(f"Error discovering files in {folder_path}: {e}")
return supported_files
def parse_arguments():
"""Parse command line arguments"""
import argparse
parser = argparse.ArgumentParser(
description="Enhanced Brief Processing System with Multi-Model Support",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Process single document
python process_brief_enhanced.py document.pdf
# Process entire folder
python process_brief_enhanced.py /path/to/briefs/
# Custom models for batch processing
python process_brief_enhanced.py /path/to/briefs/ \
--primary-models openai-gpt51,anthropic-sonnet45,google-gemini31 \
--consolidation-model anthropic-opus45
# Cost estimation for folder
python process_brief_enhanced.py /path/to/briefs/ --estimate-cost
Available models: openai-gpt51, anthropic-opus45, anthropic-sonnet45, google-gemini31
"""
)
parser.add_argument('filepath', help='Path to document file or folder to process')
parser.add_argument(
'--primary-models',
type=str,
default=config.DEFAULT_PRIMARY_MODELS,
help=f'Comma-separated list of models for primary analysis (default: {config.DEFAULT_PRIMARY_MODELS})'
)
parser.add_argument(
'--consolidation-model',
type=str,
default=config.DEFAULT_CONSOLIDATION_MODEL,
help=f'Model for final consolidation (default: {config.DEFAULT_CONSOLIDATION_MODEL})'
)
parser.add_argument(
'--estimate-cost',
action='store_true',
help='Estimate processing cost before execution'
)
return parser.parse_args()
async def main():
# Enhanced logging setup
log_file = 'processing.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='w'),
logging.StreamHandler(sys.stdout)
]
)
# Parse command line arguments
args = parse_arguments()
filepath = args.filepath
primary_models = args.primary_models.split(',')
consolidation_model = args.consolidation_model
# Initialize multi-model analyzer
analyzer = DocumentAnalyzer(primary_models, consolidation_model)
# Cost estimation if requested
if args.estimate_cost or config.ENABLE_COST_ESTIMATION:
try:
# Rough estimation based on document size
file_size = os.path.getsize(filepath)
estimated_tokens = min(file_size // 4, 50000) # Rough heuristic
cost_breakdown = analyzer.provider_manager.estimate_total_cost(
primary_models + [consolidation_model],
estimated_tokens,
estimated_tokens // 2
)
logging.info("=== COST ESTIMATION ===")
for model, cost in cost_breakdown.items():
if model != 'total':
logging.info(f"{model}: ${cost:.4f}")
logging.info(f"Total Estimated Cost: ${cost_breakdown['total']:.4f}")
if cost_breakdown['total'] > config.MAX_PROCESSING_COST_USD:
response = input(f"Estimated cost (${cost_breakdown['total']:.4f}) exceeds limit (${config.MAX_PROCESSING_COST_USD}). Continue? (y/N): ")
if response.lower() != 'y':
logging.info("Processing cancelled by user")
return
except Exception as e:
logging.warning(f"Cost estimation failed: {e}")
# Determine if input is file or folder
if os.path.isdir(filepath):
# Batch processing mode
logging.info("=== ENHANCED MULTI-MODEL BATCH PROCESSING STARTED ===")
await process_batch_documents(filepath, analyzer, args)
else:
# Single file processing mode
logging.info("=== ENHANCED MULTI-MODEL BRIEF PROCESSING STARTED ===")
await process_single_document(filepath, analyzer)
async def process_batch_documents(folder_path: str, analyzer, args):
"""Process all supported documents in a folder"""
# Discover all supported files
document_files = discover_supported_files(folder_path)
if not document_files:
logging.error(f"No supported documents found in {folder_path}")
return
logging.info(f"Starting batch processing of {len(document_files)} documents")
# Track batch statistics
successful_documents = []
failed_documents = []
total_assets = 0
total_cost = 0.0
# Process each document sequentially
for i, document_path in enumerate(document_files, 1):
document_name = os.path.basename(document_path)
# Progress reporting
logging.info(f"\\n{'='*60}")
logging.info(f"PROCESSING DOCUMENT {i}/{len(document_files)}: {document_name}")
logging.info(f"{'='*60}")
try:
# Process single document using existing logic
results = await analyzer.process_document_multi_model(document_path)
if results.raw_data:
# Generate output file
output_path = generate_output_file(document_path, results)
# Track success statistics
successful_documents.append((document_name, len(results.raw_data), output_path))
total_assets += len(results.raw_data)
# Extract cost information if available
consolidation_metadata = results.metadata.get('consolidation_metadata', {})
doc_cost = consolidation_metadata.get('cost_breakdown', {}).get('total_cost', 0)
total_cost += doc_cost
logging.info(f"SUCCESS: {document_name} - {len(results.raw_data)} assets extracted")
else:
logging.error(f"FAILED: {document_name} - No data extracted")
failed_documents.append((document_name, "No data extracted"))
except Exception as e:
logging.error(f"FAILED: {document_name} - {str(e)}")
failed_documents.append((document_name, str(e)))
# Final batch summary
logging.info(f"\\n{'='*60}")
logging.info("BATCH PROCESSING COMPLETE")
logging.info(f"{'='*60}")
logging.info(f"Documents processed: {len(document_files)}")
logging.info(f"Successful: {len(successful_documents)}")
logging.info(f"Failed: {len(failed_documents)}")
logging.info(f"Total assets extracted: {total_assets}")
logging.info(f"Total estimated cost: ${total_cost:.4f}")
# Report successful documents
if successful_documents:
logging.info(f"\\nSUCCESSFUL DOCUMENTS:")
for doc_name, asset_count, output_path in successful_documents:
logging.info(f"{doc_name}: {asset_count} assets → {output_path}")
# Report failed documents
if failed_documents:
logging.info(f"\\nFAILED DOCUMENTS:")
for doc_name, error in failed_documents:
logging.info(f"{doc_name}: {error}")
# Print summary for PHP integration
print(f"__BATCH_SUMMARY__:{len(successful_documents)}:{len(failed_documents)}:{total_assets}:{total_cost:.4f}")
def generate_output_file(filepath: str, results) -> str:
"""Generate CSV output file for processed document"""
# Generate output path
iso_datetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
base_name = os.path.basename(filepath)
sanitized_name = os.path.splitext(base_name)[0].replace(' ', '_').replace('.', '_')
# Create output directory if it doesn't exist
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)
output_filename = f"{sanitized_name}-{iso_datetime}.csv"
output_path = os.path.join(output_dir, output_filename)
# Write CSV file
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=CSV_HEADERS, extrasaction='ignore')
writer.writeheader()
writer.writerows(results.raw_data)
return output_path
async def process_single_document(filepath: str, analyzer):
"""Process a single document (existing logic)"""
results = await analyzer.process_document_multi_model(filepath)
if not results.raw_data:
logging.error("No data extracted from document")
return
# Generate output file
output_path = generate_output_file(filepath, results)
# Log processing summary
logging.info("=== PROCESSING SUMMARY ===")
logging.info(f"Document Type: {results.metadata.get('doc_type', 'unknown')}")
logging.info(f"Assets Extracted: {len(results.raw_data)}")
logging.info(f"Confidence Score: {results.confidence_score:.2f}")
logging.info(f"Processing Notes: {', '.join(results.processing_notes)}")
logging.info(f"Output File: {output_path}")
# Log cost information from consolidation metadata
consolidation_metadata = results.metadata.get('consolidation_metadata', {})
cost_breakdown = consolidation_metadata.get('cost_breakdown', {})
token_usage = consolidation_metadata.get('token_usage', {})
logging.info("=== COST ANALYSIS ===")
logging.info(f"Primary Models Used: {', '.join(results.metadata.get('primary_models_used', []))}")
logging.info(f"Consolidation Model: {results.metadata.get('consolidation_model', 'Unknown')}")
logging.info(f"Primary Analysis Cost: ${cost_breakdown.get('primary_analysis_cost', 0):.4f}")
logging.info(f"Consolidation Cost: ${cost_breakdown.get('consolidation_cost', 0):.4f}")
logging.info(f"Total Cost: ${cost_breakdown.get('total_cost', 0):.4f}")
logging.info(f"Total Tokens: {token_usage.get('grand_total', results.token_usage.get_total()):,}")
# Cost info now included in ProcessingResult for GUI integration
# Legacy print statements removed as per GUI integration plan
total_cost = cost_breakdown.get('total_cost', 0)
total_tokens = token_usage.get('grand_total', results.token_usage.get_total())
# Only print for CLI usage (when no progress reporter)
if not hasattr(analyzer, '_is_gui_mode'):
print(f"__COST_SUMMARY__:{total_cost:.4f}")
print(f"__TOKEN_USAGE__:{token_usage.get('primary_analysis_total', 0)}:{token_usage.get('consolidation_tokens', 0)}:{total_tokens}")
print(f"__FILENAME__:{output_path}")
if __name__ == "__main__":
asyncio.run(main())