- Replace sys.exit(1) with ValueError in _validate_models() — prevents killing the worker process on invalid model keys - ModelConfiguration now reads defaults from core.config instead of hardcoding model keys - Fix .env: google-gemini20 → google-gemini31 (align with MODEL_MAPPINGS) - Improve "No data extracted" error message to explain the document must be a marketing brief Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
258 lines
No EOL
8.9 KiB
Python
Executable file
258 lines
No EOL
8.9 KiB
Python
Executable file
"""
|
|
Job runner that orchestrates document processing with progress reporting
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
|
|
from ..jobs.models import Job, JobPhase, JobSummary
|
|
from ..jobs.storage import StorageManager
|
|
from ..ws.manager import WebSocketManager
|
|
from .progress import ProgressReporter, create_job_logger
|
|
from core.process_brief_enhanced import DocumentAnalyzer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def run_job(job: Job, ws_manager: WebSocketManager) -> bool:
|
|
"""
|
|
Execute a document processing job with progress reporting
|
|
|
|
Args:
|
|
job: Job to process
|
|
ws_manager: WebSocket manager for real-time updates
|
|
|
|
Returns:
|
|
True if job completed successfully, False otherwise
|
|
"""
|
|
start_time = time.time()
|
|
job_logger = create_job_logger(job.id, ws_manager)
|
|
|
|
try:
|
|
# Create progress reporter
|
|
progress = ProgressReporter(job, ws_manager)
|
|
|
|
# Create analyzer with model configuration
|
|
analyzer = DocumentAnalyzer(
|
|
primary_models=job.model_config.primary_models,
|
|
consolidation_model=job.model_config.consolidation_model
|
|
)
|
|
# Mark as GUI mode to suppress legacy print statements
|
|
analyzer._is_gui_mode = True
|
|
|
|
await progress.emit_log('INFO', f"Starting processing of {job.file_name}")
|
|
await progress.emit_log('INFO', f"File size: {job.file_size:,} bytes")
|
|
await progress.emit_log('INFO', f"Selected models: {', '.join(job.model_config.primary_models)}")
|
|
await progress.emit_log('INFO', f"Consolidation model: {job.model_config.consolidation_model}")
|
|
|
|
# Validate upload path exists
|
|
if not job.upload_path or not os.path.exists(job.upload_path):
|
|
error_msg = f"Upload file not found: {job.upload_path}"
|
|
await progress.emit_failure(error_msg)
|
|
return False
|
|
|
|
# Process document
|
|
result = await analyzer.process_document_multi_model(job.upload_path, progress)
|
|
|
|
if not result.raw_data:
|
|
notes = "; ".join(result.processing_notes) if result.processing_notes else ""
|
|
error_msg = (
|
|
"No marketing deliverables found in this document. "
|
|
"Please ensure you upload a marketing brief (PDF, PPTX, DOCX, or XLSX) "
|
|
"containing campaign assets or deliverables."
|
|
)
|
|
if notes:
|
|
error_msg += f" (Details: {notes})"
|
|
await progress.emit_failure(error_msg)
|
|
return False
|
|
|
|
# Generate output CSV
|
|
await progress.emit(JobPhase.CSV_GENERATION, 95, "Generating CSV output")
|
|
|
|
storage = StorageManager()
|
|
output_path = storage.get_output_path(job.id, job.file_name)
|
|
|
|
# Write CSV file asynchronously
|
|
import csv
|
|
import asyncio
|
|
|
|
def _write_csv():
|
|
"""Blocking CSV write operation for thread pool"""
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
|
|
if result.raw_data:
|
|
# Get headers from first record
|
|
headers = list(result.raw_data[0].keys())
|
|
writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore')
|
|
writer.writeheader()
|
|
writer.writerows(result.raw_data)
|
|
|
|
# Run CSV writing in thread pool to avoid blocking event loop
|
|
loop = asyncio.get_running_loop()
|
|
await loop.run_in_executor(None, _write_csv)
|
|
|
|
# Create job summary
|
|
processing_time = time.time() - start_time
|
|
summary = create_job_summary(result, processing_time)
|
|
|
|
# Generate CSV download URL
|
|
result_csv_url = f"/api/jobs/{job.id}/download"
|
|
|
|
# Mark job as completed
|
|
job.mark_completed(result_csv_url, summary, output_path)
|
|
|
|
# Emit completion event
|
|
await progress.emit_completion(result_csv_url, summary.to_dict())
|
|
|
|
await progress.emit_log('INFO', f"Processing completed in {processing_time:.1f} seconds")
|
|
await progress.emit_log('INFO', f"Extracted {len(result.raw_data)} marketing assets")
|
|
await progress.emit_log('INFO', f"Total cost: ${summary.cost_usd_total:.4f}")
|
|
await progress.emit_log('INFO', f"Total tokens: {summary.tokens_total:,}")
|
|
|
|
logger.info(f"Job {job.id} completed successfully: {len(result.raw_data)} assets, "
|
|
f"${summary.cost_usd_total:.4f}, {processing_time:.1f}s")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = f"Job processing failed: {str(e)}"
|
|
logger.error(f"Job {job.id} failed: {error_msg}", exc_info=True)
|
|
|
|
try:
|
|
progress = ProgressReporter(job, ws_manager)
|
|
await progress.emit_failure(error_msg)
|
|
except:
|
|
# Fallback if progress reporter fails
|
|
job.mark_failed(error_msg)
|
|
|
|
return False
|
|
|
|
async def process_job_queue(job_manager, ws_manager: WebSocketManager):
|
|
"""
|
|
Background worker that processes jobs from the queue
|
|
|
|
Args:
|
|
job_manager: JobManager instance
|
|
ws_manager: WebSocket manager for updates
|
|
"""
|
|
logger.info("Starting job queue processor")
|
|
|
|
while True:
|
|
try:
|
|
# Get next job from queue (blocks until available)
|
|
job_id = await job_manager.queue.get()
|
|
|
|
# Get job details
|
|
job = await job_manager.get_job(job_id)
|
|
if not job:
|
|
logger.warning(f"Job {job_id} not found in registry")
|
|
job_manager.queue.task_done()
|
|
continue
|
|
|
|
logger.info(f"Processing job {job_id}: {job.file_name}")
|
|
|
|
# Check queue size for debugging
|
|
queue_size = job_manager.queue.qsize()
|
|
logger.info(f"Queue size before processing: {queue_size}")
|
|
|
|
# Acquire semaphore for concurrency control
|
|
async with job_manager.processing_semaphore:
|
|
# Process the job
|
|
success = await run_job(job, ws_manager)
|
|
|
|
if success:
|
|
logger.info(f"Job {job_id} completed successfully")
|
|
else:
|
|
logger.error(f"Job {job_id} failed")
|
|
|
|
# Mark task as done
|
|
job_manager.queue.task_done()
|
|
|
|
# Check queue size after processing
|
|
remaining_queue_size = job_manager.queue.qsize()
|
|
logger.info(f"Queue size after processing: {remaining_queue_size}")
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Job queue processor cancelled")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in job queue processor: {e}", exc_info=True)
|
|
# Continue processing other jobs
|
|
try:
|
|
job_manager.queue.task_done()
|
|
except:
|
|
pass
|
|
|
|
async def start_background_workers(job_manager, ws_manager: WebSocketManager, num_workers: int = 1):
|
|
"""
|
|
Start background worker tasks for job processing
|
|
|
|
Args:
|
|
job_manager: JobManager instance
|
|
ws_manager: WebSocket manager
|
|
num_workers: Number of worker tasks to start
|
|
|
|
Returns:
|
|
List of worker tasks
|
|
"""
|
|
workers = []
|
|
|
|
for i in range(num_workers):
|
|
worker = asyncio.create_task(
|
|
process_job_queue(job_manager, ws_manager),
|
|
name=f"job-worker-{i}"
|
|
)
|
|
workers.append(worker)
|
|
logger.info(f"Started job worker {i}")
|
|
|
|
return workers
|
|
|
|
async def stop_background_workers(workers):
|
|
"""
|
|
Stop background worker tasks
|
|
|
|
Args:
|
|
workers: List of worker tasks to stop
|
|
"""
|
|
logger.info("Stopping background workers...")
|
|
|
|
for worker in workers:
|
|
worker.cancel()
|
|
|
|
# Wait for workers to finish
|
|
try:
|
|
await asyncio.gather(*workers, return_exceptions=True)
|
|
except Exception as e:
|
|
logger.warning(f"Error stopping workers: {e}")
|
|
|
|
logger.info("Background workers stopped")
|
|
|
|
def create_job_summary(result, processing_time: float) -> JobSummary:
|
|
"""
|
|
Create job summary from processing result
|
|
|
|
Args:
|
|
result: ProcessingResult from DocumentAnalyzer
|
|
processing_time: Total processing time in seconds
|
|
|
|
Returns:
|
|
JobSummary object
|
|
"""
|
|
# Extract cost information
|
|
consolidation_metadata = result.metadata.get('consolidation_metadata', {})
|
|
cost_breakdown = consolidation_metadata.get('cost_breakdown', {})
|
|
token_usage = consolidation_metadata.get('token_usage', {})
|
|
|
|
return JobSummary(
|
|
doc_type=result.metadata.get('doc_type', 'unknown'),
|
|
assets_extracted=len(result.raw_data),
|
|
confidence_score=result.confidence_score,
|
|
notes=result.processing_notes,
|
|
cost_usd_total=cost_breakdown.get('total_cost', 0),
|
|
tokens_total=token_usage.get('grand_total', 0),
|
|
primary_models=result.metadata.get('primary_models_used', []),
|
|
consolidation_model=result.metadata.get('consolidation_model', ''),
|
|
processing_time_seconds=processing_time
|
|
) |