#!/usr/bin/env python3 """ Ferrero Orchestrator -------------------- Master runner script that manages the execution of periodic tasks. - Runs via cron (e.g., every minute) - Checks if tasks are due to run based on interval - Ensures only ONE instance of a task runs at a time (using file locks) - Logs all activities """ import os import sys import time import logging import subprocess import fcntl import argparse from datetime import datetime, timedelta from pathlib import Path # Setup logging os.makedirs('logs', exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/orchestrator.log'), logging.StreamHandler() ] ) logger = logging.getLogger('Orchestrator') # ========================================== # CONFIGURATION # ========================================== # List of tasks to manage # interval_minutes: How often to try running it # script: Path relative to project root TASKS = [ { 'name': 'A1->A2 Box Uploader', 'script': 'scripts/a1_to_a2_box_uploader.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': [] # Temporarily using OAuth instead of --auth-pfx-v2 }, { 'name': 'A2->A3 Upload Polling', 'script': 'scripts/a2_to_a3_upload_polling.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': [] # Temporarily using OAuth instead of --auth-pfx-v2 }, { 'name': 'A4 Box Uploader', 'script': 'scripts/a4_box_uploader.py', 'interval_minutes': 10, 'args': [] # Temporarily using OAuth instead of --auth-pfx-v2 }, { 'name': 'A4 Webhook Monitor', 'script': 'scripts/a4_webhook_monitor.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': [] # Temporarily using OAuth instead of --auth-pfx-v2 }, { 'name': 'A5->A6 Download', 'script': 'scripts/a5_to_a6_download.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': [] # Temporarily using OAuth instead of --auth-pfx-v2 }, { 'name': 'B1->B2 Download', 'script': 'scripts/b1_to_b2_download.py', 'interval_minutes': 10, 'args': [] # Temporarily using OAuth instead of --auth-pfx-v2 }, { 'name': 'B4 Box Uploader', 'script': 'scripts/b4_box_uploader.py', 'interval_minutes': 10, 'args': [] # Temporarily using OAuth instead of --auth-pfx-v2 }, { 'name': 'Daily Report', 'script': 'scripts/daily_report.py', 'interval_minutes': 0, # Special case: use run_at_hour 'run_at_hour': 19, # Run at 7 PM 'args': [] } ] LOCK_DIR = 'locks' STATE_FILE = 'orchestrator_state.json' # ========================================== # CORE CLASSES # ========================================== class FileLock: """Context manager for file locking to prevent concurrent runs""" def __init__(self, lock_file): self.lock_file = lock_file self.fp = None def __enter__(self): self.fp = open(self.lock_file, 'w') try: fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) return True except IOError: return False def __exit__(self, exc_type, exc_value, traceback): if self.fp: try: fcntl.lockf(self.fp, fcntl.LOCK_UN) self.fp.close() except: pass class TaskRunner: def __init__(self): self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.lock_dir = os.path.join(self.project_root, LOCK_DIR) os.makedirs(self.lock_dir, exist_ok=True) def run_task(self, task): """Run a single task if lock can be acquired""" name = task['name'] script_rel_path = task['script'] script_path = os.path.join(self.project_root, script_rel_path) args = task.get('args', []) # Create a safe filename for the lock safe_name = name.lower().replace(' ', '_').replace('->', '_to_') lock_file = os.path.join(self.lock_dir, f"{safe_name}.lock") # Try to acquire lock with FileLock(lock_file) as acquired: if not acquired: logger.info(f"Skipping '{name}' - already running (Locked)") return logger.info(f"Starting '{name}'...") start_time = time.time() try: # Determine python executable # Prefer venv/bin/python if it exists relative to project root venv_python = os.path.join(self.project_root, 'venv', 'bin', 'python') if os.path.exists(venv_python): python_exe = venv_python else: python_exe = sys.executable # Construct command cmd = [python_exe, script_path] + args # Run subprocess with output streaming to console # This allows us to see all script output in real-time result = subprocess.run( cmd, cwd=self.project_root, # Don't capture output - let it stream to console stdout=None, # Inherits parent's stdout (console) stderr=None, # Inherits parent's stderr (console) text=True ) duration = time.time() - start_time if result.returncode == 0: logger.info(f"Finished '{name}' in {duration:.2f}s - SUCCESS") else: logger.error(f"Finished '{name}' in {duration:.2f}s - FAILED (Code {result.returncode})") except Exception as e: logger.error(f"Exception running '{name}': {str(e)}") def run_all(self): """Run all tasks that are due""" now = datetime.now() current_hour = now.hour current_minute = now.minute logger.info(f"Orchestrator tick: {now.strftime('%Y-%m-%d %H:%M:%S')}") for task in TASKS: # Check for specific hour schedule if 'run_at_hour' in task: target_hour = task['run_at_hour'] # Run only at the top of the hour (minute 0) if current_hour == target_hour and current_minute == 0: self.run_task(task) continue # Standard interval check interval = task.get('interval_minutes', 5) if interval > 0 and current_minute % interval == 0: self.run_task(task) def main(): parser = argparse.ArgumentParser(description='Ferrero Orchestrator') parser.add_argument('--force', action='store_true', help='Force run all tasks ignoring schedule') parser.add_argument('--daemon', action='store_true', help='Run in daemon mode (loop forever)') args = parser.parse_args() runner = TaskRunner() # Ensure only one instance of Orchestrator runs orchestrator_lock = os.path.join(runner.lock_dir, 'orchestrator_main.lock') with FileLock(orchestrator_lock) as acquired: if not acquired: logger.warning("Orchestrator is already running! Exiting.") sys.exit(0) if args.force: logger.info("Force run requested - running all tasks immediately") for task in TASKS: runner.run_task(task) elif args.daemon: logger.info("Starting Orchestrator in DAEMON mode (looping every 60s)") try: while True: runner.run_all() # Sleep until the start of the next minute to align with clock now = datetime.now() sleep_seconds = 60 - now.second time.sleep(sleep_seconds) except KeyboardInterrupt: logger.info("Daemon stopped by user") else: # Single run (cron mode) runner.run_all() if __name__ == '__main__': main()