ac-tool/backend/server/runners/enhanced_analyzer.py
Vadym Samoilenko 72c50b2c92 Initial commit — AC Tool unified application
Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI)
into a single Docker app with React/TypeScript frontend.

Features:
- Brief upload → AI extraction → review → Activation Calendar import
- Handsontable v17 spreadsheet with dependent dropdowns (148 categories)
- AI natural language commands via Gemini (YOLO mode, voice input)
- Azure AD MSAL SPA PKCE authentication, user roles (user/admin)
- CSV Activation Calendar export
- Real-time WebSocket job progress
- Admin: user management, dropdown Excel upload
- Multi-stage Dockerfile, docker-compose, nginx proxy instructions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 13:24:46 +00:00

368 lines
No EOL
15 KiB
Python
Executable file

"""
Enhanced DocumentAnalyzer with progress reporting for GUI integration
Extends the existing analyzer with progress hooks and WebSocket updates
"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '../../'))
from typing import Optional, List, Dict, Any
import logging
from core.process_brief_enhanced import DocumentAnalyzer, ProcessingResult
from ..jobs.models import JobPhase, ModelConfiguration, JobSummary
from .progress import ProgressReporter
logger = logging.getLogger(__name__)
class EnhancedDocumentAnalyzer(DocumentAnalyzer):
"""
Enhanced DocumentAnalyzer with progress reporting capabilities
Extends the base analyzer with WebSocket progress updates
"""
def __init__(
self,
model_config: ModelConfiguration,
progress_reporter: Optional[ProgressReporter] = None
):
# Initialize base analyzer with model configuration
primary_models = model_config.primary_models
consolidation_model = model_config.consolidation_model
super().__init__(primary_models, consolidation_model)
self.progress = progress_reporter
self.model_config = model_config
async def process_document_with_progress(self, filepath: str) -> ProcessingResult:
"""
Process document with progress reporting integration
Args:
filepath: Path to document file
Returns:
ProcessingResult with extracted data
"""
try:
if self.progress:
await self.progress.emit(
JobPhase.EXTRACT_CONTENT,
10,
f"Starting analysis of {os.path.basename(filepath)}"
)
# Stage 1: Extract document content
if self.progress:
await self.progress.emit_log('INFO', "=== STAGE 1: Document Content Extraction ===")
try:
document_content = self._extract_document_content(filepath)
if self.progress:
await self.progress.emit(
JobPhase.EXTRACT_CONTENT,
25,
"Document content extracted successfully"
)
await self.progress.emit_log('INFO', f"Extracted {len(document_content)} characters of content")
except Exception as e:
error_msg = f"Content extraction failed: {e}"
if self.progress:
await self.progress.emit_failure(error_msg)
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
# Stage 2: Parallel multi-model analysis
if self.progress:
await self.progress.emit(
JobPhase.LLM_ANALYSIS,
30,
"Starting parallel multi-model analysis"
)
await self.progress.emit_log('INFO', "=== STAGE 2: Parallel Multi-Model Analysis ===")
await self.progress.emit_log('INFO', f"Using models: {', '.join(self.primary_models)}")
doc_type = self.classify_document(filepath)
try:
analysis_responses, analysis_metadata = await self._perform_parallel_analysis_with_progress(
document_content, doc_type
)
if self.progress:
await self.progress.emit(
JobPhase.LLM_ANALYSIS,
75,
f"Parallel analysis completed - {len(analysis_responses)} successful models"
)
await self.progress.emit_log('INFO',
f"Analysis complete: {len(analysis_responses)}/{len(self.primary_models)} models succeeded"
)
except Exception as e:
error_msg = f"Parallel analysis failed: {e}"
if self.progress:
await self.progress.emit_failure(error_msg)
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
# Stage 3: Consolidation
if self.progress:
await self.progress.emit(
JobPhase.CONSOLIDATION,
80,
"Starting result consolidation"
)
await self.progress.emit_log('INFO', "=== STAGE 3: Result Consolidation ===")
await self.progress.emit_log('INFO', f"Using consolidation model: {self.consolidation_model}")
try:
consolidation_result = await self.consolidation_processor.consolidate_results(
analysis_responses, self.consolidation_model, document_content
)
if self.progress:
await self.progress.emit(
JobPhase.CONSOLIDATION,
90,
f"Consolidation completed: {len(consolidation_result.expanded_assets)} final assets"
)
await self.progress.emit_log('INFO',
f"Consolidation complete: {len(consolidation_result.expanded_assets)} final deliverables"
)
except Exception as e:
error_msg = f"Consolidation failed: {e}"
if self.progress:
await self.progress.emit_failure(error_msg)
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
# Stage 4: Prepare results
if self.progress:
await self.progress.emit(
JobPhase.CSV_GENERATION,
95,
"Preparing results for output"
)
# Convert expanded assets to dict format for compatibility
extracted_data = [asset.model_dump() for asset in consolidation_result.expanded_assets]
# Aggregate token usage from all models
total_token_usage = self.provider_manager.get_aggregated_token_usage(analysis_responses)
# Combine processing notes
successful_count = analysis_metadata.get('successful_models', len(analysis_responses))
total_count = analysis_metadata.get('total_models_attempted', len(self.primary_models))
processing_notes = [f"Parallel analysis: {successful_count}/{total_count} models"]
processing_notes.extend(consolidation_result.warnings)
# Merge metadata
combined_metadata = {
'doc_type': doc_type.value,
'primary_models_used': self.primary_models,
'consolidation_model': self.consolidation_model,
'analysis_metadata': analysis_metadata,
'consolidation_metadata': consolidation_result.consolidation_metadata
}
result = ProcessingResult(
raw_data=extracted_data,
metadata=combined_metadata,
confidence_score=0.9, # Higher confidence due to multi-model consensus
processing_notes=processing_notes,
token_usage=total_token_usage
)
if self.progress:
await self.progress.emit(
JobPhase.CSV_GENERATION,
100,
"Analysis completed successfully"
)
await self.progress.emit_log('INFO', "=== PROCESSING COMPLETED SUCCESSFULLY ===")
return result
except Exception as e:
error_msg = f"Unexpected error during processing: {e}"
logger.error(error_msg, exc_info=True)
if self.progress:
await self.progress.emit_failure(error_msg)
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
async def _perform_parallel_analysis_with_progress(
self,
document_content: str,
doc_type
) -> tuple:
"""
Perform parallel analysis with progress reporting
Args:
document_content: Extracted document text
doc_type: Document type classification
Returns:
Tuple of (successful_responses, metadata)
"""
# Load prompt from external file
multi_perspective_prompt_template = self._load_prompt('multi_perspective_analysis')
multi_perspective_prompt = multi_perspective_prompt_template.format(doc_type=doc_type.value)
# Load system message from external file
system_message = self._load_prompt('system_multi_perspective')
# Prepare combined prompt
combined_prompt = f"{multi_perspective_prompt}\n\nDocument Content:\n{document_content}"
# Prepare messages for all providers
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": combined_prompt}
]
# Get schema for structured output
from core.process_brief_enhanced import UNIVERSAL_BASE_DELIVERABLE_SCHEMA
# Create progress callback for provider updates
progress_callback = None
if self.progress:
progress_callback = self._create_provider_progress_callback()
# Execute parallel analysis with progress reporting
successful_responses, metadata = await self.provider_manager.execute_parallel_analysis(
model_keys=self.primary_models,
messages=messages,
schema=UNIVERSAL_BASE_DELIVERABLE_SCHEMA,
minimum_success_threshold=self.model_config.minimum_success_threshold,
on_model_event=progress_callback
)
return successful_responses, metadata
def _create_provider_progress_callback(self):
"""
Create callback function for provider progress updates
Returns:
Async callback function
"""
async def on_model_event(model_key: str, stage: str, data: Any):
if not self.progress:
return
try:
if stage == 'start':
await self.progress.emit_provider_update(model_key, {
'provider': self._get_provider_name(model_key),
'model': self._get_model_display_name(model_key),
'status': 'started',
'startedAt': data.get('timestamp') if data else None
})
await self.progress.emit_log('INFO', f"Starting analysis with {model_key}")
elif stage == 'end':
if 'error' in data:
await self.progress.emit_provider_update(model_key, {
'provider': self._get_provider_name(model_key),
'model': self._get_model_display_name(model_key),
'status': 'error',
'error': str(data['error']),
'completedAt': data.get('timestamp') if data else None
})
await self.progress.emit_log('ERROR', f"Analysis failed for {model_key}: {data['error']}")
else:
response = data.get('response')
cost = data.get('cost', 0)
if response:
await self.progress.emit_provider_update(model_key, {
'provider': self._get_provider_name(model_key),
'model': self._get_model_display_name(model_key),
'status': 'success',
'completedAt': data.get('timestamp') if data else None,
'latencyMs': response.processing_time * 1000 if response.processing_time else None,
'tokensIn': response.token_usage.input_tokens,
'tokensOut': response.token_usage.output_tokens,
'tokensCached': response.token_usage.cached_input_tokens,
'costUsd': cost
})
await self.progress.emit_log('INFO', f"Analysis completed for {model_key} "
f"({response.token_usage.input_tokens + response.token_usage.output_tokens} tokens, ${cost:.4f})")
# Update overall progress
completed_count = len([
p for p in self.progress.job.provider_updates.values()
if p.status in ['success', 'error']
])
total_count = len(self.primary_models)
# Calculate progress: 25% (extraction done) + (completed/total * 50%) for analysis
analysis_progress = await self.progress.calculate_analysis_progress(
base_progress=25,
completed_providers=completed_count,
total_providers=total_count,
analysis_weight=50
)
await self.progress.emit(
JobPhase.LLM_ANALYSIS,
analysis_progress,
f"Analysis progress: {completed_count}/{total_count} models complete"
)
except Exception as e:
logger.error(f"Error in provider progress callback: {e}")
return on_model_event
def _get_provider_name(self, model_key: str) -> str:
"""Get provider name from model key"""
from core.config import config
try:
provider_name, _ = config.get_model_info(model_key)
return provider_name
except:
return model_key.split('-')[0] if '-' in model_key else 'unknown'
def _get_model_display_name(self, model_key: str) -> str:
"""Get display name for model"""
display_names = {
'openai-gpt51': 'GPT-5.1',
'anthropic-opus45': 'Claude Opus 4.5',
'anthropic-sonnet45': 'Claude Sonnet 4.5',
'google-gemini31': 'Gemini 3.1 Pro'
}
return display_names.get(model_key, model_key)
def create_job_summary(self, result: ProcessingResult) -> JobSummary:
"""
Create job summary from processing result
Args:
result: Processing result
Returns:
JobSummary object
"""
# Extract cost information
consolidation_metadata = result.metadata.get('consolidation_metadata', {})
cost_breakdown = consolidation_metadata.get('cost_breakdown', {})
token_usage = consolidation_metadata.get('token_usage', {})
return JobSummary(
doc_type=result.metadata.get('doc_type', 'unknown'),
assets_extracted=len(result.raw_data),
confidence_score=result.confidence_score,
notes=result.processing_notes,
cost_usd_total=cost_breakdown.get('total_cost', 0),
tokens_total=token_usage.get('grand_total', 0),
primary_models=result.metadata.get('primary_models_used', []),
consolidation_model=result.metadata.get('consolidation_model', ''),
processing_time_seconds=None # Will be set by job runner
)