creative-x-ferrero/utils/state_manager.py
DJP 7b506ba449 Fix: Update preflight payload to match API requirements
The CreativeX API expects specific field names and structure:
- brand_name (not brand)
- market_name (not market)
- creatives array containing asset details

Changes:
- Updated preflight metadata structure in upload.py
- Updated create_preflight validation in api_client.py
- Updated state_manager to use correct field names
- Payload now includes creatives array with name, asset_url, channel

This fixes the 'brand_name, market_name, creatives are missing' error.
2026-01-09 14:45:26 -05:00

220 lines
6.5 KiB
Python

"""
State management with JSON persistence for upload tracking
"""
import json
import threading
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict
class StateManager:
"""Manage persistent state for upload tracking"""
# Status values
STATUS_PENDING = 'pending'
STATUS_UPLOADING = 'uploading'
STATUS_PREFLIGHT_CREATED = 'preflight_created'
STATUS_PROCESSING = 'processing'
STATUS_COMPLETED = 'completed'
STATUS_FAILED = 'failed'
def __init__(self, state_file_path: Path):
"""
Initialize state manager
Args:
state_file_path: Path to state JSON file
"""
self.state_file = Path(state_file_path)
self.lock = threading.Lock()
self.state = self._load_or_create_state()
def _load_or_create_state(self) -> dict:
"""Load existing state or create new one"""
if self.state_file.exists():
try:
with open(self.state_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, Exception):
# Create backup of corrupted file
backup_file = self.state_file.with_suffix('.json.backup')
if self.state_file.exists():
self.state_file.rename(backup_file)
# Create new state
return {
'version': '1.0',
'last_updated': datetime.now().isoformat(),
'uploads': {}
}
def add_upload(self, filename: str, file_path: str, metadata: dict) -> None:
"""
Register new upload with pending status
Args:
filename: Filename (with extension)
file_path: Absolute path to file
metadata: Parsed metadata dict
"""
with self.lock:
self.state['uploads'][filename] = {
'file_path': file_path,
'status': self.STATUS_PENDING,
'request_id': '',
'presigned_url': '',
'clean_url': '',
'uploaded_at': '',
'preflight_created_at': '',
'completed_at': '',
'metadata': {
'brand_name': metadata.get('brand_name', ''),
'market_name': metadata.get('country_name', ''),
'channel': metadata.get('channel', ''),
'name': filename,
},
'results': {},
'errors': []
}
self.save()
def update_status(self, filename: str, status: str, **kwargs) -> None:
"""
Update upload status and additional fields
Args:
filename: Filename to update
status: New status value
**kwargs: Additional fields to update
"""
with self.lock:
if filename not in self.state['uploads']:
raise KeyError(f"Upload not found: {filename}")
self.state['uploads'][filename]['status'] = status
# Update additional fields
for key, value in kwargs.items():
if key == 'metadata':
# Merge metadata
self.state['uploads'][filename]['metadata'].update(value)
elif key == 'results':
self.state['uploads'][filename]['results'] = value
elif key == 'errors':
self.state['uploads'][filename]['errors'].append(value)
else:
self.state['uploads'][filename][key] = value
self.save()
def get_upload(self, filename: str) -> Optional[dict]:
"""
Retrieve upload record
Args:
filename: Filename to retrieve
Returns:
dict: Upload record or None if not found
"""
return self.state['uploads'].get(filename)
def get_uploads_by_status(self, status: str) -> List[Dict]:
"""
Get all uploads with specific status
Args:
status: Status to filter by
Returns:
list: List of upload records with filename included
"""
result = []
for filename, upload in self.state['uploads'].items():
if upload['status'] == status:
upload_copy = upload.copy()
upload_copy['filename'] = filename
result.append(upload_copy)
return result
def mark_failed(self, filename: str, error: str) -> None:
"""
Mark upload as failed with error message
Args:
filename: Filename to mark as failed
error: Error message
"""
self.update_status(filename, self.STATUS_FAILED, errors=error)
def get_all_uploads(self) -> Dict[str, dict]:
"""
Get all uploads
Returns:
dict: All uploads keyed by filename
"""
return self.state['uploads']
def upload_exists(self, filename: str) -> bool:
"""
Check if upload record exists
Args:
filename: Filename to check
Returns:
bool: True if exists
"""
return filename in self.state['uploads']
def save(self) -> None:
"""
Persist state to JSON file (atomic write)
Uses atomic write by writing to temp file then renaming
"""
self.state['last_updated'] = datetime.now().isoformat()
# Atomic write: write to temp file then rename
temp_file = self.state_file.with_suffix('.json.tmp')
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(self.state, f, indent=2, ensure_ascii=False)
# Atomic rename
temp_file.replace(self.state_file)
except Exception as e:
# Clean up temp file if it exists
if temp_file.exists():
temp_file.unlink()
raise RuntimeError(f"Failed to save state: {e}")
def get_summary(self) -> dict:
"""
Return summary statistics
Returns:
dict: Summary with counts by status
"""
summary = {
'total': 0,
'pending': 0,
'uploading': 0,
'preflight_created': 0,
'processing': 0,
'completed': 0,
'failed': 0
}
for upload in self.state['uploads'].values():
status = upload['status']
summary['total'] += 1
if status in summary:
summary[status] += 1
return summary