#!/usr/bin/env python3 """ B1→B2 Master Asset Downloader Polls DAM for campaigns with status B1, downloads master assets, uploads to Box Updates status to B2 only when ALL assets successfully processed Supports OAuth2 (default) and mTLS (--auth-pfx) authentication Compatible with Python 3.6+ """ import sys import os import time import logging import argparse # Add shared library to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config, load_field_mappings from shared.dam_client import DAMClient from shared.box_client import BoxClient from shared.database import Database from shared.notifier import Notifier from shared.common import sanitize_box_item_name # Setup logging with rotation from logging.handlers import RotatingFileHandler # Create logs directory if it doesn't exist os.makedirs('logs', exist_ok=True) os.makedirs('logs/backup', exist_ok=True) # Configure logging with rotation # Keep 1 week of active logs (7 days * 10MB = 70MB) # Backup rotates keep 4 weeks (28 backups * 10MB = 280MB total) log_handler = RotatingFileHandler( 'logs/b1_to_b2.log', maxBytes=10*1024*1024, # 10MB per file backupCount=28 # Keep 28 rotated files (approximately 1 month) ) log_handler.setLevel(logging.INFO) log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.basicConfig( level=logging.INFO, handlers=[log_handler, console_handler] ) logger = logging.getLogger('B1toB2') def process_campaign(campaign, dam, box, db, notifier, config): """ Process single campaign - download all master assets Returns: dict with success, processed_count, failed_count """ campaign_id = campaign['asset_id'] campaign_name = campaign['campaign_name'] campaign_number = campaign.get('campaign_id', 'N/A') logger.info("=" * 60) logger.info("Processing campaign: {} ({})".format(campaign_name, campaign_number)) logger.info("=" * 60) try: # Get master assets (B1 campaigns use Final Assets folder, is_global=True) master_assets = dam.get_master_assets(campaign_id, is_global=True) total_assets = len(master_assets) logger.info("Found {} master assets".format(total_assets)) if total_assets == 0: logger.warning("No master assets found in Final Assets folder") # Send email notification about empty campaign notifier.send_email( template_name='b1_to_b2_no_assets', recipients=config['notifications']['recipients']['errors'], data={ 'campaign_name': campaign_name, 'campaign_id': campaign_id, 'campaign_number': campaign_number } ) logger.info("✓ Email sent: No assets found notification") return {'success': False, 'processed': 0, 'failed': 0} # Track results processed_assets = [] failed_assets = [] # Get Final Assets folder for upload directory final_folder_id = dam.find_final_assets_folder(campaign_id) if not final_folder_id: logger.error("Final Assets folder not found") return {'success': False, 'processed': 0, 'failed': total_assets} # Process each asset for asset in master_assets: asset_id = asset['asset_id'] asset_name = asset.get('name', 'unknown') folder_path = asset.get('folder_path', '') # Get subfolder path from recursive search try: if folder_path: logger.info("Processing: {} (from subfolder: {})".format(asset_name, folder_path)) else: logger.info("Processing: {}".format(asset_name)) # SAFEGUARD: Check if it's a folder (should be handled by dam_client, but double check) asset_type = asset.get('asset_type', {}) type_name = asset_type.get('name', '') if isinstance(asset_type, dict) else str(asset_type) if 'folder' in type_name.lower(): logger.warning("Skipping item identified as folder: {} (Type: {})".format(asset_name, type_name)) continue # SAFEGUARD 2: Check for missing extension (likely a container/folder) _, ext = os.path.splitext(asset_name) if not ext or len(ext) < 2: # No extension or just a dot logger.warning("Skipping item with no extension (likely folder/container): {}".format(asset_name)) continue # 1. Download from DAM file_path = dam.download_asset( asset_id, output_dir='temp/downloads/{}'.format(campaign_id) ) # 2. Generate tracking ID (master files always start with 'M') tracking_id = db.generate_unique_tracking_id(is_master=True) # 3. Upload to Box (B1 uses MASTERS_CampaignNumber format, preserve folder structure) # If campaign_number exists, use it; otherwise use MASTERS prefix only box_campaign_id = "MASTERS_{}".format(campaign_number) if campaign_number and campaign_number != 'N/A' else "MASTERS" box_result = box.upload_with_tracking_id( file_path=file_path, campaign_id=box_campaign_id, # MASTERS_C000000068 campaign_name=sanitize_box_item_name(campaign_name).replace(' ', '_'), # NUTELLA_PLANT_BASED_LAUNCH tracking_id=tracking_id, subfolder_path=folder_path # Preserve DAM folder structure ) # Result: MASTERS_C000000068-NUTELLA_PLANT_BASED_LAUNCH/[subfolder_path] # 4. Store in database with FULL metadata db_result = db.store_master_asset( tracking_id=tracking_id, opentext_id=asset_id, asset_data=asset, box_file_id=box_result['file_id'], box_url=box_result['url'], upload_folder_id=final_folder_id ) if db_result['success']: processed_assets.append({ 'asset_id': asset_id, 'asset_name': asset_name, 'tracking_id': tracking_id, 'box_file_id': box_result['file_id'], 'box_url': box_result['url'] }) logger.info("✓ Success: {} → {}".format(asset_name, tracking_id)) else: raise Exception("Database storage failed") # Clean up temp file os.remove(file_path) except Exception as e: logger.error("✗ Failed: {} - {}".format(asset_name, str(e))) failed_assets.append({ 'asset_id': asset_id, 'asset_name': asset_name, 'error': str(e) }) # CHECK: All assets processed successfully? all_done = len(processed_assets) == total_assets logger.info("") logger.info("Campaign {} Results:".format(campaign_id)) logger.info(" Total: {}".format(total_assets)) logger.info(" Successful: {}".format(len(processed_assets))) logger.info(" Failed: {}".format(len(failed_assets))) logger.info(" All Done: {}".format("YES" if all_done else "NO")) logger.info("") if all_done: # ALL assets processed - update status logger.info("All assets processed - Updating status B1 → B2") status_result = dam.update_campaign_status(campaign_id, "B2") if status_result['success']: logger.info("✓ Status updated successfully") # NOTE: B1→B2 workflow does NOT send webhook (only email notification) # Webhook is only used for A1→A2 workflow # Generate CSV Report import csv try: csv_filename = "B1_Campaign_{}_Assets.csv".format(campaign_number) csv_path = os.path.join("temp", csv_filename) if not os.path.exists("temp"): os.makedirs("temp") with open(csv_path, 'w', newline='') as csvfile: fieldnames = ['Filename', 'Tracking ID', 'Campaign Number'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for asset in processed_assets: writer.writerow({ 'Filename': asset['asset_name'], 'Tracking ID': asset['tracking_id'], 'Campaign Number': campaign_number }) logger.info("Generated CSV report: {}".format(csv_path)) attachments = [csv_path] except Exception as csv_error: logger.error("Failed to generate CSV report: {}".format(str(csv_error))) attachments = None # Send success email with asset details notifier.send_email( template_name='b1_to_b2_complete', recipients=config['notifications']['recipients']['success'], data={ 'campaign_name': campaign_name, 'campaign_id': campaign_id, 'campaign_number': campaign_number, 'asset_count': len(processed_assets), 'processed_assets': processed_assets }, attachments=attachments ) # Clean up CSV if attachments and os.path.exists(csv_path): try: os.remove(csv_path) logger.info("Cleaned up CSV report") except Exception as e: logger.warning("Failed to remove temp CSV: {}".format(str(e))) return {'success': True, 'processed': len(processed_assets), 'failed': 0} else: logger.error("✗ Status update failed: {}".format(status_result.get('error'))) # Don't send success notification if status update failed return {'success': False, 'processed': len(processed_assets), 'failed': 0} else: # NOT all done - some failed logger.warning("Campaign incomplete - NOT updating status (remains A1)") # Send partial completion email with asset details notifier.send_email( template_name='b1_to_b2_partial', recipients=config['notifications']['recipients']['errors'], data={ 'campaign_name': campaign_name, 'campaign_id': campaign_id, 'campaign_number': campaign_number, 'total_assets': total_assets, 'successful': len(processed_assets), 'failed': len(failed_assets), 'processed_assets': processed_assets, 'failed_assets': failed_assets } ) return {'success': False, 'processed': len(processed_assets), 'failed': len(failed_assets)} except Exception as e: logger.error("Campaign processing failed: {}".format(str(e))) # Return 0 for failed count since we don't know how many assets there were return {'success': False, 'processed': 0, 'failed': 0} def main(): """Main polling loop""" # Parse command-line arguments parser = argparse.ArgumentParser(description='Ferrero B1→B2 Global Master Asset Downloader') parser.add_argument('--auth-pfx', action='store_true', help='Use mTLS certificate authentication (Legacy APIM)') parser.add_argument('--auth-pfx-v2', action='store_true', help='Use mTLS V2 (Hybrid) authentication') args = parser.parse_args() logger.info("=" * 60) logger.info("Ferrero B1→B2 Master Asset Downloader Starting") # Determine auth mode auth_mode = 'oauth' if args.auth_pfx_v2: auth_mode = 'mtls_v2' logger.info("Authentication: mTLS V2 (Hybrid)") elif args.auth_pfx: auth_mode = 'mtls' logger.info("Authentication: mTLS Certificate (Legacy)") else: logger.info("Authentication: OAuth2 (default)") logger.info("=" * 60) # Load configuration config = load_config('config/config.yaml') # Initialize clients (pass mTLS flag to DAM) dam = DAMClient(config, auth_mode=auth_mode) # Use B1→B2 specific Box folder (349261192115) box = BoxClient(config, root_folder_id=config['box'].get('root_folder_b1_b2')) db = Database(config) notifier = Notifier(config) # Test connections logger.info("Testing connections...") if not dam.test_connection(): logger.error("DAM connection failed - exiting") sys.exit(1) if not box.test_connection(): logger.error("Box connection failed - exiting") sys.exit(1) if not db.test_connection(): logger.error("Database connection failed - exiting") sys.exit(1) logger.info("All connections OK") logger.info("") # SINGLE RUN MODE - Process ONE campaign and exit # Cron will run this script every 5 minutes, processing one campaign at a time try: logger.info("Searching for B1 Global campaigns...") # Search for campaigns with status B1 (Global comm campaigns) campaigns = dam.search_campaigns(status="B1", campaign_type="Global comm") if not campaigns: logger.info("No B1 campaigns found - exiting") db.close() sys.exit(0) # Process ONLY THE FIRST campaign campaign = campaigns[0] logger.info("Found {} A1 campaigns - processing first one only".format(len(campaigns))) logger.info("") result = process_campaign(campaign, dam, box, db, notifier, config) if result['success']: logger.info("") logger.info("=" * 60) logger.info("✓ Campaign completed successfully") logger.info(" Processed: {} assets".format(result['processed'])) logger.info(" Status updated: B1 → B2") logger.info("=" * 60) db.close() sys.exit(0) else: logger.warning("") logger.warning("=" * 60) logger.warning("✗ Campaign incomplete or failed") logger.warning(" Processed: {} assets".format(result['processed'])) logger.warning(" Failed: {} assets".format(result['failed'])) logger.warning(" Status NOT updated (remains A1)") logger.warning("=" * 60) db.close() sys.exit(1) except Exception as e: logger.critical("Script error: {}".format(str(e))) # Send critical error notification notifier.send_email( template_name='upload_failed', recipients=config['notifications']['recipients']['critical'], data={ 'filename': 'B1→B2 Script', 'tracking_id': 'N/A', 'error': str(e) } ) db.close() sys.exit(1) if __name__ == '__main__': main()