- Replace sys.exit(1) with ValueError in _validate_models() — prevents killing the worker process on invalid model keys - ModelConfiguration now reads defaults from core.config instead of hardcoding model keys - Fix .env: google-gemini20 → google-gemini31 (align with MODEL_MAPPINGS) - Improve "No data extracted" error message to explain the document must be a marketing brief Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
282 lines
No EOL
9.3 KiB
Python
Executable file
282 lines
No EOL
9.3 KiB
Python
Executable file
"""
|
|
Data models for job management and processing
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Dict, List, Optional, Any
|
|
import uuid
|
|
|
|
def _default_primary_models() -> List[str]:
|
|
try:
|
|
from core.config import config
|
|
return config.get_default_primary_models()
|
|
except Exception:
|
|
return ['anthropic-sonnet45', 'google-gemini31']
|
|
|
|
def _default_consolidation_model() -> str:
|
|
try:
|
|
from core.config import config
|
|
return config.DEFAULT_CONSOLIDATION_MODEL
|
|
except Exception:
|
|
return 'anthropic-sonnet45'
|
|
|
|
class JobPhase(Enum):
|
|
"""Processing phases for a job"""
|
|
QUEUED = "QUEUED"
|
|
EXTRACT_CONTENT = "EXTRACT_CONTENT"
|
|
LLM_ANALYSIS = "LLM_ANALYSIS"
|
|
CONSOLIDATION = "CONSOLIDATION"
|
|
CSV_GENERATION = "CSV_GENERATION"
|
|
COMPLETED = "COMPLETED"
|
|
FAILED = "FAILED"
|
|
|
|
@dataclass
|
|
class ProviderUpdate:
|
|
"""Update information for a specific LLM provider during processing"""
|
|
provider: str # 'openai', 'anthropic', 'google'
|
|
model: str # e.g., "gpt-5.1", "claude-sonnet-4-5", "gemini-3.1-pro"
|
|
status: str # 'started', 'success', 'error'
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
latency_ms: Optional[float] = None
|
|
tokens_in: Optional[int] = None
|
|
tokens_out: Optional[int] = None
|
|
tokens_cached: Optional[int] = None
|
|
cost_usd: Optional[float] = None
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for JSON serialization"""
|
|
return {
|
|
'provider': self.provider,
|
|
'model': self.model,
|
|
'status': self.status,
|
|
'startedAt': self.started_at,
|
|
'completedAt': self.completed_at,
|
|
'latencyMs': self.latency_ms,
|
|
'tokensIn': self.tokens_in,
|
|
'tokensOut': self.tokens_out,
|
|
'tokensCached': self.tokens_cached,
|
|
'costUsd': self.cost_usd,
|
|
'error': self.error
|
|
}
|
|
|
|
@dataclass
|
|
class LogEntry:
|
|
"""Individual log entry for job processing"""
|
|
timestamp: str
|
|
level: str # 'DEBUG', 'INFO', 'WARNING', 'ERROR'
|
|
message: str
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
'timestamp': self.timestamp,
|
|
'level': self.level,
|
|
'message': self.message
|
|
}
|
|
|
|
@dataclass
|
|
class JobSummary:
|
|
"""Summary information for a completed job"""
|
|
doc_type: str
|
|
assets_extracted: int
|
|
confidence_score: float
|
|
notes: List[str]
|
|
cost_usd_total: float
|
|
tokens_total: int
|
|
primary_models: List[str]
|
|
consolidation_model: str
|
|
processing_time_seconds: Optional[float] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
'docType': self.doc_type,
|
|
'assetsExtracted': self.assets_extracted,
|
|
'confidenceScore': self.confidence_score,
|
|
'notes': self.notes,
|
|
'costUsdTotal': self.cost_usd_total,
|
|
'tokensTotal': self.tokens_total,
|
|
'primaryModels': self.primary_models,
|
|
'consolidationModel': self.consolidation_model,
|
|
'processingTimeSeconds': self.processing_time_seconds
|
|
}
|
|
|
|
@dataclass
|
|
class ModelInfo:
|
|
"""Information about an available LLM model"""
|
|
key: str
|
|
name: str
|
|
provider: str
|
|
description: str
|
|
cost_per_1m_input: float
|
|
cost_per_1m_output: float
|
|
can_be_primary: bool = True
|
|
can_be_consolidation: bool = True
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
'key': self.key,
|
|
'name': self.name,
|
|
'provider': self.provider,
|
|
'description': self.description,
|
|
'costPer1mInput': self.cost_per_1m_input,
|
|
'costPer1mOutput': self.cost_per_1m_output,
|
|
'canBePrimary': self.can_be_primary,
|
|
'canBeConsolidation': self.can_be_consolidation
|
|
}
|
|
|
|
@dataclass
|
|
class ModelConfiguration:
|
|
"""Model selection configuration for a job"""
|
|
primary_models: List[str] = field(default_factory=_default_primary_models)
|
|
consolidation_model: str = field(default_factory=_default_consolidation_model)
|
|
minimum_success_threshold: int = 1
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
'primaryModels': self.primary_models,
|
|
'consolidationModel': self.consolidation_model,
|
|
'minimumSuccessThreshold': self.minimum_success_threshold
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'ModelConfiguration':
|
|
return cls(
|
|
primary_models=data.get('primaryModels', []),
|
|
consolidation_model=data.get('consolidationModel', 'openai-gpt51'),
|
|
minimum_success_threshold=data.get('minimumSuccessThreshold', 1)
|
|
)
|
|
|
|
@dataclass
|
|
class Job:
|
|
"""Main job model representing a document processing job"""
|
|
id: str
|
|
file_name: str
|
|
file_size: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
user_id: str
|
|
phase: JobPhase
|
|
progress_pct: int # 0-100
|
|
step_label: str
|
|
provider_updates: Dict[str, ProviderUpdate] = field(default_factory=dict)
|
|
error: Optional[str] = None
|
|
result_csv_url: Optional[str] = None
|
|
summary: Optional[JobSummary] = None
|
|
logs: List[LogEntry] = field(default_factory=list)
|
|
upload_path: Optional[str] = None
|
|
output_path: Optional[str] = None
|
|
model_config: ModelConfiguration = field(default_factory=ModelConfiguration)
|
|
|
|
@classmethod
|
|
def create(
|
|
cls,
|
|
file_name: str,
|
|
file_size: int,
|
|
user_id: str,
|
|
upload_path: str,
|
|
model_config: Optional[ModelConfiguration] = None
|
|
) -> 'Job':
|
|
"""Create a new job with default values"""
|
|
now = datetime.utcnow()
|
|
return cls(
|
|
id=str(uuid.uuid4()),
|
|
file_name=file_name,
|
|
file_size=file_size,
|
|
created_at=now,
|
|
updated_at=now,
|
|
user_id=user_id,
|
|
phase=JobPhase.QUEUED,
|
|
progress_pct=0,
|
|
step_label='Queued for processing',
|
|
upload_path=upload_path,
|
|
model_config=model_config or ModelConfiguration()
|
|
)
|
|
|
|
def update_progress(
|
|
self,
|
|
phase: JobPhase,
|
|
progress_pct: int,
|
|
step_label: str = ""
|
|
):
|
|
"""Update job progress"""
|
|
self.phase = phase
|
|
self.progress_pct = min(100, max(0, progress_pct)) # Clamp to [0, 100]
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
if step_label:
|
|
self.step_label = step_label
|
|
else:
|
|
# Default step labels based on phase
|
|
phase_labels = {
|
|
JobPhase.QUEUED: 'Queued for processing',
|
|
JobPhase.EXTRACT_CONTENT: 'Extracting document content',
|
|
JobPhase.LLM_ANALYSIS: 'Parallel LLM analysis',
|
|
JobPhase.CONSOLIDATION: 'Consolidating results',
|
|
JobPhase.CSV_GENERATION: 'Generating CSV output',
|
|
JobPhase.COMPLETED: 'Processing completed',
|
|
JobPhase.FAILED: 'Processing failed'
|
|
}
|
|
self.step_label = phase_labels.get(phase, 'Processing')
|
|
|
|
def add_log(self, level: str, message: str):
|
|
"""Add a log entry to this job"""
|
|
log_entry = LogEntry(
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
level=level,
|
|
message=message
|
|
)
|
|
self.logs.append(log_entry)
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def update_provider(self, model_key: str, update: ProviderUpdate):
|
|
"""Update status for a specific provider"""
|
|
self.provider_updates[model_key] = update
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def mark_completed(
|
|
self,
|
|
result_csv_url: str,
|
|
summary: JobSummary,
|
|
output_path: str
|
|
):
|
|
"""Mark job as completed with results"""
|
|
self.phase = JobPhase.COMPLETED
|
|
self.progress_pct = 100
|
|
self.step_label = 'Processing completed'
|
|
self.result_csv_url = result_csv_url
|
|
self.summary = summary
|
|
self.output_path = output_path
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def mark_failed(self, error: str):
|
|
"""Mark job as failed with error message"""
|
|
self.phase = JobPhase.FAILED
|
|
self.error = error
|
|
self.step_label = 'Processing failed'
|
|
self.updated_at = datetime.utcnow()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert job to dictionary for JSON serialization"""
|
|
# Handle phase - might be string or enum
|
|
phase_value = self.phase.value if isinstance(self.phase, JobPhase) else self.phase
|
|
|
|
return {
|
|
'id': self.id,
|
|
'fileName': self.file_name,
|
|
'fileSize': self.file_size,
|
|
'createdAt': self.created_at.isoformat(),
|
|
'updatedAt': self.updated_at.isoformat(),
|
|
'userId': self.user_id,
|
|
'phase': phase_value,
|
|
'progressPct': self.progress_pct,
|
|
'stepLabel': self.step_label,
|
|
'providerUpdates': {k: v.to_dict() for k, v in self.provider_updates.items()},
|
|
'error': self.error,
|
|
'resultCsvUrl': self.result_csv_url,
|
|
'summary': self.summary.to_dict() if self.summary else None,
|
|
'logs': [log.to_dict() for log in self.logs],
|
|
'modelConfig': self.model_config.to_dict()
|
|
} |