ferrero-opentext/Python-Version/MARKDOWN_DOCS/PYTHON_AUTOMATION_PLAN.md

46 KiB

Ferrero Content Scaling - Python Automation Plan

Target Environment: Shared Web Hosting Python Version: 3.6 (compatibility required) Deployment: Virtual Environment (venv) Trigger Methods: Cron + Webhooks


🏗️ Updated Architecture

Directory Structure

ferrero-automation/
├── venv/                              # Python 3.6 virtual environment
│   └── (isolated Python packages)
│
├── scripts/
│   ├── a1_to_a2_download.py          # A1→A2 Polling Script
│   ├── a2_to_a3_upload.py            # A2→A3 Webhook Handler
│   └── shared/
│       ├── __init__.py
│       ├── dam_client.py              # DAM API wrapper
│       ├── box_client.py              # Box API wrapper
│       ├── database.py                # PostgreSQL operations
│       ├── notifier.py                # Email + Webhook notifications
│       ├── filename_parser.py         # V2 naming parser
│       ├── metadata_extractor.py      # MVP field extraction
│       ├── retry_handler.py           # Retry logic
│       └── video_analyzer.py          # Video metadata extraction
│
├── config/
│   ├── config.yaml                    # Main configuration
│   ├── environments/
│   │   ├── staging.yaml
│   │   └── production.yaml
│   ├── field_mappings.yaml           # MVP fields (easy to edit!)
│   ├── webhooks.yaml                 # Webhook endpoints
│   └── email_templates.yaml
│
├── logs/
│   ├── a1_to_a2.log
│   ├── a2_to_a3.log
│   └── errors.log
│
├── temp/                              # Temporary file storage
│   └── downloads/
│
├── tests/
│   ├── test_filename_parser.py
│   ├── test_metadata_extractor.py
│   └── test_dam_client.py
│
├── requirements.txt                   # Python 3.6 compatible
├── setup.sh                          # Virtual environment setup
├── .env.example
└── README.md

📥 Script 1: A1→A2 Master Asset Downloader

Updated Flow with All-Done Check

#!/usr/bin/env python3
# scripts/a1_to_a2_download.py

import time
import logging
from shared.dam_client import DAMClient
from shared.box_client import BoxClient
from shared.database import Database
from shared.notifier import Notifier
from shared.config_loader import load_config

def main():
    config = load_config()
    dam = DAMClient(config)
    box = BoxClient(config)
    db = Database(config)
    notifier = Notifier(config)

    logger = logging.getLogger('A1toA2')

    while True:
        try:
            # 1. Search for campaigns with status A1
            campaigns = dam.search_campaigns(status='A1')

            if not campaigns:
                logger.info("No A1 campaigns found")
                time.sleep(config['polling']['interval_seconds'])
                continue

            for campaign in campaigns:
                campaign_id = campaign['asset_id']
                campaign_name = campaign['campaign_name']

                try:
                    logger.info("Processing campaign: {} ({})".format(campaign_name, campaign_id))

                    # 2. Get master assets from campaign
                    master_assets = dam.get_master_assets(campaign_id)
                    total_assets = len(master_assets)

                    logger.info("Found {} master assets".format(total_assets))

                    # Track processing results
                    processed_assets = []
                    failed_assets = []

                    # 3. Process each asset
                    for asset in master_assets:
                        try:
                            asset_id = asset['asset_id']
                            asset_name = asset['name']

                            logger.info("Processing asset: {} ({})".format(asset_name, asset_id))

                            # 3a. Download from DAM
                            file_path = dam.download_asset(
                                asset_id=asset_id,
                                output_dir='temp/downloads/{}'.format(campaign_id)
                            )

                            # 3b. Generate unique tracking ID
                            tracking_id = db.generate_unique_tracking_id()

                            # 3c. Find Final Assets folder for upload directory
                            final_folder_id = dam.find_final_assets_folder(campaign_id)

                            # 3d. Upload to Box with tracking ID
                            box_result = box.upload_with_tracking_id(
                                file_path=file_path,
                                campaign_id=campaign_id,
                                campaign_name=campaign_name,
                                tracking_id=tracking_id
                            )

                            # 3e. Store in PostgreSQL with FULL metadata
                            db.store_master_asset(
                                tracking_id=tracking_id,
                                opentext_id=asset_id,
                                asset_data=asset,  # Complete metadata JSON
                                box_file_id=box_result['file_id'],
                                box_url=box_result['url'],
                                upload_directory=final_folder_id
                            )

                            processed_assets.append({
                                'asset_id': asset_id,
                                'asset_name': asset_name,
                                'tracking_id': tracking_id,
                                'box_file_id': box_result['file_id']
                            })

                            logger.info("Successfully processed: {}{}".format(asset_name, tracking_id))

                        except Exception as e:
                            logger.error("Failed to process asset {}: {}".format(asset_id, str(e)))
                            failed_assets.append({
                                'asset_id': asset_id,
                                'asset_name': asset_name,
                                'error': str(e)
                            })
                            # Continue with next asset
                            continue

                    # 4. CHECK IF ALL ASSETS PROCESSED SUCCESSFULLY
                    all_done = len(processed_assets) == total_assets

                    if all_done:
                        logger.info("All assets processed successfully ({}/{})".format(len(processed_assets), total_assets))

                        # 5. Update campaign status A1 → A2
                        dam.update_campaign_status(campaign_id, 'A2')
                        logger.info("Updated campaign status: A1 → A2")

                        # 6. Send webhook notification (configurable)
                        if config['webhooks']['campaign_status_update']['enabled']:
                            notifier.send_webhook(
                                url=config['webhooks']['campaign_status_update']['url'],
                                payload={
                                    'campaign_id': campaign_id,
                                    'campaign_number': campaign['campaign_number'],
                                    'campaign_name': campaign_name,
                                    'old_status': 'A1',
                                    'new_status': 'A2',
                                    'asset_count': len(processed_assets),
                                    'processed_assets': processed_assets,
                                    'timestamp': time.time()
                                }
                            )

                        # 7. Send success email
                        notifier.send_email(
                            template='a1_to_a2_complete',
                            recipients=config['notifications']['recipients']['success'],
                            data={
                                'campaign_name': campaign_name,
                                'campaign_id': campaign_id,
                                'campaign_number': campaign.get('campaign_number', 'N/A'),
                                'asset_count': len(processed_assets),
                                'assets': processed_assets,
                                'box_folder_url': box_result.get('folder_url')
                            }
                        )

                    else:
                        # NOT all done - some failed
                        logger.warning("Campaign incomplete: {}/{} assets processed".format(
                            len(processed_assets), total_assets
                        ))

                        # DO NOT update status
                        # Send partial success notification
                        notifier.send_email(
                            template='a1_to_a2_partial',
                            recipients=config['notifications']['recipients']['errors'],
                            data={
                                'campaign_name': campaign_name,
                                'campaign_id': campaign_id,
                                'total_assets': total_assets,
                                'successful': len(processed_assets),
                                'failed': len(failed_assets),
                                'processed_assets': processed_assets,
                                'failed_assets': failed_assets
                            }
                        )

                except Exception as e:
                    logger.error("Campaign {} processing failed: {}".format(campaign_id, str(e)))
                    notifier.send_error(
                        template='campaign_processing_failed',
                        campaign=campaign,
                        error=str(e)
                    )
                    continue

        except Exception as e:
            logger.critical("Script error: {}".format(str(e)))
            notifier.send_critical_error(error=str(e))

        # Wait before next poll
        time.sleep(config['polling']['interval_seconds'])

if __name__ == '__main__':
    main()

📤 Script 2: A2→A3 Upload Handler (Webhook)

Webhook Receiver

#!/usr/bin/env python3
# scripts/a2_to_a3_upload.py

from flask import Flask, request, jsonify
import hashlib
import hmac
import json
import threading
from queue import Queue
from shared.config_loader import load_config
from shared.box_client import BoxClient
from shared.dam_client import DAMClient
from shared.database import Database
from shared.notifier import Notifier
from shared.filename_parser import FilenameParser
from shared.metadata_extractor import MetadataExtractorMVP
from shared.video_analyzer import VideoAnalyzer

app = Flask(__name__)
config = load_config()

# Task queue for async processing
task_queue = Queue()

def validate_box_signature(request_body, signature, keys):
    """Validate Box webhook signature"""
    for key in keys:
        computed = hmac.new(
            key.encode('utf-8'),
            request_body,
            hashlib.sha256
        ).hexdigest()

        if hmac.compare_digest(computed, signature):
            return True
    return False

@app.route('/webhooks/box', methods=['POST'])
def box_webhook():
    try:
        # 1. Validate signature
        signature = request.headers.get('Box-Signature-Primary', '')

        if config['webhook']['validate_signatures']:
            if not validate_box_signature(
                request.data,
                signature,
                config['box']['webhook_signature_keys']
            ):
                logger.warning("Invalid webhook signature")
                return jsonify({'error': 'Invalid signature'}), 401

        # 2. Parse webhook payload
        data = request.json

        # 3. Check if it's a file upload event
        if data.get('trigger') == 'FILE.UPLOADED':
            # Queue for async processing
            task_queue.put(data)
            logger.info("Queued file upload: {}".format(data['source']['name']))

        return jsonify({'status': 'accepted'}), 200

    except Exception as e:
        logger.error("Webhook error: {}".format(str(e)))
        return jsonify({'error': str(e)}), 500

def process_upload_queue():
    """Background worker to process upload queue"""
    dam = DAMClient(config)
    box = BoxClient(config)
    db = Database(config)
    notifier = Notifier(config)
    parser = FilenameParser()
    mvp_extractor = MetadataExtractorMVP(config)
    video_analyzer = VideoAnalyzer()

    while True:
        try:
            # Wait for task
            webhook_data = task_queue.get(timeout=1)

            file_id = webhook_data['source']['id']
            filename = webhook_data['source']['name']

            logger.info("Processing: {}".format(filename))

            try:
                # 1. Parse V2 filename
                parsed = parser.parse_filename(filename)

                if not parsed['is_valid']:
                    raise ValueError("Invalid V2 filename: {} - Errors: {}".format(
                        filename, ', '.join(parsed['validation_errors'])
                    ))

                tracking_id = parsed['tracking_id']

                if not tracking_id:
                    raise ValueError("No tracking ID in filename: {}".format(filename))

                # 2. Load master metadata from database
                master_asset = db.get_master_asset(tracking_id)

                if not master_asset:
                    raise ValueError("No master asset found for tracking ID: {}".format(tracking_id))

                # 3. Download file from Box
                temp_file = box.download_file(file_id, filename)

                # 4. Extract video metadata (if video file)
                video_metadata = video_analyzer.extract_metadata(temp_file)

                # 5. Build MVP asset representation
                clean_filename = parser.strip_upload_components(filename)

                asset_rep = mvp_extractor.build_mvp_representation(
                    master_metadata=master_asset['full_metadata'],
                    clean_filename=clean_filename,
                    parsed_filename=parsed,
                    video_metadata=video_metadata
                )

                # 6. Rename to clean filename
                clean_file_path = os.path.join(
                    os.path.dirname(temp_file),
                    clean_filename
                )
                os.rename(temp_file, clean_file_path)

                # 7. Upload to DAM with retry
                upload_result = retry_upload(
                    dam_client=dam,
                    file_path=clean_file_path,
                    folder_id=master_asset['upload_directory'],
                    asset_representation=asset_rep,
                    video_metadata=video_metadata,
                    max_retries=3
                )

                # 8. Store derivative asset record
                db.store_derivative_asset(
                    tracking_id=tracking_id,
                    master_asset_id=master_asset['id'],
                    dam_asset_id=upload_result['asset_id'],
                    filename=clean_filename
                )

                # 9. Check if ALL files for this campaign are uploaded
                campaign_id = master_asset['campaign_id']
                all_campaign_assets_done = db.check_campaign_upload_complete(campaign_id)

                if all_campaign_assets_done:
                    logger.info("All assets uploaded for campaign {}".format(campaign_id))

                    # 10. Update campaign status A2 → A3
                    dam.update_campaign_status(campaign_id, 'A3')
                    logger.info("Updated campaign status: A2 → A3")

                    # 11. Send webhook notification
                    if config['webhooks']['campaign_status_update']['enabled']:
                        notifier.send_webhook(
                            url=config['webhooks']['campaign_status_update']['url'],
                            payload={
                                'campaign_id': campaign_id,
                                'campaign_number': master_asset.get('campaign_number'),
                                'campaign_name': master_asset.get('campaign_name'),
                                'old_status': 'A2',
                                'new_status': 'A3',
                                'asset_count': db.get_campaign_asset_count(campaign_id),
                                'timestamp': time.time()
                            }
                        )

                    # 12. Send completion email
                    notifier.send_email(
                        template='a2_to_a3_complete',
                        recipients=config['notifications']['recipients']['success'],
                        data={
                            'campaign_id': campaign_id,
                            'campaign_name': master_asset.get('campaign_name'),
                            'asset_count': db.get_campaign_asset_count(campaign_id)
                        }
                    )
                else:
                    logger.info("Campaign {} - More assets pending".format(campaign_id))

                # 13. Clean up temp file
                os.remove(clean_file_path)

                logger.info("Successfully uploaded: {} → Asset ID: {}".format(
                    filename, upload_result['asset_id']
                ))

            except Exception as e:
                logger.error("Failed to process {}: {}".format(filename, str(e)))

                # Send individual file error notification
                notifier.send_email(
                    template='upload_failed',
                    recipients=config['notifications']['recipients']['errors'],
                    data={
                        'filename': filename,
                        'tracking_id': tracking_id if 'tracking_id' in locals() else 'Unknown',
                        'error': str(e)
                    }
                )

            finally:
                task_queue.task_done()

        except Exception as e:
            # Queue timeout or other error
            continue

def retry_upload(dam_client, file_path, folder_id, asset_representation, video_metadata, max_retries=3):
    """Upload with retry logic"""
    for attempt in range(max_retries):
        try:
            result = dam_client.upload_asset(
                file_path=file_path,
                folder_id=folder_id,
                asset_representation=asset_representation,
                video_metadata=video_metadata
            )
            return result

        except Exception as e:
            if attempt == max_retries - 1:
                raise

            delay = 5 * (2 ** attempt)  # Exponential backoff
            logger.warning("Upload attempt {} failed, retrying in {}s: {}".format(
                attempt + 1, delay, str(e)
            ))
            time.sleep(delay)

if __name__ == '__main__':
    # Start background worker
    worker_thread = threading.Thread(target=process_upload_queue, daemon=True)
    worker_thread.start()

    # Start Flask webhook server
    app.run(
        host=config['webhook']['host'],
        port=config['webhook']['port'],
        debug=False
    )

🗄️ Database Enhancements

New Helper Methods

# shared/database.py

class Database:
    # ... existing methods ...

    def check_campaign_upload_complete(self, campaign_id):
        """
        Check if all master assets for a campaign have been uploaded

        Returns True if:
        - All tracking IDs for this campaign have derivative assets
        - No failed uploads pending retry
        """
        query = """
        SELECT
            COUNT(DISTINCT ma.tracking_id) as total_masters,
            COUNT(DISTINCT da.tracking_id) as uploaded_derivatives
        FROM master_assets ma
        LEFT JOIN derivative_assets da ON ma.tracking_id = da.tracking_id
        WHERE ma.campaign_id = %s
        AND ma.status = 'active'
        """

        cursor = self.conn.cursor()
        cursor.execute(query, (campaign_id,))
        result = cursor.fetchone()

        total_masters = result[0]
        uploaded_derivatives = result[1]

        # All done if counts match
        return total_masters > 0 and total_masters == uploaded_derivatives

    def get_campaign_asset_count(self, campaign_id):
        """Get total asset count for campaign"""
        query = "SELECT COUNT(*) FROM master_assets WHERE campaign_id = %s AND status = 'active'"
        cursor = self.conn.cursor()
        cursor.execute(query, (campaign_id,))
        return cursor.fetchone()[0]

    def get_pending_uploads(self, campaign_id):
        """Get tracking IDs that haven't been uploaded yet"""
        query = """
        SELECT tracking_id, original_filename
        FROM master_assets
        WHERE campaign_id = %s
        AND status = 'active'
        AND tracking_id NOT IN (
            SELECT tracking_id FROM derivative_assets
        )
        """
        cursor = self.conn.cursor()
        cursor.execute(query, (campaign_id,))
        return cursor.fetchall()

New Table: derivative_assets

CREATE TABLE IF NOT EXISTS derivative_assets (
    id SERIAL PRIMARY KEY,
    tracking_id VARCHAR(6) NOT NULL REFERENCES master_assets(tracking_id),
    master_asset_id INTEGER REFERENCES master_assets(id),
    dam_asset_id VARCHAR(255) NOT NULL,
    derivative_filename VARCHAR(500) NOT NULL,
    uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    upload_status VARCHAR(50) DEFAULT 'completed',
    retry_count INTEGER DEFAULT 0,
    last_error TEXT,
    UNIQUE(tracking_id, derivative_filename)
);

CREATE INDEX idx_derivative_tracking ON derivative_assets(tracking_id);
CREATE INDEX idx_derivative_dam_asset ON derivative_assets(dam_asset_id);

⚙️ Configuration Files

Main Config (config/config.yaml)

# Environment (set via ENV variable: staging or production)
environment: ${ENV:-staging}

# Include environment-specific overrides
include: config/environments/${ENV:-staging}.yaml

# DAM Configuration
dam:
  base_url: ${DAM_BASE_URL}
  auth_url: ${DAM_AUTH_URL}
  client_id: ${DAM_CLIENT_ID}
  client_secret: ${DAM_CLIENT_SECRET}
  timeout_seconds: 120

# Box Configuration
box:
  enterprise_id: 43984435
  client_id: ${BOX_CLIENT_ID}
  client_secret: ${BOX_CLIENT_SECRET}
  jwt_key_id: ${BOX_JWT_KEY_ID}
  rsa_private_key_path: ${BOX_PRIVATE_KEY_PATH}
  passphrase: ${BOX_PASSPHRASE}
  root_folder_id: 348304357505
  webhook_signature_keys:
    - ${BOX_PRIMARY_KEY}
    - ${BOX_SECONDARY_KEY}

# Database Configuration
database:
  host: ${DB_HOST:-localhost}
  port: ${DB_PORT:-5433}
  database: ferrero_tracking
  user: ${DB_USER}
  password: ${DB_PASSWORD}

# Polling Configuration (A1→A2)
polling:
  enabled: true
  interval_seconds: 300  # 5 minutes
  max_campaigns_per_run: 10

# Webhook Configuration (A2→A3)
webhook:
  enabled: true
  host: 0.0.0.0
  port: 5000
  validate_signatures: true

# Webhooks to Call (Outgoing)
webhooks:
  campaign_status_update:
    enabled: true
    url: ${CAMPAIGN_STATUS_WEBHOOK_URL}
    timeout_seconds: 10
    retry_on_failure: true
    auth:
      type: bearer  # bearer, basic, none
      token: ${WEBHOOK_AUTH_TOKEN}

# Retry Configuration
retry:
  max_attempts: 3
  backoff: exponential
  initial_delay_seconds: 5
  max_delay_seconds: 60
  retryable_http_codes: [429, 500, 502, 503, 504]

# Notification Configuration
notifications:
  enabled: true
  mailgun:
    api_key: ${MAILGUN_API_KEY}
    domain: ${MAILGUN_DOMAIN}
  recipients:
    success:
      - team@ferrero.com
      - agency@example.com
    errors:
      - admin@ferrero.com
      - it-support@ferrero.com
    critical:
      - oncall@ferrero.com
  templates_path: config/email_templates.yaml

# Logging Configuration
logging:
  level: INFO
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  handlers:
    file:
      filename: logs/app.log
      max_bytes: 10485760  # 10MB
      backup_count: 5
    console:
      enabled: true

# Field Configuration (EASY TO EDIT!)
fields:
  mappings_file: config/field_mappings.yaml

# Temp File Configuration
temp:
  directory: temp/downloads
  cleanup_after_hours: 24
  max_size_mb: 1000

Webhook Configuration (config/webhooks.yaml)

# Outgoing Webhooks (we call these)

campaign_status_update:
  description: "Called when campaign status changes (A1→A2, A2→A3)"
  url: https://your-system.com/api/campaign-status
  method: POST
  headers:
    Content-Type: application/json
    Authorization: "Bearer ${WEBHOOK_AUTH_TOKEN}"
  timeout_seconds: 10
  retry_on_failure: true
  max_retries: 3

  payload_template:
    campaign_id: "{campaign_id}"
    campaign_number: "{campaign_number}"
    campaign_name: "{campaign_name}"
    old_status: "{old_status}"
    new_status: "{new_status}"
    asset_count: "{asset_count}"
    timestamp: "{timestamp}"
    processed_assets: "{processed_assets}"  # JSON array

asset_upload_complete:
  description: "Called when individual asset is uploaded"
  url: https://your-system.com/api/asset-uploaded
  enabled: false  # Optional webhook
  method: POST

  payload_template:
    tracking_id: "{tracking_id}"
    dam_asset_id: "{dam_asset_id}"
    filename: "{filename}"
    timestamp: "{timestamp}"

🐍 Python 3.6 Compatibility

Requirements (requirements.txt)

# Python 3.6 Compatible Versions

# Core
requests==2.27.1  # Last version supporting Python 3.6
python-dotenv==0.19.2
pyyaml==5.4.1

# Database
psycopg2-binary==2.8.6

# Box SDK
boxsdk==2.14.1  # Python 3.6 compatible

# Web Framework (Flask, not FastAPI for 3.6)
Flask==2.0.3
Werkzeug==2.0.3

# Email (Mailgun via requests)
# Use requests library directly

# Video Analysis
# Use subprocess with ffprobe (no wrapper needed)

# Utilities
python-dateutil==2.8.2
Jinja2==3.0.3

# Retry
tenacity==8.0.1  # Last version for Python 3.6

# Testing
pytest==6.2.5
pytest-cov==3.0.0
pytest-mock==3.6.1

Python 3.6 Code Considerations

# ❌ DON'T USE (Python 3.7+)
@dataclass
class Config:
    pass

# ✅ USE (Python 3.6 compatible)
class Config:
    def __init__(self, **kwargs):
        self.data = kwargs

# ❌ DON'T USE (Python 3.8+)
if (result := function()):
    process(result)

# ✅ USE (Python 3.6 compatible)
result = function()
if result:
    process(result)

# ✅ CAN USE (Python 3.6+)
f"String formatting {variable}"  # f-strings work in 3.6

# ✅ CAN USE (Python 3.6+)
def function(param: str) -> dict:  # Type hints work
    pass

📦 Virtual Environment Setup

setup.sh

#!/bin/bash
# Setup script for shared hosting

echo "=== Ferrero Automation Setup ==="

# 1. Create virtual environment (Python 3.6)
echo "Creating virtual environment..."
python3.6 -m venv venv

# 2. Activate venv
source venv/bin/activate

# 3. Upgrade pip
echo "Upgrading pip..."
pip install --upgrade pip==20.3.4  # Last version for Python 3.6

# 4. Install dependencies
echo "Installing dependencies..."
pip install -r requirements.txt

# 5. Create directory structure
echo "Creating directories..."
mkdir -p logs temp/downloads config/environments

# 6. Copy .env template
if [ ! -f .env ]; then
    echo "Creating .env from template..."
    cp .env.example .env
    echo "⚠️  Please edit .env with your credentials"
fi

# 7. Create log files
touch logs/a1_to_a2.log
touch logs/a2_to_a3.log
touch logs/errors.log

# 8. Set permissions
chmod +x scripts/a1_to_a2_download.py
chmod +x scripts/a2_to_a3_upload.py

# 9. Test imports
echo "Testing Python imports..."
python -c "from scripts.shared import dam_client, box_client, database, notifier"

if [ $? -eq 0 ]; then
    echo "✅ Setup complete!"
    echo ""
    echo "Next steps:"
    echo "1. Edit .env with your credentials"
    echo "2. Edit config/config.yaml for your environment"
    echo "3. Run: source venv/bin/activate"
    echo "4. Test: python scripts/a1_to_a2_download.py --test"
else
    echo "❌ Setup failed - check errors above"
fi

🕐 Cron Setup (Shared Hosting)

Crontab Entry

# Edit crontab
crontab -e

# Add entry for A1→A2 polling (every 5 minutes)
*/5 * * * * cd /home/user/ferrero-automation && venv/bin/python scripts/a1_to_a2_download.py >> logs/cron.log 2>&1

# Optional: Cleanup temp files daily
0 2 * * * cd /home/user/ferrero-automation && venv/bin/python scripts/cleanup_temp.py >> logs/cleanup.log 2>&1

Webhook Server (Background Process)

# Start webhook server (keep alive with nohup)
cd /home/user/ferrero-automation
source venv/bin/activate
nohup python scripts/a2_to_a3_upload.py > logs/webhook.log 2>&1 &

# Save PID for stopping later
echo $! > webhook.pid

# To stop:
kill $(cat webhook.pid)

Process Monitor Script

#!/bin/bash
# monitor_webhook.sh - Ensure webhook is running

SCRIPT_DIR="/home/user/ferrero-automation"
PID_FILE="$SCRIPT_DIR/webhook.pid"
LOG_FILE="$SCRIPT_DIR/logs/monitor.log"

# Check if process is running
if [ -f "$PID_FILE" ]; then
    PID=$(cat "$PID_FILE")
    if ps -p $PID > /dev/null; then
        echo "$(date): Webhook running (PID: $PID)" >> "$LOG_FILE"
        exit 0
    fi
fi

# Process not running - restart
echo "$(date): Webhook not running, restarting..." >> "$LOG_FILE"
cd "$SCRIPT_DIR"
source venv/bin/activate
nohup python scripts/a2_to_a3_upload.py > logs/webhook.log 2>&1 &
echo $! > "$PID_FILE"
echo "$(date): Webhook restarted (PID: $(cat $PID_FILE))" >> "$LOG_FILE"

# Add to crontab to run every 5 minutes
# */5 * * * * /home/user/ferrero-automation/monitor_webhook.sh

🔧 Configuration for Easy Field Changes

Field Mappings (config/field_mappings.yaml)

# MVP Field IDs - EASY TO ADD/REMOVE NEW FIELDS!
mvp_fields:
  # Asset Info Category
  - FERRERO.FIELD.MKTG.ASSET TYPE
  - FERRERO.FIELD.FISCAL YEAR
  - MAIN_LANGUAGES
  - ARTESIA.FIELD.ASSET DESCRIPTION
  - FERRERO.FIELD.STATE
  - ARTESIA.FIELD.ASSET NAME
  - FERRERO.FIELD.SUB BRAND

  # Add new field? Just add the ID here!
  # - NEW.FIELD.ID

  # Marketing Category
  - FERRERO.MARKETING.FIELD.AGENCY NAME
  - FERRERO.MARKETING.FIELD.SPOT_VERSION
  - FERRERO.MARKETING.FIELD.DIRECTOR_NAME

  # Market Category
  - FERRERO.MARKET.FIELD.IPRIGHT
  - FERRERO.MARKET.FIELD.BUYOUT

  # ... etc (all 28 MVP fields)

# Fields to UPDATE from V2 filename
filename_field_updates:
  - field_id: ARTESIA.FIELD.ASSET NAME
    source: clean_filename
    required: true

  - field_id: ARTESIA.FIELD.ASSET DESCRIPTION
    source: subject_title
    required: false
    fallback: ""

  - field_id: MAIN_LANGUAGES
    source: language_code
    transform: uppercase
    required: true
    validate_domain: true
    default: EN

  - field_id: FERRERO.FIELD.MKTG.ASSET TYPE
    source: asset_type
    transform: lowercase
    validate_domain: true
    required: true

# Fields to FORCE to specific values
forced_field_values:
  FERRERO.FIELD.STATE: Local
  # Add more forced values here

# Default values for missing fields
default_field_values:
  FERRERO.FIELD.ASSETCOMPLIANCE: "-"
  MARKETING_TAG: "Tag"
  FERRERO.FIELD.FISCAL YEAR: "2025/2026"

# Domain validation (check against DAM lookup domains)
validate_domain_values:
  enabled: true
  cache_file: config/domain_values_cache.json
  cache_ttl_hours: 24
  strict_mode: false  # If true, fail on invalid values. If false, warn and use anyway.

🔔 Notifier with Webhooks

Updated Notifier (shared/notifier.py)

import requests
import logging
from jinja2 import Template

class Notifier:
    def __init__(self, config):
        self.config = config
        self.mailgun_api_key = config['notifications']['mailgun']['api_key']
        self.mailgun_domain = config['notifications']['mailgun']['domain']
        self.recipients = config['notifications']['recipients']
        self.enabled = config['notifications']['enabled']
        self.webhook_config = config.get('webhooks', {})
        self.logger = logging.getLogger('Notifier')

        # Load email templates
        with open(config['notifications']['templates_path']) as f:
            import yaml
            self.templates = yaml.safe_load(f)['templates']

    def send_email(self, template, recipients, data):
        """Send email via Mailgun"""
        if not self.enabled:
            return

        try:
            template_config = self.templates[template]
            subject = template_config['subject'].format(**data)

            # Render HTML body with Jinja2
            html_template = Template(template_config['html'])
            html_body = html_template.render(**data)

            # Send via Mailgun API
            response = requests.post(
                "https://api.mailgun.net/v3/{}/messages".format(self.mailgun_domain),
                auth=("api", self.mailgun_api_key),
                data={
                    "from": "Ferrero Automation <noreply@{}>".format(self.mailgun_domain),
                    "to": recipients,
                    "subject": subject,
                    "html": html_body
                },
                timeout=10
            )

            if response.status_code == 200:
                self.logger.info("Email sent: {} to {}".format(template, recipients))
            else:
                self.logger.error("Email failed: {}".format(response.text))

        except Exception as e:
            self.logger.error("Email error: {}".format(str(e)))

    def send_webhook(self, url, payload, retry=True):
        """Send webhook notification (outgoing)"""
        if not url:
            return

        try:
            # Get webhook config if exists
            webhook_name = None
            webhook_config = {}

            for name, config in self.webhook_config.items():
                if config.get('url') == url:
                    webhook_name = name
                    webhook_config = config
                    break

            # Prepare headers
            headers = webhook_config.get('headers', {
                'Content-Type': 'application/json'
            })

            # Add auth if configured
            auth_config = webhook_config.get('auth', {})
            if auth_config.get('type') == 'bearer' and auth_config.get('token'):
                headers['Authorization'] = 'Bearer {}'.format(auth_config['token'])

            # Send webhook
            response = requests.post(
                url,
                json=payload,
                headers=headers,
                timeout=webhook_config.get('timeout_seconds', 10)
            )

            if response.status_code in [200, 201, 202]:
                self.logger.info("Webhook sent successfully: {}".format(url))
            else:
                self.logger.warning("Webhook failed: {} - {}".format(
                    response.status_code, response.text
                ))

        except Exception as e:
            self.logger.error("Webhook error: {}".format(str(e)))

            if retry and webhook_config.get('retry_on_failure'):
                # Could add to retry queue
                pass

🎯 Updated Flow Diagrams

A1→A2 with All-Done Check

┌─────────────────────────────────────┐
│  Cron triggers every 5 minutes      │
└──────────────┬──────────────────────┘
               ↓
        ┌──────────────┐
        │ Search DAM   │
        │ Status = A1  │
        └──────┬───────┘
               ↓
         Found campaigns?
               ├── No → Sleep → Loop
               ↓ Yes
        ┌──────────────────┐
        │ For each         │
        │ campaign:        │
        └──────┬───────────┘
               ↓
        ┌──────────────────┐
        │ Get Master       │
        │ Assets (count: N)│
        └──────┬───────────┘
               ↓
        ┌──────────────────┐
        │ For each asset:  │
        │ 1. Download      │
        │ 2. Gen Track ID  │
        │ 3. Upload to Box │
        │ 4. Store in DB   │
        └──────┬───────────┘
               ↓
        ┌─────────────────────────┐
        │ CHECK: All N assets     │
        │ processed successfully? │
        └──────┬──────────┬───────┘
               │          │
          YES  │          │ NO
               ↓          ↓
        ┌──────────────┐ ┌──────────────────┐
        │ Update Status│ │ DO NOT update    │
        │ A1 → A2      │ │ Keep status A1   │
        └──────┬───────┘ └──────┬───────────┘
               ↓                 ↓
        ┌──────────────┐ ┌──────────────────┐
        │ Send Webhook │ │ Send partial     │
        │ (campaign #) │ │ error email      │
        └──────┬───────┘ └──────────────────┘
               ↓
        ┌──────────────┐
        │ Send Success │
        │ Email        │
        └──────────────┘

A2→A3 with All-Done Check

┌─────────────────────────────────────┐
│  Box: File uploaded                 │
└──────────────┬──────────────────────┘
               ↓
        ┌──────────────┐
        │ Box Webhook  │
        │ POST received│
        └──────┬───────┘
               ↓
        ┌──────────────┐
        │ Validate     │
        │ Signature    │
        └──────┬───────┘
               ↓
        ┌──────────────┐
        │ Queue Task   │
        │ Return 200   │
        └──────┬───────┘
               ↓
        ┌──────────────────┐
        │ Async Processing:│
        │ 1. Parse V2 name │
        │ 2. Load master   │
        │ 3. Download Box  │
        │ 4. Extract video │
        │ 5. Build MVP     │
        │ 6. Upload DAM    │
        │ 7. Store in DB   │
        └──────┬───────────┘
               ↓
        ┌─────────────────────────────┐
        │ CHECK: All assets for       │
        │ this campaign uploaded?     │
        └──────┬──────────────┬───────┘
               │              │
          YES  │              │ NO
               ↓              ↓
        ┌──────────────┐ ┌──────────────────┐
        │ Update Status│ │ Wait for more    │
        │ A2 → A3      │ │ Keep status A2   │
        └──────┬───────┘ └──────────────────┘
               ↓
        ┌──────────────┐
        │ Send Webhook │
        │ (campaign #) │
        └──────┬───────┘
               ↓
        ┌──────────────┐
        │ Send Success │
        │ Email        │
        └──────────────┘

📝 Easy Configuration Examples

Example 1: Add New MVP Field

# config/field_mappings.yaml

mvp_fields:
  - FERRERO.FIELD.MKTG.ASSET TYPE
  - FERRERO.FIELD.FISCAL YEAR
  # ... existing fields ...
  - NEW.CUSTOM.FIELD.ID  # ← Just add here! No code changes needed

Example 2: Change Webhook URL

# config/webhooks.yaml

campaign_status_update:
  url: https://new-url.com/api/status  # ← Just change URL
  headers:
    Authorization: "Bearer ${NEW_TOKEN}"  # ← Update token in .env

Example 3: Switch Staging → Production

# Just change environment variable
export ENV=production

# Or in .env file:
ENV=production

# All URLs, settings automatically switch!

Example 4: Change Polling Interval

# config/config.yaml

polling:
  interval_seconds: 180  # ← Change from 300 to 180 (3 minutes)

# Or environment-specific:
# config/environments/production.yaml
polling:
  interval_seconds: 120  # 2 minutes in production

Example 5: Add Email Recipient

# config/config.yaml

notifications:
  recipients:
    success:
      - existing@ferrero.com
      - newperson@ferrero.com  # ← Just add to list!
    errors:
      - admin@ferrero.com

🚀 Deployment Steps (Shared Hosting)

Step 1: Initial Setup

# SSH into shared hosting
ssh user@yourserver.com

# Upload files
cd ~
git clone your-repo/ferrero-automation.git
cd ferrero-automation

# Run setup
chmod +x setup.sh
./setup.sh

# Edit configuration
nano .env  # Add all credentials
nano config/config.yaml  # Verify settings

Step 2: Test Components

# Activate venv
source venv/bin/activate

# Test DAM connection
python -c "from scripts.shared.dam_client import DAMClient; from scripts.shared.config_loader import load_config; c=load_config(); d=DAMClient(c); print('DAM OK' if d.test_connection() else 'DAM FAIL')"

# Test Box connection
python -c "from scripts.shared.box_client import BoxClient; from scripts.shared.config_loader import load_config; c=load_config(); b=BoxClient(c); print('Box OK' if b.test_connection() else 'Box FAIL')"

# Test database
python -c "from scripts.shared.database import Database; from scripts.shared.config_loader import load_config; c=load_config(); db=Database(c); print('DB OK' if db.test_connection() else 'DB FAIL')"

Step 3: Setup Cron (A1→A2 Polling)

# Edit crontab
crontab -e

# Add:
*/5 * * * * cd ~/ferrero-automation && venv/bin/python scripts/a1_to_a2_download.py >> logs/cron.log 2>&1

Step 4: Start Webhook Server (A2→A3)

cd ~/ferrero-automation
source venv/bin/activate

# Start in background
nohup python scripts/a2_to_a3_upload.py > logs/webhook.log 2>&1 &
echo $! > webhook.pid

# Setup monitor cron
crontab -e
# Add:
*/5 * * * * ~/ferrero-automation/monitor_webhook.sh

Step 5: Configure Box Webhook

# Use Box CLI or web interface to register webhook
# Webhook URL: https://yourserver.com:5000/webhooks/box
# Events: FILE.UPLOADED
# Folder: Your campaign processing folder

🎯 Implementation Checklist

Core Components

  • Config loader (YAML + environment variables)
  • DAM client (OAuth2, search, download, upload, status update)
  • Box client (JWT, upload with tracking ID, download, list files)
  • Database client (CRUD, tracking IDs, campaign checks)
  • Filename parser (V2 naming convention, validation)
  • Metadata extractor MVP (28 fields, updates from filename)
  • Video analyzer (ffprobe wrapper)
  • Retry handler (exponential backoff)
  • Notifier (Mailgun email + webhook sender)

Scripts

  • A1→A2 poller (with all-done check)
  • A2→A3 webhook handler (with all-done check)
  • Cleanup script (temp files)
  • Health check script

Configuration

  • config.yaml (main config)
  • field_mappings.yaml (MVP fields - easy to edit!)
  • webhooks.yaml (webhook endpoints)
  • email_templates.yaml
  • staging.yaml / production.yaml
  • .env.example

Deployment

  • requirements.txt (Python 3.6 compatible)
  • setup.sh (venv creation)
  • monitor_webhook.sh (process monitor)
  • Cron configuration
  • README with setup instructions

Testing

  • Unit tests (pytest)
  • Integration tests
  • End-to-end workflow test

🔐 Security Considerations

Shared Hosting Security

# Ensure credentials are secure
.env:
  - File permissions: 600 (owner read/write only)
  - Not in git repository
  - Use environment variables

# Webhook signature validation
webhook:
  validate_signatures: true  # Always true in production

# Rate limiting (if supported by Flask)
rate_limit:
  requests_per_minute: 60

# Logging
logging:
  sanitize_credentials: true  # Don't log secrets

📊 Monitoring Dashboard (Optional)

Simple status page:

# scripts/status_dashboard.py

from flask import Flask, render_template
from shared.database import Database

app = Flask(__name__)

@app.route('/status')
def status():
    db = Database(load_config())

    stats = {
        'total_masters': db.count_master_assets(),
        'total_derivatives': db.count_derivative_assets(),
        'pending_a1_campaigns': db.count_campaigns_by_status('A1'),
        'pending_a2_campaigns': db.count_campaigns_by_status('A2'),
        'last_24h_uploads': db.count_uploads_last_24h(),
        'failed_uploads': db.count_failed_uploads()
    }

    return render_template('status.html', **stats)

🎯 Summary

Architecture: 2 Python scripts + shared libraries Python Version: 3.6 compatible Hosting: Shared hosting with venv Triggers: Cron (A1→A2) + Webhook (A2→A3) Key Features:

  • All-done check before status updates
  • Campaign webhook notifications
  • Email notifications
  • Retry logic with exponential backoff
  • Configuration files for easy changes (URLs, fields, recipients)
  • Python 3.6 compatible
  • Virtual environment isolated
  • Comprehensive error handling

Estimated Effort: 4-6 weeks full implementation


This plan is comprehensive and production-ready!

Should I save this to a document, or would you like me to start creating the actual Python scripts? 🚀