json-parser-twist-metaserver/json_workflow_processor-hybrid-protected.py
Dave Porter af8acbd986 Add all project files including previous versions and documentation
- Added INSTALL_GUIDE.md and README.md documentation
- Added OLD/ folder with previous script versions for reference
- Added data/ folder with sample JSON test files
- Added older json_workflow_processor-hybrid-protected.py version
- Excludes venv and .DS_Store (per .gitignore)

Complete project backup with full history and test data.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-02-06 11:07:22 -05:00

928 lines
No EOL
38 KiB
Python

#!/usr/bin/env python3
"""
JSON Workflow Processor - HYBRID PROTECTED VERSION
Real-time monitoring + protected periodic scanning + failed file handling
Features:
- File system events for immediate detection
- Protected periodic scanning (prevents overlapping scans)
- Failed file handling (moved to failed folder)
- File version tracking (handles file updates properly)
- Startup scan for crash recovery
- Concurrent processing with detailed reporting
- Scan duration monitoring and alerting
"""
import json
import logging
import smtplib
import shutil
import time
import threading
import schedule
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, date, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
from typing import Dict, Optional, Any, List, Set
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings"""
# Paths
HOT_FOLDER = Path("/data/PRODUCTION/JSON")
JSON_STORE = Path("/data/PRODUCTION/JSON_STORE/")
JSON_FAILED = Path("/data/PRODUCTION/JSON_FAILED/")
SYNC_BASE = Path("/data/PRODUCTION/SYNC/MAKE")
REPORTS_DIR = Path("/PRODUCTION/JSON_PARSER_LOGS")
# Clients eligible for Celtra projects (based on JobCategory)
CELTRA_ELIGIBLE_CLIENTS = {
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
}
# Client destinations
DESTINATIONS = {
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
"RANK": SYNC_BASE / "Monday Rank"
}
# Email settings
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 587
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
ERROR_EMAIL = "daveporter@oliver.agency"
REPORT_EMAILS = ["daveporter@oliver.agency"]
# Batch processing settings
BATCH_SIZE = 10
MAX_WORKERS = 10
BATCH_TIMEOUT = 30
# Monitoring settings
POLL_INTERVAL = 2
WAIT_DELAY = 3
PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds for missed files
PERIODIC_SCAN_TIMEOUT = 120 # Maximum time allowed for a periodic scan
SLOW_SCAN_THRESHOLD = 30 # Log warning if scan takes longer than this
# Reporting settings
DAILY_REPORT_TIME = "00:00"
KEEP_REPORTS_DAYS = 30
class DailyStats:
"""Thread-safe daily statistics collector"""
def __init__(self):
self.lock = threading.Lock()
self.reset_stats()
self.start_time = datetime.now()
def reset_stats(self):
"""Reset daily statistics"""
with self.lock:
self.date = date.today()
self.total_processed = 0
self.total_errors = 0
self.total_batches = 0
self.startup_files_processed = 0
self.periodic_files_found = 0
self.failed_files_moved = 0
self.periodic_scans_completed = 0
self.periodic_scans_skipped = 0
self.slow_scans = 0
# Client breakdown
self.client_stats = defaultdict(lambda: {
'processed': 0,
'routed': 0,
'celtra': 0,
'errors': 0,
'job_categories': Counter(),
'studio_codes': Counter()
})
# Destination breakdown
self.destination_stats = defaultdict(int)
# Performance metrics
self.processing_times = []
self.batch_sizes = []
self.scan_times = []
# Error tracking
self.error_details = []
# Hourly breakdown
self.hourly_stats = defaultdict(int)
def record_file_processed(self, client: str, json_data: Dict[str, Any],
destinations: List[Path], processing_time: float,
success: bool = True, error: str = None,
is_startup: bool = False, is_periodic: bool = False):
"""Record a processed file"""
with self.lock:
hour = datetime.now().hour
self.hourly_stats[hour] += 1
if is_startup:
self.startup_files_processed += 1
elif is_periodic:
self.periodic_files_found += 1
if success:
self.total_processed += 1
self.client_stats[client]['processed'] += 1
# Track job categories and studio codes
job_category = json_data.get('JobCategory', 'Unknown')
studio_code = json_data.get('StudioCode', 'Unknown')
self.client_stats[client]['job_categories'][job_category] += 1
self.client_stats[client]['studio_codes'][studio_code] += 1
# Track routing
if destinations:
self.client_stats[client]['routed'] += 1
for dest in destinations:
self.destination_stats[dest.name] += 1
if 'Celtra' in dest.name:
self.client_stats[client]['celtra'] += 1
# Track performance
self.processing_times.append(processing_time)
else:
self.total_errors += 1
self.client_stats[client]['errors'] += 1
if error:
self.error_details.append({
'time': datetime.now().strftime('%H:%M:%S'),
'client': client,
'error': error
})
def record_failed_file_moved(self):
"""Record a failed file being moved"""
with self.lock:
self.failed_files_moved += 1
def record_periodic_scan(self, duration: float, files_found: int, skipped: bool = False):
"""Record periodic scan statistics"""
with self.lock:
if skipped:
self.periodic_scans_skipped += 1
else:
self.periodic_scans_completed += 1
self.scan_times.append(duration)
if duration > Config.SLOW_SCAN_THRESHOLD:
self.slow_scans += 1
def record_batch(self, batch_size: int):
"""Record a completed batch"""
with self.lock:
self.total_batches += 1
self.batch_sizes.append(batch_size)
def get_stats_summary(self) -> Dict[str, Any]:
"""Get current statistics summary"""
with self.lock:
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
avg_batch_size = sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0
avg_scan_time = sum(self.scan_times) / len(self.scan_times) if self.scan_times else 0
return {
'date': self.date.strftime('%Y-%m-%d'),
'uptime': str(datetime.now() - self.start_time).split('.')[0],
'total_processed': self.total_processed,
'startup_files_processed': self.startup_files_processed,
'periodic_files_found': self.periodic_files_found,
'failed_files_moved': self.failed_files_moved,
'periodic_scans_completed': self.periodic_scans_completed,
'periodic_scans_skipped': self.periodic_scans_skipped,
'slow_scans': self.slow_scans,
'avg_scan_time': round(avg_scan_time, 2),
'total_errors': self.total_errors,
'total_batches': self.total_batches,
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
'avg_processing_time': round(avg_processing_time, 2),
'avg_batch_size': round(avg_batch_size, 1),
'client_stats': dict(self.client_stats),
'destination_stats': dict(self.destination_stats),
'hourly_stats': dict(self.hourly_stats),
'error_details': self.error_details.copy()
}
class ProtectedHybridProcessor:
"""JSON processor with protected hybrid monitoring"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
self.file_queue = Queue()
self.daily_stats = DailyStats()
self.startup_complete = False
# Track recently processed files to avoid duplicates
# Format: "filepath_size_mtime" -> timestamp
self.recently_processed: Set[str] = set()
self.processed_lock = threading.Lock()
# Periodic scan protection
self.scan_in_progress = False
self.scan_lock = threading.Lock()
self.last_scan_start = None
self.setup_daily_reporting()
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s'
log_dir = Path("/PRODUCTION/JSON_PARSER_LOGS")
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(log_dir / 'json_workflow_hybrid_protected.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info("Protected Hybrid JSON Workflow Processor initialized")
def ensure_directories(self):
"""Create necessary directories"""
directories = [
Config.HOT_FOLDER,
Config.JSON_STORE,
Config.JSON_FAILED,
Config.SYNC_BASE,
Config.REPORTS_DIR
] + list(Config.DESTINATIONS.values())
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
directory.chmod(0o777)
def setup_daily_reporting(self):
"""Setup scheduled daily reporting"""
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60)
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
scheduler_thread.start()
def add_to_recently_processed(self, file_path: Path):
"""Add file to recently processed list with file version tracking"""
try:
with self.processed_lock:
# Use file path + size + mtime as unique identifier for file versions
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
self.recently_processed.add(file_id)
# Keep only last 2000 processed files in memory
if len(self.recently_processed) > 2000:
# Remove oldest 500 entries (simple FIFO approximation)
oldest_entries = list(self.recently_processed)[:500]
for entry in oldest_entries:
self.recently_processed.discard(entry)
except Exception as e:
self.logger.debug(f"Error adding to recently processed: {e}")
def was_recently_processed(self, file_path: Path) -> bool:
"""Check if file was recently processed (same version)"""
try:
with self.processed_lock:
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
return file_id in self.recently_processed
except Exception as e:
self.logger.debug(f"Error checking recently processed: {e}")
return False
def scan_existing_files(self) -> List[Path]:
"""Scan all client folders for existing JSON files"""
existing_files = []
try:
self.logger.info("🔍 Scanning for existing JSON files...")
if not Config.HOT_FOLDER.exists():
self.logger.warning(f"Hot folder does not exist: {Config.HOT_FOLDER}")
return existing_files
# Scan all subdirectories for JSON files
for client_folder in Config.HOT_FOLDER.iterdir():
if client_folder.is_dir():
client_name = client_folder.name
json_files = list(client_folder.glob("*.json"))
if json_files:
self.logger.info(f"📁 Found {len(json_files)} JSON files in {client_name}/")
existing_files.extend(json_files)
else:
self.logger.debug(f"📁 No JSON files in {client_name}/")
self.logger.info(f"🔍 Startup scan complete: Found {len(existing_files)} existing JSON files")
return existing_files
except Exception as e:
self.logger.error(f"Error during startup scan: {e}")
return existing_files
def periodic_scan(self):
"""Protected periodic scan for missed files"""
while True:
try:
if self.startup_complete:
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
# Check if scan is already in progress
with self.scan_lock:
if self.scan_in_progress:
self.logger.warning("⏳ Skipping periodic scan - previous scan still in progress")
self.daily_stats.record_periodic_scan(0, 0, skipped=True)
continue
# Check if previous scan timed out
if self.last_scan_start and (time.time() - self.last_scan_start) > Config.PERIODIC_SCAN_TIMEOUT:
self.logger.error(f"🚨 Previous scan timed out after {Config.PERIODIC_SCAN_TIMEOUT}s - forcing reset")
# Start new scan
self.scan_in_progress = True
self.last_scan_start = time.time()
# Perform the actual scan
self._perform_periodic_scan()
# Mark scan as complete
with self.scan_lock:
self.scan_in_progress = False
self.last_scan_start = None
else:
time.sleep(5) # Wait for startup to complete
except Exception as e:
self.logger.error(f"Error in periodic scan thread: {e}")
# Ensure scan lock is released on error
with self.scan_lock:
self.scan_in_progress = False
self.last_scan_start = None
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
def _perform_periodic_scan(self):
"""Perform the actual periodic scan with monitoring"""
scan_start = time.time()
missed_files = []
try:
self.logger.debug("🔄 Periodic scan starting...")
# Scan for files that weren't caught by file system events
for client_folder in Config.HOT_FOLDER.iterdir():
if client_folder.is_dir():
for json_file in client_folder.glob("*.json"):
# Only add if not recently processed (handles file versions)
if not self.was_recently_processed(json_file):
missed_files.append(json_file)
scan_duration = time.time() - scan_start
# Log scan results
if missed_files:
self.logger.info(f"🔄 Periodic scan found {len(missed_files)} missed files ({scan_duration:.1f}s)")
for file_path in missed_files:
self.queue_file(file_path, is_periodic=True)
else:
self.logger.debug(f"🔄 Periodic scan: no missed files ({scan_duration:.1f}s)")
# Monitor scan performance
if scan_duration > Config.SLOW_SCAN_THRESHOLD:
self.logger.warning(f"🐌 Slow periodic scan: {scan_duration:.1f}s (threshold: {Config.SLOW_SCAN_THRESHOLD}s)")
# Record scan statistics
self.daily_stats.record_periodic_scan(scan_duration, len(missed_files), skipped=False)
except Exception as e:
scan_duration = time.time() - scan_start
self.logger.error(f"Error during periodic scan: {e} (duration: {scan_duration:.1f}s)")
self.daily_stats.record_periodic_scan(scan_duration, 0, skipped=False)
def move_failed_file(self, file_path: Path, error_msg: str):
"""Move failed file to failed folder"""
try:
# Determine client folder
client_folder = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "UNKNOWN"
# Create failed directory structure
failed_client_dir = Config.JSON_FAILED / client_folder
failed_client_dir.mkdir(parents=True, exist_ok=True)
failed_client_dir.chmod(0o777)
# Create timestamped filename to avoid conflicts
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
failed_filename = f"{timestamp}_{file_path.name}"
failed_path = failed_client_dir / failed_filename
# Move file to failed folder
shutil.move(str(file_path), str(failed_path))
failed_path.chmod(0o777)
# Create error log file
error_log_path = failed_client_dir / f"{timestamp}_{file_path.stem}_ERROR.txt"
with open(error_log_path, 'w') as f:
f.write(f"File: {file_path.name}\n")
f.write(f"Error Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Error: {error_msg}\n")
f.write(f"Client Folder: {client_folder}\n")
f.write(f"Original Path: {file_path}\n")
error_log_path.chmod(0o777)
self.logger.warning(f"❌ Moved failed file: {file_path.name}{failed_path}")
self.daily_stats.record_failed_file_moved()
except Exception as e:
self.logger.error(f"Failed to move failed file {file_path.name}: {e}")
def process_startup_files(self, files: List[Path]):
"""Process existing files found during startup scan"""
if not files:
self.logger.info("✅ No existing files to process")
return
self.logger.info(f"🚀 Processing {len(files)} existing files from startup scan...")
# Process in batches
batch_size = Config.BATCH_SIZE
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
self.logger.info(f"Processing startup batch {i//batch_size + 1}: {len(batch)} files")
self.process_batch(batch, is_startup=True)
self.logger.info(f"✅ Startup file processing complete: {len(files)} files processed")
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""Parse JSON file and extract key fields"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
extracted = {
"JobCategory": job_details.get("JobCategory", ""),
"StudioCode": job_details.get("StudioCode", ""),
"Title": job_details.get("Title", ""),
"ClientCode": job_details.get("ClientCode", "")
}
return extracted
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
return None
except Exception as e:
self.logger.error(f"Error reading {file_path.name}: {e}")
return None
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
"""Determine destination paths based on client code and job category"""
client_code = json_data.get("ClientCode", "").upper()
job_category = json_data.get("JobCategory", "").upper()
destinations = []
# Check for specific client routing by ClientCode
if client_code in Config.DESTINATIONS:
destinations.append(Config.DESTINATIONS[client_code])
# Check for Celtra by JobCategory (only for eligible clients)
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
celtra_dest = Config.DESTINATIONS["CELTRA"]
if celtra_dest not in destinations:
destinations.append(celtra_dest)
return destinations
def store_file(self, file_path: Path, folder_name: str) -> bool:
"""Store file to JSON_STORE with 777 permissions"""
try:
store_dir = Config.JSON_STORE / folder_name
store_dir.mkdir(parents=True, exist_ok=True)
store_dir.chmod(0o777)
destination = store_dir / file_path.name
shutil.copy2(file_path, destination)
destination.chmod(0o777)
return True
except Exception as e:
self.logger.error(f"Failed to store {file_path.name}: {e}")
return False
def route_file(self, file_path: Path, destination: Path) -> bool:
"""Route file to specific destination with 777 permissions"""
try:
destination.mkdir(parents=True, exist_ok=True)
destination.chmod(0o777)
dest_file = destination / file_path.name
shutil.copy2(file_path, dest_file)
dest_file.chmod(0o777)
return True
except Exception as e:
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
return False
def process_single_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False) -> Dict[str, Any]:
"""Process a single JSON file with timing"""
start_time = time.time()
thread_name = threading.current_thread().name
# Create tag for logging
tags = []
if is_startup:
tags.append("STARTUP")
if is_periodic:
tags.append("PERIODIC")
tag_str = f"[{'/'.join(tags)}] " if tags else ""
try:
# Check if already processed recently (handles file versions)
if not is_startup and self.was_recently_processed(file_path):
self.logger.debug(f"[{thread_name}] {tag_str}Skipping recently processed: {file_path.name}")
return {'success': True, 'skipped': True}
# Determine folder name for logging
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
self.logger.info(f"[{thread_name}] {tag_str}Processing: {folder_name}/{file_path.name}")
if not is_startup and not is_periodic:
time.sleep(Config.WAIT_DELAY)
# Parse JSON
json_data = self.parse_json_file(file_path)
if json_data is None:
error_msg = "JSON parsing failed"
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
processing_time = time.time() - start_time
# Move failed file
self.move_failed_file(file_path, error_msg)
self.daily_stats.record_file_processed(
folder_name, {}, [], processing_time, False, error_msg, is_startup, is_periodic
)
return {'success': False, 'error': error_msg}
# folder_name already determined above for logging
# Store file
stored = self.store_file(file_path, folder_name)
# Route file
destinations = self.determine_routing(json_data, folder_name)
routed_count = 0
for destination in destinations:
if self.route_file(file_path, destination):
routed_count += 1
success = stored and (routed_count == len(destinations) if destinations else True)
processing_time = time.time() - start_time
# Record statistics
self.daily_stats.record_file_processed(
folder_name, json_data, destinations, processing_time,
success, None if success else "Processing failed", is_startup, is_periodic
)
if success:
# Mark as processed and delete original
self.add_to_recently_processed(file_path)
file_path.unlink()
route_info = f" + {len(destinations)} routes" if destinations else ""
self.logger.info(f"[{thread_name}] {tag_str}{folder_name}/{file_path.name}{route_info} ({processing_time:.2f}s)")
else:
# Move failed file
self.move_failed_file(file_path, "Processing failed")
return {'success': success}
except Exception as e:
processing_time = time.time() - start_time
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
# Move failed file
self.move_failed_file(file_path, str(e))
self.daily_stats.record_file_processed(
folder_name, {}, [], processing_time, False, str(e), is_startup, is_periodic
)
self.logger.error(f"[{thread_name}] {tag_str}Error processing {folder_name}/{file_path.name}: {e}")
return {'success': False, 'error': str(e)}
def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False):
"""Process a batch of files concurrently"""
if not file_paths:
return
batch_start = time.time()
tags = []
if is_startup:
tags.append("STARTUP")
if is_periodic:
tags.append("PERIODIC")
tag_str = f"[{'/'.join(tags)}] " if tags else ""
self.logger.info(f"{tag_str}Processing batch of {len(file_paths)} files")
with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor:
future_to_file = {
executor.submit(self.process_single_file, file_path, is_startup, is_periodic): file_path
for file_path in file_paths
}
for future in as_completed(future_to_file):
try:
future.result()
except Exception as e:
file_path = future_to_file[future]
self.logger.error(f"{tag_str}Batch processing error for {file_path.name}: {e}")
batch_time = time.time() - batch_start
self.daily_stats.record_batch(len(file_paths))
self.logger.info(f"{tag_str}Batch completed: {len(file_paths)} files in {batch_time:.2f}s")
def queue_file(self, file_path: Path, is_periodic: bool = False):
"""Add file to processing queue"""
if self.startup_complete:
# Add flag to indicate if this is from periodic scan
self.file_queue.put((file_path, is_periodic))
def batch_processor_worker(self):
"""Background worker for batch processing"""
while True:
files_to_process = []
periodic_flags = []
batch_start_time = time.time()
while (len(files_to_process) < Config.BATCH_SIZE and
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
try:
file_data = self.file_queue.get(timeout=1.0)
if isinstance(file_data, tuple):
file_path, is_periodic = file_data
else:
file_path, is_periodic = file_data, False
files_to_process.append(file_path)
periodic_flags.append(is_periodic)
self.file_queue.task_done()
except:
if files_to_process:
break
continue
if files_to_process and self.startup_complete:
# Process batch - mark as periodic if any files are from periodic scan
is_periodic_batch = any(periodic_flags)
self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch)
def generate_daily_report(self):
"""Generate comprehensive daily report"""
try:
stats = self.daily_stats.get_stats_summary()
# Generate report content
report_content = self.format_daily_report(stats)
# Save to file
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
with open(report_file, 'w') as f:
f.write(report_content)
# Email report
self.email_daily_report(report_content, stats['date'])
# Clean old reports
self.cleanup_old_reports()
# Reset stats for new day
self.daily_stats.reset_stats()
self.logger.info(f"Daily report generated: {report_file}")
except Exception as e:
self.logger.error(f"Failed to generate daily report: {e}")
def format_daily_report(self, stats: Dict[str, Any]) -> str:
"""Format daily statistics into readable report"""
startup_info = f"Startup Files Processed: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else ""
periodic_info = f"Periodic Scan Files Found: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else ""
failed_info = f"Failed Files Moved: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else ""
scan_info = ""
if stats['periodic_scans_completed'] > 0:
scan_info = f"Periodic Scans: {stats['periodic_scans_completed']} completed, {stats['periodic_scans_skipped']} skipped\n"
scan_info += f"Average Scan Time: {stats['avg_scan_time']}s\n"
if stats['slow_scans'] > 0:
scan_info += f"Slow Scans: {stats['slow_scans']}\n"
report = f"""
JSON WORKFLOW DAILY REPORT - PROTECTED HYBRID VERSION
Date: {stats['date']}
Uptime: {stats['uptime']}
=== SUMMARY ===
Total Files Processed: {stats['total_processed']}
{startup_info}{periodic_info}{failed_info}{scan_info}Total Errors: {stats['total_errors']}
Success Rate: {stats['success_rate']:.1f}%
Total Batches: {stats['total_batches']}
Average Processing Time: {stats['avg_processing_time']}s
Average Batch Size: {stats['avg_batch_size']} files
=== CLIENT BREAKDOWN ===
"""
for client, client_data in sorted(stats['client_stats'].items()):
report += f"\n{client}:\n"
report += f" Processed: {client_data['processed']}\n"
report += f" Routed: {client_data['routed']}\n"
report += f" Celtra Projects: {client_data['celtra']}\n"
report += f" Errors: {client_data['errors']}\n"
if client_data['job_categories']:
report += f" Job Categories: {dict(client_data['job_categories'])}\n"
report += "\n=== DESTINATION BREAKDOWN ===\n"
for dest, count in sorted(stats['destination_stats'].items()):
report += f"{dest}: {count} files\n"
report += "\n=== HOURLY BREAKDOWN ===\n"
for hour in range(24):
count = stats['hourly_stats'].get(hour, 0)
if count > 0:
report += f"{hour:02d}:00 - {count} files\n"
if stats['error_details']:
report += "\n=== ERROR DETAILS ===\n"
for error in stats['error_details'][-10:]:
report += f"{error['time']} - {error['client']}: {error['error']}\n"
return report
def email_daily_report(self, report_content: str, report_date: str):
"""Email the daily report"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = ', '.join(Config.REPORT_EMAILS)
msg['Subject'] = f"JSON Workflow Daily Report - {report_date}"
msg.attach(MIMEText(report_content, 'plain'))
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
server.quit()
self.logger.info("Daily report emailed successfully")
except Exception as e:
self.logger.error(f"Failed to email daily report: {e}")
def cleanup_old_reports(self):
"""Remove old report files"""
try:
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
try:
file_date_str = report_file.stem.split('_')[-1]
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
if file_date < cutoff_date:
report_file.unlink()
self.logger.debug(f"Deleted old report: {report_file}")
except Exception:
pass
except Exception as e:
self.logger.error(f"Failed to cleanup old reports: {e}")
def get_current_stats(self) -> str:
"""Get current statistics as formatted string"""
stats = self.daily_stats.get_stats_summary()
startup_info = f" (startup: {stats['startup_files_processed']})" if stats['startup_files_processed'] > 0 else ""
periodic_info = f" (periodic: {stats['periodic_files_found']})" if stats['periodic_files_found'] > 0 else ""
failed_info = f" (failed: {stats['failed_files_moved']})" if stats['failed_files_moved'] > 0 else ""
scan_info = f" (scans: {stats['periodic_scans_completed']}/{stats['periodic_scans_skipped']})" if stats['periodic_scans_completed'] > 0 or stats['periodic_scans_skipped'] > 0 else ""
return f"Today: {stats['total_processed']} processed{startup_info}{periodic_info}{failed_info}{scan_info}, {stats['total_errors']} errors, {stats['success_rate']:.1f}% success"
class ProtectedHybridFileHandler(FileSystemEventHandler):
"""File system event handler for protected hybrid monitoring"""
def __init__(self, processor: ProtectedHybridProcessor):
self.processor = processor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
if event.is_directory or not self.processor.startup_complete:
return
file_path = Path(event.src_path)
if file_path.suffix.lower() == '.json':
self.logger.debug(f"📥 New file detected: {file_path.name}")
self.processor.queue_file(file_path, is_periodic=False)
def on_moved(self, event):
if event.is_directory or not self.processor.startup_complete:
return
dest_path = Path(event.dest_path)
if dest_path.suffix.lower() == '.json':
self.logger.debug(f"📁 File moved to hot folder: {dest_path.name}")
self.processor.queue_file(dest_path, is_periodic=False)
def main():
"""Main entry point with protected hybrid monitoring"""
processor = ProtectedHybridProcessor()
# Step 1: Scan and process existing files
existing_files = processor.scan_existing_files()
processor.process_startup_files(existing_files)
# Step 2: Mark startup complete and begin real-time monitoring
processor.startup_complete = True
processor.logger.info("🎯 Startup scan complete - switching to protected hybrid monitoring")
# Start batch processing worker for real-time files
batch_worker = threading.Thread(
target=processor.batch_processor_worker,
name="BatchWorker",
daemon=True
)
batch_worker.start()
# Start protected periodic scanning thread
periodic_scanner = threading.Thread(
target=processor.periodic_scan,
name="PeriodicScanner",
daemon=True
)
periodic_scanner.start()
# Setup file monitoring for new files
event_handler = ProtectedHybridFileHandler(processor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
# Start real-time monitoring
observer.start()
processor.logger.info(f"🛡️ Protected hybrid monitoring active: Events + {Config.PERIODIC_SCAN_INTERVAL}s protected scans")
try:
while True:
time.sleep(60)
current_stats = processor.get_current_stats()
processor.logger.info(f"Stats: {current_stats}")
except KeyboardInterrupt:
processor.logger.info("Stopping Protected Hybrid JSON Workflow Processor...")
observer.stop()
observer.join()
processor.logger.info("Protected Hybrid JSON Workflow Processor stopped")
if __name__ == "__main__":
main()