ferrero-opentext/Python-Version/scripts/a1_to_a2_box_uploader.py
nickviljoen 90f326aecb Enhancement: Treat empty A1 folders as expected workflow
Campaign managers often create the campaign in DAM before assets are
uploaded, so an empty Master Assets folder is the normal pre-asset state
rather than a failure. Stop marking these as permanently failed and stop
emailing on every poll.

- increment_a1_retry() gains mark_failed_at_max param; empty-folder path
  passes False so the campaign keeps polling indefinitely until assets
  appear (or the DAM status changes).
- Empty-folder branch now skips silently on every poll and sends a single
  warning email at poll 20 (~1 hour at the 3-min cadence) so genuinely
  stuck campaigns still surface.
- New a1_to_a2_no_assets_warning email template — one-time soft warning,
  no permanent-failure language.
- Existing reset_a1_retry() on successful A1→A2 still clears the counter
  when assets eventually appear.
- Other folder-error paths (folder not found, etc.) keep the original
  3-retry-then-fail behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:20:41 +02:00

729 lines
30 KiB
Python

#!/usr/bin/env python3
"""
A1→A2 Box Uploader
Polls DAM for campaigns with status A1, downloads master assets, uploads to Box
Updates status to A2 only when ALL assets successfully processed
Generates CSV of all live campaigns and uploads to Box instead of webhook
"""
import sys
import os
import time
import logging
import argparse
import csv
from datetime import datetime, timezone
# Add shared library to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config, load_field_mappings
from shared.dam_client import DAMClient
from shared.box_client import BoxClient
from shared.database import Database
from shared.notifier import Notifier
# Setup logging with rotation
from logging.handlers import RotatingFileHandler
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
os.makedirs('logs/backup', exist_ok=True)
# Configure logging with rotation
log_handler = RotatingFileHandler(
'logs/a1_to_a2_box.log',
maxBytes=10*1024*1024, # 10MB per file
backupCount=28
)
log_handler.setLevel(logging.INFO)
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.basicConfig(
level=logging.INFO,
handlers=[log_handler, console_handler]
)
logger = logging.getLogger('A1toA2Box')
# Empty A1 folders are an expected client workflow (folder created before assets uploaded).
# Skip silently and send a single warning email at this poll count to flag genuinely-stuck
# campaigns without spamming. At ~3-min poll cadence, 20 polls ≈ 1 hour.
EMPTY_FOLDER_WARNING_THRESHOLD = 20
def extract_creativex_from_dam_metadata(asset_metadata):
"""
Extract CreativeX score and URL from DAM asset metadata if present
"""
try:
metadata_elements = asset_metadata.get('metadata', {}).get('metadata_element_list', [])
creativex_data = {
'score': None,
'url': None,
'id': None
}
for element in metadata_elements:
element_id = element.get('id')
# Extract CreativeX Score (tabular field)
if element_id == 'FERRERO.TAB.FIELD.CREATIVEX':
values = element.get('values', [])
if values and len(values) > 0:
value_obj = values[0].get('value', {})
if isinstance(value_obj, dict):
field_value = value_obj.get('field_value', {})
if isinstance(field_value, dict):
score = field_value.get('value')
if score:
creativex_data['score'] = str(score)
logger.info("Found CreativeX Score in master metadata: {}".format(score))
# Extract CreativeX URL
elif element_id == 'FERRERO.FIELD.CREATIVEX LINK':
value_obj = element.get('value', {})
if isinstance(value_obj, dict):
nested_value = value_obj.get('value', {})
if isinstance(nested_value, dict):
url = nested_value.get('value')
if url:
creativex_data['url'] = url
logger.info("Found CreativeX URL in master metadata: {}".format(url))
if creativex_data['score'] or creativex_data['url']:
return creativex_data
else:
return None
except Exception as e:
logger.warning("Failed to extract CreativeX from metadata: {}".format(str(e)))
return None
def generate_and_upload_csv(db, box, config):
"""
Generate CSV of all live campaigns and upload to Box
"""
try:
logger.info("Generating live campaigns CSV...")
# 1. Get all live campaigns from DB
campaigns = db.get_all_live_campaigns()
if not campaigns:
logger.warning("No live campaigns found to report")
return False
logger.info("Found {} live campaigns".format(len(campaigns)))
# 2. Generate CSV file
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d_%H%M%S_UTC')
csv_filename = 'live_campaigns_{}.csv'.format(timestamp)
csv_path = os.path.join('temp', csv_filename)
os.makedirs('temp', exist_ok=True)
with open(csv_path, 'w', newline='') as csvfile:
fieldnames = ['code', 'description']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for camp in campaigns:
writer.writerow({
'code': "{}-{}".format(camp['campaign_number'], camp['campaign_name']),
'description': camp['campaign_name']
})
logger.info("Generated CSV: {}".format(csv_path))
# 3. Upload to Box
folder_id = config['box'].get('live_campaigns_folder_id')
if not folder_id:
logger.error("Box live_campaigns_folder_id not configured")
return False
upload_result = box.upload_file(
file_path=csv_path,
folder_id=folder_id,
target_filename=csv_filename
)
logger.info("Uploaded CSV to Box: {} (File ID: {})".format(
csv_filename, upload_result['file_id']
))
# Clean up
os.remove(csv_path)
return True
except Exception as e:
logger.error("Failed to generate/upload CSV: {}".format(str(e)))
return False
def process_campaign(campaign, dam, box, db, notifier, config):
"""
Process single campaign - download all master assets
"""
campaign_id = campaign['asset_id']
campaign_name = campaign['campaign_name']
campaign_number = campaign.get('campaign_id', 'N/A')
logger.info("=" * 60)
logger.info("Processing campaign: {} ({})".format(campaign_name, campaign_number))
logger.info("=" * 60)
# CHECK RETRY STATUS FIRST
retry_status = db.get_a1_retry_status(campaign_id)
if retry_status and retry_status['permanently_failed']:
logger.warning("Campaign {} is marked as permanently failed - skipping".format(campaign_number))
logger.info("Failure reason: {}".format(retry_status.get('failure_reason', 'Unknown')))
logger.info("To retry this campaign, manually reset it using database.reset_a1_retry()")
return {'success': False, 'processed': 0, 'failed': 0, 'skipped': True}
total_assets = 0
try:
# Get master assets
master_assets = dam.get_master_assets(campaign_id)
total_assets = len(master_assets)
logger.info("Found {} master assets".format(total_assets))
if total_assets == 0:
# Empty folders are expected when a campaign manager creates the campaign
# before uploading assets. Track the count for visibility but never auto-fail
# — keep retrying every poll until assets appear (or status changes in DAM).
retry_result = db.increment_a1_retry(
campaign_id=campaign_id,
campaign_number=campaign_number,
campaign_name=campaign_name,
reason="No master assets found in Master Assets folder",
mark_failed_at_max=False
)
if not retry_result['success']:
logger.error("Failed to update retry counter")
retry_count = retry_result.get('retry_count', 0)
logger.info("No master assets yet (poll {}) - skipping until assets appear".format(retry_count))
# Send a single warning email when the campaign has been empty for ~1 hour
# so genuinely-stuck campaigns still surface, without spamming on every poll.
if retry_count == EMPTY_FOLDER_WARNING_THRESHOLD:
logger.warning("Campaign has been empty for {} polls - sending one-time warning".format(retry_count))
notifier.send_email(
template_name='a1_to_a2_no_assets_warning',
recipients=config['notifications']['recipients']['errors'],
data={
'campaign_name': campaign_name,
'campaign_id': campaign_id,
'campaign_number': campaign_number,
'poll_count': retry_count
}
)
return {'success': False, 'processed': 0, 'failed': 0}
# Track results
processed_assets = []
failed_assets = []
# Get Final Assets folder for upload directory
final_folder_id = dam.find_final_assets_folder(campaign_id)
if not final_folder_id:
logger.error("Final Assets folder not found")
return {'success': False, 'processed': 0, 'failed': total_assets}
# Process each asset
for asset in master_assets:
asset_id = asset['asset_id']
asset_name = asset.get('name', 'unknown')
folder_path = asset.get('folder_path', '')
try:
if folder_path:
logger.info("Processing: {} (from subfolder: {})".format(asset_name, folder_path))
else:
logger.info("Processing: {}".format(asset_name))
# 1. Extract Global Campaign Reference (needed for tracking ID lookup)
global_ref = db.extract_global_campaign_reference(asset, campaign_number)
# 1b. Look up matching B1→B2 global master by opentext_id
global_master_tid = db.find_global_master_by_opentext_id(asset_id)
if global_master_tid:
logger.info("Linked to global master: {}{}".format(asset_name, global_master_tid))
# 2. Find existing tracking ID or generate new one
# Handles re-processing: if campaign was reset to A1 after adding new masters,
# existing assets keep their tracking IDs, new assets get new IDs
tracking_result = db.find_or_create_tracking_id(
opentext_id=asset_id,
local_campaign_id=global_ref['local_campaign_id']
)
tracking_id = tracking_result['tracking_id']
is_existing = tracking_result['is_existing']
if is_existing:
# Asset already processed in a previous A1→A2 cycle
existing_master = db.get_master_asset(tracking_id)
if existing_master and existing_master.get('box_file_id'):
logger.info("Re-processing: reusing tracking ID {} for existing asset {} (skipping download/upload)".format(
tracking_id, asset_name))
box_result = {
'file_id': existing_master['box_file_id'],
'url': existing_master['box_url']
}
# Update database metadata (asset_data may have changed in DAM)
db_result = db.store_master_asset(
tracking_id=tracking_id,
opentext_id=asset_id,
asset_data=asset,
box_file_id=box_result['file_id'],
box_url=box_result['url'],
upload_folder_id=final_folder_id,
global_master_campaign_id=global_ref['global_master_campaign_id'],
global_master_folder_id=global_ref['global_master_folder_id'],
local_campaign_id=global_ref['local_campaign_id'],
global_master_tracking_id=global_master_tid
)
if db_result['success']:
processed_assets.append({
'asset_id': asset_id,
'asset_name': asset_name,
'tracking_id': tracking_id,
'box_file_id': box_result['file_id'],
'box_url': box_result['url'],
'is_existing': True
})
logger.info("✓ Existing asset confirmed: {}{} (skipped)".format(asset_name, tracking_id))
else:
raise Exception("Database update failed for existing asset")
continue # Skip to next asset
else:
# Tracking ID exists but no usable Box info - process normally
logger.info("Existing tracking ID {} found but no Box info - downloading/uploading".format(tracking_id))
# 3. Download from DAM (new assets or existing without Box info)
file_path = dam.download_asset(
asset_id,
output_dir='temp/downloads/{}'.format(campaign_id)
)
# 4. Upload to Box (preserve folder structure from DAM)
box_result = box.upload_with_tracking_id(
file_path=file_path,
campaign_id=campaign_number,
campaign_name=campaign_name,
tracking_id=tracking_id,
subfolder_path=folder_path
)
# 5. Store in database
db_result = db.store_master_asset(
tracking_id=tracking_id,
opentext_id=asset_id,
asset_data=asset,
box_file_id=box_result['file_id'],
box_url=box_result['url'],
upload_folder_id=final_folder_id,
global_master_campaign_id=global_ref['global_master_campaign_id'],
global_master_folder_id=global_ref['global_master_folder_id'],
local_campaign_id=global_ref['local_campaign_id'],
global_master_tracking_id=global_master_tid
)
if db_result['success']:
# 6. Extract and store CreativeX score
creativex_data = extract_creativex_from_dam_metadata(asset)
if creativex_data:
cx_result = db.store_creativex_score(
filename=asset_name,
creativex_id=creativex_data.get('id', ''),
creativex_url=creativex_data.get('url', ''),
quality_score=creativex_data.get('score', ''),
box_file_id=box_result['file_id'],
full_extraction_data={'master_metadata': True, 'data': creativex_data},
tracking_id=tracking_id,
status='master-cx-score'
)
if cx_result['success']:
logger.info("Stored master CreativeX score: {} (Tracking: {})".format(
creativex_data.get('score'), tracking_id
))
processed_assets.append({
'asset_id': asset_id,
'asset_name': asset_name,
'tracking_id': tracking_id,
'box_file_id': box_result['file_id'],
'box_url': box_result['url'],
'is_existing': False
})
logger.info("✓ New asset: {}{}".format(asset_name, tracking_id))
else:
raise Exception("Database storage failed")
# Clean up temp file
os.remove(file_path)
except Exception as e:
logger.error("✗ Failed: {} - {}".format(asset_name, str(e)))
failed_assets.append({
'asset_id': asset_id,
'asset_name': asset_name,
'error': str(e)
})
# CHECK: All assets processed successfully?
all_done = len(processed_assets) == total_assets
# Count new vs existing assets
new_assets = [a for a in processed_assets if not a.get('is_existing')]
existing_assets = [a for a in processed_assets if a.get('is_existing')]
logger.info("")
logger.info("Campaign {} Results:".format(campaign_id))
logger.info(" Total: {}".format(total_assets))
logger.info(" Successful: {}".format(len(processed_assets)))
if existing_assets:
logger.info(" - New assets: {}".format(len(new_assets)))
logger.info(" - Existing assets (skipped download/upload): {}".format(len(existing_assets)))
logger.info(" Failed: {}".format(len(failed_assets)))
logger.info(" All Done: {}".format("YES" if all_done else "NO"))
logger.info("")
if all_done:
# ALL assets processed - update status
logger.info("All assets processed - Updating status A1 → A2")
status_result = dam.update_campaign_status(campaign_id, 'A2')
if status_result['success']:
logger.info("✓ Status updated successfully")
# RESET retry counter on success
db.reset_a1_retry(campaign_id)
# Record campaign status in database
logger.info("Recording campaign status in database...")
db.record_campaign_status(
campaign_id=campaign_id,
campaign_number=campaign_number,
campaign_name=campaign_name,
live_campaign='YES', # A1→A2 campaigns are going live
status='A2',
webhook_sent=False # No webhook sent in this version
)
# REPLACEMENT: Generate and upload CSV instead of webhook
logger.info("Generating and uploading live campaigns CSV...")
csv_success = generate_and_upload_csv(db, box, config)
if csv_success:
logger.info("✓ CSV report uploaded successfully")
else:
logger.error("✗ CSV report generation/upload failed")
# Generate CSV Report
import csv
try:
csv_filename = "A1_Campaign_{}_Assets.csv".format(campaign_number)
csv_path = os.path.join("temp", csv_filename)
if not os.path.exists("temp"):
os.makedirs("temp")
with open(csv_path, 'w', newline='') as csvfile:
fieldnames = ['Filename', 'Tracking ID', 'Campaign Number', 'Status']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for asset in processed_assets:
clean_name = asset['asset_name']
writer.writerow({
'Filename': clean_name,
'Tracking ID': asset['tracking_id'],
'Campaign Number': campaign_number,
'Status': 'Existing' if asset.get('is_existing') else 'New'
})
logger.info("Generated CSV report: {}".format(csv_path))
attachments = [csv_path]
except Exception as csv_error:
logger.error("Failed to generate CSV report: {}".format(str(csv_error)))
attachments = None
# Send success email with asset details AND CSV attachment
notifier.send_email(
template_name='a1_to_a2_complete',
recipients=config['notifications']['recipients']['success'],
data={
'campaign_name': campaign_name,
'campaign_id': campaign_id,
'campaign_number': campaign_number,
'asset_count': len(processed_assets),
'new_asset_count': len(new_assets),
'existing_asset_count': len(existing_assets),
'processed_assets': processed_assets,
'new_assets': new_assets,
'existing_assets': existing_assets
},
attachments=attachments
)
# Clean up CSV
if attachments and os.path.exists(csv_path):
try:
os.remove(csv_path)
logger.info("Cleaned up CSV report")
except Exception as e:
logger.warning("Failed to remove temp CSV: {}".format(str(e)))
return {'success': True, 'processed': len(processed_assets), 'failed': 0}
else:
logger.error("✗ Status update failed: {}".format(status_result.get('error')))
return {'success': False, 'processed': len(processed_assets), 'failed': 0}
else:
# NOT all done - some failed
logger.warning("Campaign incomplete - NOT updating status (remains A1)")
# Send partial completion email
notifier.send_email(
template_name='a1_to_a2_partial',
recipients=config['notifications']['recipients']['errors'],
data={
'campaign_name': campaign_name,
'campaign_id': campaign_id,
'campaign_number': campaign_number,
'total_assets': total_assets,
'successful': len(processed_assets),
'failed': len(failed_assets),
'processed_assets': processed_assets,
'failed_assets': failed_assets
}
)
return {'success': False, 'processed': len(processed_assets), 'failed': len(failed_assets)}
except Exception as e:
logger.error("Campaign processing failed: {}".format(str(e)))
# Check if this is a "folder not found" or "no assets" error - use retry logic
error_str = str(e).lower()
is_folder_issue = 'folder not found' in error_str or 'no assets' in error_str or 'assets folder' in error_str
if is_folder_issue:
logger.warning("Detected folder/assets issue - applying retry logic")
# Increment retry counter
retry_result = db.increment_a1_retry(
campaign_id=campaign_id,
campaign_number=campaign_number,
campaign_name=campaign_name,
reason=str(e)
)
if not retry_result['success']:
logger.error("Failed to update retry counter")
is_permanently_failed = retry_result.get('permanently_failed', False)
retry_count = retry_result.get('retry_count', 0)
# Determine which email template to use
if is_permanently_failed:
# Send FINAL failure email (after 3 attempts)
template_name = 'a1_to_a2_permanently_failed'
else:
# Send standard retry notification
template_name = 'a1_to_a2_no_assets_retry'
# Send email notification
try:
notifier.send_email(
template_name=template_name,
recipients=config['notifications']['recipients']['errors'],
data={
'campaign_name': campaign_name,
'campaign_id': campaign_id,
'campaign_number': campaign_number,
'retry_count': retry_count,
'max_retries': 3,
'is_permanently_failed': is_permanently_failed
}
)
except Exception as email_error:
logger.error("Failed to send error email: {}".format(str(email_error)))
else:
# Other errors - send generic failure notification
try:
notifier.send_email(
template_name='upload_failed',
recipients=config['notifications']['recipients']['errors'],
data={
'filename': "Campaign: {}".format(campaign_name),
'tracking_id': campaign_number,
'error': str(e)
}
)
except Exception as email_error:
logger.error("Failed to send error email: {}".format(str(email_error)))
return {'success': False, 'processed': 0, 'failed': total_assets}
def main():
"""Main polling loop"""
parser = argparse.ArgumentParser(description='Ferrero A1→A2 Box Uploader')
parser.add_argument('--auth-pfx', action='store_true',
help='Use mTLS certificate authentication (Legacy APIM)')
parser.add_argument('--auth-pfx-v2', action='store_true',
help='Use mTLS V2 (Hybrid) authentication')
args = parser.parse_args()
logger.info("=" * 60)
logger.info("Ferrero A1→A2 Box Uploader Starting")
# Determine auth mode
auth_mode = 'oauth'
if args.auth_pfx_v2:
auth_mode = 'mtls_v2'
logger.info("Authentication: mTLS V2 (Hybrid)")
elif args.auth_pfx:
auth_mode = 'mtls'
logger.info("Authentication: mTLS Certificate (Legacy)")
else:
logger.info("Authentication: OAuth2 (default)")
logger.info("=" * 60)
# Load configuration
config = load_config('config/config.yaml')
# Initialize clients
dam = DAMClient(config, auth_mode=auth_mode)
box = BoxClient(config)
db = Database(config)
notifier = Notifier(config)
# Test connections
logger.info("Testing connections...")
if not dam.test_connection():
logger.error("DAM connection failed - exiting")
sys.exit(1)
if not box.test_connection():
logger.error("Box connection failed - exiting")
sys.exit(1)
if not db.test_connection():
logger.error("Database connection failed - exiting")
sys.exit(1)
logger.info("All connections OK")
logger.info("")
# Process UP TO 2 campaigns per run (Cron mode)
try:
logger.info("Searching for A1 campaigns...")
campaigns = dam.search_campaigns(status='A1')
if not campaigns:
logger.info("No A1 campaigns found - exiting")
db.close()
sys.exit(0)
# Exclude permanently-failed campaigns so they don't consume processing slots
eligible_campaigns = []
skipped_failed = []
for campaign in campaigns:
retry_status = db.get_a1_retry_status(campaign['asset_id'])
if retry_status and retry_status['permanently_failed']:
skipped_failed.append(campaign.get('campaign_id', 'N/A'))
else:
eligible_campaigns.append(campaign)
if skipped_failed:
logger.info("Excluding {} permanently-failed campaign(s): {}".format(
len(skipped_failed), ", ".join(skipped_failed)
))
if not eligible_campaigns:
logger.info("No eligible A1 campaigns to process - exiting")
db.close()
sys.exit(0)
# Process UP TO 2 campaigns
campaigns_to_process = eligible_campaigns[:2]
logger.info("Found {} A1 campaigns ({} eligible) - processing {} campaign(s)".format(
len(campaigns), len(eligible_campaigns), len(campaigns_to_process)
))
logger.info("")
# Track results
successful_campaigns = 0
failed_campaigns = 0
for idx, campaign in enumerate(campaigns_to_process, 1):
logger.info("=" * 60)
logger.info("Processing campaign {}/{}".format(idx, len(campaigns_to_process)))
logger.info("=" * 60)
result = process_campaign(campaign, dam, box, db, notifier, config)
if result['success']:
successful_campaigns += 1
logger.info("✓ Campaign {} completed successfully".format(idx))
else:
failed_campaigns += 1
logger.warning("✗ Campaign {} incomplete or failed".format(idx))
logger.info("")
# Summary
logger.info("")
logger.info("=" * 60)
logger.info("A1→A2 Processing Summary")
logger.info("=" * 60)
logger.info(" Total campaigns processed: {}".format(len(campaigns_to_process)))
logger.info(" Successful: {}".format(successful_campaigns))
logger.info(" Failed: {}".format(failed_campaigns))
logger.info("=" * 60)
db.close()
# Exit with success if at least one campaign succeeded
if successful_campaigns > 0:
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
logger.critical("Script error: {}".format(str(e)))
notifier.send_email(
template_name='upload_failed',
recipients=config['notifications']['recipients']['critical'],
data={
'filename': 'A1→A2 Box Uploader',
'tracking_id': 'N/A',
'error': str(e)
}
)
db.close()
sys.exit(1)
if __name__ == '__main__':
main()