#!/usr/bin/env python3 """ JSON Workflow Processor Converted from TWIST workflow definition Monitors dynamic client hot folders for JSON files and routes them based on client codes: - Structure: /data/PRODUCTION/JSON/[CLIENT_NAME]/file.json - Celtra projects → Celtra folder - Reckitt Benckiser → Monday RB folder - Rank → Monday Rank folder - All others → Archive only (to JSON_STORE/[CLIENT_NAME]/) Features: - Dynamic hot folder monitoring (hundreds of client folders) - JSON validation and parsing - Client-based routing for special clients - File archiving for all clients - Error notifications - Activity logging """ import json import logging import smtplib import shutil import time from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from pathlib import Path from typing import Dict, Optional, Any from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class Config: """Configuration settings""" # Paths HOT_FOLDER = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON") JSON_STORE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON_STORE/") SYNC_BASE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/SYNC/MAKE") # Clients eligible for Celtra projects (based on JobCategory) # Only these clients can have JobCategory="Celtra" - add others as needed CELTRA_ELIGIBLE_CLIENTS = { "CIBC", "OLIVER", "ADIDAS", "PAYPAL", "RECKITTBENCKISER", "BAYER", "3M", "RANK" } # Client destinations DESTINATIONS = { "CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File", "RECKITTBENCKISER": SYNC_BASE / "Monday RB", "RANK": SYNC_BASE / "Monday Rank" } # Email settings (from original TWIST config) SMTP_SERVER = "smtp.mailgun.org" SMTP_PORT = 25 SMTP_USER = "twist@mail.dev.oliver.solutions" SMTP_PASSWORD = "A604D75B0CAADB20137B85299A433588839AC735F59FB3010E71F132F887CABBEB2A1ADE627ADCD608445CB058AB40B10361D0E881D7CDA4" SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency" ERROR_EMAIL = "daveporter@oliver.agency" # Monitoring settings POLL_INTERVAL = 5 # seconds WAIT_DELAY = 5 # seconds to wait for file stability class JSONWorkflowProcessor: """Main workflow processor class""" def __init__(self): self.setup_logging() self.ensure_directories() def setup_logging(self): """Setup logging configuration""" log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler('json_workflow.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) self.logger.info("JSON Workflow Processor initialized") def ensure_directories(self): """Create necessary directories if they don't exist""" directories = [ Config.HOT_FOLDER, Config.JSON_STORE, Config.SYNC_BASE ] + list(Config.DESTINATIONS.values()) for directory in directories: directory.mkdir(parents=True, exist_ok=True) self.logger.debug(f"Ensured directory exists: {directory}") def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]: """ Parse JSON file and extract key fields Returns dict with JobCategory, StudioCode, Title, ClientCode or None if invalid """ try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Extract fields from JobSpecification.JobDetails job_details = data.get("JobSpecification", {}).get("JobDetails", {}) extracted = { "JobCategory": job_details.get("JobCategory", ""), "StudioCode": job_details.get("StudioCode", ""), "Title": job_details.get("Title", ""), "ClientCode": job_details.get("ClientCode", "") } self.logger.info(f"Parsed JSON {file_path.name}: {extracted}") return extracted except json.JSONDecodeError as e: self.logger.error(f"JSON parsing error in {file_path.name}: {e}") return None except Exception as e: self.logger.error(f"Error reading {file_path.name}: {e}") return None def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]: """ Determine destination paths based on client code and job category Returns list of destination paths (can be multiple for RANK/RECKITTBENCKISER with Celtra JobCategory) """ client_code = json_data.get("ClientCode", "").upper() job_category = json_data.get("JobCategory", "").upper() destinations = [] # Check for specific client routing by ClientCode if client_code in Config.DESTINATIONS: destinations.append(Config.DESTINATIONS[client_code]) # Check for Celtra by JobCategory (only for eligible clients) if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS: # Add Celtra destination if not already added celtra_dest = Config.DESTINATIONS["CELTRA"] if celtra_dest not in destinations: destinations.append(celtra_dest) self.logger.info(f"Celtra project detected for eligible client: {client_folder}") elif job_category == "CELTRA" and client_folder.upper() not in Config.CELTRA_ELIGIBLE_CLIENTS: self.logger.warning(f"Celtra JobCategory found but {client_folder} not in eligible clients list") if not destinations: self.logger.info(f"No routing destination for ClientCode: {client_code}, JobCategory: {job_category}, Client: {client_folder}") return destinations def archive_file(self, file_path: Path, folder_name: str) -> bool: """Archive file to JSON_STORE/folder_name/""" try: archive_dir = Config.JSON_STORE / folder_name archive_dir.mkdir(parents=True, exist_ok=True) destination = archive_dir / file_path.name shutil.copy2(file_path, destination) self.logger.info(f"Archived {file_path.name} to {destination}") return True except Exception as e: self.logger.error(f"Failed to archive {file_path.name}: {e}") return False def route_file(self, file_path: Path, destination: Path) -> bool: """Route file to specific destination""" try: destination.mkdir(parents=True, exist_ok=True) dest_file = destination / file_path.name shutil.copy2(file_path, dest_file) self.logger.info(f"Routed {file_path.name} to {destination}") return True except Exception as e: self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}") return False def send_error_email(self, file_path: Path, error_msg: str): """Send error notification email""" try: msg = MIMEMultipart() msg['From'] = Config.SENDER_EMAIL msg['To'] = Config.ERROR_EMAIL msg['Subject'] = "JSON will not Parse" body = f"""Workflow: JSON_Parser Process: JSON_PARSE File: {file_path.name} will not parse Error: {error_msg} """ msg.attach(MIMEText(body, 'plain')) # Attach the problematic file with open(file_path, "rb") as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= {file_path.name}' ) msg.attach(part) # Send email server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT) server.starttls() server.login(Config.SMTP_USER, Config.SMTP_PASSWORD) server.sendmail(Config.SENDER_EMAIL, Config.ERROR_EMAIL, msg.as_string()) server.quit() self.logger.info(f"Error email sent for {file_path.name}") except Exception as e: self.logger.error(f"Failed to send error email: {e}") def process_file(self, file_path: Path): """Process a single JSON file""" self.logger.info(f"Processing file: {file_path.name}") # Wait for file stability time.sleep(Config.WAIT_DELAY) # Parse JSON json_data = self.parse_json_file(file_path) if json_data is None: self.send_error_email(file_path, "JSON parsing failed") return # Determine folder name for storing (from hot folder structure) folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT" # Store file to JSON_STORE (always done for all clients) stored = self.archive_file(file_path, folder_name) # Determine routing for special clients destinations = self.determine_routing(json_data, folder_name) routed_count = 0 total_destinations = len(destinations) if destinations: # Special clients: route to all applicable destinations for destination in destinations: if self.route_file(file_path, destination): routed_count += 1 self.logger.info(f"Routed {file_path.name} to {destination.name}") else: self.logger.error(f"Routing failed for {file_path.name} to {destination.name}") all_routed = routed_count == total_destinations if all_routed: self.logger.info(f"Successfully routed {file_path.name} to {total_destinations} destination(s)") else: self.logger.error(f"Only {routed_count}/{total_destinations} destinations succeeded for {file_path.name}") else: all_routed = True # No routing needed self.logger.info(f"Regular client: {file_path.name} stored to JSON_STORE only") # Remove original file if processing was successful # For special clients: both stored AND all routing must succeed # For regular clients: only stored must succeed processing_successful = stored and all_routed if processing_successful: try: file_path.unlink() self.logger.info(f"Removed original JSON file: {file_path.name}") except Exception as e: self.logger.error(f"Failed to remove original file {file_path.name}: {e}") else: self.logger.error(f"Processing failed for {file_path.name} - file NOT removed") class JSONFileHandler(FileSystemEventHandler): """File system event handler for monitoring JSON files""" def __init__(self, processor: JSONWorkflowProcessor): self.processor = processor self.logger = logging.getLogger(__name__) def on_created(self, event): """Handle file creation events""" if event.is_directory: return file_path = Path(event.src_path) # Only process JSON files if file_path.suffix.lower() == '.json': self.logger.info(f"New JSON file detected: {file_path.name}") self.processor.process_file(file_path) def on_moved(self, event): """Handle file move events (files moved into hot folder)""" if event.is_directory: return dest_path = Path(event.dest_path) # Only process JSON files if dest_path.suffix.lower() == '.json': self.logger.info(f"JSON file moved to hot folder: {dest_path.name}") self.processor.process_file(dest_path) def main(): """Main entry point""" processor = JSONWorkflowProcessor() # Setup file monitoring event_handler = JSONFileHandler(processor) observer = Observer() observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True) # Start monitoring observer.start() processor.logger.info(f"Started monitoring {Config.HOT_FOLDER}") try: while True: time.sleep(Config.POLL_INTERVAL) except KeyboardInterrupt: processor.logger.info("Stopping JSON Workflow Processor...") observer.stop() observer.join() processor.logger.info("JSON Workflow Processor stopped") if __name__ == "__main__": main()