ac-tool/backend/server/jobs/models.py
Vadym Samoilenko d71a044a3c Fix model config alignment and improve error messaging
- Replace sys.exit(1) with ValueError in _validate_models() — prevents killing the worker process on invalid model keys
- ModelConfiguration now reads defaults from core.config instead of hardcoding model keys
- Fix .env: google-gemini20 → google-gemini31 (align with MODEL_MAPPINGS)
- Improve "No data extracted" error message to explain the document must be a marketing brief

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 15:33:54 +00:00

282 lines
No EOL
9.3 KiB
Python
Executable file

"""
Data models for job management and processing
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Any
import uuid
def _default_primary_models() -> List[str]:
try:
from core.config import config
return config.get_default_primary_models()
except Exception:
return ['anthropic-sonnet45', 'google-gemini31']
def _default_consolidation_model() -> str:
try:
from core.config import config
return config.DEFAULT_CONSOLIDATION_MODEL
except Exception:
return 'anthropic-sonnet45'
class JobPhase(Enum):
"""Processing phases for a job"""
QUEUED = "QUEUED"
EXTRACT_CONTENT = "EXTRACT_CONTENT"
LLM_ANALYSIS = "LLM_ANALYSIS"
CONSOLIDATION = "CONSOLIDATION"
CSV_GENERATION = "CSV_GENERATION"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
@dataclass
class ProviderUpdate:
"""Update information for a specific LLM provider during processing"""
provider: str # 'openai', 'anthropic', 'google'
model: str # e.g., "gpt-5.1", "claude-sonnet-4-5", "gemini-3.1-pro"
status: str # 'started', 'success', 'error'
started_at: Optional[str] = None
completed_at: Optional[str] = None
latency_ms: Optional[float] = None
tokens_in: Optional[int] = None
tokens_out: Optional[int] = None
tokens_cached: Optional[int] = None
cost_usd: Optional[float] = None
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
return {
'provider': self.provider,
'model': self.model,
'status': self.status,
'startedAt': self.started_at,
'completedAt': self.completed_at,
'latencyMs': self.latency_ms,
'tokensIn': self.tokens_in,
'tokensOut': self.tokens_out,
'tokensCached': self.tokens_cached,
'costUsd': self.cost_usd,
'error': self.error
}
@dataclass
class LogEntry:
"""Individual log entry for job processing"""
timestamp: str
level: str # 'DEBUG', 'INFO', 'WARNING', 'ERROR'
message: str
def to_dict(self) -> Dict[str, Any]:
return {
'timestamp': self.timestamp,
'level': self.level,
'message': self.message
}
@dataclass
class JobSummary:
"""Summary information for a completed job"""
doc_type: str
assets_extracted: int
confidence_score: float
notes: List[str]
cost_usd_total: float
tokens_total: int
primary_models: List[str]
consolidation_model: str
processing_time_seconds: Optional[float] = None
def to_dict(self) -> Dict[str, Any]:
return {
'docType': self.doc_type,
'assetsExtracted': self.assets_extracted,
'confidenceScore': self.confidence_score,
'notes': self.notes,
'costUsdTotal': self.cost_usd_total,
'tokensTotal': self.tokens_total,
'primaryModels': self.primary_models,
'consolidationModel': self.consolidation_model,
'processingTimeSeconds': self.processing_time_seconds
}
@dataclass
class ModelInfo:
"""Information about an available LLM model"""
key: str
name: str
provider: str
description: str
cost_per_1m_input: float
cost_per_1m_output: float
can_be_primary: bool = True
can_be_consolidation: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
'key': self.key,
'name': self.name,
'provider': self.provider,
'description': self.description,
'costPer1mInput': self.cost_per_1m_input,
'costPer1mOutput': self.cost_per_1m_output,
'canBePrimary': self.can_be_primary,
'canBeConsolidation': self.can_be_consolidation
}
@dataclass
class ModelConfiguration:
"""Model selection configuration for a job"""
primary_models: List[str] = field(default_factory=_default_primary_models)
consolidation_model: str = field(default_factory=_default_consolidation_model)
minimum_success_threshold: int = 1
def to_dict(self) -> Dict[str, Any]:
return {
'primaryModels': self.primary_models,
'consolidationModel': self.consolidation_model,
'minimumSuccessThreshold': self.minimum_success_threshold
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ModelConfiguration':
return cls(
primary_models=data.get('primaryModels', []),
consolidation_model=data.get('consolidationModel', 'openai-gpt51'),
minimum_success_threshold=data.get('minimumSuccessThreshold', 1)
)
@dataclass
class Job:
"""Main job model representing a document processing job"""
id: str
file_name: str
file_size: int
created_at: datetime
updated_at: datetime
user_id: str
phase: JobPhase
progress_pct: int # 0-100
step_label: str
provider_updates: Dict[str, ProviderUpdate] = field(default_factory=dict)
error: Optional[str] = None
result_csv_url: Optional[str] = None
summary: Optional[JobSummary] = None
logs: List[LogEntry] = field(default_factory=list)
upload_path: Optional[str] = None
output_path: Optional[str] = None
model_config: ModelConfiguration = field(default_factory=ModelConfiguration)
@classmethod
def create(
cls,
file_name: str,
file_size: int,
user_id: str,
upload_path: str,
model_config: Optional[ModelConfiguration] = None
) -> 'Job':
"""Create a new job with default values"""
now = datetime.utcnow()
return cls(
id=str(uuid.uuid4()),
file_name=file_name,
file_size=file_size,
created_at=now,
updated_at=now,
user_id=user_id,
phase=JobPhase.QUEUED,
progress_pct=0,
step_label='Queued for processing',
upload_path=upload_path,
model_config=model_config or ModelConfiguration()
)
def update_progress(
self,
phase: JobPhase,
progress_pct: int,
step_label: str = ""
):
"""Update job progress"""
self.phase = phase
self.progress_pct = min(100, max(0, progress_pct)) # Clamp to [0, 100]
self.updated_at = datetime.utcnow()
if step_label:
self.step_label = step_label
else:
# Default step labels based on phase
phase_labels = {
JobPhase.QUEUED: 'Queued for processing',
JobPhase.EXTRACT_CONTENT: 'Extracting document content',
JobPhase.LLM_ANALYSIS: 'Parallel LLM analysis',
JobPhase.CONSOLIDATION: 'Consolidating results',
JobPhase.CSV_GENERATION: 'Generating CSV output',
JobPhase.COMPLETED: 'Processing completed',
JobPhase.FAILED: 'Processing failed'
}
self.step_label = phase_labels.get(phase, 'Processing')
def add_log(self, level: str, message: str):
"""Add a log entry to this job"""
log_entry = LogEntry(
timestamp=datetime.utcnow().isoformat(),
level=level,
message=message
)
self.logs.append(log_entry)
self.updated_at = datetime.utcnow()
def update_provider(self, model_key: str, update: ProviderUpdate):
"""Update status for a specific provider"""
self.provider_updates[model_key] = update
self.updated_at = datetime.utcnow()
def mark_completed(
self,
result_csv_url: str,
summary: JobSummary,
output_path: str
):
"""Mark job as completed with results"""
self.phase = JobPhase.COMPLETED
self.progress_pct = 100
self.step_label = 'Processing completed'
self.result_csv_url = result_csv_url
self.summary = summary
self.output_path = output_path
self.updated_at = datetime.utcnow()
def mark_failed(self, error: str):
"""Mark job as failed with error message"""
self.phase = JobPhase.FAILED
self.error = error
self.step_label = 'Processing failed'
self.updated_at = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""Convert job to dictionary for JSON serialization"""
# Handle phase - might be string or enum
phase_value = self.phase.value if isinstance(self.phase, JobPhase) else self.phase
return {
'id': self.id,
'fileName': self.file_name,
'fileSize': self.file_size,
'createdAt': self.created_at.isoformat(),
'updatedAt': self.updated_at.isoformat(),
'userId': self.user_id,
'phase': phase_value,
'progressPct': self.progress_pct,
'stepLabel': self.step_label,
'providerUpdates': {k: v.to_dict() for k, v in self.provider_updates.items()},
'error': self.error,
'resultCsvUrl': self.result_csv_url,
'summary': self.summary.to_dict() if self.summary else None,
'logs': [log.to_dict() for log in self.logs],
'modelConfig': self.model_config.to_dict()
}