""" Progress reporting for job processing with WebSocket integration """ import logging from datetime import datetime from typing import Dict, Any, Optional from ..jobs.models import Job, JobPhase, ProviderUpdate from ..ws.manager import WebSocketManager logger = logging.getLogger(__name__) class ProgressReporter: """ Reports progress updates for job processing with WebSocket broadcasting """ def __init__(self, job: Job, ws_manager: WebSocketManager): self.job = job self.ws_manager = ws_manager self.logger = logging.getLogger(f"{__name__}.{job.id}") async def emit( self, phase: JobPhase, progress_pct: int, message: str = "", step_label: str = "" ): """ Emit progress update for job Args: phase: Current processing phase progress_pct: Progress percentage (0-100) message: Optional progress message step_label: Optional custom step label """ try: # Update job progress self.job.update_progress(phase, progress_pct, step_label) # Add log entry if message: self.job.add_log('INFO', message) self.logger.info(message) # Broadcast progress update — include full job so frontend can updateJob() await self.ws_manager.broadcast_job_update(self.job.id, { 'type': 'job.progress', 'jobId': self.job.id, 'phase': phase.value if hasattr(phase, 'value') else phase, 'progressPct': progress_pct, 'message': message, 'stepLabel': self.job.step_label, 'providerUpdates': {k: v.to_dict() for k, v in self.job.provider_updates.items()}, 'job': self.job.to_dict(), }) self.logger.debug(f"Progress update: {phase.value if hasattr(phase, 'value') else phase} {progress_pct}% - {message}") except Exception as e: self.logger.error(f"Failed to emit progress update: {e}") # Don't re-raise to avoid breaking the processing pipeline async def emit_provider_update( self, model_key: str, update_data: Dict[str, Any] ): """ Emit provider-specific update Args: model_key: Model identifier (e.g., 'openai-gpt51') update_data: Provider update information """ try: # Create provider update object provider_update = ProviderUpdate( provider=update_data.get('provider', ''), model=update_data.get('model', ''), status=update_data.get('status', ''), started_at=update_data.get('startedAt'), completed_at=update_data.get('completedAt'), latency_ms=update_data.get('latencyMs'), tokens_in=update_data.get('tokensIn'), tokens_out=update_data.get('tokensOut'), tokens_cached=update_data.get('tokensCached'), cost_usd=update_data.get('costUsd'), error=update_data.get('error') ) # Update job self.job.update_provider(model_key, provider_update) # Log provider update status_msg = f"Provider {model_key}: {provider_update.status}" if provider_update.error: status_msg += f" - {provider_update.error}" self.job.add_log('ERROR', status_msg) self.logger.error(status_msg) else: self.job.add_log('INFO', status_msg) self.logger.info(status_msg) # Broadcast provider update await self.ws_manager.broadcast_job_update(self.job.id, { 'type': 'job.provider_update', 'jobId': self.job.id, 'modelKey': model_key, 'update': provider_update.to_dict() }) self.logger.debug(f"Provider update: {model_key} - {provider_update.status}") except Exception as e: self.logger.error(f"Failed to emit provider update for {model_key}: {e}") async def emit_log(self, level: str, message: str): """ Emit log message with WebSocket streaming Args: level: Log level (DEBUG, INFO, WARNING, ERROR) message: Log message """ try: # Add to job logs self.job.add_log(level, message) # Log to system logger getattr(self.logger, level.lower(), self.logger.info)(message) # Broadcast log entry await self.ws_manager.broadcast_job_update(self.job.id, { 'type': 'job.log', 'jobId': self.job.id, 'logEntry': { 'timestamp': datetime.utcnow().isoformat(), 'level': level, 'message': message } }) except Exception as e: self.logger.error(f"Failed to emit log message: {e}") async def calculate_analysis_progress( self, base_progress: int, completed_providers: int, total_providers: int, analysis_weight: int = 50 ) -> int: """ Calculate progress percentage for LLM analysis phase Args: base_progress: Starting progress percentage (usually 25) completed_providers: Number of completed providers total_providers: Total number of providers analysis_weight: Weight of analysis phase in total progress Returns: Updated progress percentage """ if total_providers == 0: return base_progress analysis_progress = (completed_providers / total_providers) * analysis_weight return min(100, base_progress + int(analysis_progress)) async def emit_completion( self, result_csv_url: str, summary_data: Dict[str, Any] ): """ Emit job completion event Args: result_csv_url: URL to download CSV result summary_data: Job summary information """ try: self.job.add_log('INFO', 'Processing completed successfully') # Broadcast completion — include full job so frontend can updateJob() await self.ws_manager.broadcast_job_update(self.job.id, { 'type': 'job.completed', 'jobId': self.job.id, 'resultCsvUrl': result_csv_url, 'summary': summary_data, 'job': self.job.to_dict(), }) self.logger.info(f"Job {self.job.id} completed successfully") except Exception as e: self.logger.error(f"Failed to emit completion event: {e}") async def emit_failure(self, error: str): """ Emit job failure event Args: error: Error message """ try: self.job.mark_failed(error) self.job.add_log('ERROR', f'Processing failed: {error}') # Broadcast failure — include full job so frontend can updateJob() await self.ws_manager.broadcast_job_update(self.job.id, { 'type': 'job.failed', 'jobId': self.job.id, 'error': error, 'job': self.job.to_dict(), }) self.logger.error(f"Job {self.job.id} failed: {error}") except Exception as e: self.logger.error(f"Failed to emit failure event: {e}") class JobLogHandler(logging.Handler): """ Custom logging handler that routes job-specific logs to WebSocket clients """ def __init__(self, job_id: str, ws_manager: WebSocketManager): super().__init__() self.job_id = job_id self.ws_manager = ws_manager # Set up formatter for log messages self.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) def emit(self, record): """ Process a log record and send it via WebSocket Args: record: LogRecord to process """ try: # Format the message message = self.format(record) # Create log entry log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'level': record.levelname, 'message': message, 'logger': record.name } # Send via WebSocket (non-blocking) import asyncio try: loop = asyncio.get_event_loop() loop.create_task(self.ws_manager.broadcast_job_update(self.job_id, { 'type': 'job.log', 'jobId': self.job_id, 'logEntry': log_entry })) except RuntimeError: # No event loop available, skip WebSocket update pass except Exception as e: # Don't let logging errors break the application print(f"JobLogHandler error: {e}") def create_job_logger(job_id: str, ws_manager: WebSocketManager) -> logging.Logger: """ Create a job-specific logger with WebSocket streaming Args: job_id: Job identifier ws_manager: WebSocket manager instance Returns: Logger instance with job-specific handler """ logger = logging.getLogger(f"job.{job_id}") # Remove existing handlers to avoid duplicates logger.handlers.clear() # Add job-specific handler handler = JobLogHandler(job_id, ws_manager) handler.setLevel(logging.INFO) logger.addHandler(handler) # Set logger level logger.setLevel(logging.INFO) return logger