- Replace sys.exit(1) with ValueError in _validate_models() — prevents killing the worker process on invalid model keys - ModelConfiguration now reads defaults from core.config instead of hardcoding model keys - Fix .env: google-gemini20 → google-gemini31 (align with MODEL_MAPPINGS) - Improve "No data extracted" error message to explain the document must be a marketing brief Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1256 lines
No EOL
54 KiB
Python
Executable file
1256 lines
No EOL
54 KiB
Python
Executable file
import sys
|
|
import os
|
|
import datetime
|
|
import logging
|
|
import json
|
|
import csv
|
|
import re
|
|
import itertools
|
|
import asyncio
|
|
from typing import List, Dict, Any, Optional, Tuple, Union
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from pydantic import BaseModel
|
|
|
|
# File Processing Libraries
|
|
import pptx
|
|
import pandas as pd
|
|
import fitz # PyMuPDF
|
|
from PIL import Image
|
|
import docx
|
|
from openpyxl import load_workbook
|
|
|
|
# AI Libraries
|
|
import json5
|
|
import base64
|
|
|
|
# Configuration and LLM Services
|
|
from .config import config
|
|
from .llm_service import ProviderManager, LLMResponse
|
|
from .consolidation_processor import ConsolidationProcessor
|
|
|
|
# OpenAI GPT-5.1 Pricing (per 1M tokens)
|
|
OPENAI_PRICING = {
|
|
'gpt-5.1': {
|
|
'input': 1.25,
|
|
'cached_input': 0.625,
|
|
'output': 10.00
|
|
}
|
|
}
|
|
|
|
CSV_HEADERS = [
|
|
'title', 'status', 'category', 'media', 'asset_type',
|
|
'brand_identifier', 'technical_specifications', 'review_date', 'live_date',
|
|
'end_date', 'reference_material', 'language_country_market',
|
|
'quantity', 'page_number', 'priority_level',
|
|
'creative_direction'
|
|
]
|
|
|
|
# Base deliverable with mixed field types (strings for metadata, arrays for multipliers)
|
|
class BaseDeliverable(BaseModel):
|
|
title: str
|
|
status: Optional[str] = ""
|
|
category: Optional[str] = ""
|
|
media: Optional[str] = ""
|
|
asset_type: Optional[str] = ""
|
|
brand_identifier: Optional[str] = ""
|
|
technical_specifications: Optional[List[str]] = []
|
|
review_date: Optional[str] = ""
|
|
live_date: Optional[str] = ""
|
|
end_date: Optional[str] = ""
|
|
reference_material: Optional[str] = ""
|
|
language_country_market: Optional[List[str]] = []
|
|
quantity: Optional[str] = "1"
|
|
page_number: Optional[str] = ""
|
|
priority_level: Optional[str] = ""
|
|
creative_direction: Optional[str] = ""
|
|
|
|
# Individual marketing asset (for final CSV output)
|
|
class MarketingAsset(BaseModel):
|
|
title: str
|
|
status: Optional[str] = ""
|
|
category: Optional[str] = ""
|
|
media: Optional[str] = ""
|
|
asset_type: Optional[str] = ""
|
|
brand_identifier: Optional[str] = ""
|
|
technical_specifications: Optional[str] = ""
|
|
review_date: Optional[str] = ""
|
|
live_date: Optional[str] = ""
|
|
end_date: Optional[str] = ""
|
|
reference_material: Optional[str] = ""
|
|
language_country_market: Optional[str] = ""
|
|
quantity: Optional[str] = "1"
|
|
page_number: Optional[str] = ""
|
|
priority_level: Optional[str] = ""
|
|
creative_direction: Optional[str] = ""
|
|
|
|
# Base extraction result (from LLM)
|
|
class BaseExtractionResult(BaseModel):
|
|
assets: List[BaseDeliverable]
|
|
|
|
# Final extraction result (expanded individual assets)
|
|
class AssetExtractionResult(BaseModel):
|
|
assets: List[MarketingAsset]
|
|
|
|
# Load universal schema from external file
|
|
def _load_universal_schema():
|
|
"""Load universal schema from JSON file"""
|
|
try:
|
|
# Go up one level from core/ to find prompts/
|
|
schema_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts', 'universal_schema.json')
|
|
with open(schema_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logging.error(f"Error loading universal schema: {e}")
|
|
raise
|
|
|
|
# Universal schema for base deliverable extraction (works with all providers)
|
|
UNIVERSAL_BASE_DELIVERABLE_SCHEMA = _load_universal_schema()
|
|
|
|
# Legacy schema maintained for backward compatibility
|
|
OPENAI_ASSET_SCHEMA = {
|
|
"name": "asset_extraction",
|
|
"description": "Extract assets from document analysis",
|
|
"schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"assets": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"title": {"type": "string", "description": "Asset title or name"},
|
|
"status": {"type": "string", "description": "Current status"},
|
|
"category": {"type": "string", "description": "Asset category"},
|
|
"media": {"type": "string", "description": "Media type"},
|
|
"asset_type": {"type": "string", "description": "Specific asset type"},
|
|
"brand_identifier": {"type": "string", "description": "Brand or client"},
|
|
"technical_specifications": {"type": "string", "description": "Technical specifications including dimensions (e.g., '1080x1920', '1920x1080'), descriptive formats (e.g., 'Mobile Banner', 'Desktop Hero'), file formats, technical requirements, or any other technical details"},
|
|
"review_date": {"type": "string", "description": "Review deadline"},
|
|
"live_date": {"type": "string", "description": "Go-live date"},
|
|
"end_date": {"type": "string", "description": "End/expiry date"},
|
|
"reference_material": {"type": "string", "description": "Detailed requirements"},
|
|
"language": {"type": "string", "description": "Target language"},
|
|
"country": {"type": "string", "description": "Target country/region"},
|
|
"quantity": {"type": "string", "description": "Number of assets"},
|
|
"page_number": {"type": "string", "description": "Source page"},
|
|
"priority_level": {"type": "string", "description": "Business priority"},
|
|
"creative_direction": {"type": "string", "description": "Design requirements"}
|
|
},
|
|
"required": ["title", "technical_specifications"],
|
|
"additionalProperties": False
|
|
}
|
|
}
|
|
},
|
|
"required": ["assets"],
|
|
"additionalProperties": False
|
|
}
|
|
}
|
|
|
|
# Legacy Gemini Schema (keep for backward compatibility)
|
|
GEMINI_ASSET_SCHEMA = {
|
|
"type": "object",
|
|
"properties": {
|
|
"assets": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"title": {"type": "string", "description": "Asset title or name"},
|
|
"status": {"type": "string", "description": "Current status"},
|
|
"category": {"type": "string", "description": "Asset category"},
|
|
"media": {"type": "string", "description": "Media type"},
|
|
"asset_type": {"type": "string", "description": "Specific asset type"},
|
|
"brand_identifier": {"type": "string", "description": "Brand or client"},
|
|
"technical_specifications": {"type": "string", "description": "Technical specifications including format/dimensions, file formats, technical requirements, or any other technical details"},
|
|
"review_date": {"type": "string", "description": "Review deadline"},
|
|
"live_date": {"type": "string", "description": "Go-live date"},
|
|
"end_date": {"type": "string", "description": "End/expiry date"},
|
|
"reference_material": {"type": "string", "description": "Detailed requirements"},
|
|
"language": {"type": "string", "description": "Target language"},
|
|
"country": {"type": "string", "description": "Target country/region"},
|
|
"quantity": {"type": "string", "description": "Number of assets"},
|
|
"page_number": {"type": "string", "description": "Source page"},
|
|
"priority_level": {"type": "string", "description": "Business priority"},
|
|
"creative_direction": {"type": "string", "description": "Design requirements"}
|
|
},
|
|
"required": ["title", "technical_specifications"]
|
|
}
|
|
}
|
|
},
|
|
"required": ["assets"]
|
|
}
|
|
|
|
class DocumentType(Enum):
|
|
POWERPOINT = "powerpoint"
|
|
WORD = "word"
|
|
PDF = "pdf"
|
|
EXCEL = "excel"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class TokenUsage:
|
|
input_tokens: int = 0
|
|
cached_input_tokens: int = 0
|
|
output_tokens: int = 0
|
|
|
|
def add_usage(self, usage_dict: Dict[str, int]):
|
|
"""Add token usage from OpenAI Responses API"""
|
|
# Support both old (Chat Completions) and new (Responses API) field names
|
|
self.input_tokens += usage_dict.get('prompt_tokens', usage_dict.get('input_tokens', 0))
|
|
self.cached_input_tokens += usage_dict.get('prompt_tokens_cached', usage_dict.get('input_tokens_cached', 0))
|
|
self.output_tokens += usage_dict.get('completion_tokens', usage_dict.get('output_tokens', 0))
|
|
|
|
def calculate_cost(self, model_name: str) -> float:
|
|
"""Calculate total cost based on GPT-5 pricing"""
|
|
if model_name not in OPENAI_PRICING:
|
|
logging.warning(f"No pricing info for model {model_name}, defaulting to gpt-5")
|
|
model_name = 'gpt-5'
|
|
|
|
pricing = OPENAI_PRICING[model_name]
|
|
|
|
# Calculate cost per component (pricing is per 1M tokens)
|
|
input_cost = (self.input_tokens / 1_000_000) * pricing['input']
|
|
cached_cost = (self.cached_input_tokens / 1_000_000) * pricing['cached_input']
|
|
output_cost = (self.output_tokens / 1_000_000) * pricing['output']
|
|
|
|
return input_cost + cached_cost + output_cost
|
|
|
|
def get_summary(self, model_name: str) -> Dict[str, Any]:
|
|
"""Get detailed cost breakdown"""
|
|
total_cost = self.calculate_cost(model_name)
|
|
|
|
return {
|
|
'input_tokens': self.input_tokens,
|
|
'cached_input_tokens': self.cached_input_tokens,
|
|
'output_tokens': self.output_tokens,
|
|
'total_tokens': self.input_tokens + self.cached_input_tokens + self.output_tokens,
|
|
'total_cost_usd': round(total_cost, 4),
|
|
'cost_breakdown': {
|
|
'input_cost': round((self.input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['input'], 4),
|
|
'cached_input_cost': round((self.cached_input_tokens / 1_000_000) * OPENAI_PRICING[model_name]['cached_input'], 4),
|
|
'output_cost': round((self.output_tokens / 1_000_000) * OPENAI_PRICING[model_name]['output'], 4)
|
|
}
|
|
}
|
|
|
|
@dataclass
|
|
class ProcessingResult:
|
|
raw_data: List[Dict[str, Any]]
|
|
metadata: Dict[str, Any]
|
|
confidence_score: float
|
|
processing_notes: List[str]
|
|
token_usage: TokenUsage
|
|
|
|
def create_unique_title(base_title: str, multiplier_values: Dict[str, str], max_suffix_length: int = 40) -> str:
|
|
"""Create unique title by appending abbreviated multiplier values"""
|
|
|
|
def csv_safe_string(text: str) -> str:
|
|
"""Replace special characters for CSV compatibility"""
|
|
replacements = {
|
|
',': '_',
|
|
'"': '',
|
|
"'": '',
|
|
'\n': '_',
|
|
'\r': '_',
|
|
'\t': '_'
|
|
}
|
|
|
|
result = text
|
|
for old, new in replacements.items():
|
|
result = result.replace(old, new)
|
|
return result
|
|
|
|
def abbreviate_value(value: str, max_length: int) -> str:
|
|
"""Intelligently abbreviate a value while preserving meaning"""
|
|
if len(value) <= max_length:
|
|
return value
|
|
|
|
# For technical specs, try to preserve dimensions
|
|
if 'x' in value and value.replace('x', '').replace('-', '').isdigit():
|
|
# It's a dimension - keep as much as possible
|
|
return value[:max_length]
|
|
|
|
# For market codes, prioritize country part
|
|
if '-' in value:
|
|
parts = value.split('-')
|
|
if len(parts) >= 2:
|
|
# Try to keep country code at least
|
|
country = parts[-1]
|
|
if len(country) <= max_length:
|
|
return country
|
|
|
|
# Fallback: simple truncation
|
|
return value[:max_length]
|
|
|
|
# Extract variable values in priority order: market first, then specs
|
|
suffix_parts = []
|
|
|
|
# Priority 1: Market (language_country_market)
|
|
if 'language_country_market' in multiplier_values:
|
|
market = multiplier_values['language_country_market']
|
|
suffix_parts.append(abbreviate_value(market, 15))
|
|
|
|
# Priority 2: Technical specifications
|
|
if 'technical_specifications' in multiplier_values:
|
|
spec = multiplier_values['technical_specifications']
|
|
suffix_parts.append(abbreviate_value(spec, 20))
|
|
|
|
# Create suffix within length limit
|
|
if suffix_parts:
|
|
suffix = '_'.join(suffix_parts)
|
|
if len(suffix) > max_suffix_length:
|
|
# Truncate proportionally
|
|
if len(suffix_parts) == 2:
|
|
market_part = suffix_parts[0][:15]
|
|
spec_part = suffix_parts[1][:20]
|
|
suffix = f"{market_part}_{spec_part}"
|
|
else:
|
|
suffix = suffix[:max_suffix_length]
|
|
|
|
unique_title = f"{base_title}_{suffix}"
|
|
else:
|
|
unique_title = base_title
|
|
|
|
# Ensure CSV safety
|
|
return csv_safe_string(unique_title)
|
|
|
|
def expand_deliverables(base_deliverables: List[BaseDeliverable]) -> Tuple[List[MarketingAsset], List[str]]:
|
|
"""
|
|
Expand base deliverables with multiplier arrays into individual MarketingAsset objects.
|
|
Returns: (expanded_assets, warnings)
|
|
"""
|
|
expanded_assets = []
|
|
warnings = []
|
|
|
|
for base in base_deliverables:
|
|
# Convert base deliverable to dict for easier processing
|
|
base_dict = base.model_dump()
|
|
|
|
# Identify fields with arrays (multipliers)
|
|
multiplier_fields = {}
|
|
single_fields = {}
|
|
|
|
# Define which fields are multipliers (arrays) vs metadata (strings)
|
|
multiplier_field_names = {'technical_specifications', 'language_country_market'}
|
|
|
|
for field, value in base_dict.items():
|
|
if field in multiplier_field_names:
|
|
# Multiplier fields should be arrays
|
|
if isinstance(value, list) and len(value) > 0:
|
|
# Skip empty arrays and arrays with only empty strings
|
|
if any(v.strip() for v in value if v): # Has non-empty values
|
|
multiplier_fields[field] = [v for v in value if v.strip()] # Filter out empty strings
|
|
else:
|
|
single_fields[field] = None # Empty array becomes None
|
|
elif isinstance(value, str) and value.strip():
|
|
# Single string value becomes single-item array for multiplier field
|
|
multiplier_fields[field] = [value]
|
|
else:
|
|
single_fields[field] = None
|
|
else:
|
|
# Non-multiplier fields should be strings
|
|
if isinstance(value, str) and value.strip():
|
|
single_fields[field] = value
|
|
elif isinstance(value, list) and len(value) > 0:
|
|
# If somehow we get an array for a string field, take the first value
|
|
single_fields[field] = next((v for v in value if v.strip()), None)
|
|
else:
|
|
single_fields[field] = None
|
|
|
|
# If no multiplier fields, create single asset
|
|
if not multiplier_fields:
|
|
asset_data = {**single_fields, "quantity": "1"}
|
|
expanded_assets.append(MarketingAsset(**asset_data))
|
|
continue
|
|
|
|
# Calculate expected count from quantity field if present
|
|
expected_quantity = None
|
|
if 'quantity' in base_dict and base_dict['quantity']:
|
|
try:
|
|
# Quantity is now a string field
|
|
quantity_str = str(base_dict['quantity'])
|
|
if quantity_str and quantity_str != "1":
|
|
expected_quantity = int(quantity_str)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Generate all combinations using itertools.product
|
|
field_names = list(multiplier_fields.keys())
|
|
field_values = [multiplier_fields[field] for field in field_names]
|
|
|
|
combinations = list(itertools.product(*field_values))
|
|
actual_count = len(combinations)
|
|
|
|
# Validate quantity if expected quantity was specified
|
|
if expected_quantity and actual_count != expected_quantity:
|
|
warnings.append(
|
|
f"Quantity mismatch for '{base.title}': expected {expected_quantity}, "
|
|
f"but expansion created {actual_count} deliverables"
|
|
)
|
|
|
|
# Create individual assets for each combination
|
|
for combo in combinations:
|
|
asset_data = single_fields.copy()
|
|
|
|
# Assign multiplier values from this combination
|
|
multiplier_combination = {}
|
|
for i, field_name in enumerate(field_names):
|
|
asset_data[field_name] = combo[i]
|
|
multiplier_combination[field_name] = combo[i]
|
|
|
|
# Generate unique title with multiplier values
|
|
unique_title = create_unique_title(base.title, multiplier_combination)
|
|
asset_data["title"] = unique_title
|
|
|
|
# Ensure quantity is always "1" for individual assets
|
|
asset_data["quantity"] = "1"
|
|
|
|
try:
|
|
expanded_assets.append(MarketingAsset(**asset_data))
|
|
except Exception as e:
|
|
warnings.append(f"Error creating asset for '{base.title}': {e}")
|
|
|
|
# Log concise expansion summary (reduced verbosity)
|
|
expanding_fields = {field: values for field, values in multiplier_fields.items() if len(values) > 1}
|
|
|
|
if expanding_fields:
|
|
logging.debug(f"EXPANDED '{base.title}': {actual_count} deliverables from {len(expanding_fields)} multiplier fields")
|
|
else:
|
|
logging.debug(f"EXPANDED '{base.title}': {actual_count} deliverable (no multipliers)")
|
|
|
|
return expanded_assets, warnings
|
|
|
|
class DocumentAnalyzer:
|
|
def __init__(self, primary_models: List[str] = None, consolidation_model: str = None):
|
|
self.primary_models = primary_models or config.get_default_primary_models()
|
|
self.consolidation_model = consolidation_model or config.DEFAULT_CONSOLIDATION_MODEL
|
|
self.provider_manager = ProviderManager()
|
|
self.consolidation_processor = ConsolidationProcessor()
|
|
self.token_usage = TokenUsage()
|
|
|
|
# Validate models
|
|
self._validate_models()
|
|
|
|
def _validate_models(self):
|
|
"""Validate that specified models are available and configured"""
|
|
valid_models = list(config.MODEL_MAPPINGS.keys())
|
|
|
|
# Validate primary models
|
|
for model in self.primary_models:
|
|
if model not in valid_models:
|
|
raise ValueError(f"Invalid primary model: {model}. Available: {valid_models}")
|
|
|
|
# Validate consolidation model
|
|
if self.consolidation_model not in valid_models:
|
|
raise ValueError(f"Invalid consolidation model: {self.consolidation_model}. Available: {valid_models}")
|
|
|
|
# Validate API keys
|
|
api_key_status = config.validate_api_keys()
|
|
missing_keys = [provider for provider, valid in api_key_status.items() if not valid]
|
|
|
|
if missing_keys:
|
|
logging.warning(f"Missing API keys for: {missing_keys}")
|
|
|
|
logging.info(f"Using primary models: {self.primary_models}")
|
|
logging.info(f"Using consolidation model: {self.consolidation_model}")
|
|
|
|
async def _load_prompt(self, prompt_name: str) -> str:
|
|
"""Load prompt from external file asynchronously."""
|
|
import asyncio
|
|
|
|
def _read_prompt():
|
|
"""Blocking prompt read operation for thread pool"""
|
|
# Go up one level from core/ to find prompts/
|
|
prompt_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts', f'{prompt_name}.txt')
|
|
with open(prompt_path, 'r', encoding='utf-8') as f:
|
|
return f.read().strip()
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
content = await loop.run_in_executor(None, _read_prompt)
|
|
return content
|
|
except FileNotFoundError:
|
|
logging.error(f"Prompt file not found: {prompt_name}")
|
|
raise
|
|
except Exception as e:
|
|
logging.error(f"Error loading prompt {prompt_name}: {e}")
|
|
raise
|
|
|
|
async def _save_base_deliverables_json(self, base_deliverables: List[BaseDeliverable], doc_type: str):
|
|
"""Save intermediate base deliverables with multiplier arrays as JSON."""
|
|
import asyncio
|
|
|
|
def _write_json():
|
|
"""Blocking JSON write operation for thread pool"""
|
|
# Create base_deliverable_JSON directory if it doesn't exist
|
|
json_dir = os.path.join(os.path.dirname(__file__), 'base_deliverable_JSON')
|
|
os.makedirs(json_dir, exist_ok=True)
|
|
|
|
# Generate timestamp for filename
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
filename = f"base_deliverables_{doc_type}_{timestamp}.json"
|
|
filepath = os.path.join(json_dir, filename)
|
|
|
|
# Convert base deliverables to serializable format
|
|
json_data = {
|
|
"timestamp": timestamp,
|
|
"document_type": doc_type,
|
|
"base_deliverables_count": len(base_deliverables),
|
|
"base_deliverables": [base.model_dump() for base in base_deliverables]
|
|
}
|
|
|
|
# Write JSON file
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(json_data, f, indent=2, ensure_ascii=False)
|
|
|
|
logging.info(f"Saved base deliverables JSON: {filepath}")
|
|
return filepath
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
await loop.run_in_executor(None, _write_json)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Failed to save base deliverables JSON: {e}")
|
|
|
|
def classify_document(self, filepath: str) -> DocumentType:
|
|
"""Classify document type based on extension and content."""
|
|
extension = os.path.splitext(filepath)[1].lower()
|
|
|
|
if extension in ['.ppt', '.pptx']:
|
|
return DocumentType.POWERPOINT
|
|
elif extension in ['.doc', '.docx']:
|
|
return DocumentType.WORD
|
|
elif extension == '.pdf':
|
|
return DocumentType.PDF
|
|
elif extension in ['.xls', '.xlsx']:
|
|
return DocumentType.EXCEL
|
|
else:
|
|
return DocumentType.UNKNOWN
|
|
|
|
def _encode_file_for_openai(self, filepath: str) -> str:
|
|
"""Encode file content for OpenAI API."""
|
|
try:
|
|
with open(filepath, "rb") as file:
|
|
return base64.b64encode(file.read()).decode('utf-8')
|
|
except Exception as e:
|
|
logging.error(f"Error encoding file for OpenAI: {e}")
|
|
return None
|
|
|
|
def _extract_document_content_local(self, filepath: str) -> str:
|
|
"""Local fallback extraction using PyMuPDF / python-pptx / python-docx / openpyxl."""
|
|
ext = os.path.splitext(filepath)[1].lower()
|
|
logging.info(f"Local extraction for {os.path.basename(filepath)} (ext={ext})")
|
|
|
|
if ext == '.pdf':
|
|
doc = fitz.open(filepath)
|
|
pages = []
|
|
for i, page in enumerate(doc, 1):
|
|
text = page.get_text("text")
|
|
if text.strip():
|
|
pages.append(f"--- Page {i} ---\n{text}")
|
|
doc.close()
|
|
return "\n\n".join(pages) or "No text content found in PDF."
|
|
|
|
elif ext in ('.pptx', '.ppt'):
|
|
prs = pptx.Presentation(filepath)
|
|
slides = []
|
|
for i, slide in enumerate(prs.slides, 1):
|
|
texts = []
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
texts.append(shape.text.strip())
|
|
if texts:
|
|
slides.append(f"--- Slide {i} ---\n" + "\n".join(texts))
|
|
return "\n\n".join(slides) or "No text content found in presentation."
|
|
|
|
elif ext in ('.docx', '.doc'):
|
|
document = docx.Document(filepath)
|
|
paragraphs = [p.text for p in document.paragraphs if p.text.strip()]
|
|
for table in document.tables:
|
|
for row in table.rows:
|
|
row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
|
|
if row_text:
|
|
paragraphs.append(row_text)
|
|
return "\n".join(paragraphs) or "No text content found in document."
|
|
|
|
elif ext in ('.xlsx', '.xls'):
|
|
wb = load_workbook(filepath, read_only=True, data_only=True)
|
|
sheets_text = []
|
|
for sheet_name in wb.sheetnames:
|
|
ws = wb[sheet_name]
|
|
rows = []
|
|
for row in ws.iter_rows(values_only=True):
|
|
cells = [str(c) for c in row if c is not None and str(c).strip()]
|
|
if cells:
|
|
rows.append(" | ".join(cells))
|
|
if rows:
|
|
sheets_text.append(f"--- Sheet: {sheet_name} ---\n" + "\n".join(rows))
|
|
wb.close()
|
|
return "\n\n".join(sheets_text) or "No content found in spreadsheet."
|
|
|
|
else:
|
|
raise Exception(f"Unsupported file type for local extraction: {ext}")
|
|
|
|
async def _extract_document_content(self, filepath: str) -> str:
|
|
"""Extract content from document — LlamaParser if key is configured, else local fallback."""
|
|
if not config.LLAMACLOUD_API_KEY:
|
|
logging.warning("LLAMACLOUD_API_KEY not set — using local document extraction")
|
|
return self._extract_document_content_local(filepath)
|
|
|
|
try:
|
|
from llama_cloud_services import LlamaParse
|
|
|
|
logging.info(f"Using LlamaParser to extract content from: {os.path.basename(filepath)}")
|
|
|
|
parser = LlamaParse(
|
|
api_key=config.LLAMACLOUD_API_KEY,
|
|
parse_mode="parse_page_with_agent",
|
|
model="openai-gpt-5",
|
|
high_res_ocr=True,
|
|
adaptive_long_table=True,
|
|
outlined_table_extraction=True,
|
|
output_tables_as_HTML=True,
|
|
page_separator="\n\n---\n\n",
|
|
)
|
|
|
|
result = await parser.aparse(filepath)
|
|
markdown_documents = result.get_markdown_documents(split_by_page=True)
|
|
combined_content = "\n\n".join([doc.text for doc in markdown_documents])
|
|
|
|
logging.info(f"LlamaParser extraction completed. Content length: {len(combined_content)} characters")
|
|
return combined_content
|
|
|
|
except Exception as e:
|
|
logging.error(f"LlamaParser failed: {e} — falling back to local extraction")
|
|
return self._extract_document_content_local(filepath)
|
|
|
|
|
|
|
|
async def process_document_multi_model(self, filepath: str, progress=None) -> ProcessingResult:
|
|
"""Process document using parallel multi-model analysis with consolidation."""
|
|
logging.info(f"Starting parallel multi-model analysis of '{os.path.basename(filepath)}'")
|
|
|
|
# Import JobPhase if progress reporting is enabled
|
|
if progress:
|
|
try:
|
|
from server.jobs.models import JobPhase
|
|
except ImportError:
|
|
# Fallback for CLI usage - create mock enum
|
|
class JobPhase:
|
|
EXTRACT_CONTENT = 'EXTRACT_CONTENT'
|
|
LLM_ANALYSIS = 'LLM_ANALYSIS'
|
|
CONSOLIDATION = 'CONSOLIDATION'
|
|
|
|
# Progress: EXTRACT_CONTENT 0% → 25%
|
|
if progress:
|
|
await progress.emit(JobPhase.EXTRACT_CONTENT, 10, f'Starting analysis of {os.path.basename(filepath)}')
|
|
|
|
# Stage 1: Extract document content using LlamaParser
|
|
try:
|
|
document_content = await self._extract_document_content(filepath)
|
|
logging.info(f"Document content extracted using LlamaParser")
|
|
if progress:
|
|
await progress.emit(JobPhase.EXTRACT_CONTENT, 25, 'Document content extracted successfully')
|
|
except Exception as e:
|
|
logging.error(f"Content extraction failed: {e}")
|
|
if progress:
|
|
await progress.emit_failure(f"Content extraction failed: {e}")
|
|
return ProcessingResult([], {}, 0.0, [f"Content extraction failed: {e}"], TokenUsage())
|
|
|
|
# Stage 2: Parallel multi-model analysis
|
|
# Progress: LLM_ANALYSIS 25% → 75% (50% weight)
|
|
if progress:
|
|
await progress.emit(JobPhase.LLM_ANALYSIS, 30, 'Starting parallel multi-model analysis')
|
|
|
|
logging.info("=== STAGE 2: Starting Parallel Multi-Model Analysis ===")
|
|
doc_type = self.classify_document(filepath)
|
|
|
|
try:
|
|
analysis_responses, analysis_metadata = await self._perform_parallel_analysis(
|
|
document_content, doc_type, progress
|
|
)
|
|
logging.info(f"Parallel analysis completed - {len(analysis_responses)} successful models")
|
|
if progress:
|
|
await progress.emit(JobPhase.LLM_ANALYSIS, 75, f'Parallel analysis completed - {len(analysis_responses)} successful models')
|
|
except Exception as e:
|
|
logging.error(f"Parallel analysis failed: {e}")
|
|
if progress:
|
|
await progress.emit_failure(f"Parallel analysis failed: {e}")
|
|
return ProcessingResult([], {}, 0.0, [f"Parallel analysis failed: {e}"], TokenUsage())
|
|
|
|
# Stage 3: Consolidation
|
|
# Progress: CONSOLIDATION 75% → 90% (15% weight)
|
|
if progress:
|
|
await progress.emit(JobPhase.CONSOLIDATION, 75, 'Starting result consolidation')
|
|
|
|
logging.info("=== STAGE 3: Starting Result Consolidation ===")
|
|
try:
|
|
consolidation_result = await self.consolidation_processor.consolidate_results(
|
|
analysis_responses, self.consolidation_model, document_content
|
|
)
|
|
logging.info(f"Consolidation completed: {len(consolidation_result.expanded_assets)} final deliverables")
|
|
if progress:
|
|
await progress.emit(JobPhase.CONSOLIDATION, 90, f'Consolidation completed: {len(consolidation_result.expanded_assets)} final assets')
|
|
except Exception as e:
|
|
logging.error(f"Consolidation failed: {e}")
|
|
if progress:
|
|
await progress.emit_failure(f"Consolidation failed: {e}")
|
|
return ProcessingResult([], {}, 0.0, [f"Consolidation failed: {e}"], TokenUsage())
|
|
|
|
# Convert expanded assets to dict format for compatibility
|
|
extracted_data = [asset.model_dump() for asset in consolidation_result.expanded_assets]
|
|
|
|
# Aggregate token usage from all models
|
|
total_token_usage = self.provider_manager.get_aggregated_token_usage(analysis_responses)
|
|
|
|
# Combine processing notes
|
|
successful_count = analysis_metadata.get('successful_models', len(analysis_responses))
|
|
total_count = analysis_metadata.get('total_models_attempted', len(self.primary_models))
|
|
processing_notes = [f"Parallel analysis: {successful_count}/{total_count} models"]
|
|
processing_notes.extend(consolidation_result.warnings)
|
|
|
|
# Merge metadata
|
|
combined_metadata = {
|
|
'doc_type': doc_type.value,
|
|
'primary_models_used': self.primary_models,
|
|
'consolidation_model': self.consolidation_model,
|
|
'analysis_metadata': analysis_metadata,
|
|
'consolidation_metadata': consolidation_result.consolidation_metadata
|
|
}
|
|
|
|
return ProcessingResult(
|
|
raw_data=extracted_data,
|
|
metadata=combined_metadata,
|
|
confidence_score=0.9, # Higher confidence due to multi-model consensus
|
|
processing_notes=processing_notes,
|
|
token_usage=total_token_usage
|
|
)
|
|
|
|
async def _perform_parallel_analysis(self, document_content: str, doc_type: DocumentType, progress=None) -> Tuple[List[LLMResponse], Dict[str, Any]]:
|
|
"""Perform parallel analysis across multiple models"""
|
|
|
|
# Load prompt from external file
|
|
multi_perspective_prompt_template = await self._load_prompt('multi_perspective_analysis')
|
|
multi_perspective_prompt = multi_perspective_prompt_template.format(doc_type=doc_type.value)
|
|
|
|
# Load system message from external file
|
|
system_message = await self._load_prompt('system_multi_perspective')
|
|
|
|
# Prepare combined prompt
|
|
combined_prompt = f"{multi_perspective_prompt}\n\nDocument Content:\n{document_content}"
|
|
|
|
# Prepare messages for all providers
|
|
messages = [
|
|
{"role": "system", "content": system_message},
|
|
{"role": "user", "content": combined_prompt}
|
|
]
|
|
|
|
# Get schema for structured output
|
|
schema = UNIVERSAL_BASE_DELIVERABLE_SCHEMA
|
|
|
|
# Create progress callback for provider updates
|
|
progress_callback = None
|
|
if progress:
|
|
progress_callback = self._create_provider_progress_callback(progress)
|
|
|
|
# Execute parallel analysis
|
|
successful_responses, metadata = await self.provider_manager.execute_parallel_analysis(
|
|
model_keys=self.primary_models,
|
|
messages=messages,
|
|
schema=schema,
|
|
minimum_success_threshold=config.MINIMUM_SUCCESS_THRESHOLD,
|
|
on_model_event=progress_callback
|
|
)
|
|
|
|
return successful_responses, metadata
|
|
|
|
def _create_provider_progress_callback(self, progress):
|
|
"""Create callback function for provider progress updates"""
|
|
async def on_model_event(model_key: str, stage: str, data):
|
|
try:
|
|
if stage == 'start':
|
|
await progress.emit_provider_update(model_key, {
|
|
'provider': self._get_provider_name(model_key),
|
|
'model': self._get_model_display_name(model_key),
|
|
'status': 'started',
|
|
'startedAt': data.get('timestamp') if data else None
|
|
})
|
|
|
|
elif stage == 'end':
|
|
if 'error' in data:
|
|
await progress.emit_provider_update(model_key, {
|
|
'provider': self._get_provider_name(model_key),
|
|
'model': self._get_model_display_name(model_key),
|
|
'status': 'error',
|
|
'error': str(data['error']),
|
|
'completedAt': data.get('timestamp') if data else None
|
|
})
|
|
else:
|
|
response = data.get('response')
|
|
cost = data.get('cost', 0)
|
|
|
|
if response:
|
|
await progress.emit_provider_update(model_key, {
|
|
'provider': self._get_provider_name(model_key),
|
|
'model': self._get_model_display_name(model_key),
|
|
'status': 'success',
|
|
'completedAt': data.get('timestamp'),
|
|
'latencyMs': response.processing_time * 1000 if response.processing_time else None,
|
|
'tokensIn': response.token_usage.input_tokens,
|
|
'tokensOut': response.token_usage.output_tokens,
|
|
'tokensCached': response.token_usage.cached_input_tokens,
|
|
'costUsd': cost
|
|
})
|
|
|
|
# Update overall LLM_ANALYSIS progress (25% + completed/total * 50%)
|
|
completed_count = len([
|
|
p for p in progress.job.provider_updates.values()
|
|
if p.status in ['success', 'error']
|
|
])
|
|
total_count = len(self.primary_models)
|
|
analysis_progress = 25 + (completed_count / total_count) * 50
|
|
|
|
await progress.emit('LLM_ANALYSIS', int(analysis_progress),
|
|
f'Analysis progress: {completed_count}/{total_count} models complete')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in provider progress callback: {e}")
|
|
|
|
return on_model_event
|
|
|
|
def _get_provider_name(self, model_key: str) -> str:
|
|
"""Get provider name from model key"""
|
|
try:
|
|
provider_name, _ = config.get_model_info(model_key)
|
|
return provider_name
|
|
except:
|
|
return model_key.split('-')[0] if '-' in model_key else 'unknown'
|
|
|
|
def _get_model_display_name(self, model_key: str) -> str:
|
|
"""Get display name for model"""
|
|
display_names = {
|
|
'openai-gpt51': 'GPT-5.1',
|
|
'anthropic-opus45': 'Claude Opus 4.5',
|
|
'anthropic-sonnet45': 'Claude Sonnet 4.5',
|
|
'google-gemini31': 'Gemini 3.1 Pro'
|
|
}
|
|
return display_names.get(model_key, model_key)
|
|
|
|
async def _enhance_and_validate_results(self, uploaded_file, initial_results: ProcessingResult) -> ProcessingResult:
|
|
"""Enhance results with cross-validation and gap analysis."""
|
|
|
|
if not initial_results.raw_data:
|
|
return initial_results
|
|
|
|
# Load validation prompt from external file
|
|
validation_prompt_template = await self._load_prompt('validation_analysis')
|
|
validation_prompt = validation_prompt_template.format(
|
|
asset_count=len(initial_results.raw_data),
|
|
doc_type=initial_results.metadata.get('doc_type', 'unknown')
|
|
)
|
|
|
|
try:
|
|
# For GPT-5 using Responses API with reasoning_effort
|
|
combined_prompt = f"{validation_prompt}\n\nDocument Content:\n{uploaded_file}"
|
|
logging.info(f"=== CALLING OPENAI RESPONSES API: /v1/responses (Validation parse) ===")
|
|
logging.info(f"Model: {self.model_name}, Reasoning Effort: {self.reasoning_effort}")
|
|
# Load system message from external file
|
|
system_validation_message = await self._load_prompt('system_validation')
|
|
|
|
response = self.model.responses.parse(
|
|
model=self.model_name,
|
|
input=[
|
|
{"role": "system", "content": system_validation_message},
|
|
{"role": "user", "content": combined_prompt}
|
|
],
|
|
reasoning={"effort": self.reasoning_effort},
|
|
text_format=AssetExtractionResult
|
|
)
|
|
logging.info(f"=== RESPONSES API CALL COMPLETED SUCCESSFULLY (Validation) ===")
|
|
|
|
# Track token usage for GPT-5 Responses API
|
|
if hasattr(response, 'usage'):
|
|
usage_dict = {
|
|
'input_tokens': response.usage.input_tokens,
|
|
'output_tokens': response.usage.output_tokens,
|
|
'input_tokens_cached': getattr(response.usage, 'input_tokens_cached', 0)
|
|
}
|
|
self.token_usage.add_usage(usage_dict)
|
|
logging.info(f"Validation Analysis - Tokens: {usage_dict['input_tokens']} input, {usage_dict['output_tokens']} output")
|
|
|
|
# Extract parsed data from Responses API format
|
|
parsed_result = response.output_parsed
|
|
logging.info(f"GPT-5 Validation Analysis - Parsed {len(parsed_result.assets)} additional assets")
|
|
|
|
additional_data = [asset.model_dump() for asset in parsed_result.assets]
|
|
logging.info(f"VALIDATION DEBUG - Extracted data type: {type(additional_data)}, length: {len(additional_data)}")
|
|
|
|
if additional_data and len(additional_data) > 0:
|
|
logging.info(f"Validation found {len(additional_data)} additional assets")
|
|
logging.info(f"VALIDATION DEBUG - Adding assets: {[asset.get('title', 'No title') for asset in additional_data]}")
|
|
initial_results.raw_data.extend(additional_data)
|
|
initial_results.processing_notes.append(f"Added {len(additional_data)} assets from validation")
|
|
logging.info(f"VALIDATION DEBUG - Total assets after validation: {len(initial_results.raw_data)}")
|
|
else:
|
|
logging.info("Validation confirmed extraction completeness")
|
|
initial_results.confidence_score = 0.95
|
|
initial_results.processing_notes.append("Validation confirmed completeness")
|
|
if additional_data is not None:
|
|
logging.info(f"VALIDATION DEBUG - Validation returned empty array (expected when extraction is complete)")
|
|
else:
|
|
logging.warning(f"VALIDATION DEBUG - JSON extraction returned None - possible parsing issue")
|
|
|
|
return initial_results
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Validation step failed: {e}")
|
|
initial_results.processing_notes.append(f"Validation failed: {e}")
|
|
return initial_results
|
|
|
|
|
|
def _extract_structured_json(self, raw_text: str) -> List[Dict[str, Any]]:
|
|
"""Extract structured JSON from AI response with schema validation."""
|
|
try:
|
|
# Log the raw response for debugging
|
|
logging.info(f"Raw response for JSON parsing: {raw_text[:200]}...")
|
|
|
|
# Parse the structured response
|
|
structured_data = json.loads(raw_text)
|
|
|
|
# Extract assets array from structured response
|
|
if 'assets' in structured_data:
|
|
assets = structured_data['assets']
|
|
logging.info(f"Successfully extracted {len(assets)} assets using structured output")
|
|
return assets
|
|
else:
|
|
logging.warning("No 'assets' key found in structured response")
|
|
logging.info(f"Available keys in response: {list(structured_data.keys())}")
|
|
return []
|
|
|
|
except json.JSONDecodeError as e:
|
|
logging.warning(f"Structured JSON parsing failed: {e}")
|
|
logging.info(f"Raw text causing JSON error: {raw_text[:500]}...")
|
|
logging.info("Falling back to legacy parsing")
|
|
return self._extract_json(raw_text)
|
|
except Exception as e:
|
|
logging.error(f"Structured JSON extraction failed: {e}")
|
|
logging.info(f"Raw text: {raw_text[:500]}...")
|
|
return []
|
|
|
|
def _extract_json(self, raw_text: str) -> List[Dict[str, Any]]:
|
|
"""Extract JSON from AI response using robust parsing."""
|
|
try:
|
|
# Try direct JSON parsing first
|
|
if raw_text.strip().startswith('['):
|
|
return json5.loads(raw_text.strip())
|
|
|
|
# Look for JSON array in response
|
|
start_index = raw_text.find('[')
|
|
end_index = raw_text.rfind(']')
|
|
|
|
if start_index != -1 and end_index != -1:
|
|
json_str = raw_text[start_index:end_index + 1]
|
|
return json5.loads(json_str)
|
|
|
|
# Look for individual JSON objects
|
|
json_objects = []
|
|
for line in raw_text.split('\n'):
|
|
line = line.strip()
|
|
if line.startswith('{') and line.endswith('}'):
|
|
try:
|
|
json_objects.append(json5.loads(line))
|
|
except:
|
|
continue
|
|
|
|
if json_objects:
|
|
return json_objects
|
|
|
|
raise ValueError("No valid JSON found in response")
|
|
|
|
except Exception as e:
|
|
logging.error(f"JSON extraction failed: {e}")
|
|
logging.debug(f"Raw text: {raw_text[:500]}...")
|
|
return []
|
|
|
|
def discover_supported_files(folder_path: str) -> List[str]:
|
|
"""Discover all supported document files in a folder (top-level only)"""
|
|
supported_extensions = {'.pdf', '.pptx', '.docx', '.xlsx', '.ppt', '.doc', '.xls'}
|
|
supported_files = []
|
|
|
|
try:
|
|
for filename in os.listdir(folder_path):
|
|
# Skip hidden files
|
|
if filename.startswith('.'):
|
|
continue
|
|
|
|
file_path = os.path.join(folder_path, filename)
|
|
|
|
# Only process files (not subdirectories)
|
|
if os.path.isfile(file_path):
|
|
_, ext = os.path.splitext(filename)
|
|
if ext.lower() in supported_extensions:
|
|
supported_files.append(file_path)
|
|
|
|
# Sort alphabetically for consistent processing order
|
|
supported_files.sort()
|
|
logging.info(f"Discovered {len(supported_files)} supported documents in {folder_path}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error discovering files in {folder_path}: {e}")
|
|
|
|
return supported_files
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Enhanced Brief Processing System with Multi-Model Support",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Process single document
|
|
python process_brief_enhanced.py document.pdf
|
|
|
|
# Process entire folder
|
|
python process_brief_enhanced.py /path/to/briefs/
|
|
|
|
# Custom models for batch processing
|
|
python process_brief_enhanced.py /path/to/briefs/ \
|
|
--primary-models openai-gpt51,anthropic-sonnet45,google-gemini31 \
|
|
--consolidation-model anthropic-opus45
|
|
|
|
# Cost estimation for folder
|
|
python process_brief_enhanced.py /path/to/briefs/ --estimate-cost
|
|
|
|
Available models: openai-gpt51, anthropic-opus45, anthropic-sonnet45, google-gemini31
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('filepath', help='Path to document file or folder to process')
|
|
parser.add_argument(
|
|
'--primary-models',
|
|
type=str,
|
|
default=config.DEFAULT_PRIMARY_MODELS,
|
|
help=f'Comma-separated list of models for primary analysis (default: {config.DEFAULT_PRIMARY_MODELS})'
|
|
)
|
|
parser.add_argument(
|
|
'--consolidation-model',
|
|
type=str,
|
|
default=config.DEFAULT_CONSOLIDATION_MODEL,
|
|
help=f'Model for final consolidation (default: {config.DEFAULT_CONSOLIDATION_MODEL})'
|
|
)
|
|
parser.add_argument(
|
|
'--estimate-cost',
|
|
action='store_true',
|
|
help='Estimate processing cost before execution'
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
async def main():
|
|
# Enhanced logging setup
|
|
log_file = 'processing.log'
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file, mode='w'),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
|
|
# Parse command line arguments
|
|
args = parse_arguments()
|
|
|
|
filepath = args.filepath
|
|
primary_models = args.primary_models.split(',')
|
|
consolidation_model = args.consolidation_model
|
|
|
|
# Initialize multi-model analyzer
|
|
analyzer = DocumentAnalyzer(primary_models, consolidation_model)
|
|
|
|
# Cost estimation if requested
|
|
if args.estimate_cost or config.ENABLE_COST_ESTIMATION:
|
|
try:
|
|
# Rough estimation based on document size
|
|
file_size = os.path.getsize(filepath)
|
|
estimated_tokens = min(file_size // 4, 50000) # Rough heuristic
|
|
|
|
cost_breakdown = analyzer.provider_manager.estimate_total_cost(
|
|
primary_models + [consolidation_model],
|
|
estimated_tokens,
|
|
estimated_tokens // 2
|
|
)
|
|
|
|
logging.info("=== COST ESTIMATION ===")
|
|
for model, cost in cost_breakdown.items():
|
|
if model != 'total':
|
|
logging.info(f"{model}: ${cost:.4f}")
|
|
logging.info(f"Total Estimated Cost: ${cost_breakdown['total']:.4f}")
|
|
|
|
if cost_breakdown['total'] > config.MAX_PROCESSING_COST_USD:
|
|
response = input(f"Estimated cost (${cost_breakdown['total']:.4f}) exceeds limit (${config.MAX_PROCESSING_COST_USD}). Continue? (y/N): ")
|
|
if response.lower() != 'y':
|
|
logging.info("Processing cancelled by user")
|
|
return
|
|
except Exception as e:
|
|
logging.warning(f"Cost estimation failed: {e}")
|
|
|
|
# Determine if input is file or folder
|
|
if os.path.isdir(filepath):
|
|
# Batch processing mode
|
|
logging.info("=== ENHANCED MULTI-MODEL BATCH PROCESSING STARTED ===")
|
|
await process_batch_documents(filepath, analyzer, args)
|
|
else:
|
|
# Single file processing mode
|
|
logging.info("=== ENHANCED MULTI-MODEL BRIEF PROCESSING STARTED ===")
|
|
await process_single_document(filepath, analyzer)
|
|
|
|
async def process_batch_documents(folder_path: str, analyzer, args):
|
|
"""Process all supported documents in a folder"""
|
|
# Discover all supported files
|
|
document_files = discover_supported_files(folder_path)
|
|
|
|
if not document_files:
|
|
logging.error(f"No supported documents found in {folder_path}")
|
|
return
|
|
|
|
logging.info(f"Starting batch processing of {len(document_files)} documents")
|
|
|
|
# Track batch statistics
|
|
successful_documents = []
|
|
failed_documents = []
|
|
total_assets = 0
|
|
total_cost = 0.0
|
|
|
|
# Process each document sequentially
|
|
for i, document_path in enumerate(document_files, 1):
|
|
document_name = os.path.basename(document_path)
|
|
|
|
# Progress reporting
|
|
logging.info(f"\\n{'='*60}")
|
|
logging.info(f"PROCESSING DOCUMENT {i}/{len(document_files)}: {document_name}")
|
|
logging.info(f"{'='*60}")
|
|
|
|
try:
|
|
# Process single document using existing logic
|
|
results = await analyzer.process_document_multi_model(document_path)
|
|
|
|
if results.raw_data:
|
|
# Generate output file
|
|
output_path = generate_output_file(document_path, results)
|
|
|
|
# Track success statistics
|
|
successful_documents.append((document_name, len(results.raw_data), output_path))
|
|
total_assets += len(results.raw_data)
|
|
|
|
# Extract cost information if available
|
|
consolidation_metadata = results.metadata.get('consolidation_metadata', {})
|
|
doc_cost = consolidation_metadata.get('cost_breakdown', {}).get('total_cost', 0)
|
|
total_cost += doc_cost
|
|
|
|
logging.info(f"SUCCESS: {document_name} - {len(results.raw_data)} assets extracted")
|
|
|
|
else:
|
|
logging.error(f"FAILED: {document_name} - No data extracted")
|
|
failed_documents.append((document_name, "No data extracted"))
|
|
|
|
except Exception as e:
|
|
logging.error(f"FAILED: {document_name} - {str(e)}")
|
|
failed_documents.append((document_name, str(e)))
|
|
|
|
# Final batch summary
|
|
logging.info(f"\\n{'='*60}")
|
|
logging.info("BATCH PROCESSING COMPLETE")
|
|
logging.info(f"{'='*60}")
|
|
logging.info(f"Documents processed: {len(document_files)}")
|
|
logging.info(f"Successful: {len(successful_documents)}")
|
|
logging.info(f"Failed: {len(failed_documents)}")
|
|
logging.info(f"Total assets extracted: {total_assets}")
|
|
logging.info(f"Total estimated cost: ${total_cost:.4f}")
|
|
|
|
# Report successful documents
|
|
if successful_documents:
|
|
logging.info(f"\\nSUCCESSFUL DOCUMENTS:")
|
|
for doc_name, asset_count, output_path in successful_documents:
|
|
logging.info(f" ✅ {doc_name}: {asset_count} assets → {output_path}")
|
|
|
|
# Report failed documents
|
|
if failed_documents:
|
|
logging.info(f"\\nFAILED DOCUMENTS:")
|
|
for doc_name, error in failed_documents:
|
|
logging.info(f" ❌ {doc_name}: {error}")
|
|
|
|
# Print summary for PHP integration
|
|
print(f"__BATCH_SUMMARY__:{len(successful_documents)}:{len(failed_documents)}:{total_assets}:{total_cost:.4f}")
|
|
|
|
def generate_output_file(filepath: str, results) -> str:
|
|
"""Generate CSV output file for processed document"""
|
|
# Generate output path
|
|
iso_datetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
base_name = os.path.basename(filepath)
|
|
sanitized_name = os.path.splitext(base_name)[0].replace(' ', '_').replace('.', '_')
|
|
|
|
# Create output directory if it doesn't exist
|
|
output_dir = 'output'
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
output_filename = f"{sanitized_name}-{iso_datetime}.csv"
|
|
output_path = os.path.join(output_dir, output_filename)
|
|
|
|
# Write CSV file
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=CSV_HEADERS, extrasaction='ignore')
|
|
writer.writeheader()
|
|
writer.writerows(results.raw_data)
|
|
|
|
return output_path
|
|
|
|
async def process_single_document(filepath: str, analyzer):
|
|
"""Process a single document (existing logic)"""
|
|
results = await analyzer.process_document_multi_model(filepath)
|
|
|
|
if not results.raw_data:
|
|
logging.error("No data extracted from document")
|
|
return
|
|
|
|
# Generate output file
|
|
output_path = generate_output_file(filepath, results)
|
|
|
|
# Log processing summary
|
|
logging.info("=== PROCESSING SUMMARY ===")
|
|
logging.info(f"Document Type: {results.metadata.get('doc_type', 'unknown')}")
|
|
logging.info(f"Assets Extracted: {len(results.raw_data)}")
|
|
logging.info(f"Confidence Score: {results.confidence_score:.2f}")
|
|
logging.info(f"Processing Notes: {', '.join(results.processing_notes)}")
|
|
logging.info(f"Output File: {output_path}")
|
|
|
|
# Log cost information from consolidation metadata
|
|
consolidation_metadata = results.metadata.get('consolidation_metadata', {})
|
|
cost_breakdown = consolidation_metadata.get('cost_breakdown', {})
|
|
token_usage = consolidation_metadata.get('token_usage', {})
|
|
|
|
logging.info("=== COST ANALYSIS ===")
|
|
logging.info(f"Primary Models Used: {', '.join(results.metadata.get('primary_models_used', []))}")
|
|
logging.info(f"Consolidation Model: {results.metadata.get('consolidation_model', 'Unknown')}")
|
|
logging.info(f"Primary Analysis Cost: ${cost_breakdown.get('primary_analysis_cost', 0):.4f}")
|
|
logging.info(f"Consolidation Cost: ${cost_breakdown.get('consolidation_cost', 0):.4f}")
|
|
logging.info(f"Total Cost: ${cost_breakdown.get('total_cost', 0):.4f}")
|
|
logging.info(f"Total Tokens: {token_usage.get('grand_total', results.token_usage.get_total()):,}")
|
|
|
|
# Cost info now included in ProcessingResult for GUI integration
|
|
# Legacy print statements removed as per GUI integration plan
|
|
total_cost = cost_breakdown.get('total_cost', 0)
|
|
total_tokens = token_usage.get('grand_total', results.token_usage.get_total())
|
|
|
|
# Only print for CLI usage (when no progress reporter)
|
|
if not hasattr(analyzer, '_is_gui_mode'):
|
|
print(f"__COST_SUMMARY__:{total_cost:.4f}")
|
|
print(f"__TOKEN_USAGE__:{token_usage.get('primary_analysis_total', 0)}:{token_usage.get('consolidation_tokens', 0)}:{total_tokens}")
|
|
print(f"__FILENAME__:{output_path}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |