creative-x-ferrero/scripts/upload.py
DJP ccb5203948 Fix: Pass filename parameter to presigned_url endpoint
The CreativeX API requires a 'filename' parameter when requesting presigned URLs.

Changes:
- Updated get_presigned_url() to accept filename parameter
- Pass filename to API as query parameter
- Updated upload script to pass filename when requesting URL
- Updated test_connection to use test filename

This fixes the 'filename is missing' error.
2026-01-09 14:42:23 -05:00

323 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Upload files to CreativeX API for preflight testing
Usage:
python upload.py /path/to/file.mp4
python upload.py --dir /path/to/videos/
python upload.py --dry-run /path/to/file.mp4
python upload.py --resume
"""
import argparse
import sys
from pathlib import Path
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import load_config
from core.filename_parser import FerreroFilenameParser
from core.data_loader import DataLoader
from core.validators import UploadValidator
from core.api_client import CreativeXAPIClient, APIError
from utils.state_manager import StateManager
from utils.logger import setup_logger
from utils.file_handler import FileHandler
class UploadOrchestrator:
"""
Main orchestrator for upload process
"""
def __init__(self, config, dry_run=False):
"""Initialize orchestrator with configuration"""
self.config = config
self.dry_run = dry_run
# Setup logger
self.logger = setup_logger(
__name__,
log_dir=config.logs_dir if not dry_run else None,
log_level=config.log_level
)
self.logger.info("=" * 60)
self.logger.info(f"CreativeX Upload Orchestrator")
self.logger.info(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
self.logger.info(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
self.logger.info("=" * 60)
# Initialize components
self.data_loader = DataLoader(str(config.data_json_path))
self.parser = FerreroFilenameParser(self.data_loader)
self.validator = UploadValidator(self.data_loader, config.max_file_size_mb)
self.state_manager = StateManager(config.state_file)
if not dry_run:
self.api_client = CreativeXAPIClient(
config.api_base_url,
config.access_token,
config.api_max_retries,
config.api_timeout
)
else:
self.api_client = None
self.logger.info("Initialization complete")
def upload_file(self, file_path: str) -> bool:
"""
Upload single file through complete workflow
Args:
file_path: Path to file
Returns:
bool: True if successful, False otherwise
"""
filename = FileHandler.get_filename(file_path)
abs_path = FileHandler.get_absolute_path(file_path)
self.logger.info(f"\nProcessing: {filename}")
try:
# Step 1: Check if already uploaded
if self.state_manager.upload_exists(filename):
upload = self.state_manager.get_upload(filename)
status = upload['status']
if status in [StateManager.STATUS_COMPLETED, StateManager.STATUS_PROCESSING]:
self.logger.info(f"✓ Already processed (status: {status})")
return True
elif status == StateManager.STATUS_FAILED:
self.logger.info(f"⟳ Previously failed, retrying...")
else:
self.logger.info(f"⟳ Found existing upload (status: {status}), continuing...")
# Step 2: Validate file
self.logger.info("Validating file...")
is_valid, errors = self.validator.validate_file(abs_path)
if not is_valid:
for error in errors:
self.logger.error(f"{error}")
self.state_manager.mark_failed(filename, "; ".join(errors))
return False
self.logger.info(" ✓ File valid")
# Step 3: Parse filename
self.logger.info("Parsing filename...")
try:
parsed_data = self.parser.parse(filename)
self.logger.info(f" ✓ Brand: {parsed_data['brand_name']}")
self.logger.info(f" ✓ Market: {parsed_data['country_name']}")
self.logger.info(f" ✓ Channel: {parsed_data['channel']}")
except ValueError as e:
self.logger.error(f" ✗ Parse error: {e}")
if not self.state_manager.upload_exists(filename):
# Can't add to state without metadata, just fail
pass
return False
# Step 4: Validate metadata
self.logger.info("Validating metadata...")
is_valid, errors = self.validator.validate_parsed_metadata(parsed_data)
if not is_valid:
for error in errors:
self.logger.error(f"{error}")
self.state_manager.mark_failed(filename, "; ".join(errors))
return False
self.logger.info(" ✓ Metadata valid")
# Register upload if not exists
if not self.state_manager.upload_exists(filename):
self.state_manager.add_upload(filename, abs_path, parsed_data)
if self.dry_run:
self.logger.info("✓ DRY RUN: Would upload file")
return True
# Step 5: Get presigned URL
self.logger.info("Getting presigned URL...")
try:
presigned_url = self.api_client.get_presigned_url(filename)
clean_url = self.api_client.strip_query_params(presigned_url)
self.logger.info(f" ✓ Got presigned URL")
self.state_manager.update_status(
filename,
StateManager.STATUS_UPLOADING,
presigned_url=presigned_url,
clean_url=clean_url
)
except APIError as e:
self.logger.error(f" ✗ Failed to get presigned URL: {e}")
self.state_manager.mark_failed(filename, str(e))
return False
# Step 6: Upload file to S3
self.logger.info("Uploading file to S3...")
try:
content_type = self.validator.get_content_type(abs_path)
self.api_client.upload_file_to_presigned_url(
presigned_url,
abs_path,
content_type
)
self.logger.info(" ✓ File uploaded")
self.state_manager.update_status(
filename,
StateManager.STATUS_UPLOADING,
uploaded_at=datetime.now().isoformat()
)
except APIError as e:
self.logger.error(f" ✗ Upload failed: {e}")
self.state_manager.mark_failed(filename, str(e))
return False
# Step 7: Create preflight
self.logger.info("Creating preflight...")
try:
preflight_metadata = {
'source_url': clean_url,
'name': filename,
'brand': parsed_data['brand_name'],
'market': parsed_data['country_name'],
'channel': parsed_data['channel'],
'asset_id': filename,
}
response = self.api_client.create_preflight(preflight_metadata)
request_id = response.get('request_id') or response.get('id')
self.logger.info(f" ✓ Preflight created: {request_id}")
self.state_manager.update_status(
filename,
StateManager.STATUS_PREFLIGHT_CREATED,
request_id=request_id,
preflight_created_at=datetime.now().isoformat()
)
return True
except (APIError, ValueError) as e:
self.logger.error(f" ✗ Preflight creation failed: {e}")
self.state_manager.mark_failed(filename, str(e))
return False
except Exception as e:
self.logger.error(f"✗ Unexpected error: {e}", exc_info=True)
if self.state_manager.upload_exists(filename):
self.state_manager.mark_failed(filename, str(e))
return False
def upload_batch(self, file_paths: list, skip_existing: bool = True) -> dict:
"""
Upload multiple files with progress tracking
Args:
file_paths: List of file paths
skip_existing: Skip files already uploaded
Returns:
dict: Summary with succeeded, failed, skipped counts
"""
summary = {
'total': len(file_paths),
'succeeded': 0,
'failed': 0,
'skipped': 0
}
self.logger.info(f"\nBatch upload: {summary['total']} files")
self.logger.info("-" * 60)
for i, file_path in enumerate(file_paths, 1):
self.logger.info(f"\n[{i}/{summary['total']}]")
success = self.upload_file(file_path)
if success:
summary['succeeded'] += 1
else:
summary['failed'] += 1
# Print summary
self.logger.info("\n" + "=" * 60)
self.logger.info("BATCH UPLOAD SUMMARY")
self.logger.info("=" * 60)
self.logger.info(f"Total files: {summary['total']}")
self.logger.info(f"✓ Succeeded: {summary['succeeded']}")
self.logger.info(f"✗ Failed: {summary['failed']}")
self.logger.info(f"⊙ Skipped: {summary['skipped']}")
self.logger.info("=" * 60)
return summary
def main():
"""CLI entry point"""
parser = argparse.ArgumentParser(
description='Upload files to CreativeX for preflight testing'
)
parser.add_argument('files', nargs='*', help='Files to upload')
parser.add_argument('--dir', help='Directory to scan for files')
parser.add_argument('--dry-run', action='store_true',
help='Validate only, do not upload')
parser.add_argument('--skip-existing', action='store_true',
help='Skip files already uploaded')
args = parser.parse_args()
# Load configuration
try:
config = load_config()
except Exception as e:
print(f"Error loading configuration: {e}")
sys.exit(1)
# Initialize orchestrator
orchestrator = UploadOrchestrator(config, dry_run=args.dry_run)
# Collect files to process
files_to_process = []
if args.files:
files_to_process.extend(args.files)
if args.dir:
dir_path = Path(args.dir)
if not dir_path.exists():
print(f"Error: Directory not found: {args.dir}")
sys.exit(1)
# Find all supported files
for ext in FileHandler.SUPPORTED_VIDEO_FORMATS + FileHandler.SUPPORTED_IMAGE_FORMATS:
files_to_process.extend(dir_path.glob(f'*{ext}'))
files_to_process.extend(dir_path.glob(f'*{ext.upper()}'))
if not files_to_process:
print("No files specified. Use --help for usage information.")
sys.exit(1)
# Process files
if len(files_to_process) == 1:
success = orchestrator.upload_file(str(files_to_process[0]))
sys.exit(0 if success else 1)
else:
summary = orchestrator.upload_batch(
[str(f) for f in files_to_process],
skip_existing=args.skip_existing
)
sys.exit(0 if summary['failed'] == 0 else 1)
if __name__ == '__main__':
main()