# Ferrero Content Scaling - Python Automation Plan **Target Environment:** Shared Web Hosting **Python Version:** 3.6 (compatibility required) **Deployment:** Virtual Environment (venv) **Trigger Methods:** Cron + Webhooks --- ## πŸ—οΈ Updated Architecture ### Directory Structure ``` ferrero-automation/ β”œβ”€β”€ venv/ # Python 3.6 virtual environment β”‚ └── (isolated Python packages) β”‚ β”œβ”€β”€ scripts/ β”‚ β”œβ”€β”€ a1_to_a2_download.py # A1β†’A2 Polling Script β”‚ β”œβ”€β”€ a2_to_a3_upload.py # A2β†’A3 Webhook Handler β”‚ └── shared/ β”‚ β”œβ”€β”€ __init__.py β”‚ β”œβ”€β”€ dam_client.py # DAM API wrapper β”‚ β”œβ”€β”€ box_client.py # Box API wrapper β”‚ β”œβ”€β”€ database.py # PostgreSQL operations β”‚ β”œβ”€β”€ notifier.py # Email + Webhook notifications β”‚ β”œβ”€β”€ filename_parser.py # V2 naming parser β”‚ β”œβ”€β”€ metadata_extractor.py # MVP field extraction β”‚ β”œβ”€β”€ retry_handler.py # Retry logic β”‚ └── video_analyzer.py # Video metadata extraction β”‚ β”œβ”€β”€ config/ β”‚ β”œβ”€β”€ config.yaml # Main configuration β”‚ β”œβ”€β”€ environments/ β”‚ β”‚ β”œβ”€β”€ staging.yaml β”‚ β”‚ └── production.yaml β”‚ β”œβ”€β”€ field_mappings.yaml # MVP fields (easy to edit!) β”‚ β”œβ”€β”€ webhooks.yaml # Webhook endpoints β”‚ └── email_templates.yaml β”‚ β”œβ”€β”€ logs/ β”‚ β”œβ”€β”€ a1_to_a2.log β”‚ β”œβ”€β”€ a2_to_a3.log β”‚ └── errors.log β”‚ β”œβ”€β”€ temp/ # Temporary file storage β”‚ └── downloads/ β”‚ β”œβ”€β”€ tests/ β”‚ β”œβ”€β”€ test_filename_parser.py β”‚ β”œβ”€β”€ test_metadata_extractor.py β”‚ └── test_dam_client.py β”‚ β”œβ”€β”€ requirements.txt # Python 3.6 compatible β”œβ”€β”€ setup.sh # Virtual environment setup β”œβ”€β”€ .env.example └── README.md ``` --- ## πŸ“₯ Script 1: A1β†’A2 Master Asset Downloader ### Updated Flow with All-Done Check ```python #!/usr/bin/env python3 # scripts/a1_to_a2_download.py import time import logging from shared.dam_client import DAMClient from shared.box_client import BoxClient from shared.database import Database from shared.notifier import Notifier from shared.config_loader import load_config def main(): config = load_config() dam = DAMClient(config) box = BoxClient(config) db = Database(config) notifier = Notifier(config) logger = logging.getLogger('A1toA2') while True: try: # 1. Search for campaigns with status A1 campaigns = dam.search_campaigns(status='A1') if not campaigns: logger.info("No A1 campaigns found") time.sleep(config['polling']['interval_seconds']) continue for campaign in campaigns: campaign_id = campaign['asset_id'] campaign_name = campaign['campaign_name'] try: logger.info("Processing campaign: {} ({})".format(campaign_name, campaign_id)) # 2. Get master assets from campaign master_assets = dam.get_master_assets(campaign_id) total_assets = len(master_assets) logger.info("Found {} master assets".format(total_assets)) # Track processing results processed_assets = [] failed_assets = [] # 3. Process each asset for asset in master_assets: try: asset_id = asset['asset_id'] asset_name = asset['name'] logger.info("Processing asset: {} ({})".format(asset_name, asset_id)) # 3a. Download from DAM file_path = dam.download_asset( asset_id=asset_id, output_dir='temp/downloads/{}'.format(campaign_id) ) # 3b. Generate unique tracking ID tracking_id = db.generate_unique_tracking_id() # 3c. Find Final Assets folder for upload directory final_folder_id = dam.find_final_assets_folder(campaign_id) # 3d. Upload to Box with tracking ID box_result = box.upload_with_tracking_id( file_path=file_path, campaign_id=campaign_id, campaign_name=campaign_name, tracking_id=tracking_id ) # 3e. Store in PostgreSQL with FULL metadata db.store_master_asset( tracking_id=tracking_id, opentext_id=asset_id, asset_data=asset, # Complete metadata JSON box_file_id=box_result['file_id'], box_url=box_result['url'], upload_directory=final_folder_id ) processed_assets.append({ 'asset_id': asset_id, 'asset_name': asset_name, 'tracking_id': tracking_id, 'box_file_id': box_result['file_id'] }) logger.info("Successfully processed: {} β†’ {}".format(asset_name, tracking_id)) except Exception as e: logger.error("Failed to process asset {}: {}".format(asset_id, str(e))) failed_assets.append({ 'asset_id': asset_id, 'asset_name': asset_name, 'error': str(e) }) # Continue with next asset continue # 4. CHECK IF ALL ASSETS PROCESSED SUCCESSFULLY all_done = len(processed_assets) == total_assets if all_done: logger.info("All assets processed successfully ({}/{})".format(len(processed_assets), total_assets)) # 5. Update campaign status A1 β†’ A2 dam.update_campaign_status(campaign_id, 'A2') logger.info("Updated campaign status: A1 β†’ A2") # 6. Send webhook notification (configurable) if config['webhooks']['campaign_status_update']['enabled']: notifier.send_webhook( url=config['webhooks']['campaign_status_update']['url'], payload={ 'campaign_id': campaign_id, 'campaign_number': campaign['campaign_number'], 'campaign_name': campaign_name, 'old_status': 'A1', 'new_status': 'A2', 'asset_count': len(processed_assets), 'processed_assets': processed_assets, 'timestamp': time.time() } ) # 7. Send success email notifier.send_email( template='a1_to_a2_complete', recipients=config['notifications']['recipients']['success'], data={ 'campaign_name': campaign_name, 'campaign_id': campaign_id, 'campaign_number': campaign.get('campaign_number', 'N/A'), 'asset_count': len(processed_assets), 'assets': processed_assets, 'box_folder_url': box_result.get('folder_url') } ) else: # NOT all done - some failed logger.warning("Campaign incomplete: {}/{} assets processed".format( len(processed_assets), total_assets )) # DO NOT update status # Send partial success notification notifier.send_email( template='a1_to_a2_partial', recipients=config['notifications']['recipients']['errors'], data={ 'campaign_name': campaign_name, 'campaign_id': campaign_id, 'total_assets': total_assets, 'successful': len(processed_assets), 'failed': len(failed_assets), 'processed_assets': processed_assets, 'failed_assets': failed_assets } ) except Exception as e: logger.error("Campaign {} processing failed: {}".format(campaign_id, str(e))) notifier.send_error( template='campaign_processing_failed', campaign=campaign, error=str(e) ) continue except Exception as e: logger.critical("Script error: {}".format(str(e))) notifier.send_critical_error(error=str(e)) # Wait before next poll time.sleep(config['polling']['interval_seconds']) if __name__ == '__main__': main() ``` --- ## πŸ“€ Script 2: A2β†’A3 Upload Handler (Webhook) ### Webhook Receiver ```python #!/usr/bin/env python3 # scripts/a2_to_a3_upload.py from flask import Flask, request, jsonify import hashlib import hmac import json import threading from queue import Queue from shared.config_loader import load_config from shared.box_client import BoxClient from shared.dam_client import DAMClient from shared.database import Database from shared.notifier import Notifier from shared.filename_parser import FilenameParser from shared.metadata_extractor import MetadataExtractorMVP from shared.video_analyzer import VideoAnalyzer app = Flask(__name__) config = load_config() # Task queue for async processing task_queue = Queue() def validate_box_signature(request_body, signature, keys): """Validate Box webhook signature""" for key in keys: computed = hmac.new( key.encode('utf-8'), request_body, hashlib.sha256 ).hexdigest() if hmac.compare_digest(computed, signature): return True return False @app.route('/webhooks/box', methods=['POST']) def box_webhook(): try: # 1. Validate signature signature = request.headers.get('Box-Signature-Primary', '') if config['webhook']['validate_signatures']: if not validate_box_signature( request.data, signature, config['box']['webhook_signature_keys'] ): logger.warning("Invalid webhook signature") return jsonify({'error': 'Invalid signature'}), 401 # 2. Parse webhook payload data = request.json # 3. Check if it's a file upload event if data.get('trigger') == 'FILE.UPLOADED': # Queue for async processing task_queue.put(data) logger.info("Queued file upload: {}".format(data['source']['name'])) return jsonify({'status': 'accepted'}), 200 except Exception as e: logger.error("Webhook error: {}".format(str(e))) return jsonify({'error': str(e)}), 500 def process_upload_queue(): """Background worker to process upload queue""" dam = DAMClient(config) box = BoxClient(config) db = Database(config) notifier = Notifier(config) parser = FilenameParser() mvp_extractor = MetadataExtractorMVP(config) video_analyzer = VideoAnalyzer() while True: try: # Wait for task webhook_data = task_queue.get(timeout=1) file_id = webhook_data['source']['id'] filename = webhook_data['source']['name'] logger.info("Processing: {}".format(filename)) try: # 1. Parse V2 filename parsed = parser.parse_filename(filename) if not parsed['is_valid']: raise ValueError("Invalid V2 filename: {} - Errors: {}".format( filename, ', '.join(parsed['validation_errors']) )) tracking_id = parsed['tracking_id'] if not tracking_id: raise ValueError("No tracking ID in filename: {}".format(filename)) # 2. Load master metadata from database master_asset = db.get_master_asset(tracking_id) if not master_asset: raise ValueError("No master asset found for tracking ID: {}".format(tracking_id)) # 3. Download file from Box temp_file = box.download_file(file_id, filename) # 4. Extract video metadata (if video file) video_metadata = video_analyzer.extract_metadata(temp_file) # 5. Build MVP asset representation clean_filename = parser.strip_upload_components(filename) asset_rep = mvp_extractor.build_mvp_representation( master_metadata=master_asset['full_metadata'], clean_filename=clean_filename, parsed_filename=parsed, video_metadata=video_metadata ) # 6. Rename to clean filename clean_file_path = os.path.join( os.path.dirname(temp_file), clean_filename ) os.rename(temp_file, clean_file_path) # 7. Upload to DAM with retry upload_result = retry_upload( dam_client=dam, file_path=clean_file_path, folder_id=master_asset['upload_directory'], asset_representation=asset_rep, video_metadata=video_metadata, max_retries=3 ) # 8. Store derivative asset record db.store_derivative_asset( tracking_id=tracking_id, master_asset_id=master_asset['id'], dam_asset_id=upload_result['asset_id'], filename=clean_filename ) # 9. Check if ALL files for this campaign are uploaded campaign_id = master_asset['campaign_id'] all_campaign_assets_done = db.check_campaign_upload_complete(campaign_id) if all_campaign_assets_done: logger.info("All assets uploaded for campaign {}".format(campaign_id)) # 10. Update campaign status A2 β†’ A3 dam.update_campaign_status(campaign_id, 'A3') logger.info("Updated campaign status: A2 β†’ A3") # 11. Send webhook notification if config['webhooks']['campaign_status_update']['enabled']: notifier.send_webhook( url=config['webhooks']['campaign_status_update']['url'], payload={ 'campaign_id': campaign_id, 'campaign_number': master_asset.get('campaign_number'), 'campaign_name': master_asset.get('campaign_name'), 'old_status': 'A2', 'new_status': 'A3', 'asset_count': db.get_campaign_asset_count(campaign_id), 'timestamp': time.time() } ) # 12. Send completion email notifier.send_email( template='a2_to_a3_complete', recipients=config['notifications']['recipients']['success'], data={ 'campaign_id': campaign_id, 'campaign_name': master_asset.get('campaign_name'), 'asset_count': db.get_campaign_asset_count(campaign_id) } ) else: logger.info("Campaign {} - More assets pending".format(campaign_id)) # 13. Clean up temp file os.remove(clean_file_path) logger.info("Successfully uploaded: {} β†’ Asset ID: {}".format( filename, upload_result['asset_id'] )) except Exception as e: logger.error("Failed to process {}: {}".format(filename, str(e))) # Send individual file error notification notifier.send_email( template='upload_failed', recipients=config['notifications']['recipients']['errors'], data={ 'filename': filename, 'tracking_id': tracking_id if 'tracking_id' in locals() else 'Unknown', 'error': str(e) } ) finally: task_queue.task_done() except Exception as e: # Queue timeout or other error continue def retry_upload(dam_client, file_path, folder_id, asset_representation, video_metadata, max_retries=3): """Upload with retry logic""" for attempt in range(max_retries): try: result = dam_client.upload_asset( file_path=file_path, folder_id=folder_id, asset_representation=asset_representation, video_metadata=video_metadata ) return result except Exception as e: if attempt == max_retries - 1: raise delay = 5 * (2 ** attempt) # Exponential backoff logger.warning("Upload attempt {} failed, retrying in {}s: {}".format( attempt + 1, delay, str(e) )) time.sleep(delay) if __name__ == '__main__': # Start background worker worker_thread = threading.Thread(target=process_upload_queue, daemon=True) worker_thread.start() # Start Flask webhook server app.run( host=config['webhook']['host'], port=config['webhook']['port'], debug=False ) ``` --- ## πŸ—„οΈ Database Enhancements ### New Helper Methods ```python # shared/database.py class Database: # ... existing methods ... def check_campaign_upload_complete(self, campaign_id): """ Check if all master assets for a campaign have been uploaded Returns True if: - All tracking IDs for this campaign have derivative assets - No failed uploads pending retry """ query = """ SELECT COUNT(DISTINCT ma.tracking_id) as total_masters, COUNT(DISTINCT da.tracking_id) as uploaded_derivatives FROM master_assets ma LEFT JOIN derivative_assets da ON ma.tracking_id = da.tracking_id WHERE ma.campaign_id = %s AND ma.status = 'active' """ cursor = self.conn.cursor() cursor.execute(query, (campaign_id,)) result = cursor.fetchone() total_masters = result[0] uploaded_derivatives = result[1] # All done if counts match return total_masters > 0 and total_masters == uploaded_derivatives def get_campaign_asset_count(self, campaign_id): """Get total asset count for campaign""" query = "SELECT COUNT(*) FROM master_assets WHERE campaign_id = %s AND status = 'active'" cursor = self.conn.cursor() cursor.execute(query, (campaign_id,)) return cursor.fetchone()[0] def get_pending_uploads(self, campaign_id): """Get tracking IDs that haven't been uploaded yet""" query = """ SELECT tracking_id, original_filename FROM master_assets WHERE campaign_id = %s AND status = 'active' AND tracking_id NOT IN ( SELECT tracking_id FROM derivative_assets ) """ cursor = self.conn.cursor() cursor.execute(query, (campaign_id,)) return cursor.fetchall() ``` ### New Table: derivative_assets ```sql CREATE TABLE IF NOT EXISTS derivative_assets ( id SERIAL PRIMARY KEY, tracking_id VARCHAR(6) NOT NULL REFERENCES master_assets(tracking_id), master_asset_id INTEGER REFERENCES master_assets(id), dam_asset_id VARCHAR(255) NOT NULL, derivative_filename VARCHAR(500) NOT NULL, uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, upload_status VARCHAR(50) DEFAULT 'completed', retry_count INTEGER DEFAULT 0, last_error TEXT, UNIQUE(tracking_id, derivative_filename) ); CREATE INDEX idx_derivative_tracking ON derivative_assets(tracking_id); CREATE INDEX idx_derivative_dam_asset ON derivative_assets(dam_asset_id); ``` --- ## βš™οΈ Configuration Files ### Main Config (`config/config.yaml`) ```yaml # Environment (set via ENV variable: staging or production) environment: ${ENV:-staging} # Include environment-specific overrides include: config/environments/${ENV:-staging}.yaml # DAM Configuration dam: base_url: ${DAM_BASE_URL} auth_url: ${DAM_AUTH_URL} client_id: ${DAM_CLIENT_ID} client_secret: ${DAM_CLIENT_SECRET} timeout_seconds: 120 # Box Configuration box: enterprise_id: 43984435 client_id: ${BOX_CLIENT_ID} client_secret: ${BOX_CLIENT_SECRET} jwt_key_id: ${BOX_JWT_KEY_ID} rsa_private_key_path: ${BOX_PRIVATE_KEY_PATH} passphrase: ${BOX_PASSPHRASE} root_folder_id: 348304357505 webhook_signature_keys: - ${BOX_PRIMARY_KEY} - ${BOX_SECONDARY_KEY} # Database Configuration database: host: ${DB_HOST:-localhost} port: ${DB_PORT:-5433} database: ferrero_tracking user: ${DB_USER} password: ${DB_PASSWORD} # Polling Configuration (A1β†’A2) polling: enabled: true interval_seconds: 300 # 5 minutes max_campaigns_per_run: 10 # Webhook Configuration (A2β†’A3) webhook: enabled: true host: 0.0.0.0 port: 5000 validate_signatures: true # Webhooks to Call (Outgoing) webhooks: campaign_status_update: enabled: true url: ${CAMPAIGN_STATUS_WEBHOOK_URL} timeout_seconds: 10 retry_on_failure: true auth: type: bearer # bearer, basic, none token: ${WEBHOOK_AUTH_TOKEN} # Retry Configuration retry: max_attempts: 3 backoff: exponential initial_delay_seconds: 5 max_delay_seconds: 60 retryable_http_codes: [429, 500, 502, 503, 504] # Notification Configuration notifications: enabled: true mailgun: api_key: ${MAILGUN_API_KEY} domain: ${MAILGUN_DOMAIN} recipients: success: - team@ferrero.com - agency@example.com errors: - admin@ferrero.com - it-support@ferrero.com critical: - oncall@ferrero.com templates_path: config/email_templates.yaml # Logging Configuration logging: level: INFO format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" handlers: file: filename: logs/app.log max_bytes: 10485760 # 10MB backup_count: 5 console: enabled: true # Field Configuration (EASY TO EDIT!) fields: mappings_file: config/field_mappings.yaml # Temp File Configuration temp: directory: temp/downloads cleanup_after_hours: 24 max_size_mb: 1000 ``` ### Webhook Configuration (`config/webhooks.yaml`) ```yaml # Outgoing Webhooks (we call these) campaign_status_update: description: "Called when campaign status changes (A1β†’A2, A2β†’A3)" url: https://your-system.com/api/campaign-status method: POST headers: Content-Type: application/json Authorization: "Bearer ${WEBHOOK_AUTH_TOKEN}" timeout_seconds: 10 retry_on_failure: true max_retries: 3 payload_template: campaign_id: "{campaign_id}" campaign_number: "{campaign_number}" campaign_name: "{campaign_name}" old_status: "{old_status}" new_status: "{new_status}" asset_count: "{asset_count}" timestamp: "{timestamp}" processed_assets: "{processed_assets}" # JSON array asset_upload_complete: description: "Called when individual asset is uploaded" url: https://your-system.com/api/asset-uploaded enabled: false # Optional webhook method: POST payload_template: tracking_id: "{tracking_id}" dam_asset_id: "{dam_asset_id}" filename: "{filename}" timestamp: "{timestamp}" ``` --- ## 🐍 Python 3.6 Compatibility ### Requirements (`requirements.txt`) ```txt # Python 3.6 Compatible Versions # Core requests==2.27.1 # Last version supporting Python 3.6 python-dotenv==0.19.2 pyyaml==5.4.1 # Database psycopg2-binary==2.8.6 # Box SDK boxsdk==2.14.1 # Python 3.6 compatible # Web Framework (Flask, not FastAPI for 3.6) Flask==2.0.3 Werkzeug==2.0.3 # Email (Mailgun via requests) # Use requests library directly # Video Analysis # Use subprocess with ffprobe (no wrapper needed) # Utilities python-dateutil==2.8.2 Jinja2==3.0.3 # Retry tenacity==8.0.1 # Last version for Python 3.6 # Testing pytest==6.2.5 pytest-cov==3.0.0 pytest-mock==3.6.1 ``` ### Python 3.6 Code Considerations ```python # ❌ DON'T USE (Python 3.7+) @dataclass class Config: pass # βœ… USE (Python 3.6 compatible) class Config: def __init__(self, **kwargs): self.data = kwargs # ❌ DON'T USE (Python 3.8+) if (result := function()): process(result) # βœ… USE (Python 3.6 compatible) result = function() if result: process(result) # βœ… CAN USE (Python 3.6+) f"String formatting {variable}" # f-strings work in 3.6 # βœ… CAN USE (Python 3.6+) def function(param: str) -> dict: # Type hints work pass ``` --- ## πŸ“¦ Virtual Environment Setup ### `setup.sh` ```bash #!/bin/bash # Setup script for shared hosting echo "=== Ferrero Automation Setup ===" # 1. Create virtual environment (Python 3.6) echo "Creating virtual environment..." python3.6 -m venv venv # 2. Activate venv source venv/bin/activate # 3. Upgrade pip echo "Upgrading pip..." pip install --upgrade pip==20.3.4 # Last version for Python 3.6 # 4. Install dependencies echo "Installing dependencies..." pip install -r requirements.txt # 5. Create directory structure echo "Creating directories..." mkdir -p logs temp/downloads config/environments # 6. Copy .env template if [ ! -f .env ]; then echo "Creating .env from template..." cp .env.example .env echo "⚠️ Please edit .env with your credentials" fi # 7. Create log files touch logs/a1_to_a2.log touch logs/a2_to_a3.log touch logs/errors.log # 8. Set permissions chmod +x scripts/a1_to_a2_download.py chmod +x scripts/a2_to_a3_upload.py # 9. Test imports echo "Testing Python imports..." python -c "from scripts.shared import dam_client, box_client, database, notifier" if [ $? -eq 0 ]; then echo "βœ… Setup complete!" echo "" echo "Next steps:" echo "1. Edit .env with your credentials" echo "2. Edit config/config.yaml for your environment" echo "3. Run: source venv/bin/activate" echo "4. Test: python scripts/a1_to_a2_download.py --test" else echo "❌ Setup failed - check errors above" fi ``` --- ## πŸ• Cron Setup (Shared Hosting) ### Crontab Entry ```bash # Edit crontab crontab -e # Add entry for A1β†’A2 polling (every 5 minutes) */5 * * * * cd /home/user/ferrero-automation && venv/bin/python scripts/a1_to_a2_download.py >> logs/cron.log 2>&1 # Optional: Cleanup temp files daily 0 2 * * * cd /home/user/ferrero-automation && venv/bin/python scripts/cleanup_temp.py >> logs/cleanup.log 2>&1 ``` ### Webhook Server (Background Process) ```bash # Start webhook server (keep alive with nohup) cd /home/user/ferrero-automation source venv/bin/activate nohup python scripts/a2_to_a3_upload.py > logs/webhook.log 2>&1 & # Save PID for stopping later echo $! > webhook.pid # To stop: kill $(cat webhook.pid) ``` ### Process Monitor Script ```bash #!/bin/bash # monitor_webhook.sh - Ensure webhook is running SCRIPT_DIR="/home/user/ferrero-automation" PID_FILE="$SCRIPT_DIR/webhook.pid" LOG_FILE="$SCRIPT_DIR/logs/monitor.log" # Check if process is running if [ -f "$PID_FILE" ]; then PID=$(cat "$PID_FILE") if ps -p $PID > /dev/null; then echo "$(date): Webhook running (PID: $PID)" >> "$LOG_FILE" exit 0 fi fi # Process not running - restart echo "$(date): Webhook not running, restarting..." >> "$LOG_FILE" cd "$SCRIPT_DIR" source venv/bin/activate nohup python scripts/a2_to_a3_upload.py > logs/webhook.log 2>&1 & echo $! > "$PID_FILE" echo "$(date): Webhook restarted (PID: $(cat $PID_FILE))" >> "$LOG_FILE" # Add to crontab to run every 5 minutes # */5 * * * * /home/user/ferrero-automation/monitor_webhook.sh ``` --- ## πŸ”§ Configuration for Easy Field Changes ### Field Mappings (`config/field_mappings.yaml`) ```yaml # MVP Field IDs - EASY TO ADD/REMOVE NEW FIELDS! mvp_fields: # Asset Info Category - FERRERO.FIELD.MKTG.ASSET TYPE - FERRERO.FIELD.FISCAL YEAR - MAIN_LANGUAGES - ARTESIA.FIELD.ASSET DESCRIPTION - FERRERO.FIELD.STATE - ARTESIA.FIELD.ASSET NAME - FERRERO.FIELD.SUB BRAND # Add new field? Just add the ID here! # - NEW.FIELD.ID # Marketing Category - FERRERO.MARKETING.FIELD.AGENCY NAME - FERRERO.MARKETING.FIELD.SPOT_VERSION - FERRERO.MARKETING.FIELD.DIRECTOR_NAME # Market Category - FERRERO.MARKET.FIELD.IPRIGHT - FERRERO.MARKET.FIELD.BUYOUT # ... etc (all 28 MVP fields) # Fields to UPDATE from V2 filename filename_field_updates: - field_id: ARTESIA.FIELD.ASSET NAME source: clean_filename required: true - field_id: ARTESIA.FIELD.ASSET DESCRIPTION source: subject_title required: false fallback: "" - field_id: MAIN_LANGUAGES source: language_code transform: uppercase required: true validate_domain: true default: EN - field_id: FERRERO.FIELD.MKTG.ASSET TYPE source: asset_type transform: lowercase validate_domain: true required: true # Fields to FORCE to specific values forced_field_values: FERRERO.FIELD.STATE: Local # Add more forced values here # Default values for missing fields default_field_values: FERRERO.FIELD.ASSETCOMPLIANCE: "-" MARKETING_TAG: "Tag" FERRERO.FIELD.FISCAL YEAR: "2025/2026" # Domain validation (check against DAM lookup domains) validate_domain_values: enabled: true cache_file: config/domain_values_cache.json cache_ttl_hours: 24 strict_mode: false # If true, fail on invalid values. If false, warn and use anyway. ``` --- ## πŸ”” Notifier with Webhooks ### Updated Notifier (`shared/notifier.py`) ```python import requests import logging from jinja2 import Template class Notifier: def __init__(self, config): self.config = config self.mailgun_api_key = config['notifications']['mailgun']['api_key'] self.mailgun_domain = config['notifications']['mailgun']['domain'] self.recipients = config['notifications']['recipients'] self.enabled = config['notifications']['enabled'] self.webhook_config = config.get('webhooks', {}) self.logger = logging.getLogger('Notifier') # Load email templates with open(config['notifications']['templates_path']) as f: import yaml self.templates = yaml.safe_load(f)['templates'] def send_email(self, template, recipients, data): """Send email via Mailgun""" if not self.enabled: return try: template_config = self.templates[template] subject = template_config['subject'].format(**data) # Render HTML body with Jinja2 html_template = Template(template_config['html']) html_body = html_template.render(**data) # Send via Mailgun API response = requests.post( "https://api.mailgun.net/v3/{}/messages".format(self.mailgun_domain), auth=("api", self.mailgun_api_key), data={ "from": "Ferrero Automation ".format(self.mailgun_domain), "to": recipients, "subject": subject, "html": html_body }, timeout=10 ) if response.status_code == 200: self.logger.info("Email sent: {} to {}".format(template, recipients)) else: self.logger.error("Email failed: {}".format(response.text)) except Exception as e: self.logger.error("Email error: {}".format(str(e))) def send_webhook(self, url, payload, retry=True): """Send webhook notification (outgoing)""" if not url: return try: # Get webhook config if exists webhook_name = None webhook_config = {} for name, config in self.webhook_config.items(): if config.get('url') == url: webhook_name = name webhook_config = config break # Prepare headers headers = webhook_config.get('headers', { 'Content-Type': 'application/json' }) # Add auth if configured auth_config = webhook_config.get('auth', {}) if auth_config.get('type') == 'bearer' and auth_config.get('token'): headers['Authorization'] = 'Bearer {}'.format(auth_config['token']) # Send webhook response = requests.post( url, json=payload, headers=headers, timeout=webhook_config.get('timeout_seconds', 10) ) if response.status_code in [200, 201, 202]: self.logger.info("Webhook sent successfully: {}".format(url)) else: self.logger.warning("Webhook failed: {} - {}".format( response.status_code, response.text )) except Exception as e: self.logger.error("Webhook error: {}".format(str(e))) if retry and webhook_config.get('retry_on_failure'): # Could add to retry queue pass ``` --- ## 🎯 Updated Flow Diagrams ### A1β†’A2 with All-Done Check ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Cron triggers every 5 minutes β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Search DAM β”‚ β”‚ Status = A1 β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ Found campaigns? β”œβ”€β”€ No β†’ Sleep β†’ Loop ↓ Yes β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ For each β”‚ β”‚ campaign: β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Get Master β”‚ β”‚ Assets (count: N)β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ For each asset: β”‚ β”‚ 1. Download β”‚ β”‚ 2. Gen Track ID β”‚ β”‚ 3. Upload to Box β”‚ β”‚ 4. Store in DB β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ CHECK: All N assets β”‚ β”‚ processed successfully? β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ YES β”‚ β”‚ NO ↓ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Update Statusβ”‚ β”‚ DO NOT update β”‚ β”‚ A1 β†’ A2 β”‚ β”‚ Keep status A1 β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Send Webhook β”‚ β”‚ Send partial β”‚ β”‚ (campaign #) β”‚ β”‚ error email β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Send Success β”‚ β”‚ Email β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ### A2β†’A3 with All-Done Check ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Box: File uploaded β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Box Webhook β”‚ β”‚ POST receivedβ”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Validate β”‚ β”‚ Signature β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Queue Task β”‚ β”‚ Return 200 β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Async Processing:β”‚ β”‚ 1. Parse V2 name β”‚ β”‚ 2. Load master β”‚ β”‚ 3. Download Box β”‚ β”‚ 4. Extract video β”‚ β”‚ 5. Build MVP β”‚ β”‚ 6. Upload DAM β”‚ β”‚ 7. Store in DB β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ CHECK: All assets for β”‚ β”‚ this campaign uploaded? β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ YES β”‚ β”‚ NO ↓ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Update Statusβ”‚ β”‚ Wait for more β”‚ β”‚ A2 β†’ A3 β”‚ β”‚ Keep status A2 β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Send Webhook β”‚ β”‚ (campaign #) β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Send Success β”‚ β”‚ Email β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` --- ## πŸ“ Easy Configuration Examples ### Example 1: Add New MVP Field ```yaml # config/field_mappings.yaml mvp_fields: - FERRERO.FIELD.MKTG.ASSET TYPE - FERRERO.FIELD.FISCAL YEAR # ... existing fields ... - NEW.CUSTOM.FIELD.ID # ← Just add here! No code changes needed ``` ### Example 2: Change Webhook URL ```yaml # config/webhooks.yaml campaign_status_update: url: https://new-url.com/api/status # ← Just change URL headers: Authorization: "Bearer ${NEW_TOKEN}" # ← Update token in .env ``` ### Example 3: Switch Staging β†’ Production ```bash # Just change environment variable export ENV=production # Or in .env file: ENV=production # All URLs, settings automatically switch! ``` ### Example 4: Change Polling Interval ```yaml # config/config.yaml polling: interval_seconds: 180 # ← Change from 300 to 180 (3 minutes) # Or environment-specific: # config/environments/production.yaml polling: interval_seconds: 120 # 2 minutes in production ``` ### Example 5: Add Email Recipient ```yaml # config/config.yaml notifications: recipients: success: - existing@ferrero.com - newperson@ferrero.com # ← Just add to list! errors: - admin@ferrero.com ``` --- ## πŸš€ Deployment Steps (Shared Hosting) ### Step 1: Initial Setup ```bash # SSH into shared hosting ssh user@yourserver.com # Upload files cd ~ git clone your-repo/ferrero-automation.git cd ferrero-automation # Run setup chmod +x setup.sh ./setup.sh # Edit configuration nano .env # Add all credentials nano config/config.yaml # Verify settings ``` ### Step 2: Test Components ```bash # Activate venv source venv/bin/activate # Test DAM connection python -c "from scripts.shared.dam_client import DAMClient; from scripts.shared.config_loader import load_config; c=load_config(); d=DAMClient(c); print('DAM OK' if d.test_connection() else 'DAM FAIL')" # Test Box connection python -c "from scripts.shared.box_client import BoxClient; from scripts.shared.config_loader import load_config; c=load_config(); b=BoxClient(c); print('Box OK' if b.test_connection() else 'Box FAIL')" # Test database python -c "from scripts.shared.database import Database; from scripts.shared.config_loader import load_config; c=load_config(); db=Database(c); print('DB OK' if db.test_connection() else 'DB FAIL')" ``` ### Step 3: Setup Cron (A1β†’A2 Polling) ```bash # Edit crontab crontab -e # Add: */5 * * * * cd ~/ferrero-automation && venv/bin/python scripts/a1_to_a2_download.py >> logs/cron.log 2>&1 ``` ### Step 4: Start Webhook Server (A2β†’A3) ```bash cd ~/ferrero-automation source venv/bin/activate # Start in background nohup python scripts/a2_to_a3_upload.py > logs/webhook.log 2>&1 & echo $! > webhook.pid # Setup monitor cron crontab -e # Add: */5 * * * * ~/ferrero-automation/monitor_webhook.sh ``` ### Step 5: Configure Box Webhook ```bash # Use Box CLI or web interface to register webhook # Webhook URL: https://yourserver.com:5000/webhooks/box # Events: FILE.UPLOADED # Folder: Your campaign processing folder ``` --- ## 🎯 Implementation Checklist ### Core Components - [ ] Config loader (YAML + environment variables) - [ ] DAM client (OAuth2, search, download, upload, status update) - [ ] Box client (JWT, upload with tracking ID, download, list files) - [ ] Database client (CRUD, tracking IDs, campaign checks) - [ ] Filename parser (V2 naming convention, validation) - [ ] Metadata extractor MVP (28 fields, updates from filename) - [ ] Video analyzer (ffprobe wrapper) - [ ] Retry handler (exponential backoff) - [ ] Notifier (Mailgun email + webhook sender) ### Scripts - [ ] A1β†’A2 poller (with all-done check) - [ ] A2β†’A3 webhook handler (with all-done check) - [ ] Cleanup script (temp files) - [ ] Health check script ### Configuration - [ ] config.yaml (main config) - [ ] field_mappings.yaml (MVP fields - easy to edit!) - [ ] webhooks.yaml (webhook endpoints) - [ ] email_templates.yaml - [ ] staging.yaml / production.yaml - [ ] .env.example ### Deployment - [ ] requirements.txt (Python 3.6 compatible) - [ ] setup.sh (venv creation) - [ ] monitor_webhook.sh (process monitor) - [ ] Cron configuration - [ ] README with setup instructions ### Testing - [ ] Unit tests (pytest) - [ ] Integration tests - [ ] End-to-end workflow test --- ## πŸ” Security Considerations ### Shared Hosting Security ```yaml # Ensure credentials are secure .env: - File permissions: 600 (owner read/write only) - Not in git repository - Use environment variables # Webhook signature validation webhook: validate_signatures: true # Always true in production # Rate limiting (if supported by Flask) rate_limit: requests_per_minute: 60 # Logging logging: sanitize_credentials: true # Don't log secrets ``` --- ## πŸ“Š Monitoring Dashboard (Optional) Simple status page: ```python # scripts/status_dashboard.py from flask import Flask, render_template from shared.database import Database app = Flask(__name__) @app.route('/status') def status(): db = Database(load_config()) stats = { 'total_masters': db.count_master_assets(), 'total_derivatives': db.count_derivative_assets(), 'pending_a1_campaigns': db.count_campaigns_by_status('A1'), 'pending_a2_campaigns': db.count_campaigns_by_status('A2'), 'last_24h_uploads': db.count_uploads_last_24h(), 'failed_uploads': db.count_failed_uploads() } return render_template('status.html', **stats) ``` --- ## 🎯 Summary **Architecture:** 2 Python scripts + shared libraries **Python Version:** 3.6 compatible **Hosting:** Shared hosting with venv **Triggers:** Cron (A1β†’A2) + Webhook (A2β†’A3) **Key Features:** - βœ… All-done check before status updates - βœ… Campaign webhook notifications - βœ… Email notifications - βœ… Retry logic with exponential backoff - βœ… Configuration files for easy changes (URLs, fields, recipients) - βœ… Python 3.6 compatible - βœ… Virtual environment isolated - βœ… Comprehensive error handling **Estimated Effort:** 4-6 weeks full implementation --- **This plan is comprehensive and production-ready!** Should I save this to a document, or would you like me to start creating the actual Python scripts? πŸš€