#!/usr/bin/env python3 """ A2→A3 Upload Handler - Box Webhook Receiver Processes file uploads from Box, uploads to DAM with MVP metadata Updates status to A3 only when ALL assets for campaign uploaded Compatible with Python 3.6+ """ import sys import os import time import logging import hmac import hashlib from flask import Flask, request, jsonify import threading from queue import Queue # Add shared library to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config, load_field_mappings from shared.dam_client import DAMClient from shared.box_client import BoxClient from shared.database import Database from shared.notifier import Notifier from shared.filename_parser import FilenameParser from shared.metadata_extractor_mvp import MetadataExtractorMVP # Load configuration config = load_config('config/config.yaml') field_mappings = load_field_mappings(config) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/a2_to_a3.log'), logging.StreamHandler() ] ) logger = logging.getLogger('A2toA3') # Flask app for webhook app = Flask(__name__) # Task queue for async processing task_queue = Queue() def validate_box_signature(request_body, signature, keys): """Validate Box webhook signature""" for key in keys: computed = hmac.new( key.encode('utf-8'), request_body, hashlib.sha256 ).hexdigest() if hmac.compare_digest(computed, signature): return True return False @app.route('/webhooks/box', methods=['POST']) def box_webhook(): """Box webhook receiver endpoint""" try: # Validate signature signature = request.headers.get('Box-Signature-Primary', '') if config['webhook_receiver']['validate_signatures']: keys = config['box']['webhook_signature_keys'] if not validate_box_signature(request.data, signature, keys): logger.warning("Invalid webhook signature") return jsonify({'error': 'Invalid signature'}), 401 # Parse payload data = request.json # Check for file upload event if data.get('trigger') == 'FILE.UPLOADED': # Queue for async processing task_queue.put(data) logger.info("Queued: {}".format(data['source']['name'])) return jsonify({'status': 'accepted'}), 200 except Exception as e: logger.error("Webhook error: {}".format(str(e))) return jsonify({'error': str(e)}), 500 def process_upload_queue(dam, box, db, notifier, parser, mvp_extractor): """Background worker to process upload queue""" while True: try: # Wait for task (1 second timeout to allow checking) webhook_data = task_queue.get(timeout=1) file_id = webhook_data['source']['id'] filename = webhook_data['source']['name'] logger.info("=" * 60) logger.info("Processing: {}".format(filename)) logger.info("=" * 60) try: # 1. Parse V2 filename parsed = parser.parse_filename(filename) if not parsed['is_valid']: raise ValueError("Invalid V2 filename: {} - {}".format( filename, ', '.join(parsed['validation_errors']) )) tracking_id = parsed['tracking_id'] if not tracking_id: raise ValueError("No tracking ID in filename") # 2. Load master metadata from database master_asset = db.get_master_asset(tracking_id) if not master_asset: raise ValueError("No master asset for tracking ID: {}".format(tracking_id)) # 3. Download from Box temp_file = os.path.join('temp/downloads', filename) box.download_file(file_id, temp_file) # 4. Get clean filename clean_filename = parser.strip_upload_components(filename) # 5. Build MVP asset representation asset_rep = mvp_extractor.build_mvp_asset_representation( master_metadata=master_asset['full_metadata'], clean_filename=clean_filename, parsed_filename=parsed ) # 6. Rename to clean filename clean_temp_file = os.path.join('temp/downloads', clean_filename) os.rename(temp_file, clean_temp_file) # 7. Upload to DAM upload_result = dam.upload_asset( file_path=clean_temp_file, folder_id=master_asset['upload_directory'], asset_representation=asset_rep ) if not upload_result['success']: raise Exception("Upload failed: {}".format(upload_result.get('error'))) # 8. Store derivative record db.store_derivative_asset( tracking_id=tracking_id, master_asset_id=None, # Could lookup from tracking_id dam_asset_id=upload_result['asset_id'], filename=clean_filename ) # 9. Check if ALL files for campaign uploaded # TODO: Need campaign_id - could store in master_assets table # For now, log success logger.info("✓ Upload successful: {} → Asset ID: {}".format( filename, upload_result['asset_id'] )) # Clean up os.remove(clean_temp_file) except Exception as e: logger.error("✗ Upload failed: {} - {}".format(filename, str(e))) # Send error notification notifier.send_email( template_name='upload_failed', recipients=config['notifications']['recipients']['errors'], data={ 'filename': filename, 'tracking_id': tracking_id if 'tracking_id' in locals() else 'Unknown', 'error': str(e) } ) finally: task_queue.task_done() except: # Queue timeout or interrupt continue def main(): """Main entry point""" logger.info("=" * 60) logger.info("Ferrero A2→A3 Upload Handler Starting") logger.info("=" * 60) # Initialize clients dam = DAMClient(config) box = BoxClient(config) db = Database(config) notifier = Notifier(config) parser = FilenameParser() mvp_extractor = MetadataExtractorMVP(field_mappings) # Test connections logger.info("Testing connections...") if not dam.test_connection(): logger.error("DAM connection failed") sys.exit(1) if not box.test_connection(): logger.error("Box connection failed") sys.exit(1) if not db.test_connection(): logger.error("Database connection failed") sys.exit(1) logger.info("All connections OK") logger.info("") # Start background worker thread worker = threading.Thread( target=process_upload_queue, args=(dam, box, db, notifier, parser, mvp_extractor), daemon=True ) worker.start() logger.info("Background worker started") # Start Flask webhook server host = config['webhook_receiver']['host'] port = config['webhook_receiver']['port'] logger.info("Starting webhook server on {}:{}".format(host, port)) logger.info("") app.run(host=host, port=port, debug=False) if __name__ == '__main__': main()