ac-tool/backend/server/runners/job_runner.py
Vadym Samoilenko d71a044a3c Fix model config alignment and improve error messaging
- Replace sys.exit(1) with ValueError in _validate_models() — prevents killing the worker process on invalid model keys
- ModelConfiguration now reads defaults from core.config instead of hardcoding model keys
- Fix .env: google-gemini20 → google-gemini31 (align with MODEL_MAPPINGS)
- Improve "No data extracted" error message to explain the document must be a marketing brief

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 15:33:54 +00:00

258 lines
No EOL
8.9 KiB
Python
Executable file

"""
Job runner that orchestrates document processing with progress reporting
"""
import asyncio
import logging
import os
import time
from datetime import datetime
from typing import Dict, Any
from ..jobs.models import Job, JobPhase, JobSummary
from ..jobs.storage import StorageManager
from ..ws.manager import WebSocketManager
from .progress import ProgressReporter, create_job_logger
from core.process_brief_enhanced import DocumentAnalyzer
logger = logging.getLogger(__name__)
async def run_job(job: Job, ws_manager: WebSocketManager) -> bool:
"""
Execute a document processing job with progress reporting
Args:
job: Job to process
ws_manager: WebSocket manager for real-time updates
Returns:
True if job completed successfully, False otherwise
"""
start_time = time.time()
job_logger = create_job_logger(job.id, ws_manager)
try:
# Create progress reporter
progress = ProgressReporter(job, ws_manager)
# Create analyzer with model configuration
analyzer = DocumentAnalyzer(
primary_models=job.model_config.primary_models,
consolidation_model=job.model_config.consolidation_model
)
# Mark as GUI mode to suppress legacy print statements
analyzer._is_gui_mode = True
await progress.emit_log('INFO', f"Starting processing of {job.file_name}")
await progress.emit_log('INFO', f"File size: {job.file_size:,} bytes")
await progress.emit_log('INFO', f"Selected models: {', '.join(job.model_config.primary_models)}")
await progress.emit_log('INFO', f"Consolidation model: {job.model_config.consolidation_model}")
# Validate upload path exists
if not job.upload_path or not os.path.exists(job.upload_path):
error_msg = f"Upload file not found: {job.upload_path}"
await progress.emit_failure(error_msg)
return False
# Process document
result = await analyzer.process_document_multi_model(job.upload_path, progress)
if not result.raw_data:
notes = "; ".join(result.processing_notes) if result.processing_notes else ""
error_msg = (
"No marketing deliverables found in this document. "
"Please ensure you upload a marketing brief (PDF, PPTX, DOCX, or XLSX) "
"containing campaign assets or deliverables."
)
if notes:
error_msg += f" (Details: {notes})"
await progress.emit_failure(error_msg)
return False
# Generate output CSV
await progress.emit(JobPhase.CSV_GENERATION, 95, "Generating CSV output")
storage = StorageManager()
output_path = storage.get_output_path(job.id, job.file_name)
# Write CSV file asynchronously
import csv
import asyncio
def _write_csv():
"""Blocking CSV write operation for thread pool"""
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
if result.raw_data:
# Get headers from first record
headers = list(result.raw_data[0].keys())
writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore')
writer.writeheader()
writer.writerows(result.raw_data)
# Run CSV writing in thread pool to avoid blocking event loop
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _write_csv)
# Create job summary
processing_time = time.time() - start_time
summary = create_job_summary(result, processing_time)
# Generate CSV download URL
result_csv_url = f"/api/jobs/{job.id}/download"
# Mark job as completed
job.mark_completed(result_csv_url, summary, output_path)
# Emit completion event
await progress.emit_completion(result_csv_url, summary.to_dict())
await progress.emit_log('INFO', f"Processing completed in {processing_time:.1f} seconds")
await progress.emit_log('INFO', f"Extracted {len(result.raw_data)} marketing assets")
await progress.emit_log('INFO', f"Total cost: ${summary.cost_usd_total:.4f}")
await progress.emit_log('INFO', f"Total tokens: {summary.tokens_total:,}")
logger.info(f"Job {job.id} completed successfully: {len(result.raw_data)} assets, "
f"${summary.cost_usd_total:.4f}, {processing_time:.1f}s")
return True
except Exception as e:
error_msg = f"Job processing failed: {str(e)}"
logger.error(f"Job {job.id} failed: {error_msg}", exc_info=True)
try:
progress = ProgressReporter(job, ws_manager)
await progress.emit_failure(error_msg)
except:
# Fallback if progress reporter fails
job.mark_failed(error_msg)
return False
async def process_job_queue(job_manager, ws_manager: WebSocketManager):
"""
Background worker that processes jobs from the queue
Args:
job_manager: JobManager instance
ws_manager: WebSocket manager for updates
"""
logger.info("Starting job queue processor")
while True:
try:
# Get next job from queue (blocks until available)
job_id = await job_manager.queue.get()
# Get job details
job = await job_manager.get_job(job_id)
if not job:
logger.warning(f"Job {job_id} not found in registry")
job_manager.queue.task_done()
continue
logger.info(f"Processing job {job_id}: {job.file_name}")
# Check queue size for debugging
queue_size = job_manager.queue.qsize()
logger.info(f"Queue size before processing: {queue_size}")
# Acquire semaphore for concurrency control
async with job_manager.processing_semaphore:
# Process the job
success = await run_job(job, ws_manager)
if success:
logger.info(f"Job {job_id} completed successfully")
else:
logger.error(f"Job {job_id} failed")
# Mark task as done
job_manager.queue.task_done()
# Check queue size after processing
remaining_queue_size = job_manager.queue.qsize()
logger.info(f"Queue size after processing: {remaining_queue_size}")
except asyncio.CancelledError:
logger.info("Job queue processor cancelled")
break
except Exception as e:
logger.error(f"Error in job queue processor: {e}", exc_info=True)
# Continue processing other jobs
try:
job_manager.queue.task_done()
except:
pass
async def start_background_workers(job_manager, ws_manager: WebSocketManager, num_workers: int = 1):
"""
Start background worker tasks for job processing
Args:
job_manager: JobManager instance
ws_manager: WebSocket manager
num_workers: Number of worker tasks to start
Returns:
List of worker tasks
"""
workers = []
for i in range(num_workers):
worker = asyncio.create_task(
process_job_queue(job_manager, ws_manager),
name=f"job-worker-{i}"
)
workers.append(worker)
logger.info(f"Started job worker {i}")
return workers
async def stop_background_workers(workers):
"""
Stop background worker tasks
Args:
workers: List of worker tasks to stop
"""
logger.info("Stopping background workers...")
for worker in workers:
worker.cancel()
# Wait for workers to finish
try:
await asyncio.gather(*workers, return_exceptions=True)
except Exception as e:
logger.warning(f"Error stopping workers: {e}")
logger.info("Background workers stopped")
def create_job_summary(result, processing_time: float) -> JobSummary:
"""
Create job summary from processing result
Args:
result: ProcessingResult from DocumentAnalyzer
processing_time: Total processing time in seconds
Returns:
JobSummary object
"""
# Extract cost information
consolidation_metadata = result.metadata.get('consolidation_metadata', {})
cost_breakdown = consolidation_metadata.get('cost_breakdown', {})
token_usage = consolidation_metadata.get('token_usage', {})
return JobSummary(
doc_type=result.metadata.get('doc_type', 'unknown'),
assets_extracted=len(result.raw_data),
confidence_score=result.confidence_score,
notes=result.processing_notes,
cost_usd_total=cost_breakdown.get('total_cost', 0),
tokens_total=token_usage.get('grand_total', 0),
primary_models=result.metadata.get('primary_models_used', []),
consolidation_model=result.metadata.get('consolidation_model', ''),
processing_time_seconds=processing_time
)