- Added LOREAL client routing to /data/PRODUCTION/CLIENT-STORES/LOREAL/IN - Enhanced daily email reports with HTML formatting - Implemented Oliver brand colors (black and #FFC407 yellow) - Added Montserrat font with fallback fonts - Created color-coded metric cards and visual bar charts - Maintained plain text fallback for email compatibility Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
1123 lines
No EOL
52 KiB
Python
1123 lines
No EOL
52 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
JSON Workflow Processor - HYBRID PROTECTED VERSION
|
|
Real-time monitoring + protected periodic scanning + failed file handling
|
|
|
|
Features:
|
|
- File system events for immediate detection
|
|
- Protected periodic scanning (prevents overlapping scans)
|
|
- Failed file handling (moved to failed folder)
|
|
- File version tracking (handles file updates properly)
|
|
- Startup scan for crash recovery
|
|
- Concurrent processing with detailed reporting
|
|
- Scan duration monitoring and alerting
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
import shutil
|
|
import time
|
|
import threading
|
|
import schedule
|
|
from collections import defaultdict, Counter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, date, timedelta
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any, List, Set
|
|
from queue import Queue
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
|
|
class Config:
|
|
"""Configuration settings"""
|
|
|
|
# Paths
|
|
HOT_FOLDER = Path("/data/PRODUCTION/JSON")
|
|
JSON_STORE = Path("/data/PRODUCTION/JSON_STORE/")
|
|
JSON_FAILED = Path("/data/PRODUCTION/JSON_FAILED/")
|
|
SYNC_BASE = Path("/data/PRODUCTION/SYNC/MAKE")
|
|
REPORTS_DIR = Path("/data/PRODUCTION/JSON_PARSER_LOGS")
|
|
CLIENT_SYNC_BASE = Path("/data/PRODUCTION/CLIENT-STORES/")
|
|
|
|
# Clients eligible for Celtra projects (based on JobCategory)
|
|
CELTRA_ELIGIBLE_CLIENTS = {
|
|
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
|
|
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
|
|
}
|
|
|
|
# Client destinations
|
|
DESTINATIONS = {
|
|
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
|
|
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
|
|
"RANK": SYNC_BASE / "Monday Rank",
|
|
"BISSELL": CLIENT_SYNC_BASE / "BISSELL-WRIKE/IN",
|
|
"LOREAL": CLIENT_SYNC_BASE / "LOREAL/IN"
|
|
}
|
|
|
|
# Email settings
|
|
SMTP_SERVER = "smtp.mailgun.org"
|
|
SMTP_PORT = 587
|
|
SMTP_USER = "twist@mail.dev.oliver.solutions"
|
|
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
|
|
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
|
|
ERROR_EMAIL = "daveporter@oliver.agency"
|
|
REPORT_EMAILS = ["daveporter@oliver.agency"]
|
|
|
|
# Batch processing settings
|
|
BATCH_SIZE = 10
|
|
MAX_WORKERS = 10
|
|
BATCH_TIMEOUT = 30
|
|
|
|
# Monitoring settings
|
|
POLL_INTERVAL = 2
|
|
WAIT_DELAY = 3
|
|
PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds for missed files
|
|
PERIODIC_SCAN_TIMEOUT = 120 # Maximum time allowed for a periodic scan
|
|
SLOW_SCAN_THRESHOLD = 30 # Log warning if scan takes longer than this
|
|
|
|
# Reporting settings
|
|
DAILY_REPORT_TIME = "00:00"
|
|
KEEP_REPORTS_DAYS = 30
|
|
|
|
|
|
class DailyStats:
|
|
"""Thread-safe daily statistics collector"""
|
|
|
|
def __init__(self):
|
|
self.lock = threading.Lock()
|
|
self.reset_stats()
|
|
self.start_time = datetime.now()
|
|
|
|
def reset_stats(self):
|
|
"""Reset daily statistics"""
|
|
with self.lock:
|
|
self.date = date.today()
|
|
self.total_processed = 0
|
|
self.total_errors = 0
|
|
self.total_batches = 0
|
|
self.startup_files_processed = 0
|
|
self.periodic_files_found = 0
|
|
self.failed_files_moved = 0
|
|
self.periodic_scans_completed = 0
|
|
self.periodic_scans_skipped = 0
|
|
self.slow_scans = 0
|
|
|
|
# Client breakdown
|
|
self.client_stats = defaultdict(lambda: {
|
|
'processed': 0,
|
|
'routed': 0,
|
|
'celtra': 0,
|
|
'errors': 0,
|
|
'job_categories': Counter(),
|
|
'studio_codes': Counter()
|
|
})
|
|
|
|
# Destination breakdown
|
|
self.destination_stats = defaultdict(int)
|
|
|
|
# Performance metrics
|
|
self.processing_times = []
|
|
self.batch_sizes = []
|
|
self.scan_times = []
|
|
|
|
# Error tracking
|
|
self.error_details = []
|
|
|
|
# Hourly breakdown
|
|
self.hourly_stats = defaultdict(int)
|
|
|
|
def record_file_processed(self, client: str, json_data: Dict[str, Any],
|
|
destinations: List[Path], processing_time: float,
|
|
success: bool = True, error: str = None,
|
|
is_startup: bool = False, is_periodic: bool = False):
|
|
"""Record a processed file"""
|
|
with self.lock:
|
|
hour = datetime.now().hour
|
|
self.hourly_stats[hour] += 1
|
|
|
|
if is_startup:
|
|
self.startup_files_processed += 1
|
|
elif is_periodic:
|
|
self.periodic_files_found += 1
|
|
|
|
if success:
|
|
self.total_processed += 1
|
|
self.client_stats[client]['processed'] += 1
|
|
|
|
# Track job categories and studio codes
|
|
job_category = json_data.get('JobCategory', 'Unknown')
|
|
studio_code = json_data.get('StudioCode', 'Unknown')
|
|
self.client_stats[client]['job_categories'][job_category] += 1
|
|
self.client_stats[client]['studio_codes'][studio_code] += 1
|
|
|
|
# Track routing
|
|
if destinations:
|
|
self.client_stats[client]['routed'] += 1
|
|
for dest in destinations:
|
|
self.destination_stats[dest.name] += 1
|
|
if 'Celtra' in dest.name:
|
|
self.client_stats[client]['celtra'] += 1
|
|
|
|
# Track performance
|
|
self.processing_times.append(processing_time)
|
|
else:
|
|
self.total_errors += 1
|
|
self.client_stats[client]['errors'] += 1
|
|
if error:
|
|
self.error_details.append({
|
|
'time': datetime.now().strftime('%H:%M:%S'),
|
|
'client': client,
|
|
'error': error
|
|
})
|
|
|
|
def record_failed_file_moved(self):
|
|
"""Record a failed file being moved"""
|
|
with self.lock:
|
|
self.failed_files_moved += 1
|
|
|
|
def record_periodic_scan(self, duration: float, files_found: int, skipped: bool = False):
|
|
"""Record periodic scan statistics"""
|
|
with self.lock:
|
|
if skipped:
|
|
self.periodic_scans_skipped += 1
|
|
else:
|
|
self.periodic_scans_completed += 1
|
|
self.scan_times.append(duration)
|
|
if duration > Config.SLOW_SCAN_THRESHOLD:
|
|
self.slow_scans += 1
|
|
|
|
def record_batch(self, batch_size: int):
|
|
"""Record a completed batch"""
|
|
with self.lock:
|
|
self.total_batches += 1
|
|
self.batch_sizes.append(batch_size)
|
|
|
|
def get_stats_summary(self) -> Dict[str, Any]:
|
|
"""Get current statistics summary"""
|
|
with self.lock:
|
|
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
|
|
avg_batch_size = sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0
|
|
avg_scan_time = sum(self.scan_times) / len(self.scan_times) if self.scan_times else 0
|
|
|
|
return {
|
|
'date': self.date.strftime('%Y-%m-%d'),
|
|
'uptime': str(datetime.now() - self.start_time).split('.')[0],
|
|
'total_processed': self.total_processed,
|
|
'startup_files_processed': self.startup_files_processed,
|
|
'periodic_files_found': self.periodic_files_found,
|
|
'failed_files_moved': self.failed_files_moved,
|
|
'periodic_scans_completed': self.periodic_scans_completed,
|
|
'periodic_scans_skipped': self.periodic_scans_skipped,
|
|
'slow_scans': self.slow_scans,
|
|
'avg_scan_time': round(avg_scan_time, 2),
|
|
'total_errors': self.total_errors,
|
|
'total_batches': self.total_batches,
|
|
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
|
|
'avg_processing_time': round(avg_processing_time, 2),
|
|
'avg_batch_size': round(avg_batch_size, 1),
|
|
'client_stats': dict(self.client_stats),
|
|
'destination_stats': dict(self.destination_stats),
|
|
'hourly_stats': dict(self.hourly_stats),
|
|
'error_details': self.error_details.copy()
|
|
}
|
|
|
|
|
|
class ProtectedHybridProcessor:
|
|
"""JSON processor with protected hybrid monitoring"""
|
|
|
|
def __init__(self):
|
|
self.setup_logging()
|
|
self.ensure_directories()
|
|
self.file_queue = Queue()
|
|
self.daily_stats = DailyStats()
|
|
self.startup_complete = False
|
|
|
|
# Track recently processed files to avoid duplicates
|
|
# Format: "filepath_size_mtime" -> timestamp
|
|
self.recently_processed: Set[str] = set()
|
|
self.processed_lock = threading.Lock()
|
|
|
|
# Periodic scan protection
|
|
self.scan_in_progress = False
|
|
self.scan_lock = threading.Lock()
|
|
self.last_scan_start = None
|
|
|
|
self.setup_daily_reporting()
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s'
|
|
log_dir = Path("/data/PRODUCTION/JSON_PARSER_LOGS")
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=log_format,
|
|
handlers=[
|
|
logging.FileHandler(log_dir / 'json_workflow_hybrid_protected.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info("Protected Hybrid JSON Workflow Processor initialized")
|
|
|
|
def ensure_directories(self):
|
|
"""Create necessary directories"""
|
|
directories = [
|
|
Config.HOT_FOLDER,
|
|
Config.JSON_STORE,
|
|
Config.JSON_FAILED,
|
|
Config.SYNC_BASE,
|
|
Config.REPORTS_DIR
|
|
] + list(Config.DESTINATIONS.values())
|
|
|
|
for directory in directories:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
directory.chmod(0o777)
|
|
|
|
def setup_daily_reporting(self):
|
|
"""Setup scheduled daily reporting"""
|
|
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
|
|
|
|
def run_scheduler():
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(60)
|
|
|
|
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
|
|
scheduler_thread.start()
|
|
|
|
def add_to_recently_processed(self, file_path: Path):
|
|
"""Add file to recently processed list with file version tracking"""
|
|
try:
|
|
with self.processed_lock:
|
|
# Use file path + size + mtime as unique identifier for file versions
|
|
stat_info = file_path.stat()
|
|
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
|
|
self.recently_processed.add(file_id)
|
|
|
|
# Keep only last 2000 processed files in memory
|
|
if len(self.recently_processed) > 2000:
|
|
# Remove oldest 500 entries (simple FIFO approximation)
|
|
oldest_entries = list(self.recently_processed)[:500]
|
|
for entry in oldest_entries:
|
|
self.recently_processed.discard(entry)
|
|
except Exception as e:
|
|
self.logger.debug(f"Error adding to recently processed: {e}")
|
|
|
|
def was_recently_processed(self, file_path: Path) -> bool:
|
|
"""Check if file was recently processed (same version)"""
|
|
try:
|
|
with self.processed_lock:
|
|
stat_info = file_path.stat()
|
|
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
|
|
return file_id in self.recently_processed
|
|
except Exception as e:
|
|
self.logger.debug(f"Error checking recently processed: {e}")
|
|
return False
|
|
|
|
def scan_existing_files(self) -> List[Path]:
|
|
"""Scan all client folders for existing JSON files"""
|
|
existing_files = []
|
|
|
|
try:
|
|
self.logger.info("🔍 Scanning for existing JSON files...")
|
|
|
|
if not Config.HOT_FOLDER.exists():
|
|
self.logger.warning(f"Hot folder does not exist: {Config.HOT_FOLDER}")
|
|
return existing_files
|
|
|
|
# Scan all subdirectories for JSON files
|
|
for client_folder in Config.HOT_FOLDER.iterdir():
|
|
if client_folder.is_dir():
|
|
client_name = client_folder.name
|
|
json_files = list(client_folder.glob("*.json"))
|
|
|
|
if json_files:
|
|
self.logger.info(f"📁 Found {len(json_files)} JSON files in {client_name}/")
|
|
existing_files.extend(json_files)
|
|
else:
|
|
self.logger.debug(f"📁 No JSON files in {client_name}/")
|
|
|
|
self.logger.info(f"🔍 Startup scan complete: Found {len(existing_files)} existing JSON files")
|
|
return existing_files
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error during startup scan: {e}")
|
|
return existing_files
|
|
|
|
def periodic_scan(self):
|
|
"""Protected periodic scan for missed files"""
|
|
while True:
|
|
try:
|
|
if self.startup_complete:
|
|
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
|
|
|
|
# Check if scan is already in progress
|
|
with self.scan_lock:
|
|
if self.scan_in_progress:
|
|
self.logger.warning("⏳ Skipping periodic scan - previous scan still in progress")
|
|
self.daily_stats.record_periodic_scan(0, 0, skipped=True)
|
|
continue
|
|
|
|
# Check if previous scan timed out
|
|
if self.last_scan_start and (time.time() - self.last_scan_start) > Config.PERIODIC_SCAN_TIMEOUT:
|
|
self.logger.error(f"🚨 Previous scan timed out after {Config.PERIODIC_SCAN_TIMEOUT}s - forcing reset")
|
|
|
|
# Start new scan
|
|
self.scan_in_progress = True
|
|
self.last_scan_start = time.time()
|
|
|
|
# Perform the actual scan
|
|
self._perform_periodic_scan()
|
|
|
|
# Mark scan as complete
|
|
with self.scan_lock:
|
|
self.scan_in_progress = False
|
|
self.last_scan_start = None
|
|
else:
|
|
time.sleep(5) # Wait for startup to complete
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in periodic scan thread: {e}")
|
|
# Ensure scan lock is released on error
|
|
with self.scan_lock:
|
|
self.scan_in_progress = False
|
|
self.last_scan_start = None
|
|
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
|
|
|
|
def _perform_periodic_scan(self):
|
|
"""Perform the actual periodic scan with monitoring"""
|
|
scan_start = time.time()
|
|
missed_files = []
|
|
|
|
try:
|
|
self.logger.debug("🔄 Periodic scan starting...")
|
|
|
|
# Scan for files that weren't caught by file system events
|
|
for client_folder in Config.HOT_FOLDER.iterdir():
|
|
if client_folder.is_dir():
|
|
for json_file in client_folder.glob("*.json"):
|
|
# Only add if not recently processed (handles file versions)
|
|
if not self.was_recently_processed(json_file):
|
|
missed_files.append(json_file)
|
|
|
|
scan_duration = time.time() - scan_start
|
|
|
|
# Log scan results
|
|
if missed_files:
|
|
self.logger.info(f"🔄 Periodic scan found {len(missed_files)} missed files ({scan_duration:.1f}s)")
|
|
for file_path in missed_files:
|
|
self.queue_file(file_path, is_periodic=True)
|
|
else:
|
|
self.logger.debug(f"🔄 Periodic scan: no missed files ({scan_duration:.1f}s)")
|
|
|
|
# Monitor scan performance
|
|
if scan_duration > Config.SLOW_SCAN_THRESHOLD:
|
|
self.logger.warning(f"🐌 Slow periodic scan: {scan_duration:.1f}s (threshold: {Config.SLOW_SCAN_THRESHOLD}s)")
|
|
|
|
# Record scan statistics
|
|
self.daily_stats.record_periodic_scan(scan_duration, len(missed_files), skipped=False)
|
|
|
|
except Exception as e:
|
|
scan_duration = time.time() - scan_start
|
|
self.logger.error(f"Error during periodic scan: {e} (duration: {scan_duration:.1f}s)")
|
|
self.daily_stats.record_periodic_scan(scan_duration, 0, skipped=False)
|
|
|
|
def move_failed_file(self, file_path: Path, error_msg: str):
|
|
"""Move failed file to failed folder"""
|
|
try:
|
|
# Determine client folder
|
|
client_folder = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "UNKNOWN"
|
|
|
|
# Create failed directory structure
|
|
failed_client_dir = Config.JSON_FAILED / client_folder
|
|
failed_client_dir.mkdir(parents=True, exist_ok=True)
|
|
failed_client_dir.chmod(0o777)
|
|
|
|
# Create timestamped filename to avoid conflicts
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
failed_filename = f"{timestamp}_{file_path.name}"
|
|
failed_path = failed_client_dir / failed_filename
|
|
|
|
# Move file to failed folder
|
|
shutil.move(str(file_path), str(failed_path))
|
|
failed_path.chmod(0o777)
|
|
|
|
# Create error log file
|
|
error_log_path = failed_client_dir / f"{timestamp}_{file_path.stem}_ERROR.txt"
|
|
with open(error_log_path, 'w') as f:
|
|
f.write(f"File: {file_path.name}\n")
|
|
f.write(f"Error Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"Error: {error_msg}\n")
|
|
f.write(f"Client Folder: {client_folder}\n")
|
|
f.write(f"Original Path: {file_path}\n")
|
|
error_log_path.chmod(0o777)
|
|
|
|
self.logger.warning(f"❌ Moved failed file: {file_path.name} → {failed_path}")
|
|
self.daily_stats.record_failed_file_moved()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move failed file {file_path.name}: {e}")
|
|
|
|
def process_startup_files(self, files: List[Path]):
|
|
"""Process existing files found during startup scan"""
|
|
if not files:
|
|
self.logger.info("✅ No existing files to process")
|
|
return
|
|
|
|
self.logger.info(f"🚀 Processing {len(files)} existing files from startup scan...")
|
|
|
|
# Process in batches
|
|
batch_size = Config.BATCH_SIZE
|
|
for i in range(0, len(files), batch_size):
|
|
batch = files[i:i + batch_size]
|
|
self.logger.info(f"Processing startup batch {i//batch_size + 1}: {len(batch)} files")
|
|
self.process_batch(batch, is_startup=True)
|
|
|
|
self.logger.info(f"✅ Startup file processing complete: {len(files)} files processed")
|
|
|
|
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""Parse JSON file and extract key fields"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
|
|
|
|
extracted = {
|
|
"JobCategory": job_details.get("JobCategory", ""),
|
|
"StudioCode": job_details.get("StudioCode", ""),
|
|
"Title": job_details.get("Title", ""),
|
|
"ClientCode": job_details.get("ClientCode", "")
|
|
}
|
|
|
|
return extracted
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading {file_path.name}: {e}")
|
|
return None
|
|
|
|
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
|
|
"""Determine destination paths based on client code and job category"""
|
|
client_code = json_data.get("ClientCode", "").upper()
|
|
job_category = json_data.get("JobCategory", "").upper()
|
|
destinations = []
|
|
|
|
# Check for specific client routing by ClientCode
|
|
if client_code in Config.DESTINATIONS:
|
|
destinations.append(Config.DESTINATIONS[client_code])
|
|
|
|
# Check for Celtra by JobCategory (only for eligible clients)
|
|
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
|
|
celtra_dest = Config.DESTINATIONS["CELTRA"]
|
|
if celtra_dest not in destinations:
|
|
destinations.append(celtra_dest)
|
|
|
|
return destinations
|
|
|
|
def store_file(self, file_path: Path, folder_name: str) -> bool:
|
|
"""Store file to JSON_STORE with 777 permissions"""
|
|
try:
|
|
store_dir = Config.JSON_STORE / folder_name
|
|
store_dir.mkdir(parents=True, exist_ok=True)
|
|
store_dir.chmod(0o777)
|
|
|
|
destination = store_dir / file_path.name
|
|
shutil.copy2(file_path, destination)
|
|
destination.chmod(0o777)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to store {file_path.name}: {e}")
|
|
return False
|
|
|
|
def route_file(self, file_path: Path, destination: Path) -> bool:
|
|
"""Route file to specific destination with 777 permissions"""
|
|
try:
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
destination.chmod(0o777)
|
|
|
|
dest_file = destination / file_path.name
|
|
shutil.copy2(file_path, dest_file)
|
|
dest_file.chmod(0o777)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
|
|
return False
|
|
|
|
def process_single_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False) -> Dict[str, Any]:
|
|
"""Process a single JSON file with timing"""
|
|
start_time = time.time()
|
|
thread_name = threading.current_thread().name
|
|
|
|
# Create tag for logging
|
|
tags = []
|
|
if is_startup:
|
|
tags.append("STARTUP")
|
|
if is_periodic:
|
|
tags.append("PERIODIC")
|
|
tag_str = f"[{'/'.join(tags)}] " if tags else ""
|
|
|
|
try:
|
|
# Check if already processed recently (handles file versions)
|
|
if not is_startup and self.was_recently_processed(file_path):
|
|
self.logger.debug(f"[{thread_name}] {tag_str}Skipping recently processed: {file_path.name}")
|
|
return {'success': True, 'skipped': True}
|
|
|
|
# Determine folder name for logging
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
|
|
self.logger.info(f"[{thread_name}] {tag_str}Processing: {folder_name}/{file_path.name}")
|
|
|
|
if not is_startup and not is_periodic:
|
|
time.sleep(Config.WAIT_DELAY)
|
|
|
|
# Parse JSON
|
|
json_data = self.parse_json_file(file_path)
|
|
if json_data is None:
|
|
error_msg = "JSON parsing failed"
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
processing_time = time.time() - start_time
|
|
|
|
# Move failed file
|
|
self.move_failed_file(file_path, error_msg)
|
|
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, {}, [], processing_time, False, error_msg, is_startup, is_periodic
|
|
)
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
# folder_name already determined above for logging
|
|
|
|
# Store file
|
|
stored = self.store_file(file_path, folder_name)
|
|
|
|
# Route file
|
|
destinations = self.determine_routing(json_data, folder_name)
|
|
routed_count = 0
|
|
|
|
for destination in destinations:
|
|
if self.route_file(file_path, destination):
|
|
routed_count += 1
|
|
|
|
success = stored and (routed_count == len(destinations) if destinations else True)
|
|
processing_time = time.time() - start_time
|
|
|
|
# Record statistics
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, json_data, destinations, processing_time,
|
|
success, None if success else "Processing failed", is_startup, is_periodic
|
|
)
|
|
|
|
if success:
|
|
# Mark as processed and delete original
|
|
self.add_to_recently_processed(file_path)
|
|
file_path.unlink()
|
|
|
|
route_info = f" + {len(destinations)} routes" if destinations else ""
|
|
self.logger.info(f"[{thread_name}] {tag_str}✅ {folder_name}/{file_path.name}{route_info} ({processing_time:.2f}s)")
|
|
else:
|
|
# Move failed file
|
|
self.move_failed_file(file_path, "Processing failed")
|
|
|
|
return {'success': success}
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
|
|
# Move failed file
|
|
self.move_failed_file(file_path, str(e))
|
|
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, {}, [], processing_time, False, str(e), is_startup, is_periodic
|
|
)
|
|
|
|
self.logger.error(f"[{thread_name}] {tag_str}Error processing {folder_name}/{file_path.name}: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False):
|
|
"""Process a batch of files concurrently"""
|
|
if not file_paths:
|
|
return
|
|
|
|
batch_start = time.time()
|
|
tags = []
|
|
if is_startup:
|
|
tags.append("STARTUP")
|
|
if is_periodic:
|
|
tags.append("PERIODIC")
|
|
tag_str = f"[{'/'.join(tags)}] " if tags else ""
|
|
|
|
self.logger.info(f"{tag_str}Processing batch of {len(file_paths)} files")
|
|
|
|
with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor:
|
|
future_to_file = {
|
|
executor.submit(self.process_single_file, file_path, is_startup, is_periodic): file_path
|
|
for file_path in file_paths
|
|
}
|
|
|
|
for future in as_completed(future_to_file):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
file_path = future_to_file[future]
|
|
self.logger.error(f"{tag_str}Batch processing error for {file_path.name}: {e}")
|
|
|
|
batch_time = time.time() - batch_start
|
|
self.daily_stats.record_batch(len(file_paths))
|
|
self.logger.info(f"{tag_str}Batch completed: {len(file_paths)} files in {batch_time:.2f}s")
|
|
|
|
def queue_file(self, file_path: Path, is_periodic: bool = False):
|
|
"""Add file to processing queue"""
|
|
if self.startup_complete:
|
|
# Add flag to indicate if this is from periodic scan
|
|
self.file_queue.put((file_path, is_periodic))
|
|
|
|
def batch_processor_worker(self):
|
|
"""Background worker for batch processing"""
|
|
while True:
|
|
files_to_process = []
|
|
periodic_flags = []
|
|
batch_start_time = time.time()
|
|
|
|
while (len(files_to_process) < Config.BATCH_SIZE and
|
|
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
|
|
|
|
try:
|
|
file_data = self.file_queue.get(timeout=1.0)
|
|
if isinstance(file_data, tuple):
|
|
file_path, is_periodic = file_data
|
|
else:
|
|
file_path, is_periodic = file_data, False
|
|
|
|
files_to_process.append(file_path)
|
|
periodic_flags.append(is_periodic)
|
|
self.file_queue.task_done()
|
|
except:
|
|
if files_to_process:
|
|
break
|
|
continue
|
|
|
|
if files_to_process and self.startup_complete:
|
|
# Process batch - mark as periodic if any files are from periodic scan
|
|
is_periodic_batch = any(periodic_flags)
|
|
self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch)
|
|
|
|
def generate_daily_report(self):
|
|
"""Generate comprehensive daily report"""
|
|
try:
|
|
stats = self.daily_stats.get_stats_summary()
|
|
|
|
# Generate report content
|
|
report_content = self.format_daily_report(stats)
|
|
|
|
# Save to file
|
|
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
|
|
with open(report_file, 'w') as f:
|
|
f.write(report_content)
|
|
|
|
# Email report
|
|
self.email_daily_report(report_content, stats['date'], stats)
|
|
|
|
# Clean old reports
|
|
self.cleanup_old_reports()
|
|
|
|
# Reset stats for new day
|
|
self.daily_stats.reset_stats()
|
|
|
|
self.logger.info(f"Daily report generated: {report_file}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to generate daily report: {e}")
|
|
|
|
def format_daily_report(self, stats: Dict[str, Any]) -> str:
|
|
"""Format daily statistics into readable report"""
|
|
startup_info = f"Startup Files Processed: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else ""
|
|
periodic_info = f"Periodic Scan Files Found: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else ""
|
|
failed_info = f"Failed Files Moved: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else ""
|
|
|
|
scan_info = ""
|
|
if stats['periodic_scans_completed'] > 0:
|
|
scan_info = f"Periodic Scans: {stats['periodic_scans_completed']} completed, {stats['periodic_scans_skipped']} skipped\n"
|
|
scan_info += f"Average Scan Time: {stats['avg_scan_time']}s\n"
|
|
if stats['slow_scans'] > 0:
|
|
scan_info += f"Slow Scans: {stats['slow_scans']}\n"
|
|
|
|
report = f"""
|
|
JSON WORKFLOW DAILY REPORT - PROTECTED HYBRID VERSION
|
|
Date: {stats['date']}
|
|
Uptime: {stats['uptime']}
|
|
|
|
=== SUMMARY ===
|
|
Total Files Processed: {stats['total_processed']}
|
|
{startup_info}{periodic_info}{failed_info}{scan_info}Total Errors: {stats['total_errors']}
|
|
Success Rate: {stats['success_rate']:.1f}%
|
|
Total Batches: {stats['total_batches']}
|
|
Average Processing Time: {stats['avg_processing_time']}s
|
|
Average Batch Size: {stats['avg_batch_size']} files
|
|
|
|
=== CLIENT BREAKDOWN ===
|
|
"""
|
|
|
|
for client, client_data in sorted(stats['client_stats'].items()):
|
|
report += f"\n{client}:\n"
|
|
report += f" Processed: {client_data['processed']}\n"
|
|
report += f" Routed: {client_data['routed']}\n"
|
|
report += f" Celtra Projects: {client_data['celtra']}\n"
|
|
report += f" Errors: {client_data['errors']}\n"
|
|
|
|
if client_data['job_categories']:
|
|
report += f" Job Categories: {dict(client_data['job_categories'])}\n"
|
|
|
|
report += "\n=== DESTINATION BREAKDOWN ===\n"
|
|
for dest, count in sorted(stats['destination_stats'].items()):
|
|
report += f"{dest}: {count} files\n"
|
|
|
|
report += "\n=== HOURLY BREAKDOWN ===\n"
|
|
for hour in range(24):
|
|
count = stats['hourly_stats'].get(hour, 0)
|
|
if count > 0:
|
|
report += f"{hour:02d}:00 - {count} files\n"
|
|
|
|
if stats['error_details']:
|
|
report += "\n=== ERROR DETAILS ===\n"
|
|
for error in stats['error_details'][-10:]:
|
|
report += f"{error['time']} - {error['client']}: {error['error']}\n"
|
|
|
|
return report
|
|
|
|
def format_html_report(self, stats: Dict[str, Any]) -> str:
|
|
"""Format daily statistics into HTML report with Oliver branding"""
|
|
|
|
# Helper function for success rate color
|
|
def success_color(rate):
|
|
if rate >= 95: return '#10b981' # Green
|
|
if rate >= 80: return '#f59e0b' # Amber
|
|
return '#ef4444' # Red
|
|
|
|
# Helper function for stat badges
|
|
def stat_badge(value, label, color='#FFC407'):
|
|
return f'''<div style="background: #ffffff; padding: 20px; border-radius: 8px; text-align: center; border: 3px solid {color}; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
|
|
<div style="font-size: 32px; font-weight: bold; color: {color}; margin-bottom: 8px; font-family: Montserrat, sans-serif;">{value}</div>
|
|
<div style="color: #6b7280; font-size: 14px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px;">{label}</div>
|
|
</div>'''
|
|
|
|
# Build optional sections
|
|
startup_badge = stat_badge(stats['startup_files_processed'], 'Startup Files', '#8b5cf6') if stats['startup_files_processed'] > 0 else ''
|
|
periodic_badge = stat_badge(stats['periodic_files_found'], 'Periodic Scans', '#0ea5e9') if stats['periodic_files_found'] > 0 else ''
|
|
failed_badge = stat_badge(stats['failed_files_moved'], 'Failed Files', '#ef4444') if stats['failed_files_moved'] > 0 else ''
|
|
|
|
scan_section = ''
|
|
if stats['periodic_scans_completed'] > 0:
|
|
scan_section = f'''
|
|
<tr><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; font-weight: 600;">Periodic Scans Completed</td><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; text-align: right;">{stats['periodic_scans_completed']}</td></tr>
|
|
<tr><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; font-weight: 600;">Periodic Scans Skipped</td><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; text-align: right;">{stats['periodic_scans_skipped']}</td></tr>
|
|
<tr><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; font-weight: 600;">Average Scan Time</td><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; text-align: right;">{stats['avg_scan_time']}s</td></tr>
|
|
'''
|
|
if stats['slow_scans'] > 0:
|
|
scan_section += f'<tr><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; font-weight: 600;">Slow Scans</td><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; text-align: right; color: #f59e0b;">{stats["slow_scans"]}</td></tr>'
|
|
|
|
# Build client breakdown
|
|
client_rows = ''
|
|
for client, client_data in sorted(stats['client_stats'].items()):
|
|
job_cats = ', '.join([f"{k} ({v})" for k, v in client_data['job_categories'].items()]) if client_data['job_categories'] else 'None'
|
|
client_rows += f'''
|
|
<tr style="border-bottom: 1px solid #e5e7eb;">
|
|
<td style="padding: 12px; font-weight: 600; color: #FFC407;">{client}</td>
|
|
<td style="padding: 12px; text-align: center;">{client_data['processed']}</td>
|
|
<td style="padding: 12px; text-align: center;">{client_data['routed']}</td>
|
|
<td style="padding: 12px; text-align: center;">{client_data['celtra']}</td>
|
|
<td style="padding: 12px; text-align: center; color: {"#ef4444" if client_data["errors"] > 0 else "#10b981"};">{client_data['errors']}</td>
|
|
<td style="padding: 12px; font-size: 12px; color: #6b7280;">{job_cats}</td>
|
|
</tr>
|
|
'''
|
|
|
|
# Build destination breakdown
|
|
dest_rows = ''
|
|
for dest, count in sorted(stats['destination_stats'].items()):
|
|
dest_rows += f'<tr><td style="padding: 10px; border-bottom: 1px solid #e5e7eb;">{dest}</td><td style="padding: 10px; border-bottom: 1px solid #e5e7eb; text-align: right; font-weight: 600; color: #FFC407;">{count} files</td></tr>'
|
|
|
|
# Build hourly breakdown (bar chart effect)
|
|
hourly_rows = ''
|
|
max_hourly = max(stats['hourly_stats'].values()) if stats['hourly_stats'] else 1
|
|
for hour in range(24):
|
|
count = stats['hourly_stats'].get(hour, 0)
|
|
if count > 0:
|
|
bar_width = int((count / max_hourly) * 100)
|
|
hourly_rows += f'''
|
|
<tr>
|
|
<td style="padding: 8px; border-bottom: 1px solid #e5e7eb; font-family: monospace;">{hour:02d}:00</td>
|
|
<td style="padding: 8px; border-bottom: 1px solid #e5e7eb;">
|
|
<div style="background: linear-gradient(90deg, #FFC407 0%, #FFD84D 100%); height: 24px; width: {bar_width}%; border-radius: 4px; display: flex; align-items: center; padding: 0 8px; min-width: 40px;">
|
|
<span style="color: #000000; font-weight: 700; font-size: 12px;">{count}</span>
|
|
</div>
|
|
</td>
|
|
</tr>
|
|
'''
|
|
|
|
# Build error details
|
|
error_section = ''
|
|
if stats['error_details']:
|
|
error_rows = ''
|
|
for error in stats['error_details'][-10:]:
|
|
error_rows += f'<tr><td style="padding: 10px; border-bottom: 1px solid #e5e7eb; font-family: monospace; font-size: 12px;">{error["time"]}</td><td style="padding: 10px; border-bottom: 1px solid #e5e7eb; font-weight: 600;">{error["client"]}</td><td style="padding: 10px; border-bottom: 1px solid #e5e7eb; color: #6b7280; font-size: 12px;">{error["error"]}</td></tr>'
|
|
error_section = f'''
|
|
<div style="background: white; border-radius: 10px; padding: 25px; margin-top: 30px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
|
|
<h2 style="color: #dc2626; margin: 0 0 20px 0; font-size: 20px; border-bottom: 3px solid #ef4444; padding-bottom: 10px; font-family: Montserrat, sans-serif;">⚠️ Error Details (Last 10)</h2>
|
|
<table style="width: 100%; border-collapse: collapse;">
|
|
<thead>
|
|
<tr style="background: #fef2f2; border-bottom: 2px solid #dc2626;">
|
|
<th style="padding: 12px; text-align: left; font-weight: 600;">Time</th>
|
|
<th style="padding: 12px; text-align: left; font-weight: 600;">Client</th>
|
|
<th style="padding: 12px; text-align: left; font-weight: 600;">Error</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody>{error_rows}</tbody>
|
|
</table>
|
|
</div>
|
|
'''
|
|
|
|
# Main HTML template with Oliver branding
|
|
success_rate_color = success_color(stats['success_rate'])
|
|
|
|
html = f'''
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<link href="https://fonts.googleapis.com/css2?family=Montserrat:wght@400;600;700&display=swap" rel="stylesheet">
|
|
<title>JSON Workflow Daily Report</title>
|
|
</head>
|
|
<body style="margin: 0; padding: 0; font-family: Montserrat, -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif; background: #f3f4f6; color: #1f2937;">
|
|
<div style="max-width: 900px; margin: 0 auto; padding: 30px 20px;">
|
|
|
|
<!-- Header with Oliver Branding -->
|
|
<div style="background: #000000; border-radius: 15px; padding: 40px; text-align: center; color: #FFC407; margin-bottom: 30px; box-shadow: 0 4px 6px rgba(0,0,0,0.2); border: 3px solid #FFC407;">
|
|
<h1 style="margin: 0 0 10px 0; font-size: 36px; font-weight: 700; font-family: Montserrat, sans-serif; letter-spacing: 1px;">JSON WORKFLOW DAILY REPORT</h1>
|
|
<p style="margin: 0; font-size: 18px; color: #ffffff; font-weight: 400;">Protected Hybrid Version</p>
|
|
<div style="margin-top: 15px; font-size: 16px; color: #FFC407; font-weight: 600;">
|
|
<div>{stats['date']} • Uptime: {stats['uptime']}</div>
|
|
</div>
|
|
</div>
|
|
|
|
<!-- Key Metrics Grid -->
|
|
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; margin-bottom: 30px;">
|
|
{stat_badge(stats['total_processed'], 'Files Processed', '#10b981')}
|
|
{stat_badge(stats['total_errors'], 'Errors', '#ef4444')}
|
|
{stat_badge(f"{stats['success_rate']:.1f}%", 'Success Rate', success_rate_color)}
|
|
{stat_badge(stats['total_batches'], 'Batches', '#FFC407')}
|
|
{startup_badge}
|
|
{periodic_badge}
|
|
{failed_badge}
|
|
</div>
|
|
|
|
<!-- Performance Metrics -->
|
|
<div style="background: white; border-radius: 10px; padding: 25px; margin-bottom: 30px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); border-top: 4px solid #FFC407;">
|
|
<h2 style="color: #000000; margin: 0 0 20px 0; font-size: 22px; border-bottom: 3px solid #FFC407; padding-bottom: 10px; font-family: Montserrat, sans-serif; font-weight: 700;">📊 Performance Metrics</h2>
|
|
<table style="width: 100%; border-collapse: collapse;">
|
|
<tr><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; font-weight: 600;">Average Processing Time</td><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; text-align: right;">{stats['avg_processing_time']}s</td></tr>
|
|
<tr><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; font-weight: 600;">Average Batch Size</td><td style="padding: 12px; border-bottom: 1px solid #e5e7eb; text-align: right;">{stats['avg_batch_size']} files</td></tr>
|
|
{scan_section}
|
|
</table>
|
|
</div>
|
|
|
|
<!-- Client Breakdown -->
|
|
<div style="background: white; border-radius: 10px; padding: 25px; margin-bottom: 30px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); border-top: 4px solid #FFC407;">
|
|
<h2 style="color: #000000; margin: 0 0 20px 0; font-size: 22px; border-bottom: 3px solid #FFC407; padding-bottom: 10px; font-family: Montserrat, sans-serif; font-weight: 700;">👥 Client Breakdown</h2>
|
|
<div style="overflow-x: auto;">
|
|
<table style="width: 100%; border-collapse: collapse; min-width: 600px;">
|
|
<thead>
|
|
<tr style="background: #fffbeb; border-bottom: 3px solid #FFC407;">
|
|
<th style="padding: 12px; text-align: left; font-weight: 700; font-family: Montserrat, sans-serif;">Client</th>
|
|
<th style="padding: 12px; text-align: center; font-weight: 700; font-family: Montserrat, sans-serif;">Processed</th>
|
|
<th style="padding: 12px; text-align: center; font-weight: 700; font-family: Montserrat, sans-serif;">Routed</th>
|
|
<th style="padding: 12px; text-align: center; font-weight: 700; font-family: Montserrat, sans-serif;">Celtra</th>
|
|
<th style="padding: 12px; text-align: center; font-weight: 700; font-family: Montserrat, sans-serif;">Errors</th>
|
|
<th style="padding: 12px; text-align: left; font-weight: 700; font-family: Montserrat, sans-serif;">Job Categories</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody>{client_rows}</tbody>
|
|
</table>
|
|
</div>
|
|
</div>
|
|
|
|
<!-- Destination Breakdown -->
|
|
<div style="background: white; border-radius: 10px; padding: 25px; margin-bottom: 30px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); border-top: 4px solid #FFC407;">
|
|
<h2 style="color: #000000; margin: 0 0 20px 0; font-size: 22px; border-bottom: 3px solid #FFC407; padding-bottom: 10px; font-family: Montserrat, sans-serif; font-weight: 700;">📁 Destination Breakdown</h2>
|
|
<table style="width: 100%; border-collapse: collapse;">
|
|
{dest_rows}
|
|
</table>
|
|
</div>
|
|
|
|
<!-- Hourly Activity -->
|
|
<div style="background: white; border-radius: 10px; padding: 25px; margin-bottom: 30px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); border-top: 4px solid #FFC407;">
|
|
<h2 style="color: #000000; margin: 0 0 20px 0; font-size: 22px; border-bottom: 3px solid #FFC407; padding-bottom: 10px; font-family: Montserrat, sans-serif; font-weight: 700;">⏰ Hourly Activity</h2>
|
|
<table style="width: 100%; border-collapse: collapse;">
|
|
{hourly_rows}
|
|
</table>
|
|
</div>
|
|
|
|
{error_section}
|
|
|
|
<!-- Footer -->
|
|
<div style="text-align: center; margin-top: 40px; padding: 20px; color: #6b7280; font-size: 13px;">
|
|
<p style="margin: 0; font-weight: 600;">Generated by JSON Workflow Processor - Protected Hybrid Version</p>
|
|
<p style="margin: 5px 0 0 0; color: #FFC407; font-weight: 700; font-family: Montserrat, sans-serif;">TWIST Automation System • OLIVER</p>
|
|
</div>
|
|
|
|
</div>
|
|
</body>
|
|
</html>
|
|
'''
|
|
|
|
return html
|
|
|
|
def email_daily_report(self, report_content: str, report_date: str, stats: Dict[str, Any]):
|
|
"""Email the daily report with both plain text and HTML versions"""
|
|
try:
|
|
msg = MIMEMultipart('alternative')
|
|
msg['From'] = Config.SENDER_EMAIL
|
|
msg['To'] = ', '.join(Config.REPORT_EMAILS)
|
|
msg['Subject'] = f"JSON Workflow Daily Report - {report_date}"
|
|
|
|
# Generate HTML version
|
|
html_content = self.format_html_report(stats)
|
|
|
|
# Attach both plain text and HTML (plain first as fallback)
|
|
msg.attach(MIMEText(report_content, 'plain'))
|
|
msg.attach(MIMEText(html_content, 'html'))
|
|
|
|
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
|
|
server.starttls()
|
|
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
|
|
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
|
|
server.quit()
|
|
|
|
self.logger.info("Daily report emailed successfully (plain text + HTML)")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to email daily report: {e}")
|
|
|
|
def cleanup_old_reports(self):
|
|
"""Remove old report files"""
|
|
try:
|
|
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
|
|
|
|
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
|
|
try:
|
|
file_date_str = report_file.stem.split('_')[-1]
|
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
|
|
|
|
if file_date < cutoff_date:
|
|
report_file.unlink()
|
|
self.logger.debug(f"Deleted old report: {report_file}")
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to cleanup old reports: {e}")
|
|
|
|
def get_current_stats(self) -> str:
|
|
"""Get current statistics as formatted string"""
|
|
stats = self.daily_stats.get_stats_summary()
|
|
startup_info = f" (startup: {stats['startup_files_processed']})" if stats['startup_files_processed'] > 0 else ""
|
|
periodic_info = f" (periodic: {stats['periodic_files_found']})" if stats['periodic_files_found'] > 0 else ""
|
|
failed_info = f" (failed: {stats['failed_files_moved']})" if stats['failed_files_moved'] > 0 else ""
|
|
scan_info = f" (scans: {stats['periodic_scans_completed']}/{stats['periodic_scans_skipped']})" if stats['periodic_scans_completed'] > 0 or stats['periodic_scans_skipped'] > 0 else ""
|
|
return f"Today: {stats['total_processed']} processed{startup_info}{periodic_info}{failed_info}{scan_info}, {stats['total_errors']} errors, {stats['success_rate']:.1f}% success"
|
|
|
|
|
|
class ProtectedHybridFileHandler(FileSystemEventHandler):
|
|
"""File system event handler for protected hybrid monitoring"""
|
|
|
|
def __init__(self, processor: ProtectedHybridProcessor):
|
|
self.processor = processor
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory or not self.processor.startup_complete:
|
|
return
|
|
|
|
file_path = Path(event.src_path)
|
|
if file_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"📥 New file detected: {file_path.name}")
|
|
self.processor.queue_file(file_path, is_periodic=False)
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory or not self.processor.startup_complete:
|
|
return
|
|
|
|
dest_path = Path(event.dest_path)
|
|
if dest_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"📁 File moved to hot folder: {dest_path.name}")
|
|
self.processor.queue_file(dest_path, is_periodic=False)
|
|
|
|
|
|
def main():
|
|
"""Main entry point with protected hybrid monitoring"""
|
|
processor = ProtectedHybridProcessor()
|
|
|
|
# Step 1: Scan and process existing files
|
|
existing_files = processor.scan_existing_files()
|
|
processor.process_startup_files(existing_files)
|
|
|
|
# Step 2: Mark startup complete and begin real-time monitoring
|
|
processor.startup_complete = True
|
|
processor.logger.info("🎯 Startup scan complete - switching to protected hybrid monitoring")
|
|
|
|
# Start batch processing worker for real-time files
|
|
batch_worker = threading.Thread(
|
|
target=processor.batch_processor_worker,
|
|
name="BatchWorker",
|
|
daemon=True
|
|
)
|
|
batch_worker.start()
|
|
|
|
# Start protected periodic scanning thread
|
|
periodic_scanner = threading.Thread(
|
|
target=processor.periodic_scan,
|
|
name="PeriodicScanner",
|
|
daemon=True
|
|
)
|
|
periodic_scanner.start()
|
|
|
|
# Setup file monitoring for new files
|
|
event_handler = ProtectedHybridFileHandler(processor)
|
|
observer = Observer()
|
|
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
|
|
|
|
# Start real-time monitoring
|
|
observer.start()
|
|
processor.logger.info(f"🛡️ Protected hybrid monitoring active: Events + {Config.PERIODIC_SCAN_INTERVAL}s protected scans")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(60)
|
|
current_stats = processor.get_current_stats()
|
|
processor.logger.info(f"Stats: {current_stats}")
|
|
except KeyboardInterrupt:
|
|
processor.logger.info("Stopping Protected Hybrid JSON Workflow Processor...")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
processor.logger.info("Protected Hybrid JSON Workflow Processor stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |