ferrero-opentext/Python-Version/scripts/a1_to_a2_download.py
DJP 8e7ae7e2d2 Add optional mTLS certificate authentication with --auth-pfx flag
Implements dual authentication system: OAuth2 (default) + mTLS (opt-in).
Zero-risk implementation - existing OAuth2 workflows unchanged.

NEW FEATURE: mTLS Certificate Authentication
- PFX/P12 certificate support for enhanced security
- Activated ONLY with --auth-pfx command-line flag
- OAuth2 remains default (no flag = OAuth2 as before)
- Perfect for testing new auth without breaking production

USAGE:
  Default (OAuth2):
    python scripts/a1_to_a2_download.py

  With mTLS:
    python scripts/a1_to_a2_download.py --auth-pfx

IMPLEMENTATION:

1. Certificate Storage (SECURE):
   - NEW: config/certificates/ folder (gitignored)
   - Moved PFX file to secure location
   - File permissions: 600 (owner read/write only)
   - Password stored in .env (already gitignored)

2. Configuration:
   - .env: Added DAM_MTLS_CERT_PATH and DAM_MTLS_CERT_PASSWORD
   - config.yaml: Added mtls_cert_path and mtls_cert_password
   - .gitignore: Added config/certificates/, *.pfx, *.p12

3. DAM Client Dual Auth:
   - NEW: pfx_to_pem() - Converts PFX to temporary PEM for requests
   - UPDATED: __init__() - Accepts use_mtls flag
   - NEW: _make_api_request() - Unified request wrapper
   - Auto-selects auth method based on flag
   - Updated ALL 8 API calls to use wrapper

4. Scripts Updated (argparse):
   - test_connection.py - Added --auth-pfx flag
   - a1_to_a2_download.py - Added --auth-pfx flag
   - a5_to_a6_download.py - Added --auth-pfx flag
   - b1_to_b2_download.py - Added --auth-pfx flag

5. Test Script:
   - NEW: test_mtls_cert.py - Standalone cert loading test
   - Tests PFX→PEM conversion without API calls
   - Verifies certificate format and cleanup

TESTING RESULTS:
✓ Certificate loads successfully (10930 bytes)
✓ PFX→PEM conversion works (13520 bytes)
✓ Temp file cleanup working
✓ OAuth2 connection test: PASS
✓ mTLS connection test: PASS
✓ Both auth methods working independently

SECURITY:
✓ Certificate file gitignored
✓ Password in .env (gitignored)
✓ File permissions: 600
✓ Temp PEM files auto-deleted
✓ No secrets in code or config

MIGRATION PATH:
- Dev: Use dam-mtls-dev.pfx (current)
- Prod: Replace cert file, update password, same code

BACKWARD COMPATIBILITY:
✓ OAuth2 still default (100% backward compatible)
✓ Existing cron jobs unchanged
✓ No breaking changes
✓ Easy rollback (just don't use --auth-pfx)

Changes:
- .gitignore (+3 lines)
- Python-Version/.env (+3 lines)
- Python-Version/config/config.yaml (+3 lines)
- Python-Version/scripts/shared/dam_client.py (+100 lines dual auth)
- Python-Version/scripts/a1_to_a2_download.py (+14 lines argparse)
- Python-Version/scripts/a5_to_a6_download.py (+14 lines argparse)
- Python-Version/scripts/b1_to_b2_download.py (+14 lines argparse)
- Python-Version/scripts/test_connection.py (+15 lines argparse)
- NEW: Python-Version/scripts/test_mtls_cert.py (92 lines)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 18:01:23 -05:00

354 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
"""
A1→A2 Master Asset Downloader
Polls DAM for campaigns with status A1, downloads master assets, uploads to Box
Updates status to A2 only when ALL assets successfully processed
Supports OAuth2 (default) and mTLS (--auth-pfx) authentication
Compatible with Python 3.6+
"""
import sys
import os
import time
import logging
import argparse
# Add shared library to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config, load_field_mappings
from shared.dam_client import DAMClient
from shared.box_client import BoxClient
from shared.database import Database
from shared.notifier import Notifier
# Setup logging with rotation
from logging.handlers import RotatingFileHandler
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
os.makedirs('logs/backup', exist_ok=True)
# Configure logging with rotation
# Keep 1 week of active logs (7 days * 10MB = 70MB)
# Backup rotates keep 4 weeks (28 backups * 10MB = 280MB total)
log_handler = RotatingFileHandler(
'logs/a1_to_a2.log',
maxBytes=10*1024*1024, # 10MB per file
backupCount=28 # Keep 28 rotated files (approximately 1 month)
)
log_handler.setLevel(logging.INFO)
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.basicConfig(
level=logging.INFO,
handlers=[log_handler, console_handler]
)
logger = logging.getLogger('A1toA2')
def process_campaign(campaign, dam, box, db, notifier, config):
"""
Process single campaign - download all master assets
Returns:
dict with success, processed_count, failed_count
"""
campaign_id = campaign['asset_id']
campaign_name = campaign['campaign_name']
campaign_number = campaign.get('campaign_id', 'N/A')
logger.info("=" * 60)
logger.info("Processing campaign: {} ({})".format(campaign_name, campaign_number))
logger.info("=" * 60)
try:
# Get master assets
master_assets = dam.get_master_assets(campaign_id)
total_assets = len(master_assets)
logger.info("Found {} master assets".format(total_assets))
if total_assets == 0:
logger.warning("No master assets found in Master Assets folder")
# Send email notification about empty campaign
notifier.send_email(
template_name='a1_to_a2_no_assets',
recipients=config['notifications']['recipients']['errors'],
data={
'campaign_name': campaign_name,
'campaign_id': campaign_id,
'campaign_number': campaign_number
}
)
logger.info("✓ Email sent: No assets found notification")
return {'success': False, 'processed': 0, 'failed': 0}
# Track results
processed_assets = []
failed_assets = []
# Get Final Assets folder for upload directory
final_folder_id = dam.find_final_assets_folder(campaign_id)
if not final_folder_id:
logger.error("Final Assets folder not found")
return {'success': False, 'processed': 0, 'failed': total_assets}
# Process each asset
for asset in master_assets:
asset_id = asset['asset_id']
asset_name = asset.get('name', 'unknown')
folder_path = asset.get('folder_path', '') # Get subfolder path from recursive search
try:
if folder_path:
logger.info("Processing: {} (from subfolder: {})".format(asset_name, folder_path))
else:
logger.info("Processing: {}".format(asset_name))
# 1. Download from DAM
file_path = dam.download_asset(
asset_id,
output_dir='temp/downloads/{}'.format(campaign_id)
)
# 2. Generate tracking ID
tracking_id = db.generate_unique_tracking_id()
# 3. Upload to Box (preserve folder structure from DAM)
box_result = box.upload_with_tracking_id(
file_path=file_path,
campaign_id=campaign_number, # Use C000000078, not hex asset_id
campaign_name=campaign_name,
tracking_id=tracking_id,
subfolder_path=folder_path # Preserve DAM folder structure
)
# 4. Extract Global Campaign Reference and Local Campaign ID from asset metadata
global_ref = db.extract_global_campaign_reference(asset)
# 5. Store in database with FULL metadata and campaign references
db_result = db.store_master_asset(
tracking_id=tracking_id,
opentext_id=asset_id,
asset_data=asset,
box_file_id=box_result['file_id'],
box_url=box_result['url'],
upload_folder_id=final_folder_id,
global_master_campaign_id=global_ref['global_master_campaign_id'],
global_master_folder_id=global_ref['global_master_folder_id'],
local_campaign_id=global_ref['local_campaign_id']
)
if db_result['success']:
processed_assets.append({
'asset_id': asset_id,
'asset_name': asset_name,
'tracking_id': tracking_id,
'box_file_id': box_result['file_id'],
'box_url': box_result['url']
})
logger.info("✓ Success: {}{}".format(asset_name, tracking_id))
else:
raise Exception("Database storage failed")
# Clean up temp file
os.remove(file_path)
except Exception as e:
logger.error("✗ Failed: {} - {}".format(asset_name, str(e)))
failed_assets.append({
'asset_id': asset_id,
'asset_name': asset_name,
'error': str(e)
})
# CHECK: All assets processed successfully?
all_done = len(processed_assets) == total_assets
logger.info("")
logger.info("Campaign {} Results:".format(campaign_id))
logger.info(" Total: {}".format(total_assets))
logger.info(" Successful: {}".format(len(processed_assets)))
logger.info(" Failed: {}".format(len(failed_assets)))
logger.info(" All Done: {}".format("YES" if all_done else "NO"))
logger.info("")
if all_done:
# ALL assets processed - update status
logger.info("All assets processed - Updating status A1 → A2")
status_result = dam.update_campaign_status(campaign_id, 'A2')
if status_result['success']:
logger.info("✓ Status updated successfully")
# Send webhook notification
if config['webhooks']['campaign_status_update']['enabled']:
logger.info("Sending campaign status webhook...")
notifier.send_webhook(
url=config['webhooks']['campaign_status_update']['url'],
payload={
'campaign_id': campaign_id,
'campaign_number': campaign_number,
'campaign_name': campaign_name,
'old_status': 'A1',
'new_status': 'A2',
'asset_count': len(processed_assets),
'processed_assets': processed_assets,
'timestamp': int(time.time())
}
)
# Send success email with asset details
notifier.send_email(
template_name='a1_to_a2_complete',
recipients=config['notifications']['recipients']['success'],
data={
'campaign_name': campaign_name,
'campaign_id': campaign_id,
'campaign_number': campaign_number,
'asset_count': len(processed_assets),
'processed_assets': processed_assets
}
)
return {'success': True, 'processed': len(processed_assets), 'failed': 0}
else:
logger.error("✗ Status update failed: {}".format(status_result.get('error')))
# Don't send success notification if status update failed
return {'success': False, 'processed': len(processed_assets), 'failed': 0}
else:
# NOT all done - some failed
logger.warning("Campaign incomplete - NOT updating status (remains A1)")
# Send partial completion email with asset details
notifier.send_email(
template_name='a1_to_a2_partial',
recipients=config['notifications']['recipients']['errors'],
data={
'campaign_name': campaign_name,
'campaign_id': campaign_id,
'campaign_number': campaign_number,
'total_assets': total_assets,
'successful': len(processed_assets),
'failed': len(failed_assets),
'processed_assets': processed_assets,
'failed_assets': failed_assets
}
)
return {'success': False, 'processed': len(processed_assets), 'failed': len(failed_assets)}
except Exception as e:
logger.error("Campaign processing failed: {}".format(str(e)))
return {'success': False, 'processed': 0, 'failed': total_assets}
def main():
"""Main polling loop"""
# Parse command-line arguments
parser = argparse.ArgumentParser(description='Ferrero A1→A2 Master Asset Downloader')
parser.add_argument('--auth-pfx', action='store_true',
help='Use mTLS certificate authentication instead of OAuth2')
args = parser.parse_args()
logger.info("=" * 60)
logger.info("Ferrero A1→A2 Master Asset Downloader Starting")
if args.auth_pfx:
logger.info("Authentication: mTLS Certificate (--auth-pfx)")
else:
logger.info("Authentication: OAuth2 (default)")
logger.info("=" * 60)
# Load configuration
config = load_config('config/config.yaml')
# Initialize clients (pass mTLS flag to DAM client)
dam = DAMClient(config, use_mtls=args.auth_pfx)
box = BoxClient(config)
db = Database(config)
notifier = Notifier(config)
# Test connections
logger.info("Testing connections...")
if not dam.test_connection():
logger.error("DAM connection failed - exiting")
sys.exit(1)
if not box.test_connection():
logger.error("Box connection failed - exiting")
sys.exit(1)
if not db.test_connection():
logger.error("Database connection failed - exiting")
sys.exit(1)
logger.info("All connections OK")
logger.info("")
# SINGLE RUN MODE - Process ONE campaign and exit
# Cron will run this script every 5 minutes, processing one campaign at a time
try:
logger.info("Searching for A1 campaigns...")
# Search for campaigns with status A1
campaigns = dam.search_campaigns(status='A1')
if not campaigns:
logger.info("No A1 campaigns found - exiting")
db.close()
sys.exit(0)
# Process ONLY THE FIRST campaign
campaign = campaigns[0]
logger.info("Found {} A1 campaigns - processing first one only".format(len(campaigns)))
logger.info("")
result = process_campaign(campaign, dam, box, db, notifier, config)
if result['success']:
logger.info("")
logger.info("=" * 60)
logger.info("✓ Campaign completed successfully")
logger.info(" Processed: {} assets".format(result['processed']))
logger.info(" Status updated: A1 → A2")
logger.info("=" * 60)
db.close()
sys.exit(0)
else:
logger.warning("")
logger.warning("=" * 60)
logger.warning("✗ Campaign incomplete or failed")
logger.warning(" Processed: {} assets".format(result['processed']))
logger.warning(" Failed: {} assets".format(result['failed']))
logger.warning(" Status NOT updated (remains A1)")
logger.warning("=" * 60)
db.close()
sys.exit(1)
except Exception as e:
logger.critical("Script error: {}".format(str(e)))
# Send critical error notification
notifier.send_email(
template_name='upload_failed',
recipients=config['notifications']['recipients']['critical'],
data={
'filename': 'A1→A2 Script',
'tracking_id': 'N/A',
'error': str(e)
}
)
db.close()
sys.exit(1)
if __name__ == '__main__':
main()