#!/usr/bin/env python3 """ JSON Workflow Processor Converted from TWIST workflow definition Monitors dynamic client hot folders for JSON files and routes them based on client codes: - Structure: /data/PRODUCTION/JSON/[CLIENT_NAME]/file.json - Celtra projects → Celtra folder - Reckitt Benckiser → Monday RB folder - Rank → Monday Rank folder - All others → Archive only (to JSON_STORE/[CLIENT_NAME]/) Features: - Dynamic hot folder monitoring (hundreds of client folders) - JSON validation and parsing - Client-based routing for special clients - File archiving for all clients - Error notifications - Activity logging """ import json import logging import smtplib import shutil import time from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from pathlib import Path from typing import Dict, Optional, Any from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class Config: """Configuration settings""" # Paths HOT_FOLDER = Path("/data/PRODUCTION/JSON/") JSON_STORE = Path("/data/PRODUCTION/JSON_STORE/") SYNC_BASE = Path("/data/PRODUCTION/SYNC/MAKE/") # Client destinations DESTINATIONS = { "CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File", "RECKITTBENCKISER": SYNC_BASE / "Monday RB", "RANK": SYNC_BASE / "Monday Rank" } # Email settings (from original TWIST config) SMTP_SERVER = "smtp.mailgun.org" SMTP_PORT = 25 SMTP_USER = "twist@mail.dev.oliver.solutions" SMTP_PASSWORD = "A604D75B0CAADB20137B85299A433588839AC735F59FB3010E71F132F887CABBEB2A1ADE627ADCD608445CB058AB40B10361D0E881D7CDA4" SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency" ERROR_EMAIL = "daveporter@oliver.agency" # Monitoring settings POLL_INTERVAL = 5 # seconds WAIT_DELAY = 5 # seconds to wait for file stability class JSONWorkflowProcessor: """Main workflow processor class""" def __init__(self): self.setup_logging() self.ensure_directories() def setup_logging(self): """Setup logging configuration""" log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler('json_workflow.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) self.logger.info("JSON Workflow Processor initialized") def ensure_directories(self): """Create necessary directories if they don't exist""" directories = [ Config.HOT_FOLDER, Config.JSON_STORE, Config.SYNC_BASE ] + list(Config.DESTINATIONS.values()) for directory in directories: directory.mkdir(parents=True, exist_ok=True) self.logger.debug(f"Ensured directory exists: {directory}") def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]: """ Parse JSON file and extract key fields Returns dict with JobCategory, StudioCode, Title, ClientCode or None if invalid """ try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Extract fields from JobSpecification.JobDetails job_details = data.get("JobSpecification", {}).get("JobDetails", {}) extracted = { "JobCategory": job_details.get("JobCategory", ""), "StudioCode": job_details.get("StudioCode", ""), "Title": job_details.get("Title", ""), "ClientCode": job_details.get("ClientCode", "") } self.logger.info(f"Parsed JSON {file_path.name}: {extracted}") return extracted except json.JSONDecodeError as e: self.logger.error(f"JSON parsing error in {file_path.name}: {e}") return None except Exception as e: self.logger.error(f"Error reading {file_path.name}: {e}") return None def determine_routing(self, json_data: Dict[str, Any]) -> Optional[Path]: """ Determine destination path based on client code and job category Returns destination path or None for archive-only """ client_code = json_data.get("ClientCode", "").upper() job_category = json_data.get("JobCategory", "").upper() # Check for specific client routing if client_code in Config.DESTINATIONS: return Config.DESTINATIONS[client_code] # Check for Celtra by job category if job_category == "CELTRA": return Config.DESTINATIONS["CELTRA"] # No specific routing - archive only self.logger.info(f"No routing destination for ClientCode: {client_code}, JobCategory: {job_category}") return None def archive_file(self, file_path: Path, folder_name: str) -> bool: """Archive file to JSON_STORE/folder_name/""" try: archive_dir = Config.JSON_STORE / folder_name archive_dir.mkdir(parents=True, exist_ok=True) destination = archive_dir / file_path.name shutil.copy2(file_path, destination) self.logger.info(f"Archived {file_path.name} to {destination}") return True except Exception as e: self.logger.error(f"Failed to archive {file_path.name}: {e}") return False def route_file(self, file_path: Path, destination: Path) -> bool: """Route file to specific destination""" try: destination.mkdir(parents=True, exist_ok=True) dest_file = destination / file_path.name shutil.copy2(file_path, dest_file) self.logger.info(f"Routed {file_path.name} to {destination}") return True except Exception as e: self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}") return False def send_error_email(self, file_path: Path, error_msg: str): """Send error notification email""" try: msg = MIMEMultipart() msg['From'] = Config.SENDER_EMAIL msg['To'] = Config.ERROR_EMAIL msg['Subject'] = "JSON will not Parse" body = f"""Workflow: JSON_Parser Process: JSON_PARSE File: {file_path.name} will not parse Error: {error_msg} """ msg.attach(MIMEText(body, 'plain')) # Attach the problematic file with open(file_path, "rb") as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= {file_path.name}' ) msg.attach(part) # Send email server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT) server.starttls() server.login(Config.SMTP_USER, Config.SMTP_PASSWORD) server.sendmail(Config.SENDER_EMAIL, Config.ERROR_EMAIL, msg.as_string()) server.quit() self.logger.info(f"Error email sent for {file_path.name}") except Exception as e: self.logger.error(f"Failed to send error email: {e}") def process_file(self, file_path: Path): """Process a single JSON file""" self.logger.info(f"Processing file: {file_path.name}") # Wait for file stability time.sleep(Config.WAIT_DELAY) # Parse JSON json_data = self.parse_json_file(file_path) if json_data is None: self.send_error_email(file_path, "JSON parsing failed") return # Determine folder name for archiving (from hot folder structure) folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT" # Archive file (always done) archived = self.archive_file(file_path, folder_name) # Determine routing destination = self.determine_routing(json_data) if destination: # Route to specific destination routed = self.route_file(file_path, destination) if routed: self.logger.info(f"Successfully processed and routed {file_path.name}") else: self.logger.error(f"Routing failed for {file_path.name}") else: self.logger.info(f"File {file_path.name} archived only (no routing destination)") # Remove original file if processing was successful if archived: try: file_path.unlink() self.logger.info(f"Removed original file: {file_path.name}") except Exception as e: self.logger.error(f"Failed to remove original file {file_path.name}: {e}") class JSONFileHandler(FileSystemEventHandler): """File system event handler for monitoring JSON files""" def __init__(self, processor: JSONWorkflowProcessor): self.processor = processor self.logger = logging.getLogger(__name__) def on_created(self, event): """Handle file creation events""" if event.is_directory: return file_path = Path(event.src_path) # Only process JSON files if file_path.suffix.lower() == '.json': self.logger.info(f"New JSON file detected: {file_path.name}") self.processor.process_file(file_path) def on_moved(self, event): """Handle file move events (files moved into hot folder)""" if event.is_directory: return dest_path = Path(event.dest_path) # Only process JSON files if dest_path.suffix.lower() == '.json': self.logger.info(f"JSON file moved to hot folder: {dest_path.name}") self.processor.process_file(dest_path) def main(): """Main entry point""" processor = JSONWorkflowProcessor() # Setup file monitoring event_handler = JSONFileHandler(processor) observer = Observer() observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True) # Start monitoring observer.start() processor.logger.info(f"Started monitoring {Config.HOT_FOLDER}") try: while True: time.sleep(Config.POLL_INTERVAL) except KeyboardInterrupt: processor.logger.info("Stopping JSON Workflow Processor...") observer.stop() observer.join() processor.logger.info("JSON Workflow Processor stopped") if __name__ == "__main__": main()