#!/usr/bin/env python3 """ Upload files to CreativeX API for preflight testing Usage: python upload.py /path/to/file.mp4 python upload.py --dir /path/to/videos/ python upload.py --dry-run /path/to/file.mp4 python upload.py --resume """ import argparse import sys from pathlib import Path from datetime import datetime # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from config import load_config from core.filename_parser import FerreroFilenameParser from core.data_loader import DataLoader from core.validators import UploadValidator from core.mapping_resolver import MappingResolver from core.api_client import CreativeXAPIClient, APIError from utils.state_manager import StateManager from utils.logger import setup_logger from utils.file_handler import FileHandler class UploadOrchestrator: """ Main orchestrator for upload process """ def __init__(self, config, dry_run=False): """Initialize orchestrator with configuration""" self.config = config self.dry_run = dry_run # Setup logger self.logger = setup_logger( __name__, log_dir=config.logs_dir if not dry_run else None, log_level=config.log_level ) self.logger.info("=" * 60) self.logger.info(f"CreativeX Upload Orchestrator") self.logger.info(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}") self.logger.info(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") self.logger.info("=" * 60) # Initialize components self.data_loader = DataLoader(str(config.data_json_path)) mappings_path = config.project_root / 'mappings.json' self.mapping_resolver = MappingResolver(str(mappings_path)) self.parser = FerreroFilenameParser(self.data_loader) self.validator = UploadValidator(self.data_loader, config.max_file_size_mb) self.state_manager = StateManager(config.state_file) if not dry_run: self.api_client = CreativeXAPIClient( config.api_base_url, config.access_token, config.api_max_retries, config.api_timeout ) else: self.api_client = None self.logger.info("Initialization complete") def upload_file(self, file_path: str) -> bool: """ Upload single file through complete workflow Args: file_path: Path to file Returns: bool: True if successful, False otherwise """ filename = FileHandler.get_filename(file_path) abs_path = FileHandler.get_absolute_path(file_path) self.logger.info(f"\nProcessing: {filename}") try: # Step 1: Check if already uploaded if self.state_manager.upload_exists(filename): upload = self.state_manager.get_upload(filename) status = upload['status'] if status in [StateManager.STATUS_COMPLETED, StateManager.STATUS_PROCESSING]: self.logger.info(f"✓ Already processed (status: {status})") return True elif status == StateManager.STATUS_FAILED: self.logger.info(f"⟳ Previously failed, retrying...") else: self.logger.info(f"⟳ Found existing upload (status: {status}), continuing...") # Step 2: Validate file self.logger.info("Validating file...") is_valid, errors = self.validator.validate_file(abs_path) if not is_valid: for error in errors: self.logger.error(f" ✗ {error}") self.state_manager.mark_failed(filename, "; ".join(errors)) return False self.logger.info(" ✓ File valid") # Step 3: Parse filename self.logger.info("Parsing filename...") try: parsed_data = self.parser.parse(filename) self.logger.info(f" ✓ Brand: {parsed_data['brand_name']}") self.logger.info(f" ✓ Market: {parsed_data['country_name']}") self.logger.info(f" ✓ Channel: {parsed_data['channel']}") except ValueError as e: self.logger.error(f" ✗ Parse error: {e}") if not self.state_manager.upload_exists(filename): # Can't add to state without metadata, just fail pass return False # Step 4: Validate metadata self.logger.info("Validating metadata...") is_valid, errors = self.validator.validate_parsed_metadata(parsed_data) if not is_valid: for error in errors: self.logger.error(f" ✗ {error}") self.state_manager.mark_failed(filename, "; ".join(errors)) return False self.logger.info(" ✓ Metadata valid") # Step 4b: Validate Creative X mappings self.logger.info("Validating Creative X mappings...") is_valid, errors = self.mapping_resolver.validate_metadata_for_upload(parsed_data) if not is_valid: for error in errors: self.logger.error(f" ✗ {error}") self.state_manager.mark_failed(filename, "; ".join(errors)) return False self.logger.info(" ✓ Mappings valid") # Register upload if not exists if not self.state_manager.upload_exists(filename): # Prepare metadata for state storage state_metadata = { 'brand_name': parsed_data['brand_name'], 'country_name': parsed_data['country_name'], 'channel': parsed_data['channel'], } self.state_manager.add_upload(filename, abs_path, state_metadata) if self.dry_run: self.logger.info("✓ DRY RUN: Would upload file") return True # Step 5: Get presigned URL self.logger.info("Getting presigned URL...") try: presigned_url = self.api_client.get_presigned_url(filename) clean_url = self.api_client.strip_query_params(presigned_url) self.logger.info(f" ✓ Got presigned URL") self.state_manager.update_status( filename, StateManager.STATUS_UPLOADING, presigned_url=presigned_url, clean_url=clean_url ) except APIError as e: self.logger.error(f" ✗ Failed to get presigned URL: {e}") self.state_manager.mark_failed(filename, str(e)) return False # Step 6: Upload file to S3 self.logger.info("Uploading file to S3...") try: content_type = self.validator.get_content_type(abs_path) self.api_client.upload_file_to_presigned_url( presigned_url, abs_path, content_type ) self.logger.info(" ✓ File uploaded") self.state_manager.update_status( filename, StateManager.STATUS_UPLOADING, uploaded_at=datetime.now().isoformat() ) except APIError as e: self.logger.error(f" ✗ Upload failed: {e}") self.state_manager.mark_failed(filename, str(e)) return False # Step 7: Create preflight self.logger.info("Creating preflight...") try: # Build preflight payload using mapping resolver mapped_metadata = self.mapping_resolver.build_creativex_payload(parsed_data) # Log what we're sending self.logger.info(f" → Brand: {mapped_metadata.get('brand_name')}") self.logger.info(f" → Channel: {mapped_metadata.get('channel')}") if mapped_metadata.get('publisher'): self.logger.info(f" → Publisher: {mapped_metadata.get('publisher')}") if mapped_metadata.get('placement'): self.logger.info(f" → Placement: {mapped_metadata.get('placement')}") if mapped_metadata.get('ad_format'): self.logger.info(f" → Ad Format: {mapped_metadata.get('ad_format')}") # Build final preflight payload with Creative X structure preflight_metadata = { 'name': filename, 'brand_name': mapped_metadata['brand_name'], 'market_name': mapped_metadata['market_name'], 'channel': mapped_metadata['channel'], 'creatives': [ { 'source_url': clean_url, } ] } # Add optional fields if mapped_metadata.get('publisher'): preflight_metadata['publisher'] = mapped_metadata['publisher'] if mapped_metadata.get('placement'): preflight_metadata['placement'] = mapped_metadata['placement'] if mapped_metadata.get('ad_format'): preflight_metadata['ad_format'] = mapped_metadata['ad_format'] if mapped_metadata.get('language'): preflight_metadata['language'] = mapped_metadata['language'] if mapped_metadata.get('dimensions'): preflight_metadata['dimensions'] = mapped_metadata['dimensions'] if mapped_metadata.get('duration'): preflight_metadata['duration'] = mapped_metadata['duration'] if mapped_metadata.get('subject'): preflight_metadata['subject'] = mapped_metadata['subject'] response = self.api_client.create_preflight(preflight_metadata) request_id = response.get('request_id') or response.get('id') self.logger.info(f" ✓ Preflight created: {request_id}") self.state_manager.update_status( filename, StateManager.STATUS_PREFLIGHT_CREATED, request_id=request_id, preflight_created_at=datetime.now().isoformat() ) return True except (APIError, ValueError) as e: self.logger.error(f" ✗ Preflight creation failed: {e}") self.state_manager.mark_failed(filename, str(e)) return False except Exception as e: self.logger.error(f"✗ Unexpected error: {e}", exc_info=True) if self.state_manager.upload_exists(filename): self.state_manager.mark_failed(filename, str(e)) return False def upload_batch(self, file_paths: list, skip_existing: bool = True) -> dict: """ Upload multiple files with progress tracking Args: file_paths: List of file paths skip_existing: Skip files already uploaded Returns: dict: Summary with succeeded, failed, skipped counts """ summary = { 'total': len(file_paths), 'succeeded': 0, 'failed': 0, 'skipped': 0 } self.logger.info(f"\nBatch upload: {summary['total']} files") self.logger.info("-" * 60) for i, file_path in enumerate(file_paths, 1): self.logger.info(f"\n[{i}/{summary['total']}]") success = self.upload_file(file_path) if success: summary['succeeded'] += 1 else: summary['failed'] += 1 # Print summary self.logger.info("\n" + "=" * 60) self.logger.info("BATCH UPLOAD SUMMARY") self.logger.info("=" * 60) self.logger.info(f"Total files: {summary['total']}") self.logger.info(f"✓ Succeeded: {summary['succeeded']}") self.logger.info(f"✗ Failed: {summary['failed']}") self.logger.info(f"⊙ Skipped: {summary['skipped']}") self.logger.info("=" * 60) return summary def main(): """CLI entry point""" parser = argparse.ArgumentParser( description='Upload files to CreativeX for preflight testing' ) parser.add_argument('files', nargs='*', help='Files to upload') parser.add_argument('--dir', help='Directory to scan for files') parser.add_argument('--dry-run', action='store_true', help='Validate only, do not upload') parser.add_argument('--skip-existing', action='store_true', help='Skip files already uploaded') args = parser.parse_args() # Load configuration try: config = load_config() except Exception as e: print(f"Error loading configuration: {e}") sys.exit(1) # Initialize orchestrator orchestrator = UploadOrchestrator(config, dry_run=args.dry_run) # Collect files to process files_to_process = [] if args.files: files_to_process.extend(args.files) if args.dir: dir_path = Path(args.dir) if not dir_path.exists(): print(f"Error: Directory not found: {args.dir}") sys.exit(1) # Find all supported files for ext in FileHandler.SUPPORTED_VIDEO_FORMATS + FileHandler.SUPPORTED_IMAGE_FORMATS: files_to_process.extend(dir_path.glob(f'*{ext}')) files_to_process.extend(dir_path.glob(f'*{ext.upper()}')) if not files_to_process: print("No files specified. Use --help for usage information.") sys.exit(1) # Process files if len(files_to_process) == 1: success = orchestrator.upload_file(str(files_to_process[0])) sys.exit(0 if success else 1) else: summary = orchestrator.upload_batch( [str(f) for f in files_to_process], skip_existing=args.skip_existing ) sys.exit(0 if summary['failed'] == 0 else 1) if __name__ == '__main__': main()