#!/usr/bin/env python3 """ JSON Workflow Processor - STARTUP SCAN VERSION Processes existing files on startup + monitors for new files Features: - Scans all client folders on startup for existing JSON files - Processes backlog before starting real-time monitoring - Concurrent processing with detailed reporting - Handles crash recovery gracefully """ import json import logging import smtplib import shutil import time import threading import schedule from collections import defaultdict, Counter from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, date, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from pathlib import Path from typing import Dict, Optional, Any, List from queue import Queue from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class Config: """Configuration settings""" # Paths HOT_FOLDER = Path("/data/PRODUCTION/JSON") JSON_STORE = Path("/data/PRODUCTION/JSON_STORE/") SYNC_BASE = Path("/data/PRODUCTION/SYNC/MAKE") REPORTS_DIR = Path("/PRODUCTION/JSON_PARSER_LOGS") # Clients eligible for Celtra projects (based on JobCategory) CELTRA_ELIGIBLE_CLIENTS = { "CIBC", "OLIVER", "ADIDAS", "PAYPAL", "RECKITTBENCKISER", "BAYER", "3M", "RANK" } # Client destinations DESTINATIONS = { "CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File", "RECKITTBENCKISER": SYNC_BASE / "Monday RB", "RANK": SYNC_BASE / "Monday Rank" } # Email settings SMTP_SERVER = "smtp.mailgun.org" SMTP_PORT = 587 SMTP_USER = "twist@mail.dev.oliver.solutions" SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71" SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency" ERROR_EMAIL = "daveporter@oliver.agency" REPORT_EMAILS = ["daveporter@oliver.agency"] # Batch processing settings BATCH_SIZE = 10 MAX_WORKERS = 10 BATCH_TIMEOUT = 30 # Monitoring settings POLL_INTERVAL = 2 WAIT_DELAY = 3 # Reporting settings DAILY_REPORT_TIME = "00:00" KEEP_REPORTS_DAYS = 30 class DailyStats: """Thread-safe daily statistics collector""" def __init__(self): self.lock = threading.Lock() self.reset_stats() self.start_time = datetime.now() def reset_stats(self): """Reset daily statistics""" with self.lock: self.date = date.today() self.total_processed = 0 self.total_errors = 0 self.total_batches = 0 self.startup_files_processed = 0 # Client breakdown self.client_stats = defaultdict(lambda: { 'processed': 0, 'routed': 0, 'celtra': 0, 'errors': 0, 'job_categories': Counter(), 'studio_codes': Counter() }) # Destination breakdown self.destination_stats = defaultdict(int) # Performance metrics self.processing_times = [] self.batch_sizes = [] # Error tracking self.error_details = [] # Hourly breakdown self.hourly_stats = defaultdict(int) def record_file_processed(self, client: str, json_data: Dict[str, Any], destinations: List[Path], processing_time: float, success: bool = True, error: str = None, is_startup: bool = False): """Record a processed file""" with self.lock: hour = datetime.now().hour self.hourly_stats[hour] += 1 if is_startup: self.startup_files_processed += 1 if success: self.total_processed += 1 self.client_stats[client]['processed'] += 1 # Track job categories and studio codes job_category = json_data.get('JobCategory', 'Unknown') studio_code = json_data.get('StudioCode', 'Unknown') self.client_stats[client]['job_categories'][job_category] += 1 self.client_stats[client]['studio_codes'][studio_code] += 1 # Track routing if destinations: self.client_stats[client]['routed'] += 1 for dest in destinations: self.destination_stats[dest.name] += 1 if 'Celtra' in dest.name: self.client_stats[client]['celtra'] += 1 # Track performance self.processing_times.append(processing_time) else: self.total_errors += 1 self.client_stats[client]['errors'] += 1 if error: self.error_details.append({ 'time': datetime.now().strftime('%H:%M:%S'), 'client': client, 'error': error }) def record_batch(self, batch_size: int): """Record a completed batch""" with self.lock: self.total_batches += 1 self.batch_sizes.append(batch_size) def get_stats_summary(self) -> Dict[str, Any]: """Get current statistics summary""" with self.lock: avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0 avg_batch_size = sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0 return { 'date': self.date.strftime('%Y-%m-%d'), 'uptime': str(datetime.now() - self.start_time).split('.')[0], 'total_processed': self.total_processed, 'startup_files_processed': self.startup_files_processed, 'total_errors': self.total_errors, 'total_batches': self.total_batches, 'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0, 'avg_processing_time': round(avg_processing_time, 2), 'avg_batch_size': round(avg_batch_size, 1), 'client_stats': dict(self.client_stats), 'destination_stats': dict(self.destination_stats), 'hourly_stats': dict(self.hourly_stats), 'error_details': self.error_details.copy() } class StartupScanProcessor: """JSON processor with startup scan capability""" def __init__(self): self.setup_logging() self.ensure_directories() self.file_queue = Queue() self.daily_stats = DailyStats() self.startup_complete = False self.setup_daily_reporting() def setup_logging(self): """Setup logging configuration""" log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s' log_dir = Path("/PRODUCTION/JSON_PARSER_LOGS") log_dir.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler(log_dir / 'json_workflow_startup_scan.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) self.logger.info("Startup Scan JSON Workflow Processor initialized") def ensure_directories(self): """Create necessary directories""" directories = [ Config.HOT_FOLDER, Config.JSON_STORE, Config.SYNC_BASE, Config.REPORTS_DIR ] + list(Config.DESTINATIONS.values()) for directory in directories: directory.mkdir(parents=True, exist_ok=True) def setup_daily_reporting(self): """Setup scheduled daily reporting""" schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report) def run_scheduler(): while True: schedule.run_pending() time.sleep(60) scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True) scheduler_thread.start() def scan_existing_files(self) -> List[Path]: """Scan all client folders for existing JSON files""" existing_files = [] try: self.logger.info("🔍 Scanning for existing JSON files...") if not Config.HOT_FOLDER.exists(): self.logger.warning(f"Hot folder does not exist: {Config.HOT_FOLDER}") return existing_files # Scan all subdirectories for JSON files for client_folder in Config.HOT_FOLDER.iterdir(): if client_folder.is_dir(): client_name = client_folder.name json_files = list(client_folder.glob("*.json")) if json_files: self.logger.info(f"📁 Found {len(json_files)} JSON files in {client_name}/") existing_files.extend(json_files) else: self.logger.debug(f"📁 No JSON files in {client_name}/") self.logger.info(f"🔍 Startup scan complete: Found {len(existing_files)} existing JSON files") return existing_files except Exception as e: self.logger.error(f"Error during startup scan: {e}") return existing_files def process_startup_files(self, files: List[Path]): """Process existing files found during startup scan""" if not files: self.logger.info("✅ No existing files to process") return self.logger.info(f"🚀 Processing {len(files)} existing files from startup scan...") # Process in batches batch_size = Config.BATCH_SIZE for i in range(0, len(files), batch_size): batch = files[i:i + batch_size] self.logger.info(f"Processing startup batch {i//batch_size + 1}: {len(batch)} files") self.process_batch(batch, is_startup=True) self.logger.info(f"✅ Startup file processing complete: {len(files)} files processed") def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]: """Parse JSON file and extract key fields""" try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) job_details = data.get("JobSpecification", {}).get("JobDetails", {}) extracted = { "JobCategory": job_details.get("JobCategory", ""), "StudioCode": job_details.get("StudioCode", ""), "Title": job_details.get("Title", ""), "ClientCode": job_details.get("ClientCode", "") } return extracted except json.JSONDecodeError as e: self.logger.error(f"JSON parsing error in {file_path.name}: {e}") return None except Exception as e: self.logger.error(f"Error reading {file_path.name}: {e}") return None def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]: """Determine destination paths based on client code and job category""" client_code = json_data.get("ClientCode", "").upper() job_category = json_data.get("JobCategory", "").upper() destinations = [] # Check for specific client routing by ClientCode if client_code in Config.DESTINATIONS: destinations.append(Config.DESTINATIONS[client_code]) # Check for Celtra by JobCategory (only for eligible clients) if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS: celtra_dest = Config.DESTINATIONS["CELTRA"] if celtra_dest not in destinations: destinations.append(celtra_dest) return destinations def store_file(self, file_path: Path, folder_name: str) -> bool: """Store file to JSON_STORE with 777 permissions""" try: store_dir = Config.JSON_STORE / folder_name store_dir.mkdir(parents=True, exist_ok=True) # Set directory permissions to 777 store_dir.chmod(0o777) destination = store_dir / file_path.name shutil.copy2(file_path, destination) # Set file permissions to 777 destination.chmod(0o777) return True except Exception as e: self.logger.error(f"Failed to store {file_path.name}: {e}") return False def route_file(self, file_path: Path, destination: Path) -> bool: """Route file to specific destination with 777 permissions""" try: destination.mkdir(parents=True, exist_ok=True) # Set directory permissions to 777 destination.chmod(0o777) dest_file = destination / file_path.name shutil.copy2(file_path, dest_file) # Set file permissions to 777 dest_file.chmod(0o777) return True except Exception as e: self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}") return False def process_single_file(self, file_path: Path, is_startup: bool = False) -> Dict[str, Any]: """Process a single JSON file with timing""" start_time = time.time() thread_name = threading.current_thread().name startup_tag = "[STARTUP]" if is_startup else "" try: self.logger.info(f"[{thread_name}] {startup_tag} Processing: {file_path.name}") if not is_startup: time.sleep(Config.WAIT_DELAY) # Parse JSON json_data = self.parse_json_file(file_path) if json_data is None: error_msg = "JSON parsing failed" folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT" processing_time = time.time() - start_time self.daily_stats.record_file_processed( folder_name, {}, [], processing_time, False, error_msg, is_startup ) return {'success': False, 'error': error_msg} # Determine folder name folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT" # Store file stored = self.store_file(file_path, folder_name) # Route file destinations = self.determine_routing(json_data, folder_name) routed_count = 0 for destination in destinations: if self.route_file(file_path, destination): routed_count += 1 success = stored and (routed_count == len(destinations) if destinations else True) processing_time = time.time() - start_time # Record statistics self.daily_stats.record_file_processed( folder_name, json_data, destinations, processing_time, success, None if success else "Processing failed", is_startup ) if success: file_path.unlink() route_info = f" + {len(destinations)} routes" if destinations else "" self.logger.info(f"[{thread_name}] {startup_tag} ✅ {file_path.name}{route_info} ({processing_time:.2f}s)") return {'success': success} except Exception as e: processing_time = time.time() - start_time folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT" self.daily_stats.record_file_processed( folder_name, {}, [], processing_time, False, str(e), is_startup ) self.logger.error(f"[{thread_name}] {startup_tag} Error processing {file_path.name}: {e}") return {'success': False, 'error': str(e)} def process_batch(self, file_paths: List[Path], is_startup: bool = False): """Process a batch of files concurrently""" if not file_paths: return batch_start = time.time() startup_tag = "[STARTUP]" if is_startup else "" self.logger.info(f"{startup_tag} Processing batch of {len(file_paths)} files") with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor: future_to_file = { executor.submit(self.process_single_file, file_path, is_startup): file_path for file_path in file_paths } for future in as_completed(future_to_file): try: future.result() except Exception as e: file_path = future_to_file[future] self.logger.error(f"{startup_tag} Batch processing error for {file_path.name}: {e}") batch_time = time.time() - batch_start self.daily_stats.record_batch(len(file_paths)) self.logger.info(f"{startup_tag} Batch completed: {len(file_paths)} files in {batch_time:.2f}s") def queue_file(self, file_path: Path): """Add file to processing queue (only for real-time files)""" if self.startup_complete: self.file_queue.put(file_path) def batch_processor_worker(self): """Background worker for batch processing (real-time files only)""" while True: files_to_process = [] batch_start_time = time.time() while (len(files_to_process) < Config.BATCH_SIZE and (time.time() - batch_start_time) < Config.BATCH_TIMEOUT): try: file_path = self.file_queue.get(timeout=1.0) files_to_process.append(file_path) self.file_queue.task_done() except: if files_to_process: break continue if files_to_process and self.startup_complete: self.process_batch(files_to_process, is_startup=False) def generate_daily_report(self): """Generate comprehensive daily report""" try: stats = self.daily_stats.get_stats_summary() # Generate report content report_content = self.format_daily_report(stats) # Save to file report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt" with open(report_file, 'w') as f: f.write(report_content) # Email report self.email_daily_report(report_content, stats['date']) # Clean old reports self.cleanup_old_reports() # Reset stats for new day self.daily_stats.reset_stats() self.logger.info(f"Daily report generated: {report_file}") except Exception as e: self.logger.error(f"Failed to generate daily report: {e}") def format_daily_report(self, stats: Dict[str, Any]) -> str: """Format daily statistics into readable report""" startup_info = f"Startup Files Processed: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else "" report = f""" JSON WORKFLOW DAILY REPORT - STARTUP SCAN VERSION Date: {stats['date']} Uptime: {stats['uptime']} === SUMMARY === Total Files Processed: {stats['total_processed']} {startup_info}Total Errors: {stats['total_errors']} Success Rate: {stats['success_rate']:.1f}% Total Batches: {stats['total_batches']} Average Processing Time: {stats['avg_processing_time']}s Average Batch Size: {stats['avg_batch_size']} files === CLIENT BREAKDOWN === """ for client, client_data in sorted(stats['client_stats'].items()): report += f"\n{client}:\n" report += f" Processed: {client_data['processed']}\n" report += f" Routed: {client_data['routed']}\n" report += f" Celtra Projects: {client_data['celtra']}\n" report += f" Errors: {client_data['errors']}\n" if client_data['job_categories']: report += f" Job Categories: {dict(client_data['job_categories'])}\n" report += "\n=== DESTINATION BREAKDOWN ===\n" for dest, count in sorted(stats['destination_stats'].items()): report += f"{dest}: {count} files\n" report += "\n=== HOURLY BREAKDOWN ===\n" for hour in range(24): count = stats['hourly_stats'].get(hour, 0) if count > 0: report += f"{hour:02d}:00 - {count} files\n" if stats['error_details']: report += "\n=== ERROR DETAILS ===\n" for error in stats['error_details'][-10:]: report += f"{error['time']} - {error['client']}: {error['error']}\n" return report def email_daily_report(self, report_content: str, report_date: str): """Email the daily report""" try: msg = MIMEMultipart() msg['From'] = Config.SENDER_EMAIL msg['To'] = ', '.join(Config.REPORT_EMAILS) msg['Subject'] = f"JSON Workflow Daily Report - {report_date}" msg.attach(MIMEText(report_content, 'plain')) server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT) server.starttls() server.login(Config.SMTP_USER, Config.SMTP_PASSWORD) server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string()) server.quit() self.logger.info("Daily report emailed successfully") except Exception as e: self.logger.error(f"Failed to email daily report: {e}") def cleanup_old_reports(self): """Remove old report files""" try: cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS) for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"): try: file_date_str = report_file.stem.split('_')[-1] file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date() if file_date < cutoff_date: report_file.unlink() self.logger.debug(f"Deleted old report: {report_file}") except Exception: pass except Exception as e: self.logger.error(f"Failed to cleanup old reports: {e}") def get_current_stats(self) -> str: """Get current statistics as formatted string""" stats = self.daily_stats.get_stats_summary() startup_info = f" (startup: {stats['startup_files_processed']})" if stats['startup_files_processed'] > 0 else "" return f"Today: {stats['total_processed']} processed{startup_info}, {stats['total_errors']} errors, {stats['success_rate']:.1f}% success" class StartupScanFileHandler(FileSystemEventHandler): """File system event handler that only processes after startup scan""" def __init__(self, processor: StartupScanProcessor): self.processor = processor self.logger = logging.getLogger(__name__) def on_created(self, event): if event.is_directory or not self.processor.startup_complete: return file_path = Path(event.src_path) if file_path.suffix.lower() == '.json': self.logger.debug(f"New file detected: {file_path.name}") self.processor.queue_file(file_path) def on_moved(self, event): if event.is_directory or not self.processor.startup_complete: return dest_path = Path(event.dest_path) if dest_path.suffix.lower() == '.json': self.logger.debug(f"File moved to hot folder: {dest_path.name}") self.processor.queue_file(dest_path) def main(): """Main entry point with startup scan""" processor = StartupScanProcessor() # Step 1: Scan and process existing files existing_files = processor.scan_existing_files() processor.process_startup_files(existing_files) # Step 2: Mark startup complete and begin real-time monitoring processor.startup_complete = True processor.logger.info("🎯 Startup scan complete - switching to real-time monitoring") # Start batch processing worker for real-time files batch_worker = threading.Thread( target=processor.batch_processor_worker, name="BatchWorker", daemon=True ) batch_worker.start() # Setup file monitoring for new files event_handler = StartupScanFileHandler(processor) observer = Observer() observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True) # Start real-time monitoring observer.start() processor.logger.info(f"👀 Real-time monitoring started on {Config.HOT_FOLDER}") try: while True: time.sleep(60) current_stats = processor.get_current_stats() processor.logger.info(f"Stats: {current_stats}") except KeyboardInterrupt: processor.logger.info("Stopping Startup Scan JSON Workflow Processor...") observer.stop() observer.join() processor.logger.info("Startup Scan JSON Workflow Processor stopped") if __name__ == "__main__": main()