#!/usr/bin/env python3 """ A2→A3 Upload Handler - Box Folder Polling Version Polls Box folder for new files with V2 naming, uploads to DAM Updates status to A3 only when ALL assets for campaign uploaded Supports --A3update flag to force status update for testing Compatible with Python 3.6+ """ import sys import os import time import logging import argparse # Add shared library to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config, load_field_mappings from shared.dam_client import DAMClient from shared.box_client import BoxClient from shared.database import Database from shared.notifier import Notifier from shared.filename_parser import FilenameParser from shared.metadata_extractor_mvp import MetadataExtractorMVP # Load configuration config = load_config('config/config.yaml') field_mappings = load_field_mappings(config) # Setup logging with rotation from logging.handlers import RotatingFileHandler # Create logs directory if it doesn't exist os.makedirs('logs', exist_ok=True) os.makedirs('logs/backup', exist_ok=True) # Configure logging with rotation # Keep 1 week of active logs (7 days * 10MB = 70MB) # Backup rotates keep 4 weeks (28 backups * 10MB = 280MB total) log_handler = RotatingFileHandler( 'logs/a2_to_a3.log', maxBytes=10*1024*1024, # 10MB per file backupCount=28 # Keep 28 rotated files (approximately 1 month) ) log_handler.setLevel(logging.INFO) log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.basicConfig( level=logging.INFO, handlers=[log_handler, console_handler] ) logger = logging.getLogger('A2toA3') def process_box_file(file_info, dam, box, db, parser, mvp_extractor, config, notifier, keep_files=False, dryrun=False): """ Process a single file from Box folder Args: keep_files: If True, don't delete file from Box after upload (for testing) dryrun: If True, build metadata but don't upload to DAM (shows full JSON) Returns: dict with success, asset_id, tracking_id """ file_id = file_info['id'] filename = file_info['name'] logger.info("Processing: {}".format(filename)) try: # 1. Parse V2 filename parsed = parser.parse_filename(filename) if not parsed['is_valid']: raise ValueError("Invalid V2 filename: {} - {}".format( filename, ', '.join(parsed['validation_errors']) )) tracking_id = parsed['tracking_id'] tracking_mode = parsed.get('tracking_mode', 'full') subfolder_path = file_info.get('subfolder_path') if not tracking_id: raise ValueError("No tracking ID in filename") logger.info("Tracking ID: {} (mode: {})".format(tracking_id, tracking_mode)) if subfolder_path: logger.info("From Box subfolder: {} -> will create in DAM".format(subfolder_path)) # 2. Load master metadata from database (support multiple tracking IDs in PPR) tracking_ids = parsed.get('tracking_ids', [tracking_id]) # Get all IDs or fallback to single has_multiple_masters = parsed.get('has_multiple_masters', False) # Load all master assets (supports multiple masters in both PPR and PROD) master_assets = [] master_opentext_ids = [] if has_multiple_masters: logger.info("Multiple master assets detected: {}".format(', '.join(tracking_ids))) for tid in tracking_ids: master = db.get_master_asset(tid) if not master: logger.warning("Master asset not found for tracking ID: {} - skipping".format(tid)) continue master_assets.append(master) master_opentext_ids.append(master['opentext_id']) if not master_assets: raise ValueError("No master assets found for tracking IDs: {}".format(', '.join(tracking_ids))) # Use first master for metadata inheritance master_asset = master_assets[0] logger.info("Using primary master {} for metadata, linking {} total masters".format( tracking_ids[0], len(master_assets))) else: # Single master (backward compatible) master_asset = db.get_master_asset(tracking_id) if not master_asset: # Will check below master_asset = None else: master_opentext_ids = [master_asset['opentext_id']] # CHECK: Warn if Master Tracking ID is used (starts with uppercase M) if tracking_id.startswith('M'): logger.warning("Detected Master Tracking ID in Version/Derivative upload folder: {}".format(tracking_id)) # Send notification notifier.send_email( template_name='upload_failed', # Reusing error template recipients=config['notifications']['recipients']['errors'], data={ 'filename': filename, 'tracking_id': tracking_id, 'error': "INVALID TRACKING ID: Master Tracking IDs (starting with 'M') are not allowed in this folder. Please use a Version Tracking ID." } ) # Move file to _Errors subfolder to prevent re-processing loop # Check if parent folder has an _Errors subfolder try: parent_folder_id = file_info.get('parent', {}).get('id') if not parent_folder_id: # Fallback if parent not in file_info (should handle this case) # For recursive scan, we might not have parent ID handy in file_info, need to fetch or deduce # file_info from list_folder_files_recursive usually has basic info. parent_folder_id = box.client.file(file_info['id']).get().parent.id # Create/Get _Errors folder errors_folder = box._get_or_create_subfolder_path(box.client.folder(parent_folder_id), '_Errors') # Move file box.move_file(file_info['id'], errors_folder.id) logger.info("Moved invalid file to _Errors folder") except Exception as move_error: logger.error("Failed to move invalid file: {}".format(str(move_error))) return { 'success': False, 'error': "Master Tracking ID detected - moved to _Errors", 'filename': filename, 'tracking_id': tracking_id } if not master_asset: raise ValueError("No master asset for tracking ID: {}".format(tracking_id)) # 3. Get CreativeX score from database (lookup by filename, fallback to tracking ID) creativex_data = db.get_creativex_score_by_filename(filename, tracking_id=tracking_id) # Build box_metadata dict (for compatibility with existing code) if creativex_data: # Extract platforms from full_extraction_data full_data = creativex_data.get('full_extraction_data', {}) data_obj = full_data.get('data', {}) if isinstance(full_data, dict) else {} platforms = data_obj.get('ferrero_mapped_platforms', []) # If legacy single platform exists, add it to list if not platforms and data_obj.get('ferrero_mapped_platform'): platforms = [data_obj.get('ferrero_mapped_platform')] # Fallback: Handle new CreativeX API format (no 'data' wrapper) # Maps API channel/publisher back to DAM platform names if not platforms and isinstance(full_data, dict) and 'channel' in full_data: api_channel = full_data.get('channel', '') api_publisher = full_data.get('publisher', '') CHANNEL_TO_DAM = { 'google_ads': 'Google', 'dv360': 'DV360', 'tiktok_paid': 'TikTok', 'snapchat_paid': 'Snap', 'pinterest': 'Pinterest', 'twitter_paid': 'Twitter', 'amazon_paid': 'Amazon', } FB_PUBLISHER_TO_DAM = { 'facebook': 'FB - Feed', 'audience_network': 'Audience Network - An Classic', 'messenger': 'Messenger - Inbox', } IG_PUBLISHER_TO_DAM = { 'instagram': 'IG - Feed', } if api_channel in CHANNEL_TO_DAM: platforms = [CHANNEL_TO_DAM[api_channel]] elif api_channel == 'facebook_paid' and api_publisher in FB_PUBLISHER_TO_DAM: platforms = [FB_PUBLISHER_TO_DAM[api_publisher]] elif api_channel == 'instagram_paid' and api_publisher in IG_PUBLISHER_TO_DAM: platforms = [IG_PUBLISHER_TO_DAM[api_publisher]] elif api_channel == 'facebook_paid': platforms = ['FB - Feed'] elif api_channel == 'instagram_paid': platforms = ['IG - Feed'] if platforms: logger.info("CreativeX: Mapped API channel '{}'/publisher '{}' to DAM platform '{}'".format( api_channel, api_publisher, platforms[0])) box_metadata = { 'score': creativex_data['quality_score'], 'url': creativex_data['creativex_url'], 'platforms': platforms } logger.info("CreativeX score found in database: Score={}, URL={}, Platforms={}".format( creativex_data['quality_score'], creativex_data['creativex_url'], platforms )) creativex_found = True else: # Use default values when no CreativeX score found - no URL sent box_metadata = { 'score': '0', 'url': '' } logger.warning("No CreativeX score found for: {} - Using default values (Score: 0, No URL)".format( filename )) creativex_found = False # 4. Download from Box temp_file = os.path.join('temp/downloads', filename) box.download_file(file_id, temp_file) # 5. Get clean filename clean_filename = parser.strip_upload_components(filename) # 6. Look up pre-upload metadata override saved by the naming tool's editor. # The naming tool stores filename without extension, so strip it here. filename_no_ext = os.path.splitext(filename)[0] override = db.get_override_metadata(filename_no_ext) override_fields = None if override: override_fields = override.get('override_fields') logger.info("Found pre-upload override (id={}) for {}: {} field(s)".format( override.get('id'), filename_no_ext, len(override_fields) if override_fields else 0 )) # 7. Build MVP asset representation with CreativeX data from database asset_rep = mvp_extractor.build_mvp_asset_representation( master_metadata=master_asset['full_metadata'], clean_filename=clean_filename, parsed_filename=parsed, box_metadata=box_metadata, # Pass CreativeX data from database tracking_mode=tracking_mode, # Pass tracking mode for folder-only handling master_opentext_id=master_asset['opentext_id'], # Primary master DAM ID master_opentext_ids=master_opentext_ids, # All master IDs (multiple or single) override_fields=override_fields # Pre-upload edits from naming tool ) # DRYRUN MODE: Display full asset representation and exit if dryrun: import json logger.info("") logger.info("=" * 80) logger.info("DRYRUN MODE - Asset Representation (will NOT upload to DAM)") logger.info("=" * 80) logger.info("") logger.info("FULL ASSET REPRESENTATION (JSON):") logger.info("") logger.info(json.dumps(asset_rep, indent=2, ensure_ascii=False)) logger.info("") logger.info("=" * 80) logger.info("Field Count: {} fields".format(len(asset_rep))) logger.info("=" * 80) logger.info("") logger.info("CreativeX Status:") logger.info(" Found in database: {}".format(creativex_found)) logger.info(" Score: {}".format(box_metadata.get('score'))) logger.info(" URL: {}".format(box_metadata.get('url'))) logger.info("") # Register master asset IDs in lookup domain (even in dryrun for testing) # This API call is safe - it only adds values to the lookup table, doesn't create assets if master_opentext_ids: logger.info("Domain Registration Test:") registration_result = dam.register_master_asset_ids_for_ppr(master_opentext_ids) if registration_result.get('skipped'): logger.info(" Skipped (not PPR environment)") else: logger.info(" Registered: {}".format(registration_result.get('registered_ids', []))) if registration_result.get('failed_ids'): logger.info(" Failed: {}".format(registration_result.get('failed_ids', []))) logger.info("") logger.info("DRYRUN: No upload performed, file kept in Box") logger.info("=" * 80) return { 'success': True, 'asset_id': 'DRYRUN_NO_UPLOAD', 'tracking_id': tracking_id, 'filename': filename, 'clean_filename': clean_filename, 'creativex_found': creativex_found, 'creativex_score': box_metadata.get('score', '0'), 'creativex_url': box_metadata.get('url', ''), 'dryrun': True } # 7. Rename to clean filename clean_temp_file = os.path.join('temp/downloads', clean_filename) if os.path.exists(clean_temp_file): os.remove(clean_temp_file) os.rename(temp_file, clean_temp_file) # 7. Upload to DAM (with subfolder structure if present) upload_folder_id = master_asset['upload_directory'] # Base "01. Final Assets" folder # If file was in a Box subfolder, create/use matching DAM subfolder if subfolder_path: logger.info("Creating DAM subfolder path: {}".format(subfolder_path)) upload_folder_id = dam.get_or_create_subfolder_path( base_folder_id=upload_folder_id, subfolder_path=subfolder_path ) logger.info("Will upload to: 01. Final Assets/{}".format(subfolder_path)) # Register master asset IDs in lookup domain before upload # OpenText API requires domain values to exist before they can be used in asset creation if master_opentext_ids: dam.register_master_asset_ids_for_ppr(master_opentext_ids) upload_result = dam.upload_asset( file_path=clean_temp_file, folder_id=upload_folder_id, asset_representation=asset_rep ) if not upload_result['success']: raise Exception("Upload failed: {}".format(upload_result.get('error'))) # 8. Store derivative record db.store_derivative_asset( tracking_id=tracking_id, master_asset_id=None, dam_asset_id=upload_result['asset_id'], filename=clean_filename ) # Mark pre-upload override as applied (only after confirmed DAM upload success). if override: db.mark_override_applied(filename_no_ext) # 9. Delete file from Box after successful upload (unless --keep-files flag set) if keep_files: logger.info("--keep-files flag set - File kept in Box: {}".format(filename)) else: try: box_file = box.client.file(file_id) box_file.delete() logger.info("Deleted file from Box: {}".format(filename)) except Exception as e: logger.warning("Could not delete file from Box: {}".format(str(e))) # 10. Clean up local temp file os.remove(clean_temp_file) logger.info("✓ Success: {} → Asset ID: {}".format(filename, upload_result['asset_id'])) return { 'success': True, 'asset_id': upload_result['asset_id'], 'tracking_id': tracking_id, 'filename': filename, 'clean_filename': clean_filename, 'creativex_found': creativex_found, 'creativex_score': box_metadata.get('score', '0'), 'creativex_url': box_metadata.get('url', ''), 'subfolder_path': subfolder_path # Add subfolder path to result } except Exception as e: logger.error("✗ Failed: {} - {}".format(filename, str(e))) return { 'success': False, 'error': str(e), 'filename': filename, 'tracking_id': tracking_id if 'tracking_id' in locals() else None, 'subfolder_path': file_info.get('subfolder_path') # Add subfolder path to error result } def main(): """Main entry point - single run mode""" # Parse command-line arguments parser_args = argparse.ArgumentParser(description='Ferrero A2→A3 Upload Handler') parser_args.add_argument('--auth-pfx', action='store_true', help='Use mTLS certificate authentication (Legacy APIM)') parser_args.add_argument('--auth-pfx-v2', action='store_true', help='Use mTLS V2 (Hybrid) authentication') parser_args.add_argument('--A3update', action='store_true', help='Force update campaign status A2→A3 after upload (for testing)') parser_args.add_argument('--keep-files', action='store_true', help='Keep files in Box after upload (don\'t delete, for testing)') parser_args.add_argument('--dryrun', action='store_true', help='Build metadata but don\'t upload to DAM (shows full JSON for debugging)') args = parser_args.parse_args() logger.info("=" * 60) logger.info("Ferrero A2→A3 Upload Handler Starting (Polling Mode)") # Determine auth mode auth_mode = 'oauth' if args.auth_pfx_v2: auth_mode = 'mtls_v2' logger.info("Authentication: mTLS V2 (Hybrid)") elif args.auth_pfx: auth_mode = 'mtls' logger.info("Authentication: mTLS Certificate (Legacy)") else: logger.info("Authentication: OAuth2 (default)") if args.A3update: logger.info("Mode: Auto-update campaign status A2→A3 (--A3update)") if args.keep_files: logger.info("Mode: Keep files in Box after upload (--keep-files)") if args.dryrun: logger.info("Mode: DRYRUN - Build metadata but DON'T upload (--dryrun)") logger.info("=" * 60) # Initialize clients dam = DAMClient(config, auth_mode=auth_mode) # Use A2→A3 Box folder for polling box = BoxClient(config, root_folder_id=config['box'].get('root_folder_a2_a3')) db = Database(config) notifier = Notifier(config) parser = FilenameParser(dam_base_url=dam.base_url) # Pass DAM URL for environment detection mvp_extractor = MetadataExtractorMVP(field_mappings) # Test connections logger.info("Testing connections...") if not dam.test_connection(): logger.error("DAM connection failed") sys.exit(1) if not box.test_connection(): logger.error("Box connection failed") sys.exit(1) if not db.test_connection(): logger.error("Database connection failed") sys.exit(1) logger.info("All connections OK") logger.info("") try: # Get Box folder ID to poll box_folder_id = config['box'].get('root_folder_a2_a3', config['box'].get('root_folder_id')) logger.info("Polling Box folder: {}".format(box_folder_id)) # List files recursively in Box folder (skips 1st level job folders, preserves 2nd+ levels) files = box.list_folder_files_recursive(box_folder_id) logger.info("Recursive scan complete") if not files: logger.info("No files found in Box folder - exiting") db.close() sys.exit(0) logger.info("Found {} files in Box folder".format(len(files))) # Show subfolder distribution subfolders = set([f.get('subfolder_path') for f in files if f.get('subfolder_path')]) if subfolders: logger.info("Files in {} subfolder(s): {}".format( len(subfolders), ', '.join(sorted(subfolders)) )) files_at_root = len([f for f in files if not f.get('subfolder_path')]) if files_at_root: logger.info("Files at job level (will go to DAM root): {}".format(files_at_root)) # Filter for V2 filenames only valid_files = [] for file_info in files: parsed = parser.parse_filename(file_info['name']) if parsed['is_valid'] and parsed.get('tracking_id'): valid_files.append(file_info) else: logger.info("Skipping invalid V2 file: {} - Errors: {}".format( file_info['name'], parsed.get('validation_errors', []) )) logger.info("Found {} valid V2 files to process".format(len(valid_files))) if not valid_files: logger.info("No valid V2 files to process - exiting") db.close() sys.exit(0) # Process ALL files logger.info("Processing all {} file(s)".format(len(valid_files))) logger.info("") # Track results successful_files = [] failed_files = [] for idx, file_info in enumerate(valid_files, 1): logger.info("=" * 60) logger.info("Processing file {}/{}".format(idx, len(valid_files))) logger.info("=" * 60) result = process_box_file(file_info, dam, box, db, parser, mvp_extractor, config, notifier, keep_files=args.keep_files, dryrun=args.dryrun) if result['success']: successful_files.append(result) logger.info("✓ File {} processed successfully".format(idx)) else: failed_files.append(result) logger.error("✗ File {} failed".format(idx)) logger.info("") # Summary logger.info("") logger.info("=" * 60) logger.info("A2→A3 Processing Summary") logger.info("=" * 60) logger.info(" Total files processed: {}".format(len(valid_files))) logger.info(" Successful: {}".format(len(successful_files))) logger.info(" Failed: {}".format(len(failed_files))) logger.info("=" * 60) # Send summary email notification if len(successful_files) > 0: # Get campaign info from first successful file for context first_result = successful_files[0] master_asset = db.get_master_asset(first_result['tracking_id']) notifier.send_email( template_name='a2_to_a3_batch_complete', recipients=config['notifications']['recipients']['success'], data={ 'total_files': len(valid_files), 'successful_count': len(successful_files), 'failed_count': len(failed_files), 'successful_files': successful_files, 'failed_files': failed_files, 'box_folder': box_folder_id } ) # Update campaign status A2→A3 if --A3update flag is set if args.A3update and master_asset: logger.info("") logger.info("--A3update flag set - Attempting to update campaign status") # Get campaign ID from master asset metadata full_metadata = master_asset.get('full_metadata', {}) # Extract campaign ID from inherited_metadata_collections campaign_id = None collections = full_metadata.get('inherited_metadata_collections', []) for collection in collections: if collection.get('container_type_name') == 'L7+ - CAMPAIGN': campaign_id = collection.get('container_id') break if campaign_id: logger.info("Found campaign ID: {}".format(campaign_id)) logger.info("Updating campaign status A2 → A3...") status_result = dam.update_campaign_status(campaign_id, 'A3') if status_result['success']: logger.info("✓ Campaign status updated successfully: A2 → A3") else: logger.error("✗ Campaign status update failed: {}".format(status_result.get('error'))) else: logger.warning("⚠ Campaign ID not found in master asset metadata - cannot update status") db.close() # Exit with success if at least one file succeeded if len(successful_files) > 0: sys.exit(0) else: sys.exit(1) except Exception as e: logger.critical("Script error: {}".format(str(e))) db.close() sys.exit(1) if __name__ == '__main__': main()