import sys import os import datetime import logging import json import csv import re import itertools import asyncio from typing import List, Dict, Any, Optional, Tuple, Union from dataclasses import dataclass from enum import Enum from pydantic import BaseModel # File Processing Libraries import pptx import pandas as pd import fitz # PyMuPDF from PIL import Image import docx from openpyxl import load_workbook # AI Libraries import json5 import base64 # Configuration and LLM Services from .config import config from .llm_service import ProviderManager, LLMResponse from .consolidation_processor import ConsolidationProcessor # OpenAI GPT-5.1 Pricing (per 1M tokens) OPENAI_PRICING = { 'gpt-5.1': { 'input': 1.25, 'cached_input': 0.625, 'output': 10.00 } } CSV_HEADERS = [ 'title', 'status', 'category', 'media', 'asset_type', 'brand_identifier', 'technical_specifications', 'review_date', 'live_date', 'end_date', 'reference_material', 'language_country_market', 'quantity', 'page_number', 'priority_level', 'creative_direction' ] # Base deliverable with mixed field types (strings for metadata, arrays for multipliers) class BaseDeliverable(BaseModel): title: str status: Optional[str] = "" category: Optional[str] = "" media: Optional[str] = "" asset_type: Optional[str] = "" brand_identifier: Optional[str] = "" technical_specifications: Optional[List[str]] = [] review_date: Optional[str] = "" live_date: Optional[str] = "" end_date: Optional[str] = "" reference_material: Optional[str] = "" language_country_market: Optional[List[str]] = [] quantity: Optional[str] = "1" page_number: Optional[str] = "" priority_level: Optional[str] = "" creative_direction: Optional[str] = "" # Individual marketing asset (for final CSV output) class MarketingAsset(BaseModel): title: str status: Optional[str] = "" category: Optional[str] = "" media: Optional[str] = "" asset_type: Optional[str] = "" brand_identifier: Optional[str] = "" technical_specifications: Optional[str] = "" review_date: Optional[str] = "" live_date: Optional[str] = "" end_date: Optional[str] = "" reference_material: Optional[str] = "" language_country_market: Optional[str] = "" quantity: Optional[str] = "1" page_number: Optional[str] = "" priority_level: Optional[str] = "" creative_direction: Optional[str] = "" # Base extraction result (from LLM) class BaseExtractionResult(BaseModel): assets: List[BaseDeliverable] # Final extraction result (expanded individual assets) class AssetExtractionResult(BaseModel): assets: List[MarketingAsset] # Load universal schema from external file def _load_universal_schema(): """Load universal schema from JSON file""" try: # Go up one level from core/ to find prompts/ schema_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts', 'universal_schema.json') with open(schema_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.error(f"Error loading universal schema: {e}") raise # Universal schema for base deliverable extraction (works with all providers) UNIVERSAL_BASE_DELIVERABLE_SCHEMA = _load_universal_schema() # Legacy schema maintained for backward compatibility OPENAI_ASSET_SCHEMA = { "name": "asset_extraction", "description": "Extract assets from document analysis", "schema": { "type": "object", "properties": { "assets": { "type": "array", "items": { "type": "object", "properties": { "title": {"type": "string", "description": "Asset title or name"}, "status": {"type": "string", "description": "Current status"}, "category": {"type": "string", "description": "Asset category"}, "media": {"type": "string", "description": "Media type"}, "asset_type": {"type": "string", "description": "Specific asset type"}, "brand_identifier": {"type": "string", "description": "Brand or client"}, "technical_specifications": {"type": "string", "description": "Technical specifications including dimensions (e.g., '1080x1920', '1920x1080'), descriptive formats (e.g., 'Mobile Banner', 'Desktop Hero'), file formats, technical requirements, or any other technical details"}, "review_date": {"type": "string", "description": "Review deadline"}, "live_date": {"type": "string", "description": "Go-live date"}, "end_date": {"type": "string", "description": "End/expiry date"}, "reference_material": {"type": "string", "description": "Detailed requirements"}, "language": {"type": "string", "description": "Target language"}, "country": {"type": "string", "description": "Target country/region"}, "quantity": {"type": "string", "description": "Number of assets"}, "page_number": {"type": "string", "description": "Source page"}, "priority_level": {"type": "string", "description": "Business priority"}, "creative_direction": {"type": "string", "description": "Design requirements"} }, "required": ["title", "technical_specifications"], "additionalProperties": False } } }, "required": ["assets"], "additionalProperties": False } } # Legacy Gemini Schema (keep for backward compatibility) GEMINI_ASSET_SCHEMA = { "type": "object", "properties": { "assets": { "type": "array", "items": { "type": "object", "properties": { "title": {"type": "string", "description": "Asset title or name"}, "status": {"type": "string", "description": "Current status"}, "category": {"type": "string", "description": "Asset category"}, "media": {"type": "string", "description": "Media type"}, "asset_type": {"type": "string", "description": "Specific asset type"}, "brand_identifier": {"type": "string", "description": "Brand or client"}, "technical_specifications": {"type": "string", "description": "Technical specifications including format/dimensions, file formats, technical requirements, or any other technical details"}, "review_date": {"type": "string", "description": "Review deadline"}, "live_date": {"type": "string", "description": "Go-live date"}, "end_date": {"type": "string", "description": "End/expiry date"}, "reference_material": {"type": "string", "description": "Detailed requirements"}, "language": {"type": "string", "description": "Target language"}, "country": {"type": "string", "description": "Target country/region"}, "quantity": {"type": "string", "description": "Number of assets"}, "page_number": {"type": "string", "description": "Source page"}, "priority_level": {"type": "string", "description": "Business priority"}, "creative_direction": {"type": "string", "description": "Design requirements"} }, "required": ["title", "technical_specifications"] } } }, "required": ["assets"] } class DocumentType(Enum): POWERPOINT = "powerpoint" WORD = "word" PDF = "pdf" EXCEL = "excel" UNKNOWN = "unknown" @dataclass class TokenUsage: input_tokens: int = 0 cached_input_tokens: int = 0 output_tokens: int = 0 def add_usage(self, usage_dict: Dict[str, int]): """Add token usage from OpenAI Responses API""" # Support both old (Chat Completions) and new (Responses API) field names self.input_tokens += usage_dict.get('prompt_tokens', usage_dict.get('input_tokens', 0)) self.cached_input_tokens += usage_dict.get('prompt_tokens_cached', usage_dict.get('input_tokens_cached', 0)) self.output_tokens += usage_dict.get('completion_tokens', usage_dict.get('output_tokens', 0)) def calculate_cost(self, model_name: str) -> float: """Calculate total cost based on GPT-5 pricing""" if model_name not in OPENAI_PRICING: logging.warning(f"No pricing info for model {model_name}, defaulting to gpt-5") model_name = 'gpt-5' pricing = OPENAI_PRICING[model_name] # Calculate cost per component (pricing is per 1M tokens) input_cost = (self.input_tokens / 1_000_000) * pricing['input'] cached_cost = (self.cached_input_tokens / 1_000_000) * pricing['cached_input'] output_cost = (self.output_tokens / 1_000_000) * pricing['output'] return input_cost + cached_cost + output_cost def get_summary(self, model_name: str) -> Dict[str, Any]: """Get detailed cost breakdown""" total_cost = self.calculate_cost(model_name) return { 'input_tokens': self.input_tokens, 'cached_input_tokens': self.cached_input_tokens, 'output_tokens': self.output_tokens, 'total_tokens': self.input_tokens + self.cached_input_tokens + self.output_tokens, 'total_cost_usd': round(total_cost, 4), 'cost_breakdown': { 'input_cost': round((self.input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['input'], 4), 'cached_input_cost': round((self.cached_input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['cached_input'], 4), 'output_cost': round((self.output_tokens / 1_000_000) * OPENAI_PRICING[model_name]['output'], 4) } } @dataclass class ProcessingResult: raw_data: List[Dict[str, Any]] metadata: Dict[str, Any] confidence_score: float processing_notes: List[str] token_usage: TokenUsage def create_unique_title(base_title: str, multiplier_values: Dict[str, str], max_suffix_length: int = 40) -> str: """Create unique title by appending abbreviated multiplier values""" def csv_safe_string(text: str) -> str: """Replace special characters for CSV compatibility""" replacements = { ',': '_', '"': '', "'": '', '\n': '_', '\r': '_', '\t': '_' } result = text for old, new in replacements.items(): result = result.replace(old, new) return result def abbreviate_value(value: str, max_length: int) -> str: """Intelligently abbreviate a value while preserving meaning""" if len(value) <= max_length: return value # For technical specs, try to preserve dimensions if 'x' in value and value.replace('x', '').replace('-', '').isdigit(): # It's a dimension - keep as much as possible return value[:max_length] # For market codes, prioritize country part if '-' in value: parts = value.split('-') if len(parts) >= 2: # Try to keep country code at least country = parts[-1] if len(country) <= max_length: return country # Fallback: simple truncation return value[:max_length] # Extract variable values in priority order: market first, then specs suffix_parts = [] # Priority 1: Market (language_country_market) if 'language_country_market' in multiplier_values: market = multiplier_values['language_country_market'] suffix_parts.append(abbreviate_value(market, 15)) # Priority 2: Technical specifications if 'technical_specifications' in multiplier_values: spec = multiplier_values['technical_specifications'] suffix_parts.append(abbreviate_value(spec, 20)) # Create suffix within length limit if suffix_parts: suffix = '_'.join(suffix_parts) if len(suffix) > max_suffix_length: # Truncate proportionally if len(suffix_parts) == 2: market_part = suffix_parts[0][:15] spec_part = suffix_parts[1][:20] suffix = f"{market_part}_{spec_part}" else: suffix = suffix[:max_suffix_length] unique_title = f"{base_title}_{suffix}" else: unique_title = base_title # Ensure CSV safety return csv_safe_string(unique_title) def expand_deliverables(base_deliverables: List[BaseDeliverable]) -> Tuple[List[MarketingAsset], List[str]]: """ Expand base deliverables with multiplier arrays into individual MarketingAsset objects. Returns: (expanded_assets, warnings) """ expanded_assets = [] warnings = [] for base in base_deliverables: # Convert base deliverable to dict for easier processing base_dict = base.model_dump() # Identify fields with arrays (multipliers) multiplier_fields = {} single_fields = {} # Define which fields are multipliers (arrays) vs metadata (strings) multiplier_field_names = {'technical_specifications', 'language_country_market'} for field, value in base_dict.items(): if field in multiplier_field_names: # Multiplier fields should be arrays if isinstance(value, list) and len(value) > 0: # Skip empty arrays and arrays with only empty strings if any(v.strip() for v in value if v): # Has non-empty values multiplier_fields[field] = [v for v in value if v.strip()] # Filter out empty strings else: single_fields[field] = None # Empty array becomes None elif isinstance(value, str) and value.strip(): # Single string value becomes single-item array for multiplier field multiplier_fields[field] = [value] else: single_fields[field] = None else: # Non-multiplier fields should be strings if isinstance(value, str) and value.strip(): single_fields[field] = value elif isinstance(value, list) and len(value) > 0: # If somehow we get an array for a string field, take the first value single_fields[field] = next((v for v in value if v.strip()), None) else: single_fields[field] = None # If no multiplier fields, create single asset if not multiplier_fields: asset_data = {**single_fields, "quantity": "1"} expanded_assets.append(MarketingAsset(**asset_data)) continue # Calculate expected count from quantity field if present expected_quantity = None if 'quantity' in base_dict and base_dict['quantity']: try: # Quantity is now a string field quantity_str = str(base_dict['quantity']) if quantity_str and quantity_str != "1": expected_quantity = int(quantity_str) except (ValueError, TypeError): pass # Generate all combinations using itertools.product field_names = list(multiplier_fields.keys()) field_values = [multiplier_fields[field] for field in field_names] combinations = list(itertools.product(*field_values)) actual_count = len(combinations) # Validate quantity if expected quantity was specified if expected_quantity and actual_count != expected_quantity: warnings.append( f"Quantity mismatch for '{base.title}': expected {expected_quantity}, " f"but expansion created {actual_count} deliverables" ) # Create individual assets for each combination for combo in combinations: asset_data = single_fields.copy() # Assign multiplier values from this combination multiplier_combination = {} for i, field_name in enumerate(field_names): asset_data[field_name] = combo[i] multiplier_combination[field_name] = combo[i] # Generate unique title with multiplier values unique_title = create_unique_title(base.title, multiplier_combination) asset_data["title"] = unique_title # Ensure quantity is always "1" for individual assets asset_data["quantity"] = "1" try: expanded_assets.append(MarketingAsset(**asset_data)) except Exception as e: warnings.append(f"Error creating asset for '{base.title}': {e}") # Log concise expansion summary (reduced verbosity) expanding_fields = {field: values for field, values in multiplier_fields.items() if len(values) > 1} if expanding_fields: logging.debug(f"EXPANDED '{base.title}': {actual_count} deliverables from {len(expanding_fields)} multiplier fields") else: logging.debug(f"EXPANDED '{base.title}': {actual_count} deliverable (no multipliers)") return expanded_assets, warnings class DocumentAnalyzer: def __init__(self, primary_models: List[str] = None, consolidation_model: str = None): self.primary_models = primary_models or config.get_default_primary_models() self.consolidation_model = consolidation_model or config.DEFAULT_CONSOLIDATION_MODEL self.provider_manager = ProviderManager() self.consolidation_processor = ConsolidationProcessor() self.token_usage = TokenUsage() # Validate models self._validate_models() def _validate_models(self): """Validate that specified models are available and configured""" valid_models = list(config.MODEL_MAPPINGS.keys()) # Validate primary models for model in self.primary_models: if model not in valid_models: raise ValueError(f"Invalid primary model: {model}. Available: {valid_models}") # Validate consolidation model if self.consolidation_model not in valid_models: raise ValueError(f"Invalid consolidation model: {self.consolidation_model}. Available: {valid_models}") # Validate API keys api_key_status = config.validate_api_keys() missing_keys = [provider for provider, valid in api_key_status.items() if not valid] if missing_keys: logging.warning(f"Missing API keys for: {missing_keys}") logging.info(f"Using primary models: {self.primary_models}") logging.info(f"Using consolidation model: {self.consolidation_model}") async def _load_prompt(self, prompt_name: str) -> str: """Load prompt from external file asynchronously.""" import asyncio def _read_prompt(): """Blocking prompt read operation for thread pool""" # Go up one level from core/ to find prompts/ prompt_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts', f'{prompt_name}.txt') with open(prompt_path, 'r', encoding='utf-8') as f: return f.read().strip() try: loop = asyncio.get_running_loop() content = await loop.run_in_executor(None, _read_prompt) return content except FileNotFoundError: logging.error(f"Prompt file not found: {prompt_name}") raise except Exception as e: logging.error(f"Error loading prompt {prompt_name}: {e}") raise async def _save_base_deliverables_json(self, base_deliverables: List[BaseDeliverable], doc_type: str): """Save intermediate base deliverables with multiplier arrays as JSON.""" import asyncio def _write_json(): """Blocking JSON write operation for thread pool""" # Create base_deliverable_JSON directory if it doesn't exist json_dir = os.path.join(os.path.dirname(__file__), 'base_deliverable_JSON') os.makedirs(json_dir, exist_ok=True) # Generate timestamp for filename timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") filename = f"base_deliverables_{doc_type}_{timestamp}.json" filepath = os.path.join(json_dir, filename) # Convert base deliverables to serializable format json_data = { "timestamp": timestamp, "document_type": doc_type, "base_deliverables_count": len(base_deliverables), "base_deliverables": [base.model_dump() for base in base_deliverables] } # Write JSON file with open(filepath, 'w', encoding='utf-8') as f: json.dump(json_data, f, indent=2, ensure_ascii=False) logging.info(f"Saved base deliverables JSON: {filepath}") return filepath try: loop = asyncio.get_running_loop() await loop.run_in_executor(None, _write_json) except Exception as e: logging.warning(f"Failed to save base deliverables JSON: {e}") def classify_document(self, filepath: str) -> DocumentType: """Classify document type based on extension and content.""" extension = os.path.splitext(filepath)[1].lower() if extension in ['.ppt', '.pptx']: return DocumentType.POWERPOINT elif extension in ['.doc', '.docx']: return DocumentType.WORD elif extension == '.pdf': return DocumentType.PDF elif extension in ['.xls', '.xlsx']: return DocumentType.EXCEL else: return DocumentType.UNKNOWN def _encode_file_for_openai(self, filepath: str) -> str: """Encode file content for OpenAI API.""" try: with open(filepath, "rb") as file: return base64.b64encode(file.read()).decode('utf-8') except Exception as e: logging.error(f"Error encoding file for OpenAI: {e}") return None def _extract_document_content_local(self, filepath: str) -> str: """Local fallback extraction using PyMuPDF / python-pptx / python-docx / openpyxl.""" ext = os.path.splitext(filepath)[1].lower() logging.info(f"Local extraction for {os.path.basename(filepath)} (ext={ext})") if ext == '.pdf': doc = fitz.open(filepath) pages = [] for i, page in enumerate(doc, 1): text = page.get_text("text") if text.strip(): pages.append(f"--- Page {i} ---\n{text}") doc.close() return "\n\n".join(pages) or "No text content found in PDF." elif ext in ('.pptx', '.ppt'): prs = pptx.Presentation(filepath) slides = [] for i, slide in enumerate(prs.slides, 1): texts = [] for shape in slide.shapes: if hasattr(shape, "text") and shape.text.strip(): texts.append(shape.text.strip()) if texts: slides.append(f"--- Slide {i} ---\n" + "\n".join(texts)) return "\n\n".join(slides) or "No text content found in presentation." elif ext in ('.docx', '.doc'): document = docx.Document(filepath) paragraphs = [p.text for p in document.paragraphs if p.text.strip()] for table in document.tables: for row in table.rows: row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip()) if row_text: paragraphs.append(row_text) return "\n".join(paragraphs) or "No text content found in document." elif ext in ('.xlsx', '.xls'): wb = load_workbook(filepath, read_only=True, data_only=True) sheets_text = [] for sheet_name in wb.sheetnames: ws = wb[sheet_name] rows = [] for row in ws.iter_rows(values_only=True): cells = [str(c) for c in row if c is not None and str(c).strip()] if cells: rows.append(" | ".join(cells)) if rows: sheets_text.append(f"--- Sheet: {sheet_name} ---\n" + "\n".join(rows)) wb.close() return "\n\n".join(sheets_text) or "No content found in spreadsheet." else: raise Exception(f"Unsupported file type for local extraction: {ext}") async def _extract_document_content(self, filepath: str) -> str: """Extract content from document — LlamaParser if key is configured, else local fallback.""" if not config.LLAMACLOUD_API_KEY: logging.warning("LLAMACLOUD_API_KEY not set — using local document extraction") return self._extract_document_content_local(filepath) try: from llama_cloud_services import LlamaParse logging.info(f"Using LlamaParser to extract content from: {os.path.basename(filepath)}") parser = LlamaParse( api_key=config.LLAMACLOUD_API_KEY, parse_mode="parse_page_with_agent", model="openai-gpt-5", high_res_ocr=True, adaptive_long_table=True, outlined_table_extraction=True, output_tables_as_HTML=True, page_separator="\n\n---\n\n", ) result = await parser.aparse(filepath) markdown_documents = result.get_markdown_documents(split_by_page=True) combined_content = "\n\n".join([doc.text for doc in markdown_documents]) logging.info(f"LlamaParser extraction completed. Content length: {len(combined_content)} characters") return combined_content except Exception as e: logging.error(f"LlamaParser failed: {e} — falling back to local extraction") return self._extract_document_content_local(filepath) async def process_document_multi_model(self, filepath: str, progress=None) -> ProcessingResult: """Process document using parallel multi-model analysis with consolidation.""" logging.info(f"Starting parallel multi-model analysis of '{os.path.basename(filepath)}'") # Import JobPhase if progress reporting is enabled if progress: try: from server.jobs.models import JobPhase except ImportError: # Fallback for CLI usage - create mock enum class JobPhase: EXTRACT_CONTENT = 'EXTRACT_CONTENT' LLM_ANALYSIS = 'LLM_ANALYSIS' CONSOLIDATION = 'CONSOLIDATION' # Progress: EXTRACT_CONTENT 0% → 25% if progress: await progress.emit(JobPhase.EXTRACT_CONTENT, 10, f'Starting analysis of {os.path.basename(filepath)}') # Stage 1: Extract document content using LlamaParser try: document_content = await self._extract_document_content(filepath) logging.info(f"Document content extracted using LlamaParser") if progress: await progress.emit(JobPhase.EXTRACT_CONTENT, 25, 'Document content extracted successfully') except Exception as e: logging.error(f"Content extraction failed: {e}") if progress: await progress.emit_failure(f"Content extraction failed: {e}") return ProcessingResult([], {}, 0.0, [f"Content extraction failed: {e}"], TokenUsage()) # Stage 2: Parallel multi-model analysis # Progress: LLM_ANALYSIS 25% → 75% (50% weight) if progress: await progress.emit(JobPhase.LLM_ANALYSIS, 30, 'Starting parallel multi-model analysis') logging.info("=== STAGE 2: Starting Parallel Multi-Model Analysis ===") doc_type = self.classify_document(filepath) try: analysis_responses, analysis_metadata = await self._perform_parallel_analysis( document_content, doc_type, progress ) logging.info(f"Parallel analysis completed - {len(analysis_responses)} successful models") if progress: await progress.emit(JobPhase.LLM_ANALYSIS, 75, f'Parallel analysis completed - {len(analysis_responses)} successful models') except Exception as e: logging.error(f"Parallel analysis failed: {e}") if progress: await progress.emit_failure(f"Parallel analysis failed: {e}") return ProcessingResult([], {}, 0.0, [f"Parallel analysis failed: {e}"], TokenUsage()) # Stage 3: Consolidation # Progress: CONSOLIDATION 75% → 90% (15% weight) if progress: await progress.emit(JobPhase.CONSOLIDATION, 75, 'Starting result consolidation') logging.info("=== STAGE 3: Starting Result Consolidation ===") try: consolidation_result = await self.consolidation_processor.consolidate_results( analysis_responses, self.consolidation_model, document_content ) logging.info(f"Consolidation completed: {len(consolidation_result.expanded_assets)} final deliverables") if progress: await progress.emit(JobPhase.CONSOLIDATION, 90, f'Consolidation completed: {len(consolidation_result.expanded_assets)} final assets') except Exception as e: logging.error(f"Consolidation failed: {e}") if progress: await progress.emit_failure(f"Consolidation failed: {e}") return ProcessingResult([], {}, 0.0, [f"Consolidation failed: {e}"], TokenUsage()) # Convert expanded assets to dict format for compatibility extracted_data = [asset.model_dump() for asset in consolidation_result.expanded_assets] # Aggregate token usage from all models total_token_usage = self.provider_manager.get_aggregated_token_usage(analysis_responses) # Combine processing notes successful_count = analysis_metadata.get('successful_models', len(analysis_responses)) total_count = analysis_metadata.get('total_models_attempted', len(self.primary_models)) processing_notes = [f"Parallel analysis: {successful_count}/{total_count} models"] processing_notes.extend(consolidation_result.warnings) # Merge metadata combined_metadata = { 'doc_type': doc_type.value, 'primary_models_used': self.primary_models, 'consolidation_model': self.consolidation_model, 'analysis_metadata': analysis_metadata, 'consolidation_metadata': consolidation_result.consolidation_metadata } return ProcessingResult( raw_data=extracted_data, metadata=combined_metadata, confidence_score=0.9, # Higher confidence due to multi-model consensus processing_notes=processing_notes, token_usage=total_token_usage ) async def _perform_parallel_analysis(self, document_content: str, doc_type: DocumentType, progress=None) -> Tuple[List[LLMResponse], Dict[str, Any]]: """Perform parallel analysis across multiple models""" # Load prompt from external file multi_perspective_prompt_template = await self._load_prompt('multi_perspective_analysis') multi_perspective_prompt = multi_perspective_prompt_template.format(doc_type=doc_type.value) # Load system message from external file system_message = await self._load_prompt('system_multi_perspective') # Prepare combined prompt combined_prompt = f"{multi_perspective_prompt}\n\nDocument Content:\n{document_content}" # Prepare messages for all providers messages = [ {"role": "system", "content": system_message}, {"role": "user", "content": combined_prompt} ] # Get schema for structured output schema = UNIVERSAL_BASE_DELIVERABLE_SCHEMA # Create progress callback for provider updates progress_callback = None if progress: progress_callback = self._create_provider_progress_callback(progress) # Execute parallel analysis successful_responses, metadata = await self.provider_manager.execute_parallel_analysis( model_keys=self.primary_models, messages=messages, schema=schema, minimum_success_threshold=config.MINIMUM_SUCCESS_THRESHOLD, on_model_event=progress_callback ) return successful_responses, metadata def _create_provider_progress_callback(self, progress): """Create callback function for provider progress updates""" async def on_model_event(model_key: str, stage: str, data): try: if stage == 'start': await progress.emit_provider_update(model_key, { 'provider': self._get_provider_name(model_key), 'model': self._get_model_display_name(model_key), 'status': 'started', 'startedAt': data.get('timestamp') if data else None }) elif stage == 'end': if 'error' in data: await progress.emit_provider_update(model_key, { 'provider': self._get_provider_name(model_key), 'model': self._get_model_display_name(model_key), 'status': 'error', 'error': str(data['error']), 'completedAt': data.get('timestamp') if data else None }) else: response = data.get('response') cost = data.get('cost', 0) if response: await progress.emit_provider_update(model_key, { 'provider': self._get_provider_name(model_key), 'model': self._get_model_display_name(model_key), 'status': 'success', 'completedAt': data.get('timestamp'), 'latencyMs': response.processing_time * 1000 if response.processing_time else None, 'tokensIn': response.token_usage.input_tokens, 'tokensOut': response.token_usage.output_tokens, 'tokensCached': response.token_usage.cached_input_tokens, 'costUsd': cost }) # Update overall LLM_ANALYSIS progress (25% + completed/total * 50%) completed_count = len([ p for p in progress.job.provider_updates.values() if p.status in ['success', 'error'] ]) total_count = len(self.primary_models) analysis_progress = 25 + (completed_count / total_count) * 50 await progress.emit('LLM_ANALYSIS', int(analysis_progress), f'Analysis progress: {completed_count}/{total_count} models complete') except Exception as e: logging.error(f"Error in provider progress callback: {e}") return on_model_event def _get_provider_name(self, model_key: str) -> str: """Get provider name from model key""" try: provider_name, _ = config.get_model_info(model_key) return provider_name except: return model_key.split('-')[0] if '-' in model_key else 'unknown' def _get_model_display_name(self, model_key: str) -> str: """Get display name for model""" display_names = { 'openai-gpt51': 'GPT-5.1', 'anthropic-opus45': 'Claude Opus 4.5', 'anthropic-sonnet45': 'Claude Sonnet 4.5', 'google-gemini31': 'Gemini 3.1 Pro' } return display_names.get(model_key, model_key) async def _enhance_and_validate_results(self, uploaded_file, initial_results: ProcessingResult) -> ProcessingResult: """Enhance results with cross-validation and gap analysis.""" if not initial_results.raw_data: return initial_results # Load validation prompt from external file validation_prompt_template = await self._load_prompt('validation_analysis') validation_prompt = validation_prompt_template.format( asset_count=len(initial_results.raw_data), doc_type=initial_results.metadata.get('doc_type', 'unknown') ) try: # For GPT-5 using Responses API with reasoning_effort combined_prompt = f"{validation_prompt}\n\nDocument Content:\n{uploaded_file}" logging.info(f"=== CALLING OPENAI RESPONSES API: /v1/responses (Validation parse) ===") logging.info(f"Model: {self.model_name}, Reasoning Effort: {self.reasoning_effort}") # Load system message from external file system_validation_message = await self._load_prompt('system_validation') response = self.model.responses.parse( model=self.model_name, input=[ {"role": "system", "content": system_validation_message}, {"role": "user", "content": combined_prompt} ], reasoning={"effort": self.reasoning_effort}, text_format=AssetExtractionResult ) logging.info(f"=== RESPONSES API CALL COMPLETED SUCCESSFULLY (Validation) ===") # Track token usage for GPT-5 Responses API if hasattr(response, 'usage'): usage_dict = { 'input_tokens': response.usage.input_tokens, 'output_tokens': response.usage.output_tokens, 'input_tokens_cached': getattr(response.usage, 'input_tokens_cached', 0) } self.token_usage.add_usage(usage_dict) logging.info(f"Validation Analysis - Tokens: {usage_dict['input_tokens']} input, {usage_dict['output_tokens']} output") # Extract parsed data from Responses API format parsed_result = response.output_parsed logging.info(f"GPT-5 Validation Analysis - Parsed {len(parsed_result.assets)} additional assets") additional_data = [asset.model_dump() for asset in parsed_result.assets] logging.info(f"VALIDATION DEBUG - Extracted data type: {type(additional_data)}, length: {len(additional_data)}") if additional_data and len(additional_data) > 0: logging.info(f"Validation found {len(additional_data)} additional assets") logging.info(f"VALIDATION DEBUG - Adding assets: {[asset.get('title', 'No title') for asset in additional_data]}") initial_results.raw_data.extend(additional_data) initial_results.processing_notes.append(f"Added {len(additional_data)} assets from validation") logging.info(f"VALIDATION DEBUG - Total assets after validation: {len(initial_results.raw_data)}") else: logging.info("Validation confirmed extraction completeness") initial_results.confidence_score = 0.95 initial_results.processing_notes.append("Validation confirmed completeness") if additional_data is not None: logging.info(f"VALIDATION DEBUG - Validation returned empty array (expected when extraction is complete)") else: logging.warning(f"VALIDATION DEBUG - JSON extraction returned None - possible parsing issue") return initial_results except Exception as e: logging.warning(f"Validation step failed: {e}") initial_results.processing_notes.append(f"Validation failed: {e}") return initial_results def _extract_structured_json(self, raw_text: str) -> List[Dict[str, Any]]: """Extract structured JSON from AI response with schema validation.""" try: # Log the raw response for debugging logging.info(f"Raw response for JSON parsing: {raw_text[:200]}...") # Parse the structured response structured_data = json.loads(raw_text) # Extract assets array from structured response if 'assets' in structured_data: assets = structured_data['assets'] logging.info(f"Successfully extracted {len(assets)} assets using structured output") return assets else: logging.warning("No 'assets' key found in structured response") logging.info(f"Available keys in response: {list(structured_data.keys())}") return [] except json.JSONDecodeError as e: logging.warning(f"Structured JSON parsing failed: {e}") logging.info(f"Raw text causing JSON error: {raw_text[:500]}...") logging.info("Falling back to legacy parsing") return self._extract_json(raw_text) except Exception as e: logging.error(f"Structured JSON extraction failed: {e}") logging.info(f"Raw text: {raw_text[:500]}...") return [] def _extract_json(self, raw_text: str) -> List[Dict[str, Any]]: """Extract JSON from AI response using robust parsing.""" try: # Try direct JSON parsing first if raw_text.strip().startswith('['): return json5.loads(raw_text.strip()) # Look for JSON array in response start_index = raw_text.find('[') end_index = raw_text.rfind(']') if start_index != -1 and end_index != -1: json_str = raw_text[start_index:end_index + 1] return json5.loads(json_str) # Look for individual JSON objects json_objects = [] for line in raw_text.split('\n'): line = line.strip() if line.startswith('{') and line.endswith('}'): try: json_objects.append(json5.loads(line)) except: continue if json_objects: return json_objects raise ValueError("No valid JSON found in response") except Exception as e: logging.error(f"JSON extraction failed: {e}") logging.debug(f"Raw text: {raw_text[:500]}...") return [] def discover_supported_files(folder_path: str) -> List[str]: """Discover all supported document files in a folder (top-level only)""" supported_extensions = {'.pdf', '.pptx', '.docx', '.xlsx', '.ppt', '.doc', '.xls'} supported_files = [] try: for filename in os.listdir(folder_path): # Skip hidden files if filename.startswith('.'): continue file_path = os.path.join(folder_path, filename) # Only process files (not subdirectories) if os.path.isfile(file_path): _, ext = os.path.splitext(filename) if ext.lower() in supported_extensions: supported_files.append(file_path) # Sort alphabetically for consistent processing order supported_files.sort() logging.info(f"Discovered {len(supported_files)} supported documents in {folder_path}") except Exception as e: logging.error(f"Error discovering files in {folder_path}: {e}") return supported_files def parse_arguments(): """Parse command line arguments""" import argparse parser = argparse.ArgumentParser( description="Enhanced Brief Processing System with Multi-Model Support", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Process single document python process_brief_enhanced.py document.pdf # Process entire folder python process_brief_enhanced.py /path/to/briefs/ # Custom models for batch processing python process_brief_enhanced.py /path/to/briefs/ \ --primary-models openai-gpt51,anthropic-sonnet45,google-gemini31 \ --consolidation-model anthropic-opus45 # Cost estimation for folder python process_brief_enhanced.py /path/to/briefs/ --estimate-cost Available models: openai-gpt51, anthropic-opus45, anthropic-sonnet45, google-gemini31 """ ) parser.add_argument('filepath', help='Path to document file or folder to process') parser.add_argument( '--primary-models', type=str, default=config.DEFAULT_PRIMARY_MODELS, help=f'Comma-separated list of models for primary analysis (default: {config.DEFAULT_PRIMARY_MODELS})' ) parser.add_argument( '--consolidation-model', type=str, default=config.DEFAULT_CONSOLIDATION_MODEL, help=f'Model for final consolidation (default: {config.DEFAULT_CONSOLIDATION_MODEL})' ) parser.add_argument( '--estimate-cost', action='store_true', help='Estimate processing cost before execution' ) return parser.parse_args() async def main(): # Enhanced logging setup log_file = 'processing.log' logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file, mode='w'), logging.StreamHandler(sys.stdout) ] ) # Parse command line arguments args = parse_arguments() filepath = args.filepath primary_models = args.primary_models.split(',') consolidation_model = args.consolidation_model # Initialize multi-model analyzer analyzer = DocumentAnalyzer(primary_models, consolidation_model) # Cost estimation if requested if args.estimate_cost or config.ENABLE_COST_ESTIMATION: try: # Rough estimation based on document size file_size = os.path.getsize(filepath) estimated_tokens = min(file_size // 4, 50000) # Rough heuristic cost_breakdown = analyzer.provider_manager.estimate_total_cost( primary_models + [consolidation_model], estimated_tokens, estimated_tokens // 2 ) logging.info("=== COST ESTIMATION ===") for model, cost in cost_breakdown.items(): if model != 'total': logging.info(f"{model}: ${cost:.4f}") logging.info(f"Total Estimated Cost: ${cost_breakdown['total']:.4f}") if cost_breakdown['total'] > config.MAX_PROCESSING_COST_USD: response = input(f"Estimated cost (${cost_breakdown['total']:.4f}) exceeds limit (${config.MAX_PROCESSING_COST_USD}). Continue? (y/N): ") if response.lower() != 'y': logging.info("Processing cancelled by user") return except Exception as e: logging.warning(f"Cost estimation failed: {e}") # Determine if input is file or folder if os.path.isdir(filepath): # Batch processing mode logging.info("=== ENHANCED MULTI-MODEL BATCH PROCESSING STARTED ===") await process_batch_documents(filepath, analyzer, args) else: # Single file processing mode logging.info("=== ENHANCED MULTI-MODEL BRIEF PROCESSING STARTED ===") await process_single_document(filepath, analyzer) async def process_batch_documents(folder_path: str, analyzer, args): """Process all supported documents in a folder""" # Discover all supported files document_files = discover_supported_files(folder_path) if not document_files: logging.error(f"No supported documents found in {folder_path}") return logging.info(f"Starting batch processing of {len(document_files)} documents") # Track batch statistics successful_documents = [] failed_documents = [] total_assets = 0 total_cost = 0.0 # Process each document sequentially for i, document_path in enumerate(document_files, 1): document_name = os.path.basename(document_path) # Progress reporting logging.info(f"\\n{'='*60}") logging.info(f"PROCESSING DOCUMENT {i}/{len(document_files)}: {document_name}") logging.info(f"{'='*60}") try: # Process single document using existing logic results = await analyzer.process_document_multi_model(document_path) if results.raw_data: # Generate output file output_path = generate_output_file(document_path, results) # Track success statistics successful_documents.append((document_name, len(results.raw_data), output_path)) total_assets += len(results.raw_data) # Extract cost information if available consolidation_metadata = results.metadata.get('consolidation_metadata', {}) doc_cost = consolidation_metadata.get('cost_breakdown', {}).get('total_cost', 0) total_cost += doc_cost logging.info(f"SUCCESS: {document_name} - {len(results.raw_data)} assets extracted") else: logging.error(f"FAILED: {document_name} - No data extracted") failed_documents.append((document_name, "No data extracted")) except Exception as e: logging.error(f"FAILED: {document_name} - {str(e)}") failed_documents.append((document_name, str(e))) # Final batch summary logging.info(f"\\n{'='*60}") logging.info("BATCH PROCESSING COMPLETE") logging.info(f"{'='*60}") logging.info(f"Documents processed: {len(document_files)}") logging.info(f"Successful: {len(successful_documents)}") logging.info(f"Failed: {len(failed_documents)}") logging.info(f"Total assets extracted: {total_assets}") logging.info(f"Total estimated cost: ${total_cost:.4f}") # Report successful documents if successful_documents: logging.info(f"\\nSUCCESSFUL DOCUMENTS:") for doc_name, asset_count, output_path in successful_documents: logging.info(f" ✅ {doc_name}: {asset_count} assets → {output_path}") # Report failed documents if failed_documents: logging.info(f"\\nFAILED DOCUMENTS:") for doc_name, error in failed_documents: logging.info(f" ❌ {doc_name}: {error}") # Print summary for PHP integration print(f"__BATCH_SUMMARY__:{len(successful_documents)}:{len(failed_documents)}:{total_assets}:{total_cost:.4f}") def generate_output_file(filepath: str, results) -> str: """Generate CSV output file for processed document""" # Generate output path iso_datetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") base_name = os.path.basename(filepath) sanitized_name = os.path.splitext(base_name)[0].replace(' ', '_').replace('.', '_') # Create output directory if it doesn't exist output_dir = 'output' os.makedirs(output_dir, exist_ok=True) output_filename = f"{sanitized_name}-{iso_datetime}.csv" output_path = os.path.join(output_dir, output_filename) # Write CSV file with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=CSV_HEADERS, extrasaction='ignore') writer.writeheader() writer.writerows(results.raw_data) return output_path async def process_single_document(filepath: str, analyzer): """Process a single document (existing logic)""" results = await analyzer.process_document_multi_model(filepath) if not results.raw_data: logging.error("No data extracted from document") return # Generate output file output_path = generate_output_file(filepath, results) # Log processing summary logging.info("=== PROCESSING SUMMARY ===") logging.info(f"Document Type: {results.metadata.get('doc_type', 'unknown')}") logging.info(f"Assets Extracted: {len(results.raw_data)}") logging.info(f"Confidence Score: {results.confidence_score:.2f}") logging.info(f"Processing Notes: {', '.join(results.processing_notes)}") logging.info(f"Output File: {output_path}") # Log cost information from consolidation metadata consolidation_metadata = results.metadata.get('consolidation_metadata', {}) cost_breakdown = consolidation_metadata.get('cost_breakdown', {}) token_usage = consolidation_metadata.get('token_usage', {}) logging.info("=== COST ANALYSIS ===") logging.info(f"Primary Models Used: {', '.join(results.metadata.get('primary_models_used', []))}") logging.info(f"Consolidation Model: {results.metadata.get('consolidation_model', 'Unknown')}") logging.info(f"Primary Analysis Cost: ${cost_breakdown.get('primary_analysis_cost', 0):.4f}") logging.info(f"Consolidation Cost: ${cost_breakdown.get('consolidation_cost', 0):.4f}") logging.info(f"Total Cost: ${cost_breakdown.get('total_cost', 0):.4f}") logging.info(f"Total Tokens: {token_usage.get('grand_total', results.token_usage.get_total()):,}") # Cost info now included in ProcessingResult for GUI integration # Legacy print statements removed as per GUI integration plan total_cost = cost_breakdown.get('total_cost', 0) total_tokens = token_usage.get('grand_total', results.token_usage.get_total()) # Only print for CLI usage (when no progress reporter) if not hasattr(analyzer, '_is_gui_mode'): print(f"__COST_SUMMARY__:{total_cost:.4f}") print(f"__TOKEN_USAGE__:{token_usage.get('primary_analysis_total', 0)}:{token_usage.get('consolidation_tokens', 0)}:{total_tokens}") print(f"__FILENAME__:{output_path}") if __name__ == "__main__": asyncio.run(main())