A1→A2 now handles re-processing when campaign is reset to A1 after adding new master assets. Existing assets reuse tracking IDs and skip Box upload, new assets are processed normally. Also includes PPR domain registration for multiple master asset IDs in a2_to_a3 and dam_client. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
616 lines
24 KiB
Python
616 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
A1→A2 Box Uploader
|
|
Polls DAM for campaigns with status A1, downloads master assets, uploads to Box
|
|
Updates status to A2 only when ALL assets successfully processed
|
|
Generates CSV of all live campaigns and uploads to Box instead of webhook
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import csv
|
|
from datetime import datetime, timezone
|
|
|
|
# Add shared library to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from shared.config_loader import load_config, load_field_mappings
|
|
from shared.dam_client import DAMClient
|
|
from shared.box_client import BoxClient
|
|
from shared.database import Database
|
|
from shared.notifier import Notifier
|
|
|
|
# Setup logging with rotation
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
# Create logs directory if it doesn't exist
|
|
os.makedirs('logs', exist_ok=True)
|
|
os.makedirs('logs/backup', exist_ok=True)
|
|
|
|
# Configure logging with rotation
|
|
log_handler = RotatingFileHandler(
|
|
'logs/a1_to_a2_box.log',
|
|
maxBytes=10*1024*1024, # 10MB per file
|
|
backupCount=28
|
|
)
|
|
log_handler.setLevel(logging.INFO)
|
|
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
handlers=[log_handler, console_handler]
|
|
)
|
|
|
|
logger = logging.getLogger('A1toA2Box')
|
|
|
|
def extract_creativex_from_dam_metadata(asset_metadata):
|
|
"""
|
|
Extract CreativeX score and URL from DAM asset metadata if present
|
|
"""
|
|
try:
|
|
metadata_elements = asset_metadata.get('metadata', {}).get('metadata_element_list', [])
|
|
|
|
creativex_data = {
|
|
'score': None,
|
|
'url': None,
|
|
'id': None
|
|
}
|
|
|
|
for element in metadata_elements:
|
|
element_id = element.get('id')
|
|
|
|
# Extract CreativeX Score (tabular field)
|
|
if element_id == 'FERRERO.TAB.FIELD.CREATIVEX':
|
|
values = element.get('values', [])
|
|
if values and len(values) > 0:
|
|
value_obj = values[0].get('value', {})
|
|
if isinstance(value_obj, dict):
|
|
field_value = value_obj.get('field_value', {})
|
|
if isinstance(field_value, dict):
|
|
score = field_value.get('value')
|
|
if score:
|
|
creativex_data['score'] = str(score)
|
|
logger.info("Found CreativeX Score in master metadata: {}".format(score))
|
|
|
|
# Extract CreativeX URL
|
|
elif element_id == 'FERRERO.FIELD.CREATIVEX LINK':
|
|
value_obj = element.get('value', {})
|
|
if isinstance(value_obj, dict):
|
|
nested_value = value_obj.get('value', {})
|
|
if isinstance(nested_value, dict):
|
|
url = nested_value.get('value')
|
|
if url:
|
|
creativex_data['url'] = url
|
|
logger.info("Found CreativeX URL in master metadata: {}".format(url))
|
|
|
|
if creativex_data['score'] or creativex_data['url']:
|
|
return creativex_data
|
|
else:
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning("Failed to extract CreativeX from metadata: {}".format(str(e)))
|
|
return None
|
|
|
|
def generate_and_upload_csv(db, box, config):
|
|
"""
|
|
Generate CSV of all live campaigns and upload to Box
|
|
"""
|
|
try:
|
|
logger.info("Generating live campaigns CSV...")
|
|
|
|
# 1. Get all live campaigns from DB
|
|
campaigns = db.get_all_live_campaigns()
|
|
|
|
if not campaigns:
|
|
logger.warning("No live campaigns found to report")
|
|
return False
|
|
|
|
logger.info("Found {} live campaigns".format(len(campaigns)))
|
|
|
|
# 2. Generate CSV file
|
|
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d_%H%M%S_UTC')
|
|
csv_filename = 'live_campaigns_{}.csv'.format(timestamp)
|
|
csv_path = os.path.join('temp', csv_filename)
|
|
|
|
os.makedirs('temp', exist_ok=True)
|
|
|
|
with open(csv_path, 'w', newline='') as csvfile:
|
|
fieldnames = ['code', 'description']
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
|
|
writer.writeheader()
|
|
for camp in campaigns:
|
|
writer.writerow({
|
|
'code': "{}-{}".format(camp['campaign_number'], camp['campaign_name']),
|
|
'description': camp['campaign_name']
|
|
})
|
|
|
|
logger.info("Generated CSV: {}".format(csv_path))
|
|
|
|
# 3. Upload to Box
|
|
folder_id = config['box'].get('live_campaigns_folder_id')
|
|
if not folder_id:
|
|
logger.error("Box live_campaigns_folder_id not configured")
|
|
return False
|
|
|
|
upload_result = box.upload_file(
|
|
file_path=csv_path,
|
|
folder_id=folder_id,
|
|
target_filename=csv_filename
|
|
)
|
|
|
|
logger.info("Uploaded CSV to Box: {} (File ID: {})".format(
|
|
csv_filename, upload_result['file_id']
|
|
))
|
|
|
|
# Clean up
|
|
os.remove(csv_path)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to generate/upload CSV: {}".format(str(e)))
|
|
return False
|
|
|
|
def process_campaign(campaign, dam, box, db, notifier, config):
|
|
"""
|
|
Process single campaign - download all master assets
|
|
"""
|
|
campaign_id = campaign['asset_id']
|
|
campaign_name = campaign['campaign_name']
|
|
campaign_number = campaign.get('campaign_id', 'N/A')
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Processing campaign: {} ({})".format(campaign_name, campaign_number))
|
|
logger.info("=" * 60)
|
|
|
|
total_assets = 0
|
|
try:
|
|
# Get master assets
|
|
master_assets = dam.get_master_assets(campaign_id)
|
|
total_assets = len(master_assets)
|
|
|
|
logger.info("Found {} master assets".format(total_assets))
|
|
|
|
if total_assets == 0:
|
|
logger.warning("No master assets found in Master Assets folder")
|
|
# Send email notification about empty campaign (keep error notifications)
|
|
notifier.send_email(
|
|
template_name='a1_to_a2_no_assets',
|
|
recipients=config['notifications']['recipients']['errors'],
|
|
data={
|
|
'campaign_name': campaign_name,
|
|
'campaign_id': campaign_id,
|
|
'campaign_number': campaign_number
|
|
}
|
|
)
|
|
return {'success': False, 'processed': 0, 'failed': 0}
|
|
|
|
# Track results
|
|
processed_assets = []
|
|
failed_assets = []
|
|
|
|
# Get Final Assets folder for upload directory
|
|
final_folder_id = dam.find_final_assets_folder(campaign_id)
|
|
|
|
if not final_folder_id:
|
|
logger.error("Final Assets folder not found")
|
|
return {'success': False, 'processed': 0, 'failed': total_assets}
|
|
|
|
# Process each asset
|
|
for asset in master_assets:
|
|
asset_id = asset['asset_id']
|
|
asset_name = asset.get('name', 'unknown')
|
|
folder_path = asset.get('folder_path', '')
|
|
|
|
try:
|
|
if folder_path:
|
|
logger.info("Processing: {} (from subfolder: {})".format(asset_name, folder_path))
|
|
else:
|
|
logger.info("Processing: {}".format(asset_name))
|
|
|
|
# 1. Extract Global Campaign Reference (needed for tracking ID lookup)
|
|
global_ref = db.extract_global_campaign_reference(asset, campaign_number)
|
|
|
|
# 2. Find existing tracking ID or generate new one
|
|
# Handles re-processing: if campaign was reset to A1 after adding new masters,
|
|
# existing assets keep their tracking IDs, new assets get new IDs
|
|
tracking_result = db.find_or_create_tracking_id(
|
|
opentext_id=asset_id,
|
|
local_campaign_id=global_ref['local_campaign_id']
|
|
)
|
|
tracking_id = tracking_result['tracking_id']
|
|
is_existing = tracking_result['is_existing']
|
|
|
|
if is_existing:
|
|
# Asset already processed in a previous A1→A2 cycle
|
|
existing_master = db.get_master_asset(tracking_id)
|
|
if existing_master and existing_master.get('box_file_id'):
|
|
logger.info("Re-processing: reusing tracking ID {} for existing asset {} (skipping download/upload)".format(
|
|
tracking_id, asset_name))
|
|
box_result = {
|
|
'file_id': existing_master['box_file_id'],
|
|
'url': existing_master['box_url']
|
|
}
|
|
|
|
# Update database metadata (asset_data may have changed in DAM)
|
|
db_result = db.store_master_asset(
|
|
tracking_id=tracking_id,
|
|
opentext_id=asset_id,
|
|
asset_data=asset,
|
|
box_file_id=box_result['file_id'],
|
|
box_url=box_result['url'],
|
|
upload_folder_id=final_folder_id,
|
|
global_master_campaign_id=global_ref['global_master_campaign_id'],
|
|
global_master_folder_id=global_ref['global_master_folder_id'],
|
|
local_campaign_id=global_ref['local_campaign_id']
|
|
)
|
|
|
|
if db_result['success']:
|
|
processed_assets.append({
|
|
'asset_id': asset_id,
|
|
'asset_name': asset_name,
|
|
'tracking_id': tracking_id,
|
|
'box_file_id': box_result['file_id'],
|
|
'box_url': box_result['url'],
|
|
'is_existing': True
|
|
})
|
|
logger.info("✓ Existing asset confirmed: {} → {} (skipped)".format(asset_name, tracking_id))
|
|
else:
|
|
raise Exception("Database update failed for existing asset")
|
|
|
|
continue # Skip to next asset
|
|
else:
|
|
# Tracking ID exists but no usable Box info - process normally
|
|
logger.info("Existing tracking ID {} found but no Box info - downloading/uploading".format(tracking_id))
|
|
|
|
# 3. Download from DAM (new assets or existing without Box info)
|
|
file_path = dam.download_asset(
|
|
asset_id,
|
|
output_dir='temp/downloads/{}'.format(campaign_id)
|
|
)
|
|
|
|
# 4. Upload to Box (preserve folder structure from DAM)
|
|
box_result = box.upload_with_tracking_id(
|
|
file_path=file_path,
|
|
campaign_id=campaign_number,
|
|
campaign_name=campaign_name,
|
|
tracking_id=tracking_id,
|
|
subfolder_path=folder_path
|
|
)
|
|
|
|
# 5. Store in database
|
|
db_result = db.store_master_asset(
|
|
tracking_id=tracking_id,
|
|
opentext_id=asset_id,
|
|
asset_data=asset,
|
|
box_file_id=box_result['file_id'],
|
|
box_url=box_result['url'],
|
|
upload_folder_id=final_folder_id,
|
|
global_master_campaign_id=global_ref['global_master_campaign_id'],
|
|
global_master_folder_id=global_ref['global_master_folder_id'],
|
|
local_campaign_id=global_ref['local_campaign_id']
|
|
)
|
|
|
|
if db_result['success']:
|
|
# 6. Extract and store CreativeX score
|
|
creativex_data = extract_creativex_from_dam_metadata(asset)
|
|
|
|
if creativex_data:
|
|
cx_result = db.store_creativex_score(
|
|
filename=asset_name,
|
|
creativex_id=creativex_data.get('id', ''),
|
|
creativex_url=creativex_data.get('url', ''),
|
|
quality_score=creativex_data.get('score', ''),
|
|
box_file_id=box_result['file_id'],
|
|
full_extraction_data={'master_metadata': True, 'data': creativex_data},
|
|
tracking_id=tracking_id,
|
|
status='master-cx-score'
|
|
)
|
|
if cx_result['success']:
|
|
logger.info("Stored master CreativeX score: {} (Tracking: {})".format(
|
|
creativex_data.get('score'), tracking_id
|
|
))
|
|
|
|
processed_assets.append({
|
|
'asset_id': asset_id,
|
|
'asset_name': asset_name,
|
|
'tracking_id': tracking_id,
|
|
'box_file_id': box_result['file_id'],
|
|
'box_url': box_result['url'],
|
|
'is_existing': False
|
|
})
|
|
logger.info("✓ New asset: {} → {}".format(asset_name, tracking_id))
|
|
else:
|
|
raise Exception("Database storage failed")
|
|
|
|
# Clean up temp file
|
|
os.remove(file_path)
|
|
|
|
except Exception as e:
|
|
logger.error("✗ Failed: {} - {}".format(asset_name, str(e)))
|
|
failed_assets.append({
|
|
'asset_id': asset_id,
|
|
'asset_name': asset_name,
|
|
'error': str(e)
|
|
})
|
|
|
|
# CHECK: All assets processed successfully?
|
|
all_done = len(processed_assets) == total_assets
|
|
|
|
# Count new vs existing assets
|
|
new_assets = [a for a in processed_assets if not a.get('is_existing')]
|
|
existing_assets = [a for a in processed_assets if a.get('is_existing')]
|
|
|
|
logger.info("")
|
|
logger.info("Campaign {} Results:".format(campaign_id))
|
|
logger.info(" Total: {}".format(total_assets))
|
|
logger.info(" Successful: {}".format(len(processed_assets)))
|
|
if existing_assets:
|
|
logger.info(" - New assets: {}".format(len(new_assets)))
|
|
logger.info(" - Existing assets (skipped download/upload): {}".format(len(existing_assets)))
|
|
logger.info(" Failed: {}".format(len(failed_assets)))
|
|
logger.info(" All Done: {}".format("YES" if all_done else "NO"))
|
|
logger.info("")
|
|
|
|
if all_done:
|
|
# ALL assets processed - update status
|
|
logger.info("All assets processed - Updating status A1 → A2")
|
|
|
|
status_result = dam.update_campaign_status(campaign_id, 'A2')
|
|
|
|
if status_result['success']:
|
|
logger.info("✓ Status updated successfully")
|
|
|
|
# Record campaign status in database
|
|
logger.info("Recording campaign status in database...")
|
|
db.record_campaign_status(
|
|
campaign_id=campaign_id,
|
|
campaign_number=campaign_number,
|
|
campaign_name=campaign_name,
|
|
live_campaign='YES', # A1→A2 campaigns are going live
|
|
status='A2',
|
|
webhook_sent=False # No webhook sent in this version
|
|
)
|
|
|
|
# REPLACEMENT: Generate and upload CSV instead of webhook
|
|
logger.info("Generating and uploading live campaigns CSV...")
|
|
csv_success = generate_and_upload_csv(db, box, config)
|
|
|
|
if csv_success:
|
|
logger.info("✓ CSV report uploaded successfully")
|
|
else:
|
|
logger.error("✗ CSV report generation/upload failed")
|
|
|
|
# Generate CSV Report
|
|
import csv
|
|
try:
|
|
csv_filename = "A1_Campaign_{}_Assets.csv".format(campaign_number)
|
|
csv_path = os.path.join("temp", csv_filename)
|
|
if not os.path.exists("temp"):
|
|
os.makedirs("temp")
|
|
|
|
with open(csv_path, 'w', newline='') as csvfile:
|
|
fieldnames = ['Filename', 'Tracking ID', 'Campaign Number', 'Status']
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
|
|
writer.writeheader()
|
|
for asset in processed_assets:
|
|
clean_name = asset['asset_name']
|
|
|
|
writer.writerow({
|
|
'Filename': clean_name,
|
|
'Tracking ID': asset['tracking_id'],
|
|
'Campaign Number': campaign_number,
|
|
'Status': 'Existing' if asset.get('is_existing') else 'New'
|
|
})
|
|
|
|
logger.info("Generated CSV report: {}".format(csv_path))
|
|
attachments = [csv_path]
|
|
|
|
except Exception as csv_error:
|
|
logger.error("Failed to generate CSV report: {}".format(str(csv_error)))
|
|
attachments = None
|
|
|
|
# Send success email with asset details AND CSV attachment
|
|
notifier.send_email(
|
|
template_name='a1_to_a2_complete',
|
|
recipients=config['notifications']['recipients']['success'],
|
|
data={
|
|
'campaign_name': campaign_name,
|
|
'campaign_id': campaign_id,
|
|
'campaign_number': campaign_number,
|
|
'asset_count': len(processed_assets),
|
|
'new_asset_count': len(new_assets),
|
|
'existing_asset_count': len(existing_assets),
|
|
'processed_assets': processed_assets
|
|
},
|
|
attachments=attachments
|
|
)
|
|
|
|
# Clean up CSV
|
|
if attachments and os.path.exists(csv_path):
|
|
try:
|
|
os.remove(csv_path)
|
|
logger.info("Cleaned up CSV report")
|
|
except Exception as e:
|
|
logger.warning("Failed to remove temp CSV: {}".format(str(e)))
|
|
|
|
|
|
return {'success': True, 'processed': len(processed_assets), 'failed': 0}
|
|
|
|
else:
|
|
logger.error("✗ Status update failed: {}".format(status_result.get('error')))
|
|
return {'success': False, 'processed': len(processed_assets), 'failed': 0}
|
|
|
|
else:
|
|
# NOT all done - some failed
|
|
logger.warning("Campaign incomplete - NOT updating status (remains A1)")
|
|
|
|
# Send partial completion email
|
|
notifier.send_email(
|
|
template_name='a1_to_a2_partial',
|
|
recipients=config['notifications']['recipients']['errors'],
|
|
data={
|
|
'campaign_name': campaign_name,
|
|
'campaign_id': campaign_id,
|
|
'campaign_number': campaign_number,
|
|
'total_assets': total_assets,
|
|
'successful': len(processed_assets),
|
|
'failed': len(failed_assets),
|
|
'processed_assets': processed_assets,
|
|
'failed_assets': failed_assets
|
|
}
|
|
)
|
|
|
|
return {'success': False, 'processed': len(processed_assets), 'failed': len(failed_assets)}
|
|
|
|
except Exception as e:
|
|
logger.error("Campaign processing failed: {}".format(str(e)))
|
|
|
|
# Send error notification for this specific campaign failure
|
|
try:
|
|
notifier.send_email(
|
|
template_name='upload_failed',
|
|
recipients=config['notifications']['recipients']['errors'],
|
|
data={
|
|
'filename': "Campaign: {}".format(campaign_name),
|
|
'tracking_id': campaign_number,
|
|
'error': str(e)
|
|
}
|
|
)
|
|
except Exception as email_error:
|
|
logger.error("Failed to send error email: {}".format(str(email_error)))
|
|
|
|
return {'success': False, 'processed': 0, 'failed': total_assets}
|
|
|
|
def main():
|
|
"""Main polling loop"""
|
|
parser = argparse.ArgumentParser(description='Ferrero A1→A2 Box Uploader')
|
|
parser.add_argument('--auth-pfx', action='store_true',
|
|
help='Use mTLS certificate authentication (Legacy APIM)')
|
|
parser.add_argument('--auth-pfx-v2', action='store_true',
|
|
help='Use mTLS V2 (Hybrid) authentication')
|
|
args = parser.parse_args()
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Ferrero A1→A2 Box Uploader Starting")
|
|
|
|
# Determine auth mode
|
|
auth_mode = 'oauth'
|
|
if args.auth_pfx_v2:
|
|
auth_mode = 'mtls_v2'
|
|
logger.info("Authentication: mTLS V2 (Hybrid)")
|
|
elif args.auth_pfx:
|
|
auth_mode = 'mtls'
|
|
logger.info("Authentication: mTLS Certificate (Legacy)")
|
|
else:
|
|
logger.info("Authentication: OAuth2 (default)")
|
|
|
|
logger.info("=" * 60)
|
|
|
|
# Load configuration
|
|
config = load_config('config/config.yaml')
|
|
|
|
# Initialize clients
|
|
dam = DAMClient(config, auth_mode=auth_mode)
|
|
box = BoxClient(config)
|
|
db = Database(config)
|
|
notifier = Notifier(config)
|
|
|
|
# Test connections
|
|
logger.info("Testing connections...")
|
|
if not dam.test_connection():
|
|
logger.error("DAM connection failed - exiting")
|
|
sys.exit(1)
|
|
|
|
if not box.test_connection():
|
|
logger.error("Box connection failed - exiting")
|
|
sys.exit(1)
|
|
|
|
if not db.test_connection():
|
|
logger.error("Database connection failed - exiting")
|
|
sys.exit(1)
|
|
|
|
logger.info("All connections OK")
|
|
logger.info("")
|
|
|
|
# Process UP TO 2 campaigns per run (Cron mode)
|
|
try:
|
|
logger.info("Searching for A1 campaigns...")
|
|
|
|
campaigns = dam.search_campaigns(status='A1')
|
|
|
|
if not campaigns:
|
|
logger.info("No A1 campaigns found - exiting")
|
|
db.close()
|
|
sys.exit(0)
|
|
|
|
# Process UP TO 2 campaigns
|
|
campaigns_to_process = campaigns[:2]
|
|
logger.info("Found {} A1 campaigns - processing {} campaign(s)".format(
|
|
len(campaigns), len(campaigns_to_process)
|
|
))
|
|
logger.info("")
|
|
|
|
# Track results
|
|
successful_campaigns = 0
|
|
failed_campaigns = 0
|
|
|
|
for idx, campaign in enumerate(campaigns_to_process, 1):
|
|
logger.info("=" * 60)
|
|
logger.info("Processing campaign {}/{}".format(idx, len(campaigns_to_process)))
|
|
logger.info("=" * 60)
|
|
|
|
result = process_campaign(campaign, dam, box, db, notifier, config)
|
|
|
|
if result['success']:
|
|
successful_campaigns += 1
|
|
logger.info("✓ Campaign {} completed successfully".format(idx))
|
|
else:
|
|
failed_campaigns += 1
|
|
logger.warning("✗ Campaign {} incomplete or failed".format(idx))
|
|
|
|
logger.info("")
|
|
|
|
# Summary
|
|
logger.info("")
|
|
logger.info("=" * 60)
|
|
logger.info("A1→A2 Processing Summary")
|
|
logger.info("=" * 60)
|
|
logger.info(" Total campaigns processed: {}".format(len(campaigns_to_process)))
|
|
logger.info(" Successful: {}".format(successful_campaigns))
|
|
logger.info(" Failed: {}".format(failed_campaigns))
|
|
logger.info("=" * 60)
|
|
|
|
db.close()
|
|
|
|
# Exit with success if at least one campaign succeeded
|
|
if successful_campaigns > 0:
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.critical("Script error: {}".format(str(e)))
|
|
notifier.send_email(
|
|
template_name='upload_failed',
|
|
recipients=config['notifications']['recipients']['critical'],
|
|
data={
|
|
'filename': 'A1→A2 Box Uploader',
|
|
'tracking_id': 'N/A',
|
|
'error': str(e)
|
|
}
|
|
)
|
|
db.close()
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|