creative-x-ferrero/scripts/upload.py
DJP b20119b383 Add complete mapping system and automated Box.com monitoring service
Major Features:
- Complete Ferrero ↔ CreativeX mapping system with 93 brands
- Automated Box.com folder monitoring service
- Email notifications with score breakdowns
- Database integration for result storage

Mapping System (v2.0.0):
- mappings.json: 93 brand mappings, 44+ channel mappings
- core/mapping_resolver.py: Translates Ferrero codes to CreativeX format
- scripts/validate_mappings.py: Validation tool for brand/channel support
- scripts/generate_brand_mappings.py: Auto-mapping tool
- scripts/download_reports.py: Scorecard PDF download tool
- Updated scripts/upload.py: Integrated mapping validation
- Updated scripts/check_status.py: Added detailed score display with guidelines

Documentation:
- Updated README.md: Complete user guide with mapping system
- Updated STATUS.md: Production-ready status with test results
- MAPPINGS_GUIDE.md: Complete mapping documentation
- MAPPING_IMPLEMENTATION.md: Implementation summary
- BRAND_MAPPINGS_REVIEW.md: Brand mapping validation guide
- PRODUCTION_BRANDS_SUMMARY.md: Production brand catalog
- PRODUCTION_MAPPING_COMPLETE.md: Mapping completion summary

Automation Service (New):
- creativex-automation/: Complete automated Box monitoring service
- Monitors Box Ferrero-In folder (363284027140) for new files
- Automatically uploads to CreativeX
- Polls for completion (30 min intervals)
- Extracts scores and stores in PostgreSQL creativex_scores table
- Sends formatted emails to file uploader + daveporter@oliver.agency
- Moves processed files to Processed subfolder

Service Components:
- automation/box_monitor.py: Box folder monitoring with uploader detection
- automation/upload_processor.py: CreativeX upload integration
- automation/status_poller.py: CreativeX status polling
- automation/result_handler.py: Score extraction and email sending
- automation/orchestrator.py: Service coordination
- automation/processing_queue.py: JSON-based processing queue
- service.py: Main service entry point
- config.py: Service configuration loader
- requirements.txt: All dependencies
- deployment/systemd/: Systemd service unit file
- Updated shared/notifier.py: Added creativex_upload_complete and creativex_upload_failed templates

Testing:
- Supports --dry-run mode for configuration testing
- Supports --scan-once mode for Box folder testing
- Manual run mode for development/testing
- Comprehensive logging with rotation (10MB, 28 backups)

Database Integration:
- Uses existing creativex_scores table (no migrations needed)
- Compatible with existing Ferrero-Opentext workflows
- Stores full CreativeX API responses in JSONB

Email Templates:
- Matches Ferrero-Opentext styling (#9c27b0 purple for CreativeX)
- Includes score, tier, guidelines breakdown, scorecard URL
- Recipients: Box uploader + CC to daveporter@oliver.agency

Deployment:
- Runs locally for dev/testing
- Systemd service for production
- Auto-restart on failure
- Complete documentation in creativex-automation/README.md

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-01-29 09:51:16 -05:00

376 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Upload files to CreativeX API for preflight testing
Usage:
python upload.py /path/to/file.mp4
python upload.py --dir /path/to/videos/
python upload.py --dry-run /path/to/file.mp4
python upload.py --resume
"""
import argparse
import sys
from pathlib import Path
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import load_config
from core.filename_parser import FerreroFilenameParser
from core.data_loader import DataLoader
from core.validators import UploadValidator
from core.mapping_resolver import MappingResolver
from core.api_client import CreativeXAPIClient, APIError
from utils.state_manager import StateManager
from utils.logger import setup_logger
from utils.file_handler import FileHandler
class UploadOrchestrator:
"""
Main orchestrator for upload process
"""
def __init__(self, config, dry_run=False):
"""Initialize orchestrator with configuration"""
self.config = config
self.dry_run = dry_run
# Setup logger
self.logger = setup_logger(
__name__,
log_dir=config.logs_dir if not dry_run else None,
log_level=config.log_level
)
self.logger.info("=" * 60)
self.logger.info(f"CreativeX Upload Orchestrator")
self.logger.info(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
self.logger.info(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
self.logger.info("=" * 60)
# Initialize components
self.data_loader = DataLoader(str(config.data_json_path))
mappings_path = config.project_root / 'mappings.json'
self.mapping_resolver = MappingResolver(str(mappings_path))
self.parser = FerreroFilenameParser(self.data_loader)
self.validator = UploadValidator(self.data_loader, config.max_file_size_mb)
self.state_manager = StateManager(config.state_file)
if not dry_run:
self.api_client = CreativeXAPIClient(
config.api_base_url,
config.access_token,
config.api_max_retries,
config.api_timeout
)
else:
self.api_client = None
self.logger.info("Initialization complete")
def upload_file(self, file_path: str) -> bool:
"""
Upload single file through complete workflow
Args:
file_path: Path to file
Returns:
bool: True if successful, False otherwise
"""
filename = FileHandler.get_filename(file_path)
abs_path = FileHandler.get_absolute_path(file_path)
self.logger.info(f"\nProcessing: {filename}")
try:
# Step 1: Check if already uploaded
if self.state_manager.upload_exists(filename):
upload = self.state_manager.get_upload(filename)
status = upload['status']
if status in [StateManager.STATUS_COMPLETED, StateManager.STATUS_PROCESSING]:
self.logger.info(f"✓ Already processed (status: {status})")
return True
elif status == StateManager.STATUS_FAILED:
self.logger.info(f"⟳ Previously failed, retrying...")
else:
self.logger.info(f"⟳ Found existing upload (status: {status}), continuing...")
# Step 2: Validate file
self.logger.info("Validating file...")
is_valid, errors = self.validator.validate_file(abs_path)
if not is_valid:
for error in errors:
self.logger.error(f"{error}")
self.state_manager.mark_failed(filename, "; ".join(errors))
return False
self.logger.info(" ✓ File valid")
# Step 3: Parse filename
self.logger.info("Parsing filename...")
try:
parsed_data = self.parser.parse(filename)
self.logger.info(f" ✓ Brand: {parsed_data['brand_name']}")
self.logger.info(f" ✓ Market: {parsed_data['country_name']}")
self.logger.info(f" ✓ Channel: {parsed_data['channel']}")
except ValueError as e:
self.logger.error(f" ✗ Parse error: {e}")
if not self.state_manager.upload_exists(filename):
# Can't add to state without metadata, just fail
pass
return False
# Step 4: Validate metadata
self.logger.info("Validating metadata...")
is_valid, errors = self.validator.validate_parsed_metadata(parsed_data)
if not is_valid:
for error in errors:
self.logger.error(f"{error}")
self.state_manager.mark_failed(filename, "; ".join(errors))
return False
self.logger.info(" ✓ Metadata valid")
# Step 4b: Validate Creative X mappings
self.logger.info("Validating Creative X mappings...")
is_valid, errors = self.mapping_resolver.validate_metadata_for_upload(parsed_data)
if not is_valid:
for error in errors:
self.logger.error(f"{error}")
self.state_manager.mark_failed(filename, "; ".join(errors))
return False
self.logger.info(" ✓ Mappings valid")
# Register upload if not exists
if not self.state_manager.upload_exists(filename):
# Prepare metadata for state storage
state_metadata = {
'brand_name': parsed_data['brand_name'],
'country_name': parsed_data['country_name'],
'channel': parsed_data['channel'],
}
self.state_manager.add_upload(filename, abs_path, state_metadata)
if self.dry_run:
self.logger.info("✓ DRY RUN: Would upload file")
return True
# Step 5: Get presigned URL
self.logger.info("Getting presigned URL...")
try:
presigned_url = self.api_client.get_presigned_url(filename)
clean_url = self.api_client.strip_query_params(presigned_url)
self.logger.info(f" ✓ Got presigned URL")
self.state_manager.update_status(
filename,
StateManager.STATUS_UPLOADING,
presigned_url=presigned_url,
clean_url=clean_url
)
except APIError as e:
self.logger.error(f" ✗ Failed to get presigned URL: {e}")
self.state_manager.mark_failed(filename, str(e))
return False
# Step 6: Upload file to S3
self.logger.info("Uploading file to S3...")
try:
content_type = self.validator.get_content_type(abs_path)
self.api_client.upload_file_to_presigned_url(
presigned_url,
abs_path,
content_type
)
self.logger.info(" ✓ File uploaded")
self.state_manager.update_status(
filename,
StateManager.STATUS_UPLOADING,
uploaded_at=datetime.now().isoformat()
)
except APIError as e:
self.logger.error(f" ✗ Upload failed: {e}")
self.state_manager.mark_failed(filename, str(e))
return False
# Step 7: Create preflight
self.logger.info("Creating preflight...")
try:
# Build preflight payload using mapping resolver
mapped_metadata = self.mapping_resolver.build_creativex_payload(parsed_data)
# Log what we're sending
self.logger.info(f" → Brand: {mapped_metadata.get('brand_name')}")
self.logger.info(f" → Channel: {mapped_metadata.get('channel')}")
if mapped_metadata.get('publisher'):
self.logger.info(f" → Publisher: {mapped_metadata.get('publisher')}")
if mapped_metadata.get('placement'):
self.logger.info(f" → Placement: {mapped_metadata.get('placement')}")
if mapped_metadata.get('ad_format'):
self.logger.info(f" → Ad Format: {mapped_metadata.get('ad_format')}")
# Build final preflight payload with Creative X structure
preflight_metadata = {
'name': filename,
'brand_name': mapped_metadata['brand_name'],
'market_name': mapped_metadata['market_name'],
'channel': mapped_metadata['channel'],
'creatives': [
{
'source_url': clean_url,
}
]
}
# Add optional fields
if mapped_metadata.get('publisher'):
preflight_metadata['publisher'] = mapped_metadata['publisher']
if mapped_metadata.get('placement'):
preflight_metadata['placement'] = mapped_metadata['placement']
if mapped_metadata.get('ad_format'):
preflight_metadata['ad_format'] = mapped_metadata['ad_format']
if mapped_metadata.get('language'):
preflight_metadata['language'] = mapped_metadata['language']
if mapped_metadata.get('dimensions'):
preflight_metadata['dimensions'] = mapped_metadata['dimensions']
if mapped_metadata.get('duration'):
preflight_metadata['duration'] = mapped_metadata['duration']
if mapped_metadata.get('subject'):
preflight_metadata['subject'] = mapped_metadata['subject']
response = self.api_client.create_preflight(preflight_metadata)
request_id = response.get('request_id') or response.get('id')
self.logger.info(f" ✓ Preflight created: {request_id}")
self.state_manager.update_status(
filename,
StateManager.STATUS_PREFLIGHT_CREATED,
request_id=request_id,
preflight_created_at=datetime.now().isoformat()
)
return True
except (APIError, ValueError) as e:
self.logger.error(f" ✗ Preflight creation failed: {e}")
self.state_manager.mark_failed(filename, str(e))
return False
except Exception as e:
self.logger.error(f"✗ Unexpected error: {e}", exc_info=True)
if self.state_manager.upload_exists(filename):
self.state_manager.mark_failed(filename, str(e))
return False
def upload_batch(self, file_paths: list, skip_existing: bool = True) -> dict:
"""
Upload multiple files with progress tracking
Args:
file_paths: List of file paths
skip_existing: Skip files already uploaded
Returns:
dict: Summary with succeeded, failed, skipped counts
"""
summary = {
'total': len(file_paths),
'succeeded': 0,
'failed': 0,
'skipped': 0
}
self.logger.info(f"\nBatch upload: {summary['total']} files")
self.logger.info("-" * 60)
for i, file_path in enumerate(file_paths, 1):
self.logger.info(f"\n[{i}/{summary['total']}]")
success = self.upload_file(file_path)
if success:
summary['succeeded'] += 1
else:
summary['failed'] += 1
# Print summary
self.logger.info("\n" + "=" * 60)
self.logger.info("BATCH UPLOAD SUMMARY")
self.logger.info("=" * 60)
self.logger.info(f"Total files: {summary['total']}")
self.logger.info(f"✓ Succeeded: {summary['succeeded']}")
self.logger.info(f"✗ Failed: {summary['failed']}")
self.logger.info(f"⊙ Skipped: {summary['skipped']}")
self.logger.info("=" * 60)
return summary
def main():
"""CLI entry point"""
parser = argparse.ArgumentParser(
description='Upload files to CreativeX for preflight testing'
)
parser.add_argument('files', nargs='*', help='Files to upload')
parser.add_argument('--dir', help='Directory to scan for files')
parser.add_argument('--dry-run', action='store_true',
help='Validate only, do not upload')
parser.add_argument('--skip-existing', action='store_true',
help='Skip files already uploaded')
args = parser.parse_args()
# Load configuration
try:
config = load_config()
except Exception as e:
print(f"Error loading configuration: {e}")
sys.exit(1)
# Initialize orchestrator
orchestrator = UploadOrchestrator(config, dry_run=args.dry_run)
# Collect files to process
files_to_process = []
if args.files:
files_to_process.extend(args.files)
if args.dir:
dir_path = Path(args.dir)
if not dir_path.exists():
print(f"Error: Directory not found: {args.dir}")
sys.exit(1)
# Find all supported files
for ext in FileHandler.SUPPORTED_VIDEO_FORMATS + FileHandler.SUPPORTED_IMAGE_FORMATS:
files_to_process.extend(dir_path.glob(f'*{ext}'))
files_to_process.extend(dir_path.glob(f'*{ext.upper()}'))
if not files_to_process:
print("No files specified. Use --help for usage information.")
sys.exit(1)
# Process files
if len(files_to_process) == 1:
success = orchestrator.upload_file(str(files_to_process[0]))
sys.exit(0 if success else 1)
else:
summary = orchestrator.upload_batch(
[str(f) for f in files_to_process],
skip_existing=args.skip_existing
)
sys.exit(0 if summary['failed'] == 0 else 1)
if __name__ == '__main__':
main()