- Added INSTALL_GUIDE.md and README.md documentation - Added OLD/ folder with previous script versions for reference - Added data/ folder with sample JSON test files - Added older json_workflow_processor-hybrid-protected.py version - Excludes venv and .DS_Store (per .gitignore) Complete project backup with full history and test data. Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
928 lines
No EOL
38 KiB
Python
928 lines
No EOL
38 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
JSON Workflow Processor - HYBRID PROTECTED VERSION
|
|
Real-time monitoring + protected periodic scanning + failed file handling
|
|
|
|
Features:
|
|
- File system events for immediate detection
|
|
- Protected periodic scanning (prevents overlapping scans)
|
|
- Failed file handling (moved to failed folder)
|
|
- File version tracking (handles file updates properly)
|
|
- Startup scan for crash recovery
|
|
- Concurrent processing with detailed reporting
|
|
- Scan duration monitoring and alerting
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
import shutil
|
|
import time
|
|
import threading
|
|
import schedule
|
|
from collections import defaultdict, Counter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, date, timedelta
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any, List, Set
|
|
from queue import Queue
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
|
|
class Config:
|
|
"""Configuration settings"""
|
|
|
|
# Paths
|
|
HOT_FOLDER = Path("/data/PRODUCTION/JSON")
|
|
JSON_STORE = Path("/data/PRODUCTION/JSON_STORE/")
|
|
JSON_FAILED = Path("/data/PRODUCTION/JSON_FAILED/")
|
|
SYNC_BASE = Path("/data/PRODUCTION/SYNC/MAKE")
|
|
REPORTS_DIR = Path("/PRODUCTION/JSON_PARSER_LOGS")
|
|
|
|
# Clients eligible for Celtra projects (based on JobCategory)
|
|
CELTRA_ELIGIBLE_CLIENTS = {
|
|
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
|
|
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
|
|
}
|
|
|
|
# Client destinations
|
|
DESTINATIONS = {
|
|
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
|
|
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
|
|
"RANK": SYNC_BASE / "Monday Rank"
|
|
}
|
|
|
|
# Email settings
|
|
SMTP_SERVER = "smtp.mailgun.org"
|
|
SMTP_PORT = 587
|
|
SMTP_USER = "twist@mail.dev.oliver.solutions"
|
|
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
|
|
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
|
|
ERROR_EMAIL = "daveporter@oliver.agency"
|
|
REPORT_EMAILS = ["daveporter@oliver.agency"]
|
|
|
|
# Batch processing settings
|
|
BATCH_SIZE = 10
|
|
MAX_WORKERS = 10
|
|
BATCH_TIMEOUT = 30
|
|
|
|
# Monitoring settings
|
|
POLL_INTERVAL = 2
|
|
WAIT_DELAY = 3
|
|
PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds for missed files
|
|
PERIODIC_SCAN_TIMEOUT = 120 # Maximum time allowed for a periodic scan
|
|
SLOW_SCAN_THRESHOLD = 30 # Log warning if scan takes longer than this
|
|
|
|
# Reporting settings
|
|
DAILY_REPORT_TIME = "00:00"
|
|
KEEP_REPORTS_DAYS = 30
|
|
|
|
|
|
class DailyStats:
|
|
"""Thread-safe daily statistics collector"""
|
|
|
|
def __init__(self):
|
|
self.lock = threading.Lock()
|
|
self.reset_stats()
|
|
self.start_time = datetime.now()
|
|
|
|
def reset_stats(self):
|
|
"""Reset daily statistics"""
|
|
with self.lock:
|
|
self.date = date.today()
|
|
self.total_processed = 0
|
|
self.total_errors = 0
|
|
self.total_batches = 0
|
|
self.startup_files_processed = 0
|
|
self.periodic_files_found = 0
|
|
self.failed_files_moved = 0
|
|
self.periodic_scans_completed = 0
|
|
self.periodic_scans_skipped = 0
|
|
self.slow_scans = 0
|
|
|
|
# Client breakdown
|
|
self.client_stats = defaultdict(lambda: {
|
|
'processed': 0,
|
|
'routed': 0,
|
|
'celtra': 0,
|
|
'errors': 0,
|
|
'job_categories': Counter(),
|
|
'studio_codes': Counter()
|
|
})
|
|
|
|
# Destination breakdown
|
|
self.destination_stats = defaultdict(int)
|
|
|
|
# Performance metrics
|
|
self.processing_times = []
|
|
self.batch_sizes = []
|
|
self.scan_times = []
|
|
|
|
# Error tracking
|
|
self.error_details = []
|
|
|
|
# Hourly breakdown
|
|
self.hourly_stats = defaultdict(int)
|
|
|
|
def record_file_processed(self, client: str, json_data: Dict[str, Any],
|
|
destinations: List[Path], processing_time: float,
|
|
success: bool = True, error: str = None,
|
|
is_startup: bool = False, is_periodic: bool = False):
|
|
"""Record a processed file"""
|
|
with self.lock:
|
|
hour = datetime.now().hour
|
|
self.hourly_stats[hour] += 1
|
|
|
|
if is_startup:
|
|
self.startup_files_processed += 1
|
|
elif is_periodic:
|
|
self.periodic_files_found += 1
|
|
|
|
if success:
|
|
self.total_processed += 1
|
|
self.client_stats[client]['processed'] += 1
|
|
|
|
# Track job categories and studio codes
|
|
job_category = json_data.get('JobCategory', 'Unknown')
|
|
studio_code = json_data.get('StudioCode', 'Unknown')
|
|
self.client_stats[client]['job_categories'][job_category] += 1
|
|
self.client_stats[client]['studio_codes'][studio_code] += 1
|
|
|
|
# Track routing
|
|
if destinations:
|
|
self.client_stats[client]['routed'] += 1
|
|
for dest in destinations:
|
|
self.destination_stats[dest.name] += 1
|
|
if 'Celtra' in dest.name:
|
|
self.client_stats[client]['celtra'] += 1
|
|
|
|
# Track performance
|
|
self.processing_times.append(processing_time)
|
|
else:
|
|
self.total_errors += 1
|
|
self.client_stats[client]['errors'] += 1
|
|
if error:
|
|
self.error_details.append({
|
|
'time': datetime.now().strftime('%H:%M:%S'),
|
|
'client': client,
|
|
'error': error
|
|
})
|
|
|
|
def record_failed_file_moved(self):
|
|
"""Record a failed file being moved"""
|
|
with self.lock:
|
|
self.failed_files_moved += 1
|
|
|
|
def record_periodic_scan(self, duration: float, files_found: int, skipped: bool = False):
|
|
"""Record periodic scan statistics"""
|
|
with self.lock:
|
|
if skipped:
|
|
self.periodic_scans_skipped += 1
|
|
else:
|
|
self.periodic_scans_completed += 1
|
|
self.scan_times.append(duration)
|
|
if duration > Config.SLOW_SCAN_THRESHOLD:
|
|
self.slow_scans += 1
|
|
|
|
def record_batch(self, batch_size: int):
|
|
"""Record a completed batch"""
|
|
with self.lock:
|
|
self.total_batches += 1
|
|
self.batch_sizes.append(batch_size)
|
|
|
|
def get_stats_summary(self) -> Dict[str, Any]:
|
|
"""Get current statistics summary"""
|
|
with self.lock:
|
|
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
|
|
avg_batch_size = sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0
|
|
avg_scan_time = sum(self.scan_times) / len(self.scan_times) if self.scan_times else 0
|
|
|
|
return {
|
|
'date': self.date.strftime('%Y-%m-%d'),
|
|
'uptime': str(datetime.now() - self.start_time).split('.')[0],
|
|
'total_processed': self.total_processed,
|
|
'startup_files_processed': self.startup_files_processed,
|
|
'periodic_files_found': self.periodic_files_found,
|
|
'failed_files_moved': self.failed_files_moved,
|
|
'periodic_scans_completed': self.periodic_scans_completed,
|
|
'periodic_scans_skipped': self.periodic_scans_skipped,
|
|
'slow_scans': self.slow_scans,
|
|
'avg_scan_time': round(avg_scan_time, 2),
|
|
'total_errors': self.total_errors,
|
|
'total_batches': self.total_batches,
|
|
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
|
|
'avg_processing_time': round(avg_processing_time, 2),
|
|
'avg_batch_size': round(avg_batch_size, 1),
|
|
'client_stats': dict(self.client_stats),
|
|
'destination_stats': dict(self.destination_stats),
|
|
'hourly_stats': dict(self.hourly_stats),
|
|
'error_details': self.error_details.copy()
|
|
}
|
|
|
|
|
|
class ProtectedHybridProcessor:
|
|
"""JSON processor with protected hybrid monitoring"""
|
|
|
|
def __init__(self):
|
|
self.setup_logging()
|
|
self.ensure_directories()
|
|
self.file_queue = Queue()
|
|
self.daily_stats = DailyStats()
|
|
self.startup_complete = False
|
|
|
|
# Track recently processed files to avoid duplicates
|
|
# Format: "filepath_size_mtime" -> timestamp
|
|
self.recently_processed: Set[str] = set()
|
|
self.processed_lock = threading.Lock()
|
|
|
|
# Periodic scan protection
|
|
self.scan_in_progress = False
|
|
self.scan_lock = threading.Lock()
|
|
self.last_scan_start = None
|
|
|
|
self.setup_daily_reporting()
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s'
|
|
log_dir = Path("/PRODUCTION/JSON_PARSER_LOGS")
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=log_format,
|
|
handlers=[
|
|
logging.FileHandler(log_dir / 'json_workflow_hybrid_protected.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info("Protected Hybrid JSON Workflow Processor initialized")
|
|
|
|
def ensure_directories(self):
|
|
"""Create necessary directories"""
|
|
directories = [
|
|
Config.HOT_FOLDER,
|
|
Config.JSON_STORE,
|
|
Config.JSON_FAILED,
|
|
Config.SYNC_BASE,
|
|
Config.REPORTS_DIR
|
|
] + list(Config.DESTINATIONS.values())
|
|
|
|
for directory in directories:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
directory.chmod(0o777)
|
|
|
|
def setup_daily_reporting(self):
|
|
"""Setup scheduled daily reporting"""
|
|
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
|
|
|
|
def run_scheduler():
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(60)
|
|
|
|
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
|
|
scheduler_thread.start()
|
|
|
|
def add_to_recently_processed(self, file_path: Path):
|
|
"""Add file to recently processed list with file version tracking"""
|
|
try:
|
|
with self.processed_lock:
|
|
# Use file path + size + mtime as unique identifier for file versions
|
|
stat_info = file_path.stat()
|
|
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
|
|
self.recently_processed.add(file_id)
|
|
|
|
# Keep only last 2000 processed files in memory
|
|
if len(self.recently_processed) > 2000:
|
|
# Remove oldest 500 entries (simple FIFO approximation)
|
|
oldest_entries = list(self.recently_processed)[:500]
|
|
for entry in oldest_entries:
|
|
self.recently_processed.discard(entry)
|
|
except Exception as e:
|
|
self.logger.debug(f"Error adding to recently processed: {e}")
|
|
|
|
def was_recently_processed(self, file_path: Path) -> bool:
|
|
"""Check if file was recently processed (same version)"""
|
|
try:
|
|
with self.processed_lock:
|
|
stat_info = file_path.stat()
|
|
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
|
|
return file_id in self.recently_processed
|
|
except Exception as e:
|
|
self.logger.debug(f"Error checking recently processed: {e}")
|
|
return False
|
|
|
|
def scan_existing_files(self) -> List[Path]:
|
|
"""Scan all client folders for existing JSON files"""
|
|
existing_files = []
|
|
|
|
try:
|
|
self.logger.info("🔍 Scanning for existing JSON files...")
|
|
|
|
if not Config.HOT_FOLDER.exists():
|
|
self.logger.warning(f"Hot folder does not exist: {Config.HOT_FOLDER}")
|
|
return existing_files
|
|
|
|
# Scan all subdirectories for JSON files
|
|
for client_folder in Config.HOT_FOLDER.iterdir():
|
|
if client_folder.is_dir():
|
|
client_name = client_folder.name
|
|
json_files = list(client_folder.glob("*.json"))
|
|
|
|
if json_files:
|
|
self.logger.info(f"📁 Found {len(json_files)} JSON files in {client_name}/")
|
|
existing_files.extend(json_files)
|
|
else:
|
|
self.logger.debug(f"📁 No JSON files in {client_name}/")
|
|
|
|
self.logger.info(f"🔍 Startup scan complete: Found {len(existing_files)} existing JSON files")
|
|
return existing_files
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error during startup scan: {e}")
|
|
return existing_files
|
|
|
|
def periodic_scan(self):
|
|
"""Protected periodic scan for missed files"""
|
|
while True:
|
|
try:
|
|
if self.startup_complete:
|
|
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
|
|
|
|
# Check if scan is already in progress
|
|
with self.scan_lock:
|
|
if self.scan_in_progress:
|
|
self.logger.warning("⏳ Skipping periodic scan - previous scan still in progress")
|
|
self.daily_stats.record_periodic_scan(0, 0, skipped=True)
|
|
continue
|
|
|
|
# Check if previous scan timed out
|
|
if self.last_scan_start and (time.time() - self.last_scan_start) > Config.PERIODIC_SCAN_TIMEOUT:
|
|
self.logger.error(f"🚨 Previous scan timed out after {Config.PERIODIC_SCAN_TIMEOUT}s - forcing reset")
|
|
|
|
# Start new scan
|
|
self.scan_in_progress = True
|
|
self.last_scan_start = time.time()
|
|
|
|
# Perform the actual scan
|
|
self._perform_periodic_scan()
|
|
|
|
# Mark scan as complete
|
|
with self.scan_lock:
|
|
self.scan_in_progress = False
|
|
self.last_scan_start = None
|
|
else:
|
|
time.sleep(5) # Wait for startup to complete
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in periodic scan thread: {e}")
|
|
# Ensure scan lock is released on error
|
|
with self.scan_lock:
|
|
self.scan_in_progress = False
|
|
self.last_scan_start = None
|
|
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
|
|
|
|
def _perform_periodic_scan(self):
|
|
"""Perform the actual periodic scan with monitoring"""
|
|
scan_start = time.time()
|
|
missed_files = []
|
|
|
|
try:
|
|
self.logger.debug("🔄 Periodic scan starting...")
|
|
|
|
# Scan for files that weren't caught by file system events
|
|
for client_folder in Config.HOT_FOLDER.iterdir():
|
|
if client_folder.is_dir():
|
|
for json_file in client_folder.glob("*.json"):
|
|
# Only add if not recently processed (handles file versions)
|
|
if not self.was_recently_processed(json_file):
|
|
missed_files.append(json_file)
|
|
|
|
scan_duration = time.time() - scan_start
|
|
|
|
# Log scan results
|
|
if missed_files:
|
|
self.logger.info(f"🔄 Periodic scan found {len(missed_files)} missed files ({scan_duration:.1f}s)")
|
|
for file_path in missed_files:
|
|
self.queue_file(file_path, is_periodic=True)
|
|
else:
|
|
self.logger.debug(f"🔄 Periodic scan: no missed files ({scan_duration:.1f}s)")
|
|
|
|
# Monitor scan performance
|
|
if scan_duration > Config.SLOW_SCAN_THRESHOLD:
|
|
self.logger.warning(f"🐌 Slow periodic scan: {scan_duration:.1f}s (threshold: {Config.SLOW_SCAN_THRESHOLD}s)")
|
|
|
|
# Record scan statistics
|
|
self.daily_stats.record_periodic_scan(scan_duration, len(missed_files), skipped=False)
|
|
|
|
except Exception as e:
|
|
scan_duration = time.time() - scan_start
|
|
self.logger.error(f"Error during periodic scan: {e} (duration: {scan_duration:.1f}s)")
|
|
self.daily_stats.record_periodic_scan(scan_duration, 0, skipped=False)
|
|
|
|
def move_failed_file(self, file_path: Path, error_msg: str):
|
|
"""Move failed file to failed folder"""
|
|
try:
|
|
# Determine client folder
|
|
client_folder = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "UNKNOWN"
|
|
|
|
# Create failed directory structure
|
|
failed_client_dir = Config.JSON_FAILED / client_folder
|
|
failed_client_dir.mkdir(parents=True, exist_ok=True)
|
|
failed_client_dir.chmod(0o777)
|
|
|
|
# Create timestamped filename to avoid conflicts
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
failed_filename = f"{timestamp}_{file_path.name}"
|
|
failed_path = failed_client_dir / failed_filename
|
|
|
|
# Move file to failed folder
|
|
shutil.move(str(file_path), str(failed_path))
|
|
failed_path.chmod(0o777)
|
|
|
|
# Create error log file
|
|
error_log_path = failed_client_dir / f"{timestamp}_{file_path.stem}_ERROR.txt"
|
|
with open(error_log_path, 'w') as f:
|
|
f.write(f"File: {file_path.name}\n")
|
|
f.write(f"Error Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"Error: {error_msg}\n")
|
|
f.write(f"Client Folder: {client_folder}\n")
|
|
f.write(f"Original Path: {file_path}\n")
|
|
error_log_path.chmod(0o777)
|
|
|
|
self.logger.warning(f"❌ Moved failed file: {file_path.name} → {failed_path}")
|
|
self.daily_stats.record_failed_file_moved()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move failed file {file_path.name}: {e}")
|
|
|
|
def process_startup_files(self, files: List[Path]):
|
|
"""Process existing files found during startup scan"""
|
|
if not files:
|
|
self.logger.info("✅ No existing files to process")
|
|
return
|
|
|
|
self.logger.info(f"🚀 Processing {len(files)} existing files from startup scan...")
|
|
|
|
# Process in batches
|
|
batch_size = Config.BATCH_SIZE
|
|
for i in range(0, len(files), batch_size):
|
|
batch = files[i:i + batch_size]
|
|
self.logger.info(f"Processing startup batch {i//batch_size + 1}: {len(batch)} files")
|
|
self.process_batch(batch, is_startup=True)
|
|
|
|
self.logger.info(f"✅ Startup file processing complete: {len(files)} files processed")
|
|
|
|
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""Parse JSON file and extract key fields"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
|
|
|
|
extracted = {
|
|
"JobCategory": job_details.get("JobCategory", ""),
|
|
"StudioCode": job_details.get("StudioCode", ""),
|
|
"Title": job_details.get("Title", ""),
|
|
"ClientCode": job_details.get("ClientCode", "")
|
|
}
|
|
|
|
return extracted
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading {file_path.name}: {e}")
|
|
return None
|
|
|
|
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
|
|
"""Determine destination paths based on client code and job category"""
|
|
client_code = json_data.get("ClientCode", "").upper()
|
|
job_category = json_data.get("JobCategory", "").upper()
|
|
destinations = []
|
|
|
|
# Check for specific client routing by ClientCode
|
|
if client_code in Config.DESTINATIONS:
|
|
destinations.append(Config.DESTINATIONS[client_code])
|
|
|
|
# Check for Celtra by JobCategory (only for eligible clients)
|
|
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
|
|
celtra_dest = Config.DESTINATIONS["CELTRA"]
|
|
if celtra_dest not in destinations:
|
|
destinations.append(celtra_dest)
|
|
|
|
return destinations
|
|
|
|
def store_file(self, file_path: Path, folder_name: str) -> bool:
|
|
"""Store file to JSON_STORE with 777 permissions"""
|
|
try:
|
|
store_dir = Config.JSON_STORE / folder_name
|
|
store_dir.mkdir(parents=True, exist_ok=True)
|
|
store_dir.chmod(0o777)
|
|
|
|
destination = store_dir / file_path.name
|
|
shutil.copy2(file_path, destination)
|
|
destination.chmod(0o777)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to store {file_path.name}: {e}")
|
|
return False
|
|
|
|
def route_file(self, file_path: Path, destination: Path) -> bool:
|
|
"""Route file to specific destination with 777 permissions"""
|
|
try:
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
destination.chmod(0o777)
|
|
|
|
dest_file = destination / file_path.name
|
|
shutil.copy2(file_path, dest_file)
|
|
dest_file.chmod(0o777)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
|
|
return False
|
|
|
|
def process_single_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False) -> Dict[str, Any]:
|
|
"""Process a single JSON file with timing"""
|
|
start_time = time.time()
|
|
thread_name = threading.current_thread().name
|
|
|
|
# Create tag for logging
|
|
tags = []
|
|
if is_startup:
|
|
tags.append("STARTUP")
|
|
if is_periodic:
|
|
tags.append("PERIODIC")
|
|
tag_str = f"[{'/'.join(tags)}] " if tags else ""
|
|
|
|
try:
|
|
# Check if already processed recently (handles file versions)
|
|
if not is_startup and self.was_recently_processed(file_path):
|
|
self.logger.debug(f"[{thread_name}] {tag_str}Skipping recently processed: {file_path.name}")
|
|
return {'success': True, 'skipped': True}
|
|
|
|
# Determine folder name for logging
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
|
|
self.logger.info(f"[{thread_name}] {tag_str}Processing: {folder_name}/{file_path.name}")
|
|
|
|
if not is_startup and not is_periodic:
|
|
time.sleep(Config.WAIT_DELAY)
|
|
|
|
# Parse JSON
|
|
json_data = self.parse_json_file(file_path)
|
|
if json_data is None:
|
|
error_msg = "JSON parsing failed"
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
processing_time = time.time() - start_time
|
|
|
|
# Move failed file
|
|
self.move_failed_file(file_path, error_msg)
|
|
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, {}, [], processing_time, False, error_msg, is_startup, is_periodic
|
|
)
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
# folder_name already determined above for logging
|
|
|
|
# Store file
|
|
stored = self.store_file(file_path, folder_name)
|
|
|
|
# Route file
|
|
destinations = self.determine_routing(json_data, folder_name)
|
|
routed_count = 0
|
|
|
|
for destination in destinations:
|
|
if self.route_file(file_path, destination):
|
|
routed_count += 1
|
|
|
|
success = stored and (routed_count == len(destinations) if destinations else True)
|
|
processing_time = time.time() - start_time
|
|
|
|
# Record statistics
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, json_data, destinations, processing_time,
|
|
success, None if success else "Processing failed", is_startup, is_periodic
|
|
)
|
|
|
|
if success:
|
|
# Mark as processed and delete original
|
|
self.add_to_recently_processed(file_path)
|
|
file_path.unlink()
|
|
|
|
route_info = f" + {len(destinations)} routes" if destinations else ""
|
|
self.logger.info(f"[{thread_name}] {tag_str}✅ {folder_name}/{file_path.name}{route_info} ({processing_time:.2f}s)")
|
|
else:
|
|
# Move failed file
|
|
self.move_failed_file(file_path, "Processing failed")
|
|
|
|
return {'success': success}
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
|
|
# Move failed file
|
|
self.move_failed_file(file_path, str(e))
|
|
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, {}, [], processing_time, False, str(e), is_startup, is_periodic
|
|
)
|
|
|
|
self.logger.error(f"[{thread_name}] {tag_str}Error processing {folder_name}/{file_path.name}: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False):
|
|
"""Process a batch of files concurrently"""
|
|
if not file_paths:
|
|
return
|
|
|
|
batch_start = time.time()
|
|
tags = []
|
|
if is_startup:
|
|
tags.append("STARTUP")
|
|
if is_periodic:
|
|
tags.append("PERIODIC")
|
|
tag_str = f"[{'/'.join(tags)}] " if tags else ""
|
|
|
|
self.logger.info(f"{tag_str}Processing batch of {len(file_paths)} files")
|
|
|
|
with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor:
|
|
future_to_file = {
|
|
executor.submit(self.process_single_file, file_path, is_startup, is_periodic): file_path
|
|
for file_path in file_paths
|
|
}
|
|
|
|
for future in as_completed(future_to_file):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
file_path = future_to_file[future]
|
|
self.logger.error(f"{tag_str}Batch processing error for {file_path.name}: {e}")
|
|
|
|
batch_time = time.time() - batch_start
|
|
self.daily_stats.record_batch(len(file_paths))
|
|
self.logger.info(f"{tag_str}Batch completed: {len(file_paths)} files in {batch_time:.2f}s")
|
|
|
|
def queue_file(self, file_path: Path, is_periodic: bool = False):
|
|
"""Add file to processing queue"""
|
|
if self.startup_complete:
|
|
# Add flag to indicate if this is from periodic scan
|
|
self.file_queue.put((file_path, is_periodic))
|
|
|
|
def batch_processor_worker(self):
|
|
"""Background worker for batch processing"""
|
|
while True:
|
|
files_to_process = []
|
|
periodic_flags = []
|
|
batch_start_time = time.time()
|
|
|
|
while (len(files_to_process) < Config.BATCH_SIZE and
|
|
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
|
|
|
|
try:
|
|
file_data = self.file_queue.get(timeout=1.0)
|
|
if isinstance(file_data, tuple):
|
|
file_path, is_periodic = file_data
|
|
else:
|
|
file_path, is_periodic = file_data, False
|
|
|
|
files_to_process.append(file_path)
|
|
periodic_flags.append(is_periodic)
|
|
self.file_queue.task_done()
|
|
except:
|
|
if files_to_process:
|
|
break
|
|
continue
|
|
|
|
if files_to_process and self.startup_complete:
|
|
# Process batch - mark as periodic if any files are from periodic scan
|
|
is_periodic_batch = any(periodic_flags)
|
|
self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch)
|
|
|
|
def generate_daily_report(self):
|
|
"""Generate comprehensive daily report"""
|
|
try:
|
|
stats = self.daily_stats.get_stats_summary()
|
|
|
|
# Generate report content
|
|
report_content = self.format_daily_report(stats)
|
|
|
|
# Save to file
|
|
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
|
|
with open(report_file, 'w') as f:
|
|
f.write(report_content)
|
|
|
|
# Email report
|
|
self.email_daily_report(report_content, stats['date'])
|
|
|
|
# Clean old reports
|
|
self.cleanup_old_reports()
|
|
|
|
# Reset stats for new day
|
|
self.daily_stats.reset_stats()
|
|
|
|
self.logger.info(f"Daily report generated: {report_file}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to generate daily report: {e}")
|
|
|
|
def format_daily_report(self, stats: Dict[str, Any]) -> str:
|
|
"""Format daily statistics into readable report"""
|
|
startup_info = f"Startup Files Processed: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else ""
|
|
periodic_info = f"Periodic Scan Files Found: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else ""
|
|
failed_info = f"Failed Files Moved: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else ""
|
|
|
|
scan_info = ""
|
|
if stats['periodic_scans_completed'] > 0:
|
|
scan_info = f"Periodic Scans: {stats['periodic_scans_completed']} completed, {stats['periodic_scans_skipped']} skipped\n"
|
|
scan_info += f"Average Scan Time: {stats['avg_scan_time']}s\n"
|
|
if stats['slow_scans'] > 0:
|
|
scan_info += f"Slow Scans: {stats['slow_scans']}\n"
|
|
|
|
report = f"""
|
|
JSON WORKFLOW DAILY REPORT - PROTECTED HYBRID VERSION
|
|
Date: {stats['date']}
|
|
Uptime: {stats['uptime']}
|
|
|
|
=== SUMMARY ===
|
|
Total Files Processed: {stats['total_processed']}
|
|
{startup_info}{periodic_info}{failed_info}{scan_info}Total Errors: {stats['total_errors']}
|
|
Success Rate: {stats['success_rate']:.1f}%
|
|
Total Batches: {stats['total_batches']}
|
|
Average Processing Time: {stats['avg_processing_time']}s
|
|
Average Batch Size: {stats['avg_batch_size']} files
|
|
|
|
=== CLIENT BREAKDOWN ===
|
|
"""
|
|
|
|
for client, client_data in sorted(stats['client_stats'].items()):
|
|
report += f"\n{client}:\n"
|
|
report += f" Processed: {client_data['processed']}\n"
|
|
report += f" Routed: {client_data['routed']}\n"
|
|
report += f" Celtra Projects: {client_data['celtra']}\n"
|
|
report += f" Errors: {client_data['errors']}\n"
|
|
|
|
if client_data['job_categories']:
|
|
report += f" Job Categories: {dict(client_data['job_categories'])}\n"
|
|
|
|
report += "\n=== DESTINATION BREAKDOWN ===\n"
|
|
for dest, count in sorted(stats['destination_stats'].items()):
|
|
report += f"{dest}: {count} files\n"
|
|
|
|
report += "\n=== HOURLY BREAKDOWN ===\n"
|
|
for hour in range(24):
|
|
count = stats['hourly_stats'].get(hour, 0)
|
|
if count > 0:
|
|
report += f"{hour:02d}:00 - {count} files\n"
|
|
|
|
if stats['error_details']:
|
|
report += "\n=== ERROR DETAILS ===\n"
|
|
for error in stats['error_details'][-10:]:
|
|
report += f"{error['time']} - {error['client']}: {error['error']}\n"
|
|
|
|
return report
|
|
|
|
def email_daily_report(self, report_content: str, report_date: str):
|
|
"""Email the daily report"""
|
|
try:
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.SENDER_EMAIL
|
|
msg['To'] = ', '.join(Config.REPORT_EMAILS)
|
|
msg['Subject'] = f"JSON Workflow Daily Report - {report_date}"
|
|
|
|
msg.attach(MIMEText(report_content, 'plain'))
|
|
|
|
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
|
|
server.starttls()
|
|
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
|
|
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
|
|
server.quit()
|
|
|
|
self.logger.info("Daily report emailed successfully")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to email daily report: {e}")
|
|
|
|
def cleanup_old_reports(self):
|
|
"""Remove old report files"""
|
|
try:
|
|
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
|
|
|
|
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
|
|
try:
|
|
file_date_str = report_file.stem.split('_')[-1]
|
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
|
|
|
|
if file_date < cutoff_date:
|
|
report_file.unlink()
|
|
self.logger.debug(f"Deleted old report: {report_file}")
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to cleanup old reports: {e}")
|
|
|
|
def get_current_stats(self) -> str:
|
|
"""Get current statistics as formatted string"""
|
|
stats = self.daily_stats.get_stats_summary()
|
|
startup_info = f" (startup: {stats['startup_files_processed']})" if stats['startup_files_processed'] > 0 else ""
|
|
periodic_info = f" (periodic: {stats['periodic_files_found']})" if stats['periodic_files_found'] > 0 else ""
|
|
failed_info = f" (failed: {stats['failed_files_moved']})" if stats['failed_files_moved'] > 0 else ""
|
|
scan_info = f" (scans: {stats['periodic_scans_completed']}/{stats['periodic_scans_skipped']})" if stats['periodic_scans_completed'] > 0 or stats['periodic_scans_skipped'] > 0 else ""
|
|
return f"Today: {stats['total_processed']} processed{startup_info}{periodic_info}{failed_info}{scan_info}, {stats['total_errors']} errors, {stats['success_rate']:.1f}% success"
|
|
|
|
|
|
class ProtectedHybridFileHandler(FileSystemEventHandler):
|
|
"""File system event handler for protected hybrid monitoring"""
|
|
|
|
def __init__(self, processor: ProtectedHybridProcessor):
|
|
self.processor = processor
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory or not self.processor.startup_complete:
|
|
return
|
|
|
|
file_path = Path(event.src_path)
|
|
if file_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"📥 New file detected: {file_path.name}")
|
|
self.processor.queue_file(file_path, is_periodic=False)
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory or not self.processor.startup_complete:
|
|
return
|
|
|
|
dest_path = Path(event.dest_path)
|
|
if dest_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"📁 File moved to hot folder: {dest_path.name}")
|
|
self.processor.queue_file(dest_path, is_periodic=False)
|
|
|
|
|
|
def main():
|
|
"""Main entry point with protected hybrid monitoring"""
|
|
processor = ProtectedHybridProcessor()
|
|
|
|
# Step 1: Scan and process existing files
|
|
existing_files = processor.scan_existing_files()
|
|
processor.process_startup_files(existing_files)
|
|
|
|
# Step 2: Mark startup complete and begin real-time monitoring
|
|
processor.startup_complete = True
|
|
processor.logger.info("🎯 Startup scan complete - switching to protected hybrid monitoring")
|
|
|
|
# Start batch processing worker for real-time files
|
|
batch_worker = threading.Thread(
|
|
target=processor.batch_processor_worker,
|
|
name="BatchWorker",
|
|
daemon=True
|
|
)
|
|
batch_worker.start()
|
|
|
|
# Start protected periodic scanning thread
|
|
periodic_scanner = threading.Thread(
|
|
target=processor.periodic_scan,
|
|
name="PeriodicScanner",
|
|
daemon=True
|
|
)
|
|
periodic_scanner.start()
|
|
|
|
# Setup file monitoring for new files
|
|
event_handler = ProtectedHybridFileHandler(processor)
|
|
observer = Observer()
|
|
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
|
|
|
|
# Start real-time monitoring
|
|
observer.start()
|
|
processor.logger.info(f"🛡️ Protected hybrid monitoring active: Events + {Config.PERIODIC_SCAN_INTERVAL}s protected scans")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(60)
|
|
current_stats = processor.get_current_stats()
|
|
processor.logger.info(f"Stats: {current_stats}")
|
|
except KeyboardInterrupt:
|
|
processor.logger.info("Stopping Protected Hybrid JSON Workflow Processor...")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
processor.logger.info("Protected Hybrid JSON Workflow Processor stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |