The CreativeX API expects specific field names and structure: - brand_name (not brand) - market_name (not market) - creatives array containing asset details Changes: - Updated preflight metadata structure in upload.py - Updated create_preflight validation in api_client.py - Updated state_manager to use correct field names - Payload now includes creatives array with name, asset_url, channel This fixes the 'brand_name, market_name, creatives are missing' error.
220 lines
6.5 KiB
Python
220 lines
6.5 KiB
Python
"""
|
|
State management with JSON persistence for upload tracking
|
|
"""
|
|
|
|
import json
|
|
import threading
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict
|
|
|
|
|
|
class StateManager:
|
|
"""Manage persistent state for upload tracking"""
|
|
|
|
# Status values
|
|
STATUS_PENDING = 'pending'
|
|
STATUS_UPLOADING = 'uploading'
|
|
STATUS_PREFLIGHT_CREATED = 'preflight_created'
|
|
STATUS_PROCESSING = 'processing'
|
|
STATUS_COMPLETED = 'completed'
|
|
STATUS_FAILED = 'failed'
|
|
|
|
def __init__(self, state_file_path: Path):
|
|
"""
|
|
Initialize state manager
|
|
|
|
Args:
|
|
state_file_path: Path to state JSON file
|
|
"""
|
|
self.state_file = Path(state_file_path)
|
|
self.lock = threading.Lock()
|
|
self.state = self._load_or_create_state()
|
|
|
|
def _load_or_create_state(self) -> dict:
|
|
"""Load existing state or create new one"""
|
|
if self.state_file.exists():
|
|
try:
|
|
with open(self.state_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, Exception):
|
|
# Create backup of corrupted file
|
|
backup_file = self.state_file.with_suffix('.json.backup')
|
|
if self.state_file.exists():
|
|
self.state_file.rename(backup_file)
|
|
|
|
# Create new state
|
|
return {
|
|
'version': '1.0',
|
|
'last_updated': datetime.now().isoformat(),
|
|
'uploads': {}
|
|
}
|
|
|
|
def add_upload(self, filename: str, file_path: str, metadata: dict) -> None:
|
|
"""
|
|
Register new upload with pending status
|
|
|
|
Args:
|
|
filename: Filename (with extension)
|
|
file_path: Absolute path to file
|
|
metadata: Parsed metadata dict
|
|
"""
|
|
with self.lock:
|
|
self.state['uploads'][filename] = {
|
|
'file_path': file_path,
|
|
'status': self.STATUS_PENDING,
|
|
'request_id': '',
|
|
'presigned_url': '',
|
|
'clean_url': '',
|
|
'uploaded_at': '',
|
|
'preflight_created_at': '',
|
|
'completed_at': '',
|
|
'metadata': {
|
|
'brand_name': metadata.get('brand_name', ''),
|
|
'market_name': metadata.get('country_name', ''),
|
|
'channel': metadata.get('channel', ''),
|
|
'name': filename,
|
|
},
|
|
'results': {},
|
|
'errors': []
|
|
}
|
|
self.save()
|
|
|
|
def update_status(self, filename: str, status: str, **kwargs) -> None:
|
|
"""
|
|
Update upload status and additional fields
|
|
|
|
Args:
|
|
filename: Filename to update
|
|
status: New status value
|
|
**kwargs: Additional fields to update
|
|
"""
|
|
with self.lock:
|
|
if filename not in self.state['uploads']:
|
|
raise KeyError(f"Upload not found: {filename}")
|
|
|
|
self.state['uploads'][filename]['status'] = status
|
|
|
|
# Update additional fields
|
|
for key, value in kwargs.items():
|
|
if key == 'metadata':
|
|
# Merge metadata
|
|
self.state['uploads'][filename]['metadata'].update(value)
|
|
elif key == 'results':
|
|
self.state['uploads'][filename]['results'] = value
|
|
elif key == 'errors':
|
|
self.state['uploads'][filename]['errors'].append(value)
|
|
else:
|
|
self.state['uploads'][filename][key] = value
|
|
|
|
self.save()
|
|
|
|
def get_upload(self, filename: str) -> Optional[dict]:
|
|
"""
|
|
Retrieve upload record
|
|
|
|
Args:
|
|
filename: Filename to retrieve
|
|
|
|
Returns:
|
|
dict: Upload record or None if not found
|
|
"""
|
|
return self.state['uploads'].get(filename)
|
|
|
|
def get_uploads_by_status(self, status: str) -> List[Dict]:
|
|
"""
|
|
Get all uploads with specific status
|
|
|
|
Args:
|
|
status: Status to filter by
|
|
|
|
Returns:
|
|
list: List of upload records with filename included
|
|
"""
|
|
result = []
|
|
for filename, upload in self.state['uploads'].items():
|
|
if upload['status'] == status:
|
|
upload_copy = upload.copy()
|
|
upload_copy['filename'] = filename
|
|
result.append(upload_copy)
|
|
return result
|
|
|
|
def mark_failed(self, filename: str, error: str) -> None:
|
|
"""
|
|
Mark upload as failed with error message
|
|
|
|
Args:
|
|
filename: Filename to mark as failed
|
|
error: Error message
|
|
"""
|
|
self.update_status(filename, self.STATUS_FAILED, errors=error)
|
|
|
|
def get_all_uploads(self) -> Dict[str, dict]:
|
|
"""
|
|
Get all uploads
|
|
|
|
Returns:
|
|
dict: All uploads keyed by filename
|
|
"""
|
|
return self.state['uploads']
|
|
|
|
def upload_exists(self, filename: str) -> bool:
|
|
"""
|
|
Check if upload record exists
|
|
|
|
Args:
|
|
filename: Filename to check
|
|
|
|
Returns:
|
|
bool: True if exists
|
|
"""
|
|
return filename in self.state['uploads']
|
|
|
|
def save(self) -> None:
|
|
"""
|
|
Persist state to JSON file (atomic write)
|
|
|
|
Uses atomic write by writing to temp file then renaming
|
|
"""
|
|
self.state['last_updated'] = datetime.now().isoformat()
|
|
|
|
# Atomic write: write to temp file then rename
|
|
temp_file = self.state_file.with_suffix('.json.tmp')
|
|
|
|
try:
|
|
with open(temp_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.state, f, indent=2, ensure_ascii=False)
|
|
|
|
# Atomic rename
|
|
temp_file.replace(self.state_file)
|
|
|
|
except Exception as e:
|
|
# Clean up temp file if it exists
|
|
if temp_file.exists():
|
|
temp_file.unlink()
|
|
raise RuntimeError(f"Failed to save state: {e}")
|
|
|
|
def get_summary(self) -> dict:
|
|
"""
|
|
Return summary statistics
|
|
|
|
Returns:
|
|
dict: Summary with counts by status
|
|
"""
|
|
summary = {
|
|
'total': 0,
|
|
'pending': 0,
|
|
'uploading': 0,
|
|
'preflight_created': 0,
|
|
'processing': 0,
|
|
'completed': 0,
|
|
'failed': 0
|
|
}
|
|
|
|
for upload in self.state['uploads'].values():
|
|
status = upload['status']
|
|
summary['total'] += 1
|
|
if status in summary:
|
|
summary[status] += 1
|
|
|
|
return summary
|