#!/usr/bin/env python3 """ Ferrero Orchestrator -------------------- Master runner script that manages the execution of periodic tasks. - Runs via cron (e.g., every minute) - Checks if tasks are due to run based on interval - Ensures only ONE instance of a task runs at a time (using file locks) - Logs all activities """ import os import sys import time import logging import subprocess import fcntl import argparse from datetime import datetime, timedelta from pathlib import Path # Setup logging os.makedirs('logs', exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/orchestrator.log'), logging.StreamHandler() ] ) logger = logging.getLogger('Orchestrator') # ========================================== # CONFIGURATION # ========================================== # List of tasks to manage # interval_minutes: How often to try running it # script: Path relative to project root TASKS = [ { 'name': 'A1->A2 Box Uploader', 'script': 'scripts/a1_to_a2_box_uploader.py', 'interval_minutes': 5, 'args': ['--auth-pfx-v2'] }, { 'name': 'A1->A2 Download', 'script': 'scripts/a1_to_a2_download.py', 'interval_minutes': 5, 'args': ['--auth-pfx-v2'] }, { 'name': 'A2->A3 Upload Polling', 'script': 'scripts/a2_to_a3_upload_polling.py', 'interval_minutes': 5, 'args': ['--auth-pfx-v2'] }, { 'name': 'A4 Box Uploader', 'script': 'scripts/a4_box_uploader.py', 'interval_minutes': 10, 'args': ['--auth-pfx-v2'] }, { 'name': 'A4 Webhook Monitor', 'script': 'scripts/a4_webhook_monitor.py', 'interval_minutes': 5, 'args': ['--auth-pfx-v2'] }, { 'name': 'A5->A6 Download', 'script': 'scripts/a5_to_a6_download.py', 'interval_minutes': 5, 'args': ['--auth-pfx-v2'] }, { 'name': 'B1->B2 Download', 'script': 'scripts/b1_to_b2_download.py', 'interval_minutes': 10, 'args': ['--auth-pfx-v2'] } ] LOCK_DIR = 'locks' STATE_FILE = 'orchestrator_state.json' # ========================================== # CORE CLASSES # ========================================== class FileLock: """Context manager for file locking to prevent concurrent runs""" def __init__(self, lock_file): self.lock_file = lock_file self.fp = None def __enter__(self): self.fp = open(self.lock_file, 'w') try: fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) return True except IOError: return False def __exit__(self, exc_type, exc_value, traceback): if self.fp: try: fcntl.lockf(self.fp, fcntl.LOCK_UN) self.fp.close() except: pass class TaskRunner: def __init__(self): self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.lock_dir = os.path.join(self.project_root, LOCK_DIR) os.makedirs(self.lock_dir, exist_ok=True) def run_task(self, task): """Run a single task if lock can be acquired""" name = task['name'] script_rel_path = task['script'] script_path = os.path.join(self.project_root, script_rel_path) args = task.get('args', []) # Create a safe filename for the lock safe_name = name.lower().replace(' ', '_').replace('->', '_to_') lock_file = os.path.join(self.lock_dir, f"{safe_name}.lock") # Try to acquire lock with FileLock(lock_file) as acquired: if not acquired: logger.info(f"Skipping '{name}' - already running (Locked)") return logger.info(f"Starting '{name}'...") start_time = time.time() try: # Construct command cmd = [sys.executable, script_path] + args # Run subprocess # We wait for it to finish because we are in a loop or parallel thread # For this simple version, we run sequentially or we could use Popen for parallel # But let's stick to Popen and wait to capture output or just let it log to its own files # NOTE: The individual scripts already handle their own logging. # We just want to trigger them. result = subprocess.run( cmd, capture_output=True, text=True, cwd=self.project_root ) duration = time.time() - start_time if result.returncode == 0: logger.info(f"Finished '{name}' in {duration:.2f}s - SUCCESS") else: logger.error(f"Finished '{name}' in {duration:.2f}s - FAILED (Code {result.returncode})") # Log stderr if failed if result.stderr: logger.error(f"Error output for '{name}':\n{result.stderr.strip()}") except Exception as e: logger.error(f"Exception running '{name}': {str(e)}") def run_all(self): """Run all tasks that are due""" # Note: In a more advanced version, we would check 'last_run' timestamps here. # For now, we rely on the cron interval being the "tick" and the script's own logic # or just run them every time the orchestrator runs (e.g. every 5 mins). # But if orchestrator runs every minute, we need to check intervals. # For simplicity in this V1: # We assume Orchestrator runs every 1 minute via Cron. # We check if current_minute % interval == 0 now = datetime.now() current_minute = now.minute logger.info(f"Orchestrator tick: {now.strftime('%Y-%m-%d %H:%M:%S')}") for task in TASKS: interval = task.get('interval_minutes', 5) # Check if it's time to run # Logic: Run if (current_minute % interval) == 0 # This aligns runs to the clock (e.g. :00, :05, :10) if current_minute % interval == 0: self.run_task(task) else: # logger.debug(f"Skipping '{task['name']}' - not due (Interval: {interval}m)") pass def main(): parser = argparse.ArgumentParser(description='Ferrero Orchestrator') parser.add_argument('--force', action='store_true', help='Force run all tasks ignoring schedule') args = parser.parse_args() runner = TaskRunner() # Ensure only one instance of Orchestrator runs orchestrator_lock = os.path.join(runner.lock_dir, 'orchestrator_main.lock') with FileLock(orchestrator_lock) as acquired: if not acquired: logger.warning("Orchestrator is already running! Exiting.") sys.exit(0) if args.force: logger.info("Force run requested - running all tasks immediately") for task in TASKS: runner.run_task(task) else: runner.run_all() if __name__ == '__main__': main()