""" State management with JSON persistence for upload tracking """ import json import threading from datetime import datetime from pathlib import Path from typing import Optional, List, Dict class StateManager: """Manage persistent state for upload tracking""" # Status values STATUS_PENDING = 'pending' STATUS_UPLOADING = 'uploading' STATUS_PREFLIGHT_CREATED = 'preflight_created' STATUS_PROCESSING = 'processing' STATUS_COMPLETED = 'completed' STATUS_FAILED = 'failed' def __init__(self, state_file_path: Path): """ Initialize state manager Args: state_file_path: Path to state JSON file """ self.state_file = Path(state_file_path) self.lock = threading.Lock() self.state = self._load_or_create_state() def _load_or_create_state(self) -> dict: """Load existing state or create new one""" if self.state_file.exists(): try: with open(self.state_file, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, Exception): # Create backup of corrupted file backup_file = self.state_file.with_suffix('.json.backup') if self.state_file.exists(): self.state_file.rename(backup_file) # Create new state return { 'version': '1.0', 'last_updated': datetime.now().isoformat(), 'uploads': {} } def add_upload(self, filename: str, file_path: str, metadata: dict) -> None: """ Register new upload with pending status Args: filename: Filename (with extension) file_path: Absolute path to file metadata: Parsed metadata dict """ with self.lock: self.state['uploads'][filename] = { 'file_path': file_path, 'status': self.STATUS_PENDING, 'request_id': '', 'presigned_url': '', 'clean_url': '', 'uploaded_at': '', 'preflight_created_at': '', 'completed_at': '', 'metadata': { 'brand_name': metadata.get('brand_name', ''), 'market_name': metadata.get('country_name', ''), 'channel': metadata.get('channel', ''), 'name': filename, }, 'results': {}, 'errors': [] } self.save() def update_status(self, filename: str, status: str, **kwargs) -> None: """ Update upload status and additional fields Args: filename: Filename to update status: New status value **kwargs: Additional fields to update """ with self.lock: if filename not in self.state['uploads']: raise KeyError(f"Upload not found: {filename}") self.state['uploads'][filename]['status'] = status # Update additional fields for key, value in kwargs.items(): if key == 'metadata': # Merge metadata self.state['uploads'][filename]['metadata'].update(value) elif key == 'results': self.state['uploads'][filename]['results'] = value elif key == 'errors': self.state['uploads'][filename]['errors'].append(value) else: self.state['uploads'][filename][key] = value self.save() def get_upload(self, filename: str) -> Optional[dict]: """ Retrieve upload record Args: filename: Filename to retrieve Returns: dict: Upload record or None if not found """ return self.state['uploads'].get(filename) def get_uploads_by_status(self, status: str) -> List[Dict]: """ Get all uploads with specific status Args: status: Status to filter by Returns: list: List of upload records with filename included """ result = [] for filename, upload in self.state['uploads'].items(): if upload['status'] == status: upload_copy = upload.copy() upload_copy['filename'] = filename result.append(upload_copy) return result def mark_failed(self, filename: str, error: str) -> None: """ Mark upload as failed with error message Args: filename: Filename to mark as failed error: Error message """ self.update_status(filename, self.STATUS_FAILED, errors=error) def get_all_uploads(self) -> Dict[str, dict]: """ Get all uploads Returns: dict: All uploads keyed by filename """ return self.state['uploads'] def upload_exists(self, filename: str) -> bool: """ Check if upload record exists Args: filename: Filename to check Returns: bool: True if exists """ return filename in self.state['uploads'] def save(self) -> None: """ Persist state to JSON file (atomic write) Uses atomic write by writing to temp file then renaming """ self.state['last_updated'] = datetime.now().isoformat() # Atomic write: write to temp file then rename temp_file = self.state_file.with_suffix('.json.tmp') try: with open(temp_file, 'w', encoding='utf-8') as f: json.dump(self.state, f, indent=2, ensure_ascii=False) # Atomic rename temp_file.replace(self.state_file) except Exception as e: # Clean up temp file if it exists if temp_file.exists(): temp_file.unlink() raise RuntimeError(f"Failed to save state: {e}") def get_summary(self) -> dict: """ Return summary statistics Returns: dict: Summary with counts by status """ summary = { 'total': 0, 'pending': 0, 'uploading': 0, 'preflight_created': 0, 'processing': 0, 'completed': 0, 'failed': 0 } for upload in self.state['uploads'].values(): status = upload['status'] summary['total'] += 1 if status in summary: summary[status] += 1 return summary