#!/usr/bin/env python3 """ CreativeX Service Runner ------------------------ Dedicated runner for the CreativeX scoring script. - Runs independently of the main orchestrator - Uses its own file lock to prevent concurrent runs - Logs to logs/creativex_service.log """ import os import sys import time import logging import subprocess import fcntl import argparse from datetime import datetime # Setup logging os.makedirs('logs', exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/creativex_service.log'), logging.StreamHandler() ] ) logger = logging.getLogger('CreativeXService') # Configuration SCRIPT_REL_PATH = 'scripts/creativex_scoring_storing.py' LOCK_FILE = 'locks/creativex_service.lock' INTERVAL_MINUTES = 15 class FileLock: """Context manager for file locking""" def __init__(self, lock_file): self.lock_file = lock_file self.fp = None def __enter__(self): self.fp = open(self.lock_file, 'w') try: fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) return True except IOError: return False def __exit__(self, exc_type, exc_value, traceback): if self.fp: try: fcntl.lockf(self.fp, fcntl.LOCK_UN) self.fp.close() except: pass def run_service(project_root): """Run the service loop""" script_path = os.path.join(project_root, SCRIPT_REL_PATH) lock_path = os.path.join(project_root, LOCK_FILE) # Ensure lock directory exists os.makedirs(os.path.dirname(lock_path), exist_ok=True) logger.info(f"Starting CreativeX Service (Interval: {INTERVAL_MINUTES}m)") while True: try: # Try to acquire lock with FileLock(lock_path) as acquired: if not acquired: logger.warning("Service already running (Locked). Skipping this cycle.") else: logger.info("Starting execution cycle...") start_time = time.time() # Determine python executable venv_python = os.path.join(project_root, 'venv', 'bin', 'python') python_exe = venv_python if os.path.exists(venv_python) else sys.executable # Run the script cmd = [python_exe, script_path] result = subprocess.run( cmd, cwd=project_root, stdout=None, # Stream to console stderr=None, text=True ) duration = time.time() - start_time if result.returncode == 0: logger.info(f"Cycle completed successfully in {duration:.2f}s") else: logger.error(f"Cycle failed with code {result.returncode} in {duration:.2f}s") # Wait for next interval logger.info(f"Sleeping for {INTERVAL_MINUTES} minutes...") time.sleep(INTERVAL_MINUTES * 60) except KeyboardInterrupt: logger.info("Service stopped by user") break except Exception as e: logger.error(f"Unexpected error: {str(e)}") time.sleep(60) # Wait a bit before retrying on error def main(): project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) run_service(project_root) if __name__ == '__main__': main()