ac-tool/backend/core/consolidation_processor.py
Vadym Samoilenko 72c50b2c92 Initial commit — AC Tool unified application
Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI)
into a single Docker app with React/TypeScript frontend.

Features:
- Brief upload → AI extraction → review → Activation Calendar import
- Handsontable v17 spreadsheet with dependent dropdowns (148 categories)
- AI natural language commands via Gemini (YOLO mode, voice input)
- Azure AD MSAL SPA PKCE authentication, user roles (user/admin)
- CSV Activation Calendar export
- Real-time WebSocket job progress
- Admin: user management, dropdown Excel upload
- Multi-stage Dockerfile, docker-compose, nginx proxy instructions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 13:24:46 +00:00

353 lines
No EOL
17 KiB
Python
Executable file

"""
Consolidation processor for merging multiple LLM analysis results
"""
import json
import logging
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
import os
from .llm_service import ProviderManager, LLMResponse
from .config import config
@dataclass
class ConsolidationResult:
"""Result of consolidation process"""
consolidated_deliverables: List[Any] # BaseDeliverable
expanded_assets: List[Any] # MarketingAsset
consolidation_metadata: Dict[str, Any]
warnings: List[str]
class ConsolidationProcessor:
"""Processes multiple LLM analysis results into a single consolidated output"""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.provider_manager = ProviderManager()
async def consolidate_results(
self,
analysis_responses: List[LLMResponse],
consolidation_model: str,
document_content: str = ""
) -> ConsolidationResult:
"""
Consolidate multiple analysis results using the specified consolidation model
Args:
analysis_responses: List of LLM responses from primary analysis
consolidation_model: Model key for consolidation (e.g., 'anthropic-opus45')
document_content: Optional original document content for context
Returns:
ConsolidationResult with final consolidated deliverables
"""
self.logger.info(f"Starting consolidation with {len(analysis_responses)} model results using {consolidation_model}")
# Log individual model deliverable counts
successful_models = []
deliverable_counts = []
for i, response in enumerate(analysis_responses):
if response.success:
count = self._count_deliverables_in_response(response.content)
deliverable_counts.append(count)
successful_models.append(f"{response.provider} {response.model_used}")
self.logger.info(f"Model {i+1} ({response.provider} {response.model_used}): {count} base deliverables")
if deliverable_counts:
avg_deliverables = sum(deliverable_counts) / len(deliverable_counts)
self.logger.info(f"Average deliverables across {len(deliverable_counts)} models: {avg_deliverables:.1f}")
else:
self.logger.warning("No successful model responses to analyze")
# Extract and format results from all models
formatted_results = self._format_model_results(analysis_responses)
# Prepare consolidation prompt
consolidation_prompt = await self._prepare_consolidation_prompt(formatted_results)
# Load system message for consolidation
system_message = self._load_consolidation_system_prompt()
# Execute consolidation using specified model
try:
provider = self.provider_manager.get_provider(consolidation_model)
messages = provider.prepare_messages(system_message, consolidation_prompt)
# Use the universal base deliverable schema for structured output
from .process_brief_enhanced import UNIVERSAL_BASE_DELIVERABLE_SCHEMA
consolidation_response = await provider.generate_response(
messages=messages,
schema=UNIVERSAL_BASE_DELIVERABLE_SCHEMA
)
if not consolidation_response.success:
raise Exception(f"Consolidation failed: {consolidation_response.error}")
# Parse the consolidated results - import here to avoid circular import
from .process_brief_enhanced import BaseDeliverable, expand_deliverables
try:
consolidated_data = json.loads(consolidation_response.content)
if 'assets' not in consolidated_data:
# PROBLEM DETECTED - Log everything verbosely
self.logger.error(f"[CONSOLIDATION] ========== MISSING 'assets' KEY - VERBOSE DEBUG ==========")
self.logger.error(f"[CONSOLIDATION] Model: {consolidation_model}")
self.logger.error(f"[CONSOLIDATION] Response success: {consolidation_response.success}")
self.logger.error(f"[CONSOLIDATION] Response content length: {len(consolidation_response.content)} chars")
self.logger.error(f"[CONSOLIDATION] Response content type: {type(consolidation_response.content)}")
self.logger.error(f"[CONSOLIDATION] Full raw content: {consolidation_response.content}")
self.logger.error(f"[CONSOLIDATION] Parsed data type: {type(consolidated_data)}")
self.logger.error(f"[CONSOLIDATION] Parsed data keys: {list(consolidated_data.keys()) if isinstance(consolidated_data, dict) else 'N/A'}")
self.logger.error(f"[CONSOLIDATION] Full parsed data: {consolidated_data}")
# Save debug file
self._save_consolidation_debug(consolidation_response, consolidated_data, analysis_responses)
raise KeyError("Response missing 'assets' key")
# SUCCESS - Just log summary
self.logger.info(f"Consolidation completed: {len(consolidated_data['assets'])} base deliverables")
base_deliverables = [BaseDeliverable(**item) for item in consolidated_data['assets']]
except json.JSONDecodeError as e:
self.logger.error(f"[CONSOLIDATION] ========== JSON PARSE ERROR ==========")
self.logger.error(f"[CONSOLIDATION] Parse error: {e}")
self.logger.error(f"[CONSOLIDATION] Full response content: {consolidation_response.content}")
raise
except KeyError as e:
# Already logged in detail above
raise
except Exception as e:
self.logger.error(f"[CONSOLIDATION] Error processing consolidation response: {e}")
self.logger.error(f"[CONSOLIDATION] Full response content: {consolidation_response.content}")
raise
# Expand consolidated base deliverables into individual assets
expanded_assets, expansion_warnings = expand_deliverables(base_deliverables)
self.logger.info(f"Expansion completed: {len(expanded_assets)} individual assets")
# Create consolidation metadata
metadata = self._create_consolidation_metadata(
analysis_responses,
consolidation_response,
base_deliverables,
expanded_assets
)
return ConsolidationResult(
consolidated_deliverables=base_deliverables,
expanded_assets=expanded_assets,
consolidation_metadata=metadata,
warnings=expansion_warnings
)
except Exception as e:
self.logger.error(f"Consolidation failed: {e}")
raise
def _count_deliverables_in_response(self, content: str) -> int:
"""Count the number of deliverables in a model's JSON response"""
try:
data = json.loads(content)
if isinstance(data, dict) and 'assets' in data:
return len(data['assets'])
return 0
except (json.JSONDecodeError, KeyError, TypeError):
return 0
def _format_model_results(self, responses: List[LLMResponse]) -> str:
"""Format analysis results from multiple models for consolidation prompt"""
formatted_results = []
for i, response in enumerate(responses):
if response.success:
model_info = f"**MODEL {i+1}: {response.provider.upper()} {response.model_used}**"
# Try to extract JSON content
try:
# Parse the JSON to validate it
result_data = json.loads(response.content)
formatted_content = json.dumps(result_data, indent=2)
except json.JSONDecodeError:
# Fallback to raw content if not valid JSON
formatted_content = response.content
formatted_results.append(f"{model_info}\n```json\n{formatted_content}\n```")
else:
self.logger.warning(f"Skipping failed response from {response.provider} {response.model_used}: {response.error}")
return "\n\n".join(formatted_results)
async def _prepare_consolidation_prompt(self, formatted_results: str) -> str:
"""Prepare the consolidation prompt with model results"""
import asyncio
def _read_template():
"""Blocking template read operation for thread pool"""
# Load consolidation prompt template - go up one level from core/ to find prompts/
prompt_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts', 'consolidation_analysis.txt')
with open(prompt_path, 'r', encoding='utf-8') as f:
return f.read()
try:
loop = asyncio.get_running_loop()
template = await loop.run_in_executor(None, _read_template)
return template.format(models_results=formatted_results)
except FileNotFoundError:
self.logger.error("Consolidation prompt template not found")
raise
except Exception as e:
self.logger.error(f"Error preparing consolidation prompt: {e}")
raise
def _load_consolidation_system_prompt(self) -> str:
"""Load system prompt for consolidation"""
return """You are an expert data consolidation specialist. Your task is to intelligently merge multiple LLM analysis results into the most complete and accurate dataset possible. Follow the consolidation strategy provided in the user prompt, with emphasis on completeness and thoroughness. Return only valid JSON in the specified format."""
def _save_consolidation_debug(self, consolidation_response, consolidated_data, analysis_responses):
"""Save debug information about failed consolidation"""
try:
import tempfile
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
debug_file = os.path.join(tempfile.gettempdir(), f"consolidation_debug_{timestamp}.json")
debug_info = {
"timestamp": timestamp,
"consolidation_model": consolidation_response.model_used,
"consolidation_provider": consolidation_response.provider,
"raw_content": consolidation_response.content,
"parsed_data": consolidated_data,
"response_success": consolidation_response.success,
"response_error": consolidation_response.error,
"token_usage": {
"input": consolidation_response.token_usage.input_tokens,
"output": consolidation_response.token_usage.output_tokens,
"total": consolidation_response.token_usage.get_total()
},
"primary_analysis_results": [
{
"provider": r.provider,
"model": r.model_used,
"success": r.success,
"deliverable_count": self._count_deliverables_in_response(r.content) if r.success else 0,
"content_preview": r.content[:500] if r.success else r.error
}
for r in analysis_responses
]
}
with open(debug_file, 'w') as f:
json.dump(debug_info, f, indent=2)
self.logger.error(f"[CONSOLIDATION] Debug info saved to: {debug_file}")
except Exception as e:
self.logger.error(f"[CONSOLIDATION] Failed to save debug info: {e}")
def _create_consolidation_metadata(
self,
analysis_responses: List[LLMResponse],
consolidation_response: LLMResponse,
base_deliverables: List[Any],
expanded_assets: List[Any]
) -> Dict[str, Any]:
"""Create metadata about the consolidation process"""
# Analyze model contributions
model_stats = {}
total_primary_tokens = 0
total_primary_cost = 0.0
for response in analysis_responses:
if response.success:
model_key = f"{response.provider}_{response.model_used}"
model_stats[model_key] = {
'tokens_used': response.token_usage.get_total(),
'processing_time': response.processing_time,
'success': True
}
total_primary_tokens += response.token_usage.get_total()
# Estimate cost for this response
try:
# Find the correct model key for this response
provider_model_key = None
for key in config.MODEL_MAPPINGS.keys():
provider_name, model_name = config.get_model_info(key)
if provider_name == response.provider and model_name == response.model_used:
provider_model_key = key
break
if provider_model_key:
provider = self.provider_manager.get_provider(provider_model_key)
cost = provider.estimate_cost(
response.token_usage.input_tokens,
response.token_usage.output_tokens,
response.token_usage.cached_input_tokens
)
total_primary_cost += cost
model_stats[model_key]['estimated_cost'] = cost
else:
model_stats[model_key]['estimated_cost'] = 0.0
except:
model_stats[model_key]['estimated_cost'] = 0.0
else:
model_key = f"{response.provider}_{response.model_used}"
model_stats[model_key] = {
'tokens_used': 0,
'processing_time': response.processing_time,
'success': False,
'error': response.error,
'estimated_cost': 0.0
}
# Consolidation model stats
consolidation_cost = 0.0
try:
# Find the correct model key for consolidation response
consolidation_model_key = None
for key in config.MODEL_MAPPINGS.keys():
provider_name, model_name = config.get_model_info(key)
if provider_name == consolidation_response.provider and model_name == consolidation_response.model_used:
consolidation_model_key = key
break
if consolidation_model_key:
provider = self.provider_manager.get_provider(consolidation_model_key)
consolidation_cost = provider.estimate_cost(
consolidation_response.token_usage.input_tokens,
consolidation_response.token_usage.output_tokens,
consolidation_response.token_usage.cached_input_tokens
)
except:
pass
return {
'consolidation_model': consolidation_response.model_used,
'consolidation_provider': consolidation_response.provider,
'primary_models_used': len([r for r in analysis_responses if r.success]),
'total_models_attempted': len(analysis_responses),
'base_deliverables_count': len(base_deliverables),
'final_assets_count': len(expanded_assets),
'model_statistics': model_stats,
'token_usage': {
'primary_analysis_total': total_primary_tokens,
'consolidation_tokens': consolidation_response.token_usage.get_total(),
'grand_total': total_primary_tokens + consolidation_response.token_usage.get_total()
},
'cost_breakdown': {
'primary_analysis_cost': round(total_primary_cost, 4),
'consolidation_cost': round(consolidation_cost, 4),
'total_cost': round(total_primary_cost + consolidation_cost, 4)
},
'processing_times': {
'consolidation_time': consolidation_response.processing_time,
'primary_models_avg_time': sum(r.processing_time for r in analysis_responses if r.success) / max(1, len([r for r in analysis_responses if r.success]))
}
}