ferrero-opentext/Python-Version/scripts/a2_to_a3_upload_polling.py
nickviljoen 26363f772d Enhancement: Campaign re-opening support and PPR master asset ID registration
A1→A2 now handles re-processing when campaign is reset to A1 after adding new
master assets. Existing assets reuse tracking IDs and skip Box upload, new assets
are processed normally. Also includes PPR domain registration for multiple master
asset IDs in a2_to_a3 and dam_client.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-05 21:06:14 +02:00

564 lines
23 KiB
Python
Executable file

#!/usr/bin/env python3
"""
A2→A3 Upload Handler - Box Folder Polling Version
Polls Box folder for new files with V2 naming, uploads to DAM
Updates status to A3 only when ALL assets for campaign uploaded
Supports --A3update flag to force status update for testing
Compatible with Python 3.6+
"""
import sys
import os
import time
import logging
import argparse
# Add shared library to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config, load_field_mappings
from shared.dam_client import DAMClient
from shared.box_client import BoxClient
from shared.database import Database
from shared.notifier import Notifier
from shared.filename_parser import FilenameParser
from shared.metadata_extractor_mvp import MetadataExtractorMVP
# Load configuration
config = load_config('config/config.yaml')
field_mappings = load_field_mappings(config)
# Setup logging with rotation
from logging.handlers import RotatingFileHandler
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
os.makedirs('logs/backup', exist_ok=True)
# Configure logging with rotation
# Keep 1 week of active logs (7 days * 10MB = 70MB)
# Backup rotates keep 4 weeks (28 backups * 10MB = 280MB total)
log_handler = RotatingFileHandler(
'logs/a2_to_a3.log',
maxBytes=10*1024*1024, # 10MB per file
backupCount=28 # Keep 28 rotated files (approximately 1 month)
)
log_handler.setLevel(logging.INFO)
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.basicConfig(
level=logging.INFO,
handlers=[log_handler, console_handler]
)
logger = logging.getLogger('A2toA3')
def process_box_file(file_info, dam, box, db, parser, mvp_extractor, config, keep_files=False, dryrun=False):
"""
Process a single file from Box folder
Args:
keep_files: If True, don't delete file from Box after upload (for testing)
dryrun: If True, build metadata but don't upload to DAM (shows full JSON)
Returns:
dict with success, asset_id, tracking_id
"""
file_id = file_info['id']
filename = file_info['name']
logger.info("Processing: {}".format(filename))
try:
# 1. Parse V2 filename
parsed = parser.parse_filename(filename)
if not parsed['is_valid']:
raise ValueError("Invalid V2 filename: {} - {}".format(
filename, ', '.join(parsed['validation_errors'])
))
tracking_id = parsed['tracking_id']
tracking_mode = parsed.get('tracking_mode', 'full')
subfolder_path = file_info.get('subfolder_path')
if not tracking_id:
raise ValueError("No tracking ID in filename")
logger.info("Tracking ID: {} (mode: {})".format(tracking_id, tracking_mode))
if subfolder_path:
logger.info("From Box subfolder: {} -> will create in DAM".format(subfolder_path))
# 2. Load master metadata from database (support multiple tracking IDs in PPR)
tracking_ids = parsed.get('tracking_ids', [tracking_id]) # Get all IDs or fallback to single
has_multiple_masters = parsed.get('has_multiple_masters', False)
# Load all master assets (PPR: multiple, PROD: single)
master_assets = []
master_opentext_ids = []
if has_multiple_masters:
logger.info("PPR - Multiple master assets detected: {}".format(', '.join(tracking_ids)))
for tid in tracking_ids:
master = db.get_master_asset(tid)
if not master:
logger.warning("Master asset not found for tracking ID: {} - skipping".format(tid))
continue
master_assets.append(master)
master_opentext_ids.append(master['opentext_id'])
if not master_assets:
raise ValueError("No master assets found for tracking IDs: {}".format(', '.join(tracking_ids)))
# Use first master for metadata inheritance
master_asset = master_assets[0]
logger.info("Using primary master {} for metadata, linking {} total masters".format(
tracking_ids[0], len(master_assets)))
else:
# Single master (backward compatible)
master_asset = db.get_master_asset(tracking_id)
if not master_asset:
# Will check below
master_asset = None
else:
master_opentext_ids = [master_asset['opentext_id']]
# CHECK: Warn if Master Tracking ID is used (starts with M)
if tracking_id.upper().startswith('M'):
logger.warning("Detected Master Tracking ID in Version/Derivative upload folder: {}".format(tracking_id))
# Send notification
notifier.send_email(
template_name='upload_failed', # Reusing error template
recipients=config['notifications']['recipients']['errors'],
data={
'filename': filename,
'tracking_id': tracking_id,
'error': "INVALID TRACKING ID: Master Tracking IDs (starting with 'M') are not allowed in this folder. Please use a Version Tracking ID."
}
)
# Move file to _Errors subfolder to prevent re-processing loop
# Check if parent folder has an _Errors subfolder
try:
parent_folder_id = file_info.get('parent', {}).get('id')
if not parent_folder_id:
# Fallback if parent not in file_info (should handle this case)
# For recursive scan, we might not have parent ID handy in file_info, need to fetch or deduce
# file_info from list_folder_files_recursive usually has basic info.
parent_folder_id = box.client.file(file_info['id']).get().parent.id
# Create/Get _Errors folder
errors_folder = box._get_or_create_subfolder_path(box.client.folder(parent_folder_id), '_Errors')
# Move file
box.move_file(file_info['id'], errors_folder.id)
logger.info("Moved invalid file to _Errors folder")
except Exception as move_error:
logger.error("Failed to move invalid file: {}".format(str(move_error)))
return {
'success': False,
'error': "Master Tracking ID detected - moved to _Errors",
'filename': filename,
'tracking_id': tracking_id
}
if not master_asset:
raise ValueError("No master asset for tracking ID: {}".format(tracking_id))
# 3. Get CreativeX score from database (lookup by original Box filename)
# The PDF contains the filename field with the full name (job + tracking ID)
# So we lookup using the original filename from Box, not the stripped version
creativex_data = db.get_creativex_score_by_filename(filename)
# Build box_metadata dict (for compatibility with existing code)
if creativex_data:
# Extract platforms from full_extraction_data
full_data = creativex_data.get('full_extraction_data', {})
data_obj = full_data.get('data', {}) if isinstance(full_data, dict) else {}
platforms = data_obj.get('ferrero_mapped_platforms', [])
# If legacy single platform exists, add it to list
if not platforms and data_obj.get('ferrero_mapped_platform'):
platforms = [data_obj.get('ferrero_mapped_platform')]
box_metadata = {
'score': creativex_data['quality_score'],
'url': creativex_data['creativex_url'],
'platforms': platforms
}
logger.info("CreativeX score found in database: Score={}, URL={}, Platforms={}".format(
creativex_data['quality_score'], creativex_data['creativex_url'], platforms
))
creativex_found = True
else:
# Use default values when no CreativeX score found
box_metadata = {
'score': '0',
'url': 'https://app.creativex.com/preflight/pretests'
}
logger.warning("No CreativeX score found for: {} - Using default values (Score: 0, Placeholder URL)".format(
filename
))
creativex_found = False
# 4. Download from Box
temp_file = os.path.join('temp/downloads', filename)
box.download_file(file_id, temp_file)
# 5. Get clean filename
clean_filename = parser.strip_upload_components(filename)
# 6. Build MVP asset representation with CreativeX data from database
asset_rep = mvp_extractor.build_mvp_asset_representation(
master_metadata=master_asset['full_metadata'],
clean_filename=clean_filename,
parsed_filename=parsed,
box_metadata=box_metadata, # Pass CreativeX data from database
tracking_mode=tracking_mode, # Pass tracking mode for folder-only handling
master_opentext_id=master_asset['opentext_id'], # Primary master DAM ID
master_opentext_ids=master_opentext_ids # All master IDs (PPR: multiple, PROD: single)
)
# DRYRUN MODE: Display full asset representation and exit
if dryrun:
import json
logger.info("")
logger.info("=" * 80)
logger.info("DRYRUN MODE - Asset Representation (will NOT upload to DAM)")
logger.info("=" * 80)
logger.info("")
logger.info("FULL ASSET REPRESENTATION (JSON):")
logger.info("")
logger.info(json.dumps(asset_rep, indent=2, ensure_ascii=False))
logger.info("")
logger.info("=" * 80)
logger.info("Field Count: {} fields".format(len(asset_rep)))
logger.info("=" * 80)
logger.info("")
logger.info("CreativeX Status:")
logger.info(" Found in database: {}".format(creativex_found))
logger.info(" Score: {}".format(box_metadata.get('score')))
logger.info(" URL: {}".format(box_metadata.get('url')))
logger.info("")
# PPR ONLY: Register master asset IDs in lookup domain (even in dryrun for testing)
# This API call is safe - it only adds values to the lookup table, doesn't create assets
if master_opentext_ids:
logger.info("PPR Domain Registration Test:")
registration_result = dam.register_master_asset_ids_for_ppr(master_opentext_ids)
if registration_result.get('skipped'):
logger.info(" Skipped (not PPR environment)")
else:
logger.info(" Registered: {}".format(registration_result.get('registered_ids', [])))
if registration_result.get('failed_ids'):
logger.info(" Failed: {}".format(registration_result.get('failed_ids', [])))
logger.info("")
logger.info("DRYRUN: No upload performed, file kept in Box")
logger.info("=" * 80)
return {
'success': True,
'asset_id': 'DRYRUN_NO_UPLOAD',
'tracking_id': tracking_id,
'filename': filename,
'clean_filename': clean_filename,
'creativex_found': creativex_found,
'creativex_score': box_metadata.get('score', '0'),
'creativex_url': box_metadata.get('url', 'https://app.creativex.com/preflight/pretests'),
'dryrun': True
}
# 7. Rename to clean filename
clean_temp_file = os.path.join('temp/downloads', clean_filename)
if os.path.exists(clean_temp_file):
os.remove(clean_temp_file)
os.rename(temp_file, clean_temp_file)
# 7. Upload to DAM (with subfolder structure if present)
upload_folder_id = master_asset['upload_directory'] # Base "01. Final Assets" folder
# If file was in a Box subfolder, create/use matching DAM subfolder
if subfolder_path:
logger.info("Creating DAM subfolder path: {}".format(subfolder_path))
upload_folder_id = dam.get_or_create_subfolder_path(
base_folder_id=upload_folder_id,
subfolder_path=subfolder_path
)
logger.info("Will upload to: 01. Final Assets/{}".format(subfolder_path))
# PPR ONLY: Register master asset IDs in lookup domain before upload
# OpenText API requires domain values to exist before they can be used in asset creation
if master_opentext_ids:
dam.register_master_asset_ids_for_ppr(master_opentext_ids)
upload_result = dam.upload_asset(
file_path=clean_temp_file,
folder_id=upload_folder_id,
asset_representation=asset_rep
)
if not upload_result['success']:
raise Exception("Upload failed: {}".format(upload_result.get('error')))
# 8. Store derivative record
db.store_derivative_asset(
tracking_id=tracking_id,
master_asset_id=None,
dam_asset_id=upload_result['asset_id'],
filename=clean_filename
)
# 9. Delete file from Box after successful upload (unless --keep-files flag set)
if keep_files:
logger.info("--keep-files flag set - File kept in Box: {}".format(filename))
else:
try:
box_file = box.client.file(file_id)
box_file.delete()
logger.info("Deleted file from Box: {}".format(filename))
except Exception as e:
logger.warning("Could not delete file from Box: {}".format(str(e)))
# 10. Clean up local temp file
os.remove(clean_temp_file)
logger.info("✓ Success: {} → Asset ID: {}".format(filename, upload_result['asset_id']))
return {
'success': True,
'asset_id': upload_result['asset_id'],
'tracking_id': tracking_id,
'filename': filename,
'clean_filename': clean_filename,
'creativex_found': creativex_found,
'creativex_score': box_metadata.get('score', '0'),
'creativex_url': box_metadata.get('url', 'https://app.creativex.com/preflight/pretests'),
'subfolder_path': subfolder_path # Add subfolder path to result
}
except Exception as e:
logger.error("✗ Failed: {} - {}".format(filename, str(e)))
return {
'success': False,
'error': str(e),
'filename': filename,
'tracking_id': tracking_id if 'tracking_id' in locals() else None,
'subfolder_path': file_info.get('subfolder_path') # Add subfolder path to error result
}
def main():
"""Main entry point - single run mode"""
# Parse command-line arguments
parser_args = argparse.ArgumentParser(description='Ferrero A2→A3 Upload Handler')
parser_args.add_argument('--auth-pfx', action='store_true',
help='Use mTLS certificate authentication (Legacy APIM)')
parser_args.add_argument('--auth-pfx-v2', action='store_true',
help='Use mTLS V2 (Hybrid) authentication')
parser_args.add_argument('--A3update', action='store_true',
help='Force update campaign status A2→A3 after upload (for testing)')
parser_args.add_argument('--keep-files', action='store_true',
help='Keep files in Box after upload (don\'t delete, for testing)')
parser_args.add_argument('--dryrun', action='store_true',
help='Build metadata but don\'t upload to DAM (shows full JSON for debugging)')
args = parser_args.parse_args()
logger.info("=" * 60)
logger.info("Ferrero A2→A3 Upload Handler Starting (Polling Mode)")
# Determine auth mode
auth_mode = 'oauth'
if args.auth_pfx_v2:
auth_mode = 'mtls_v2'
logger.info("Authentication: mTLS V2 (Hybrid)")
elif args.auth_pfx:
auth_mode = 'mtls'
logger.info("Authentication: mTLS Certificate (Legacy)")
else:
logger.info("Authentication: OAuth2 (default)")
if args.A3update:
logger.info("Mode: Auto-update campaign status A2→A3 (--A3update)")
if args.keep_files:
logger.info("Mode: Keep files in Box after upload (--keep-files)")
if args.dryrun:
logger.info("Mode: DRYRUN - Build metadata but DON'T upload (--dryrun)")
logger.info("=" * 60)
# Initialize clients
dam = DAMClient(config, auth_mode=auth_mode)
# Use A2→A3 Box folder for polling
box = BoxClient(config, root_folder_id=config['box'].get('root_folder_a2_a3'))
db = Database(config)
notifier = Notifier(config)
parser = FilenameParser(dam_base_url=dam.base_url) # Pass DAM URL for environment detection
mvp_extractor = MetadataExtractorMVP(field_mappings)
# Test connections
logger.info("Testing connections...")
if not dam.test_connection():
logger.error("DAM connection failed")
sys.exit(1)
if not box.test_connection():
logger.error("Box connection failed")
sys.exit(1)
if not db.test_connection():
logger.error("Database connection failed")
sys.exit(1)
logger.info("All connections OK")
logger.info("")
try:
# Get Box folder ID to poll
box_folder_id = config['box'].get('root_folder_a2_a3', config['box'].get('root_folder_id'))
logger.info("Polling Box folder: {}".format(box_folder_id))
# List files recursively in Box folder (skips 1st level job folders, preserves 2nd+ levels)
files = box.list_folder_files_recursive(box_folder_id)
logger.info("Recursive scan complete")
if not files:
logger.info("No files found in Box folder - exiting")
db.close()
sys.exit(0)
logger.info("Found {} files in Box folder".format(len(files)))
# Show subfolder distribution
subfolders = set([f.get('subfolder_path') for f in files if f.get('subfolder_path')])
if subfolders:
logger.info("Files in {} subfolder(s): {}".format(
len(subfolders), ', '.join(sorted(subfolders))
))
files_at_root = len([f for f in files if not f.get('subfolder_path')])
if files_at_root:
logger.info("Files at job level (will go to DAM root): {}".format(files_at_root))
# Filter for V2 filenames only
valid_files = []
for file_info in files:
parsed = parser.parse_filename(file_info['name'])
if parsed['is_valid'] and parsed.get('tracking_id'):
valid_files.append(file_info)
else:
logger.info("Skipping invalid V2 file: {} - Errors: {}".format(
file_info['name'], parsed.get('validation_errors', [])
))
logger.info("Found {} valid V2 files to process".format(len(valid_files)))
if not valid_files:
logger.info("No valid V2 files to process - exiting")
db.close()
sys.exit(0)
# Process ALL files
logger.info("Processing all {} file(s)".format(len(valid_files)))
logger.info("")
# Track results
successful_files = []
failed_files = []
for idx, file_info in enumerate(valid_files, 1):
logger.info("=" * 60)
logger.info("Processing file {}/{}".format(idx, len(valid_files)))
logger.info("=" * 60)
result = process_box_file(file_info, dam, box, db, parser, mvp_extractor, config, keep_files=args.keep_files, dryrun=args.dryrun)
if result['success']:
successful_files.append(result)
logger.info("✓ File {} processed successfully".format(idx))
else:
failed_files.append(result)
logger.error("✗ File {} failed".format(idx))
logger.info("")
# Summary
logger.info("")
logger.info("=" * 60)
logger.info("A2→A3 Processing Summary")
logger.info("=" * 60)
logger.info(" Total files processed: {}".format(len(valid_files)))
logger.info(" Successful: {}".format(len(successful_files)))
logger.info(" Failed: {}".format(len(failed_files)))
logger.info("=" * 60)
# Send summary email notification
if len(successful_files) > 0:
# Get campaign info from first successful file for context
first_result = successful_files[0]
master_asset = db.get_master_asset(first_result['tracking_id'])
notifier.send_email(
template_name='a2_to_a3_batch_complete',
recipients=config['notifications']['recipients']['success'],
data={
'total_files': len(valid_files),
'successful_count': len(successful_files),
'failed_count': len(failed_files),
'successful_files': successful_files,
'failed_files': failed_files,
'box_folder': box_folder_id
}
)
# Update campaign status A2→A3 if --A3update flag is set
if args.A3update and master_asset:
logger.info("")
logger.info("--A3update flag set - Attempting to update campaign status")
# Get campaign ID from master asset metadata
full_metadata = master_asset.get('full_metadata', {})
# Extract campaign ID from inherited_metadata_collections
campaign_id = None
collections = full_metadata.get('inherited_metadata_collections', [])
for collection in collections:
if collection.get('container_type_name') == 'L7+ - CAMPAIGN':
campaign_id = collection.get('container_id')
break
if campaign_id:
logger.info("Found campaign ID: {}".format(campaign_id))
logger.info("Updating campaign status A2 → A3...")
status_result = dam.update_campaign_status(campaign_id, 'A3')
if status_result['success']:
logger.info("✓ Campaign status updated successfully: A2 → A3")
else:
logger.error("✗ Campaign status update failed: {}".format(status_result.get('error')))
else:
logger.warning("⚠ Campaign ID not found in master asset metadata - cannot update status")
db.close()
# Exit with success if at least one file succeeded
if len(successful_files) > 0:
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
logger.critical("Script error: {}".format(str(e)))
db.close()
sys.exit(1)
if __name__ == '__main__':
main()