Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI) into a single Docker app with React/TypeScript frontend. Features: - Brief upload → AI extraction → review → Activation Calendar import - Handsontable v17 spreadsheet with dependent dropdowns (148 categories) - AI natural language commands via Gemini (YOLO mode, voice input) - Azure AD MSAL SPA PKCE authentication, user roles (user/admin) - CSV Activation Calendar export - Real-time WebSocket job progress - Admin: user management, dropdown Excel upload - Multi-stage Dockerfile, docker-compose, nginx proxy instructions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
368 lines
No EOL
15 KiB
Python
Executable file
368 lines
No EOL
15 KiB
Python
Executable file
"""
|
|
Enhanced DocumentAnalyzer with progress reporting for GUI integration
|
|
Extends the existing analyzer with progress hooks and WebSocket updates
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../'))
|
|
|
|
from typing import Optional, List, Dict, Any
|
|
import logging
|
|
|
|
from core.process_brief_enhanced import DocumentAnalyzer, ProcessingResult
|
|
from ..jobs.models import JobPhase, ModelConfiguration, JobSummary
|
|
from .progress import ProgressReporter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EnhancedDocumentAnalyzer(DocumentAnalyzer):
|
|
"""
|
|
Enhanced DocumentAnalyzer with progress reporting capabilities
|
|
Extends the base analyzer with WebSocket progress updates
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
model_config: ModelConfiguration,
|
|
progress_reporter: Optional[ProgressReporter] = None
|
|
):
|
|
# Initialize base analyzer with model configuration
|
|
primary_models = model_config.primary_models
|
|
consolidation_model = model_config.consolidation_model
|
|
|
|
super().__init__(primary_models, consolidation_model)
|
|
|
|
self.progress = progress_reporter
|
|
self.model_config = model_config
|
|
|
|
async def process_document_with_progress(self, filepath: str) -> ProcessingResult:
|
|
"""
|
|
Process document with progress reporting integration
|
|
|
|
Args:
|
|
filepath: Path to document file
|
|
|
|
Returns:
|
|
ProcessingResult with extracted data
|
|
"""
|
|
try:
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.EXTRACT_CONTENT,
|
|
10,
|
|
f"Starting analysis of {os.path.basename(filepath)}"
|
|
)
|
|
|
|
# Stage 1: Extract document content
|
|
if self.progress:
|
|
await self.progress.emit_log('INFO', "=== STAGE 1: Document Content Extraction ===")
|
|
|
|
try:
|
|
document_content = self._extract_document_content(filepath)
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.EXTRACT_CONTENT,
|
|
25,
|
|
"Document content extracted successfully"
|
|
)
|
|
await self.progress.emit_log('INFO', f"Extracted {len(document_content)} characters of content")
|
|
except Exception as e:
|
|
error_msg = f"Content extraction failed: {e}"
|
|
if self.progress:
|
|
await self.progress.emit_failure(error_msg)
|
|
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
|
|
|
|
# Stage 2: Parallel multi-model analysis
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.LLM_ANALYSIS,
|
|
30,
|
|
"Starting parallel multi-model analysis"
|
|
)
|
|
await self.progress.emit_log('INFO', "=== STAGE 2: Parallel Multi-Model Analysis ===")
|
|
await self.progress.emit_log('INFO', f"Using models: {', '.join(self.primary_models)}")
|
|
|
|
doc_type = self.classify_document(filepath)
|
|
|
|
try:
|
|
analysis_responses, analysis_metadata = await self._perform_parallel_analysis_with_progress(
|
|
document_content, doc_type
|
|
)
|
|
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.LLM_ANALYSIS,
|
|
75,
|
|
f"Parallel analysis completed - {len(analysis_responses)} successful models"
|
|
)
|
|
await self.progress.emit_log('INFO',
|
|
f"Analysis complete: {len(analysis_responses)}/{len(self.primary_models)} models succeeded"
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Parallel analysis failed: {e}"
|
|
if self.progress:
|
|
await self.progress.emit_failure(error_msg)
|
|
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
|
|
|
|
# Stage 3: Consolidation
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.CONSOLIDATION,
|
|
80,
|
|
"Starting result consolidation"
|
|
)
|
|
await self.progress.emit_log('INFO', "=== STAGE 3: Result Consolidation ===")
|
|
await self.progress.emit_log('INFO', f"Using consolidation model: {self.consolidation_model}")
|
|
|
|
try:
|
|
consolidation_result = await self.consolidation_processor.consolidate_results(
|
|
analysis_responses, self.consolidation_model, document_content
|
|
)
|
|
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.CONSOLIDATION,
|
|
90,
|
|
f"Consolidation completed: {len(consolidation_result.expanded_assets)} final assets"
|
|
)
|
|
await self.progress.emit_log('INFO',
|
|
f"Consolidation complete: {len(consolidation_result.expanded_assets)} final deliverables"
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Consolidation failed: {e}"
|
|
if self.progress:
|
|
await self.progress.emit_failure(error_msg)
|
|
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
|
|
|
|
# Stage 4: Prepare results
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.CSV_GENERATION,
|
|
95,
|
|
"Preparing results for output"
|
|
)
|
|
|
|
# Convert expanded assets to dict format for compatibility
|
|
extracted_data = [asset.model_dump() for asset in consolidation_result.expanded_assets]
|
|
|
|
# Aggregate token usage from all models
|
|
total_token_usage = self.provider_manager.get_aggregated_token_usage(analysis_responses)
|
|
|
|
# Combine processing notes
|
|
successful_count = analysis_metadata.get('successful_models', len(analysis_responses))
|
|
total_count = analysis_metadata.get('total_models_attempted', len(self.primary_models))
|
|
processing_notes = [f"Parallel analysis: {successful_count}/{total_count} models"]
|
|
processing_notes.extend(consolidation_result.warnings)
|
|
|
|
# Merge metadata
|
|
combined_metadata = {
|
|
'doc_type': doc_type.value,
|
|
'primary_models_used': self.primary_models,
|
|
'consolidation_model': self.consolidation_model,
|
|
'analysis_metadata': analysis_metadata,
|
|
'consolidation_metadata': consolidation_result.consolidation_metadata
|
|
}
|
|
|
|
result = ProcessingResult(
|
|
raw_data=extracted_data,
|
|
metadata=combined_metadata,
|
|
confidence_score=0.9, # Higher confidence due to multi-model consensus
|
|
processing_notes=processing_notes,
|
|
token_usage=total_token_usage
|
|
)
|
|
|
|
if self.progress:
|
|
await self.progress.emit(
|
|
JobPhase.CSV_GENERATION,
|
|
100,
|
|
"Analysis completed successfully"
|
|
)
|
|
await self.progress.emit_log('INFO', "=== PROCESSING COMPLETED SUCCESSFULLY ===")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error during processing: {e}"
|
|
logger.error(error_msg, exc_info=True)
|
|
if self.progress:
|
|
await self.progress.emit_failure(error_msg)
|
|
return ProcessingResult([], {}, 0.0, [error_msg], self.token_usage)
|
|
|
|
async def _perform_parallel_analysis_with_progress(
|
|
self,
|
|
document_content: str,
|
|
doc_type
|
|
) -> tuple:
|
|
"""
|
|
Perform parallel analysis with progress reporting
|
|
|
|
Args:
|
|
document_content: Extracted document text
|
|
doc_type: Document type classification
|
|
|
|
Returns:
|
|
Tuple of (successful_responses, metadata)
|
|
"""
|
|
# Load prompt from external file
|
|
multi_perspective_prompt_template = self._load_prompt('multi_perspective_analysis')
|
|
multi_perspective_prompt = multi_perspective_prompt_template.format(doc_type=doc_type.value)
|
|
|
|
# Load system message from external file
|
|
system_message = self._load_prompt('system_multi_perspective')
|
|
|
|
# Prepare combined prompt
|
|
combined_prompt = f"{multi_perspective_prompt}\n\nDocument Content:\n{document_content}"
|
|
|
|
# Prepare messages for all providers
|
|
messages = [
|
|
{"role": "system", "content": system_message},
|
|
{"role": "user", "content": combined_prompt}
|
|
]
|
|
|
|
# Get schema for structured output
|
|
from core.process_brief_enhanced import UNIVERSAL_BASE_DELIVERABLE_SCHEMA
|
|
|
|
# Create progress callback for provider updates
|
|
progress_callback = None
|
|
if self.progress:
|
|
progress_callback = self._create_provider_progress_callback()
|
|
|
|
# Execute parallel analysis with progress reporting
|
|
successful_responses, metadata = await self.provider_manager.execute_parallel_analysis(
|
|
model_keys=self.primary_models,
|
|
messages=messages,
|
|
schema=UNIVERSAL_BASE_DELIVERABLE_SCHEMA,
|
|
minimum_success_threshold=self.model_config.minimum_success_threshold,
|
|
on_model_event=progress_callback
|
|
)
|
|
|
|
return successful_responses, metadata
|
|
|
|
def _create_provider_progress_callback(self):
|
|
"""
|
|
Create callback function for provider progress updates
|
|
|
|
Returns:
|
|
Async callback function
|
|
"""
|
|
async def on_model_event(model_key: str, stage: str, data: Any):
|
|
if not self.progress:
|
|
return
|
|
|
|
try:
|
|
if stage == 'start':
|
|
await self.progress.emit_provider_update(model_key, {
|
|
'provider': self._get_provider_name(model_key),
|
|
'model': self._get_model_display_name(model_key),
|
|
'status': 'started',
|
|
'startedAt': data.get('timestamp') if data else None
|
|
})
|
|
|
|
await self.progress.emit_log('INFO', f"Starting analysis with {model_key}")
|
|
|
|
elif stage == 'end':
|
|
if 'error' in data:
|
|
await self.progress.emit_provider_update(model_key, {
|
|
'provider': self._get_provider_name(model_key),
|
|
'model': self._get_model_display_name(model_key),
|
|
'status': 'error',
|
|
'error': str(data['error']),
|
|
'completedAt': data.get('timestamp') if data else None
|
|
})
|
|
|
|
await self.progress.emit_log('ERROR', f"Analysis failed for {model_key}: {data['error']}")
|
|
|
|
else:
|
|
response = data.get('response')
|
|
cost = data.get('cost', 0)
|
|
|
|
if response:
|
|
await self.progress.emit_provider_update(model_key, {
|
|
'provider': self._get_provider_name(model_key),
|
|
'model': self._get_model_display_name(model_key),
|
|
'status': 'success',
|
|
'completedAt': data.get('timestamp') if data else None,
|
|
'latencyMs': response.processing_time * 1000 if response.processing_time else None,
|
|
'tokensIn': response.token_usage.input_tokens,
|
|
'tokensOut': response.token_usage.output_tokens,
|
|
'tokensCached': response.token_usage.cached_input_tokens,
|
|
'costUsd': cost
|
|
})
|
|
|
|
await self.progress.emit_log('INFO', f"Analysis completed for {model_key} "
|
|
f"({response.token_usage.input_tokens + response.token_usage.output_tokens} tokens, ${cost:.4f})")
|
|
|
|
# Update overall progress
|
|
completed_count = len([
|
|
p for p in self.progress.job.provider_updates.values()
|
|
if p.status in ['success', 'error']
|
|
])
|
|
total_count = len(self.primary_models)
|
|
|
|
# Calculate progress: 25% (extraction done) + (completed/total * 50%) for analysis
|
|
analysis_progress = await self.progress.calculate_analysis_progress(
|
|
base_progress=25,
|
|
completed_providers=completed_count,
|
|
total_providers=total_count,
|
|
analysis_weight=50
|
|
)
|
|
|
|
await self.progress.emit(
|
|
JobPhase.LLM_ANALYSIS,
|
|
analysis_progress,
|
|
f"Analysis progress: {completed_count}/{total_count} models complete"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in provider progress callback: {e}")
|
|
|
|
return on_model_event
|
|
|
|
def _get_provider_name(self, model_key: str) -> str:
|
|
"""Get provider name from model key"""
|
|
from core.config import config
|
|
try:
|
|
provider_name, _ = config.get_model_info(model_key)
|
|
return provider_name
|
|
except:
|
|
return model_key.split('-')[0] if '-' in model_key else 'unknown'
|
|
|
|
def _get_model_display_name(self, model_key: str) -> str:
|
|
"""Get display name for model"""
|
|
display_names = {
|
|
'openai-gpt51': 'GPT-5.1',
|
|
'anthropic-opus45': 'Claude Opus 4.5',
|
|
'anthropic-sonnet45': 'Claude Sonnet 4.5',
|
|
'google-gemini31': 'Gemini 3.1 Pro'
|
|
}
|
|
return display_names.get(model_key, model_key)
|
|
|
|
def create_job_summary(self, result: ProcessingResult) -> JobSummary:
|
|
"""
|
|
Create job summary from processing result
|
|
|
|
Args:
|
|
result: Processing result
|
|
|
|
Returns:
|
|
JobSummary object
|
|
"""
|
|
# Extract cost information
|
|
consolidation_metadata = result.metadata.get('consolidation_metadata', {})
|
|
cost_breakdown = consolidation_metadata.get('cost_breakdown', {})
|
|
token_usage = consolidation_metadata.get('token_usage', {})
|
|
|
|
return JobSummary(
|
|
doc_type=result.metadata.get('doc_type', 'unknown'),
|
|
assets_extracted=len(result.raw_data),
|
|
confidence_score=result.confidence_score,
|
|
notes=result.processing_notes,
|
|
cost_usd_total=cost_breakdown.get('total_cost', 0),
|
|
tokens_total=token_usage.get('grand_total', 0),
|
|
primary_models=result.metadata.get('primary_models_used', []),
|
|
consolidation_model=result.metadata.get('consolidation_model', ''),
|
|
processing_time_seconds=None # Will be set by job runner
|
|
) |