ac-tool/backend/server/runners/progress.py
Vadym Samoilenko f85d6a6b51 fix: repair brief upload and real-time job progress
Three bugs fixed:

1. api/jobs.ts: remove manual Content-Type header on FormData upload.
   Setting it without the multipart boundary caused Quart to reject the
   request body — the root cause of brief upload failures.

2. progress.py: include full job.to_dict() in job.progress / job.completed
   / job.failed WebSocket messages. Frontend checks msg.job to call
   updateJob() — without it, job cards never updated in real-time.

3. AppShell: move useWebSocket() here from BriefUploadPage so the WS
   connection persists across all pages, not just the upload page.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 15:05:23 +00:00

304 lines
No EOL
9.7 KiB
Python
Executable file

"""
Progress reporting for job processing with WebSocket integration
"""
import logging
from datetime import datetime
from typing import Dict, Any, Optional
from ..jobs.models import Job, JobPhase, ProviderUpdate
from ..ws.manager import WebSocketManager
logger = logging.getLogger(__name__)
class ProgressReporter:
"""
Reports progress updates for job processing with WebSocket broadcasting
"""
def __init__(self, job: Job, ws_manager: WebSocketManager):
self.job = job
self.ws_manager = ws_manager
self.logger = logging.getLogger(f"{__name__}.{job.id}")
async def emit(
self,
phase: JobPhase,
progress_pct: int,
message: str = "",
step_label: str = ""
):
"""
Emit progress update for job
Args:
phase: Current processing phase
progress_pct: Progress percentage (0-100)
message: Optional progress message
step_label: Optional custom step label
"""
try:
# Update job progress
self.job.update_progress(phase, progress_pct, step_label)
# Add log entry
if message:
self.job.add_log('INFO', message)
self.logger.info(message)
# Broadcast progress update — include full job so frontend can updateJob()
await self.ws_manager.broadcast_job_update(self.job.id, {
'type': 'job.progress',
'jobId': self.job.id,
'phase': phase.value if hasattr(phase, 'value') else phase,
'progressPct': progress_pct,
'message': message,
'stepLabel': self.job.step_label,
'providerUpdates': {k: v.to_dict() for k, v in self.job.provider_updates.items()},
'job': self.job.to_dict(),
})
self.logger.debug(f"Progress update: {phase.value if hasattr(phase, 'value') else phase} {progress_pct}% - {message}")
except Exception as e:
self.logger.error(f"Failed to emit progress update: {e}")
# Don't re-raise to avoid breaking the processing pipeline
async def emit_provider_update(
self,
model_key: str,
update_data: Dict[str, Any]
):
"""
Emit provider-specific update
Args:
model_key: Model identifier (e.g., 'openai-gpt51')
update_data: Provider update information
"""
try:
# Create provider update object
provider_update = ProviderUpdate(
provider=update_data.get('provider', ''),
model=update_data.get('model', ''),
status=update_data.get('status', ''),
started_at=update_data.get('startedAt'),
completed_at=update_data.get('completedAt'),
latency_ms=update_data.get('latencyMs'),
tokens_in=update_data.get('tokensIn'),
tokens_out=update_data.get('tokensOut'),
tokens_cached=update_data.get('tokensCached'),
cost_usd=update_data.get('costUsd'),
error=update_data.get('error')
)
# Update job
self.job.update_provider(model_key, provider_update)
# Log provider update
status_msg = f"Provider {model_key}: {provider_update.status}"
if provider_update.error:
status_msg += f" - {provider_update.error}"
self.job.add_log('ERROR', status_msg)
self.logger.error(status_msg)
else:
self.job.add_log('INFO', status_msg)
self.logger.info(status_msg)
# Broadcast provider update
await self.ws_manager.broadcast_job_update(self.job.id, {
'type': 'job.provider_update',
'jobId': self.job.id,
'modelKey': model_key,
'update': provider_update.to_dict()
})
self.logger.debug(f"Provider update: {model_key} - {provider_update.status}")
except Exception as e:
self.logger.error(f"Failed to emit provider update for {model_key}: {e}")
async def emit_log(self, level: str, message: str):
"""
Emit log message with WebSocket streaming
Args:
level: Log level (DEBUG, INFO, WARNING, ERROR)
message: Log message
"""
try:
# Add to job logs
self.job.add_log(level, message)
# Log to system logger
getattr(self.logger, level.lower(), self.logger.info)(message)
# Broadcast log entry
await self.ws_manager.broadcast_job_update(self.job.id, {
'type': 'job.log',
'jobId': self.job.id,
'logEntry': {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'message': message
}
})
except Exception as e:
self.logger.error(f"Failed to emit log message: {e}")
async def calculate_analysis_progress(
self,
base_progress: int,
completed_providers: int,
total_providers: int,
analysis_weight: int = 50
) -> int:
"""
Calculate progress percentage for LLM analysis phase
Args:
base_progress: Starting progress percentage (usually 25)
completed_providers: Number of completed providers
total_providers: Total number of providers
analysis_weight: Weight of analysis phase in total progress
Returns:
Updated progress percentage
"""
if total_providers == 0:
return base_progress
analysis_progress = (completed_providers / total_providers) * analysis_weight
return min(100, base_progress + int(analysis_progress))
async def emit_completion(
self,
result_csv_url: str,
summary_data: Dict[str, Any]
):
"""
Emit job completion event
Args:
result_csv_url: URL to download CSV result
summary_data: Job summary information
"""
try:
self.job.add_log('INFO', 'Processing completed successfully')
# Broadcast completion — include full job so frontend can updateJob()
await self.ws_manager.broadcast_job_update(self.job.id, {
'type': 'job.completed',
'jobId': self.job.id,
'resultCsvUrl': result_csv_url,
'summary': summary_data,
'job': self.job.to_dict(),
})
self.logger.info(f"Job {self.job.id} completed successfully")
except Exception as e:
self.logger.error(f"Failed to emit completion event: {e}")
async def emit_failure(self, error: str):
"""
Emit job failure event
Args:
error: Error message
"""
try:
self.job.mark_failed(error)
self.job.add_log('ERROR', f'Processing failed: {error}')
# Broadcast failure — include full job so frontend can updateJob()
await self.ws_manager.broadcast_job_update(self.job.id, {
'type': 'job.failed',
'jobId': self.job.id,
'error': error,
'job': self.job.to_dict(),
})
self.logger.error(f"Job {self.job.id} failed: {error}")
except Exception as e:
self.logger.error(f"Failed to emit failure event: {e}")
class JobLogHandler(logging.Handler):
"""
Custom logging handler that routes job-specific logs to WebSocket clients
"""
def __init__(self, job_id: str, ws_manager: WebSocketManager):
super().__init__()
self.job_id = job_id
self.ws_manager = ws_manager
# Set up formatter for log messages
self.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
def emit(self, record):
"""
Process a log record and send it via WebSocket
Args:
record: LogRecord to process
"""
try:
# Format the message
message = self.format(record)
# Create log entry
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': message,
'logger': record.name
}
# Send via WebSocket (non-blocking)
import asyncio
try:
loop = asyncio.get_event_loop()
loop.create_task(self.ws_manager.broadcast_job_update(self.job_id, {
'type': 'job.log',
'jobId': self.job_id,
'logEntry': log_entry
}))
except RuntimeError:
# No event loop available, skip WebSocket update
pass
except Exception as e:
# Don't let logging errors break the application
print(f"JobLogHandler error: {e}")
def create_job_logger(job_id: str, ws_manager: WebSocketManager) -> logging.Logger:
"""
Create a job-specific logger with WebSocket streaming
Args:
job_id: Job identifier
ws_manager: WebSocket manager instance
Returns:
Logger instance with job-specific handler
"""
logger = logging.getLogger(f"job.{job_id}")
# Remove existing handlers to avoid duplicates
logger.handlers.clear()
# Add job-specific handler
handler = JobLogHandler(job_id, ws_manager)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
# Set logger level
logger.setLevel(logging.INFO)
return logger