#!/usr/bin/env python3 """ Ferrero Orchestrator -------------------- Master runner script that manages the execution of periodic tasks. - Runs via cron (e.g., every minute) - Checks if tasks are due to run based on interval - Ensures only ONE instance of a task runs at a time (using file locks) - Logs all activities """ import os import sys import time import logging import subprocess import fcntl import argparse from datetime import datetime, timedelta from pathlib import Path # Setup logging os.makedirs('logs', exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/orchestrator.log'), logging.StreamHandler() ] ) logger = logging.getLogger('Orchestrator') # ========================================== # CONFIGURATION # ========================================== # List of tasks to manage # interval_minutes: How often to try running it # script: Path relative to project root TASKS = [ { 'name': 'A1->A2 Box Uploader', 'script': 'scripts/a1_to_a2_box_uploader.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': ['--auth-pfx-v2'] # Production uses mTLS V2 }, { 'name': 'A2->A3 Upload Polling', 'script': 'scripts/a2_to_a3_upload_polling.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': ['--auth-pfx-v2'] # Production uses mTLS V2 }, { 'name': 'A4 Box Uploader', 'script': 'scripts/a4_box_uploader.py', 'interval_minutes': 10, 'args': ['--auth-pfx-v2'] # Production uses mTLS V2 }, { 'name': 'A4 Webhook Monitor', 'script': 'scripts/a4_webhook_monitor.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': ['--auth-pfx-v2'] # Production uses mTLS V2 }, { 'name': 'A5->A6 Download', 'script': 'scripts/a5_to_a6_download.py', 'interval_minutes': 3, # Changed from 5 to 3 minutes 'args': ['--auth-pfx-v2'] # Production uses mTLS V2 }, { 'name': 'B1->B2 Download', 'script': 'scripts/b1_to_b2_download.py', 'interval_minutes': 10, 'args': ['--auth-pfx-v2'] # Production uses mTLS V2 }, { 'name': 'B4 Box Uploader', 'script': 'scripts/b4_box_uploader.py', 'interval_minutes': 10, 'args': ['--auth-pfx-v2'] # Production uses mTLS V2 }, { 'name': 'Daily Report', 'script': 'scripts/daily_report.py', 'interval_minutes': 0, # Special case: use run_at_hour 'run_at_hour': 19, # Run at 7 PM 'args': [] } ] # ========================================== # OFF-HOURS CONFIGURATION # ========================================== # Off-hours definition OFF_HOURS_CONFIG = { 'enabled': True, # Set to False to disable off-hours slowdown 'extra_minutes': 30, # Minutes to add to intervals during off-hours # Late night: 10 PM (22:00) to 5 AM (05:00) every day 'late_night_start': 22, # Hour (0-23) 'late_night_end': 5, # Hour (0-23) # Weekend: All day Saturday and Sunday 'weekend_days': [5, 6], # 0=Monday, 5=Saturday, 6=Sunday # Tasks exempt from off-hours slowdown (always run at normal cadence) 'exempt_tasks': [ 'Daily Report' # Task name to exclude (runs at 7 PM regardless) ] } LOCK_DIR = 'locks' STATE_FILE = 'orchestrator_state.json' # ========================================== # OFF-HOURS DETECTION # ========================================== def is_off_hours(now=None): """ Determine if current time is in off-hours period Args: now: datetime object (defaults to current time) Returns: bool: True if in off-hours, False otherwise """ if not OFF_HOURS_CONFIG['enabled']: return False if now is None: now = datetime.now() current_hour = now.hour current_weekday = now.weekday() # 0=Monday, 6=Sunday # Check if weekend (all day Saturday or Sunday) if current_weekday in OFF_HOURS_CONFIG['weekend_days']: logger.debug("Off-hours: Weekend (day {})".format(current_weekday)) return True # Check if late night late_night_start = OFF_HOURS_CONFIG['late_night_start'] late_night_end = OFF_HOURS_CONFIG['late_night_end'] if late_night_start > late_night_end: # Wraps around midnight (e.g., 22:00 to 5:00) is_late_night = current_hour >= late_night_start or current_hour < late_night_end else: # Same day range (e.g., 1:00 to 5:00) is_late_night = late_night_start <= current_hour < late_night_end if is_late_night: logger.debug("Off-hours: Late night (hour {})".format(current_hour)) return True logger.debug("Business hours (hour {}, weekday {})".format(current_hour, current_weekday)) return False # ========================================== # CORE CLASSES # ========================================== class FileLock: """Context manager for file locking to prevent concurrent runs""" def __init__(self, lock_file): self.lock_file = lock_file self.fp = None def __enter__(self): self.fp = open(self.lock_file, 'w') try: fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) return True except IOError: return False def __exit__(self, exc_type, exc_value, traceback): if self.fp: try: fcntl.lockf(self.fp, fcntl.LOCK_UN) self.fp.close() except: pass class TaskRunner: def __init__(self): self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.lock_dir = os.path.join(self.project_root, LOCK_DIR) os.makedirs(self.lock_dir, exist_ok=True) def run_task(self, task): """Run a single task if lock can be acquired""" name = task['name'] script_rel_path = task['script'] script_path = os.path.join(self.project_root, script_rel_path) args = task.get('args', []) # Create a safe filename for the lock safe_name = name.lower().replace(' ', '_').replace('->', '_to_') lock_file = os.path.join(self.lock_dir, f"{safe_name}.lock") # Try to acquire lock with FileLock(lock_file) as acquired: if not acquired: logger.info(f"Skipping '{name}' - already running (Locked)") return logger.info(f"Starting '{name}'...") start_time = time.time() try: # Determine python executable # Prefer venv/bin/python if it exists relative to project root venv_python = os.path.join(self.project_root, 'venv', 'bin', 'python') if os.path.exists(venv_python): python_exe = venv_python else: python_exe = sys.executable # Construct command cmd = [python_exe, script_path] + args # Run subprocess with output streaming to console # This allows us to see all script output in real-time result = subprocess.run( cmd, cwd=self.project_root, # Don't capture output - let it stream to console stdout=None, # Inherits parent's stdout (console) stderr=None, # Inherits parent's stderr (console) text=True ) duration = time.time() - start_time if result.returncode == 0: logger.info(f"Finished '{name}' in {duration:.2f}s - SUCCESS") else: logger.error(f"Finished '{name}' in {duration:.2f}s - FAILED (Code {result.returncode})") except Exception as e: logger.error(f"Exception running '{name}': {str(e)}") def run_all(self): """Run all tasks that are due""" now = datetime.now() current_hour = now.hour current_minute = now.minute # Determine if we're in off-hours in_off_hours = is_off_hours(now) if in_off_hours: logger.info("=" * 80) logger.info("Orchestrator tick: {} [OFF-HOURS MODE]".format(now.strftime('%Y-%m-%d %H:%M:%S'))) logger.info("Adding {} minutes to all task intervals".format(OFF_HOURS_CONFIG['extra_minutes'])) logger.info("=" * 80) else: logger.info("Orchestrator tick: {} [NORMAL MODE]".format(now.strftime('%Y-%m-%d %H:%M:%S'))) for task in TASKS: task_name = task['name'] # Check for specific hour schedule (e.g., Daily Report at 7 PM) if 'run_at_hour' in task: target_hour = task['run_at_hour'] # Run only at the top of the hour (minute 0) if current_hour == target_hour and current_minute == 0: logger.info("Scheduled task '{}' due at {}:00".format(task_name, target_hour)) self.run_task(task) continue # Standard interval check with off-hours adjustment base_interval = task.get('interval_minutes', 5) # Check if task is exempt from off-hours slowdown is_exempt = task_name in OFF_HOURS_CONFIG['exempt_tasks'] # In off-hours, skip non-exempt tasks unless they match the extended interval if in_off_hours and not is_exempt: # Task should run if: # 1. Current minute matches base interval (normal check) # 2. AND we're at a 30-minute boundary (0 or 30) if base_interval > 0: matches_interval = current_minute % base_interval == 0 at_boundary = current_minute % 30 == 0 if matches_interval and at_boundary: logger.info("Task '{}' due (off-hours: {}min + 30min cadence)".format( task_name, base_interval )) self.run_task(task) else: # Normal business hours OR exempt task if base_interval > 0 and current_minute % base_interval == 0: logger.info("Task '{}' due ({}min interval)".format(task_name, base_interval)) self.run_task(task) def main(): parser = argparse.ArgumentParser(description='Ferrero Orchestrator') parser.add_argument('--force', action='store_true', help='Force run all tasks ignoring schedule') parser.add_argument('--daemon', action='store_true', help='Run in daemon mode (loop forever)') args = parser.parse_args() runner = TaskRunner() # Ensure only one instance of Orchestrator runs orchestrator_lock = os.path.join(runner.lock_dir, 'orchestrator_main.lock') with FileLock(orchestrator_lock) as acquired: if not acquired: logger.warning("Orchestrator is already running! Exiting.") sys.exit(0) if args.force: logger.info("Force run requested - running all tasks immediately") for task in TASKS: runner.run_task(task) elif args.daemon: logger.info("Starting Orchestrator in DAEMON mode (looping every 60s)") try: while True: runner.run_all() # Sleep until the start of the next minute to align with clock now = datetime.now() sleep_seconds = 60 - now.second time.sleep(sleep_seconds) except KeyboardInterrupt: logger.info("Daemon stopped by user") else: # Single run (cron mode) runner.run_all() if __name__ == '__main__': main()