Three bugs fixed: 1. api/jobs.ts: remove manual Content-Type header on FormData upload. Setting it without the multipart boundary caused Quart to reject the request body — the root cause of brief upload failures. 2. progress.py: include full job.to_dict() in job.progress / job.completed / job.failed WebSocket messages. Frontend checks msg.job to call updateJob() — without it, job cards never updated in real-time. 3. AppShell: move useWebSocket() here from BriefUploadPage so the WS connection persists across all pages, not just the upload page. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
304 lines
No EOL
9.7 KiB
Python
Executable file
304 lines
No EOL
9.7 KiB
Python
Executable file
"""
|
|
Progress reporting for job processing with WebSocket integration
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
|
|
from ..jobs.models import Job, JobPhase, ProviderUpdate
|
|
from ..ws.manager import WebSocketManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ProgressReporter:
|
|
"""
|
|
Reports progress updates for job processing with WebSocket broadcasting
|
|
"""
|
|
|
|
def __init__(self, job: Job, ws_manager: WebSocketManager):
|
|
self.job = job
|
|
self.ws_manager = ws_manager
|
|
self.logger = logging.getLogger(f"{__name__}.{job.id}")
|
|
|
|
async def emit(
|
|
self,
|
|
phase: JobPhase,
|
|
progress_pct: int,
|
|
message: str = "",
|
|
step_label: str = ""
|
|
):
|
|
"""
|
|
Emit progress update for job
|
|
|
|
Args:
|
|
phase: Current processing phase
|
|
progress_pct: Progress percentage (0-100)
|
|
message: Optional progress message
|
|
step_label: Optional custom step label
|
|
"""
|
|
try:
|
|
# Update job progress
|
|
self.job.update_progress(phase, progress_pct, step_label)
|
|
|
|
# Add log entry
|
|
if message:
|
|
self.job.add_log('INFO', message)
|
|
self.logger.info(message)
|
|
|
|
# Broadcast progress update — include full job so frontend can updateJob()
|
|
await self.ws_manager.broadcast_job_update(self.job.id, {
|
|
'type': 'job.progress',
|
|
'jobId': self.job.id,
|
|
'phase': phase.value if hasattr(phase, 'value') else phase,
|
|
'progressPct': progress_pct,
|
|
'message': message,
|
|
'stepLabel': self.job.step_label,
|
|
'providerUpdates': {k: v.to_dict() for k, v in self.job.provider_updates.items()},
|
|
'job': self.job.to_dict(),
|
|
})
|
|
|
|
self.logger.debug(f"Progress update: {phase.value if hasattr(phase, 'value') else phase} {progress_pct}% - {message}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to emit progress update: {e}")
|
|
# Don't re-raise to avoid breaking the processing pipeline
|
|
|
|
async def emit_provider_update(
|
|
self,
|
|
model_key: str,
|
|
update_data: Dict[str, Any]
|
|
):
|
|
"""
|
|
Emit provider-specific update
|
|
|
|
Args:
|
|
model_key: Model identifier (e.g., 'openai-gpt51')
|
|
update_data: Provider update information
|
|
"""
|
|
try:
|
|
# Create provider update object
|
|
provider_update = ProviderUpdate(
|
|
provider=update_data.get('provider', ''),
|
|
model=update_data.get('model', ''),
|
|
status=update_data.get('status', ''),
|
|
started_at=update_data.get('startedAt'),
|
|
completed_at=update_data.get('completedAt'),
|
|
latency_ms=update_data.get('latencyMs'),
|
|
tokens_in=update_data.get('tokensIn'),
|
|
tokens_out=update_data.get('tokensOut'),
|
|
tokens_cached=update_data.get('tokensCached'),
|
|
cost_usd=update_data.get('costUsd'),
|
|
error=update_data.get('error')
|
|
)
|
|
|
|
# Update job
|
|
self.job.update_provider(model_key, provider_update)
|
|
|
|
# Log provider update
|
|
status_msg = f"Provider {model_key}: {provider_update.status}"
|
|
if provider_update.error:
|
|
status_msg += f" - {provider_update.error}"
|
|
self.job.add_log('ERROR', status_msg)
|
|
self.logger.error(status_msg)
|
|
else:
|
|
self.job.add_log('INFO', status_msg)
|
|
self.logger.info(status_msg)
|
|
|
|
# Broadcast provider update
|
|
await self.ws_manager.broadcast_job_update(self.job.id, {
|
|
'type': 'job.provider_update',
|
|
'jobId': self.job.id,
|
|
'modelKey': model_key,
|
|
'update': provider_update.to_dict()
|
|
})
|
|
|
|
self.logger.debug(f"Provider update: {model_key} - {provider_update.status}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to emit provider update for {model_key}: {e}")
|
|
|
|
async def emit_log(self, level: str, message: str):
|
|
"""
|
|
Emit log message with WebSocket streaming
|
|
|
|
Args:
|
|
level: Log level (DEBUG, INFO, WARNING, ERROR)
|
|
message: Log message
|
|
"""
|
|
try:
|
|
# Add to job logs
|
|
self.job.add_log(level, message)
|
|
|
|
# Log to system logger
|
|
getattr(self.logger, level.lower(), self.logger.info)(message)
|
|
|
|
# Broadcast log entry
|
|
await self.ws_manager.broadcast_job_update(self.job.id, {
|
|
'type': 'job.log',
|
|
'jobId': self.job.id,
|
|
'logEntry': {
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'level': level,
|
|
'message': message
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to emit log message: {e}")
|
|
|
|
async def calculate_analysis_progress(
|
|
self,
|
|
base_progress: int,
|
|
completed_providers: int,
|
|
total_providers: int,
|
|
analysis_weight: int = 50
|
|
) -> int:
|
|
"""
|
|
Calculate progress percentage for LLM analysis phase
|
|
|
|
Args:
|
|
base_progress: Starting progress percentage (usually 25)
|
|
completed_providers: Number of completed providers
|
|
total_providers: Total number of providers
|
|
analysis_weight: Weight of analysis phase in total progress
|
|
|
|
Returns:
|
|
Updated progress percentage
|
|
"""
|
|
if total_providers == 0:
|
|
return base_progress
|
|
|
|
analysis_progress = (completed_providers / total_providers) * analysis_weight
|
|
return min(100, base_progress + int(analysis_progress))
|
|
|
|
async def emit_completion(
|
|
self,
|
|
result_csv_url: str,
|
|
summary_data: Dict[str, Any]
|
|
):
|
|
"""
|
|
Emit job completion event
|
|
|
|
Args:
|
|
result_csv_url: URL to download CSV result
|
|
summary_data: Job summary information
|
|
"""
|
|
try:
|
|
self.job.add_log('INFO', 'Processing completed successfully')
|
|
|
|
# Broadcast completion — include full job so frontend can updateJob()
|
|
await self.ws_manager.broadcast_job_update(self.job.id, {
|
|
'type': 'job.completed',
|
|
'jobId': self.job.id,
|
|
'resultCsvUrl': result_csv_url,
|
|
'summary': summary_data,
|
|
'job': self.job.to_dict(),
|
|
})
|
|
|
|
self.logger.info(f"Job {self.job.id} completed successfully")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to emit completion event: {e}")
|
|
|
|
async def emit_failure(self, error: str):
|
|
"""
|
|
Emit job failure event
|
|
|
|
Args:
|
|
error: Error message
|
|
"""
|
|
try:
|
|
self.job.mark_failed(error)
|
|
self.job.add_log('ERROR', f'Processing failed: {error}')
|
|
|
|
# Broadcast failure — include full job so frontend can updateJob()
|
|
await self.ws_manager.broadcast_job_update(self.job.id, {
|
|
'type': 'job.failed',
|
|
'jobId': self.job.id,
|
|
'error': error,
|
|
'job': self.job.to_dict(),
|
|
})
|
|
|
|
self.logger.error(f"Job {self.job.id} failed: {error}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to emit failure event: {e}")
|
|
|
|
|
|
class JobLogHandler(logging.Handler):
|
|
"""
|
|
Custom logging handler that routes job-specific logs to WebSocket clients
|
|
"""
|
|
|
|
def __init__(self, job_id: str, ws_manager: WebSocketManager):
|
|
super().__init__()
|
|
self.job_id = job_id
|
|
self.ws_manager = ws_manager
|
|
|
|
# Set up formatter for log messages
|
|
self.setFormatter(logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
))
|
|
|
|
def emit(self, record):
|
|
"""
|
|
Process a log record and send it via WebSocket
|
|
|
|
Args:
|
|
record: LogRecord to process
|
|
"""
|
|
try:
|
|
# Format the message
|
|
message = self.format(record)
|
|
|
|
# Create log entry
|
|
log_entry = {
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'level': record.levelname,
|
|
'message': message,
|
|
'logger': record.name
|
|
}
|
|
|
|
# Send via WebSocket (non-blocking)
|
|
import asyncio
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(self.ws_manager.broadcast_job_update(self.job_id, {
|
|
'type': 'job.log',
|
|
'jobId': self.job_id,
|
|
'logEntry': log_entry
|
|
}))
|
|
except RuntimeError:
|
|
# No event loop available, skip WebSocket update
|
|
pass
|
|
|
|
except Exception as e:
|
|
# Don't let logging errors break the application
|
|
print(f"JobLogHandler error: {e}")
|
|
|
|
def create_job_logger(job_id: str, ws_manager: WebSocketManager) -> logging.Logger:
|
|
"""
|
|
Create a job-specific logger with WebSocket streaming
|
|
|
|
Args:
|
|
job_id: Job identifier
|
|
ws_manager: WebSocket manager instance
|
|
|
|
Returns:
|
|
Logger instance with job-specific handler
|
|
"""
|
|
logger = logging.getLogger(f"job.{job_id}")
|
|
|
|
# Remove existing handlers to avoid duplicates
|
|
logger.handlers.clear()
|
|
|
|
# Add job-specific handler
|
|
handler = JobLogHandler(job_id, ws_manager)
|
|
handler.setLevel(logging.INFO)
|
|
logger.addHandler(handler)
|
|
|
|
# Set logger level
|
|
logger.setLevel(logging.INFO)
|
|
|
|
return logger |