- Added INSTALL_GUIDE.md and README.md documentation - Added OLD/ folder with previous script versions for reference - Added data/ folder with sample JSON test files - Added older json_workflow_processor-hybrid-protected.py version - Excludes venv and .DS_Store (per .gitignore) Complete project backup with full history and test data. Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
604 lines
No EOL
23 KiB
Python
604 lines
No EOL
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
JSON Workflow Processor - REPORTING VERSION
|
|
Converted from TWIST workflow definition
|
|
|
|
High-performance version with comprehensive daily reporting.
|
|
Monitors dynamic client hot folders and generates detailed workflow statistics.
|
|
|
|
Features:
|
|
- Concurrent processing (5-10 files at once)
|
|
- Daily workflow reports with client breakdown
|
|
- Real-time statistics tracking
|
|
- Automatic report generation and email delivery
|
|
- Performance metrics and trending
|
|
- Error tracking and analysis
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
import shutil
|
|
import time
|
|
import threading
|
|
import schedule
|
|
from collections import defaultdict, Counter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, date, timedelta
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any, List
|
|
from queue import Queue
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
|
|
class Config:
|
|
"""Configuration settings"""
|
|
|
|
# Paths
|
|
HOT_FOLDER = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON")
|
|
JSON_STORE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON_STORE/")
|
|
SYNC_BASE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/SYNC/MAKE")
|
|
REPORTS_DIR = Path("/PRODUCTION/JSON_PARSER_LOGS")
|
|
|
|
# Clients eligible for Celtra projects (based on JobCategory)
|
|
CELTRA_ELIGIBLE_CLIENTS = {
|
|
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
|
|
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
|
|
}
|
|
|
|
# Client destinations
|
|
DESTINATIONS = {
|
|
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
|
|
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
|
|
"RANK": SYNC_BASE / "Monday Rank"
|
|
}
|
|
|
|
# Email settings
|
|
SMTP_SERVER = "smtp.mailgun.org"
|
|
SMTP_PORT = 587
|
|
SMTP_USER = "twist@mail.dev.oliver.solutions"
|
|
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
|
|
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
|
|
ERROR_EMAIL = "daveporter@oliver.agency"
|
|
REPORT_EMAILS = ["daveporter@oliver.agency"] # Add more recipients as needed
|
|
|
|
# Batch processing settings
|
|
BATCH_SIZE = 10
|
|
MAX_WORKERS = 10
|
|
BATCH_TIMEOUT = 30
|
|
|
|
# Monitoring settings
|
|
POLL_INTERVAL = 2
|
|
WAIT_DELAY = 3
|
|
|
|
# Reporting settings
|
|
DAILY_REPORT_TIME = "00:00" # Generate daily reports at midnight
|
|
KEEP_REPORTS_DAYS = 30 # Keep report files for 30 days
|
|
|
|
|
|
class DailyStats:
|
|
"""Thread-safe daily statistics collector"""
|
|
|
|
def __init__(self):
|
|
self.lock = threading.Lock()
|
|
self.reset_stats()
|
|
self.start_time = datetime.now()
|
|
|
|
def reset_stats(self):
|
|
"""Reset daily statistics"""
|
|
with self.lock:
|
|
self.date = date.today()
|
|
self.total_processed = 0
|
|
self.total_errors = 0
|
|
self.total_batches = 0
|
|
|
|
# Client breakdown
|
|
self.client_stats = defaultdict(lambda: {
|
|
'processed': 0,
|
|
'routed': 0,
|
|
'celtra': 0,
|
|
'errors': 0,
|
|
'job_categories': Counter(),
|
|
'studio_codes': Counter()
|
|
})
|
|
|
|
# Destination breakdown
|
|
self.destination_stats = defaultdict(int)
|
|
|
|
# Performance metrics
|
|
self.processing_times = []
|
|
self.batch_sizes = []
|
|
|
|
# Error tracking
|
|
self.error_details = []
|
|
|
|
# Hourly breakdown
|
|
self.hourly_stats = defaultdict(int)
|
|
|
|
def record_file_processed(self, client: str, json_data: Dict[str, Any],
|
|
destinations: List[Path], processing_time: float,
|
|
success: bool = True, error: str = None):
|
|
"""Record a processed file"""
|
|
with self.lock:
|
|
hour = datetime.now().hour
|
|
self.hourly_stats[hour] += 1
|
|
|
|
if success:
|
|
self.total_processed += 1
|
|
self.client_stats[client]['processed'] += 1
|
|
|
|
# Track job categories and studio codes
|
|
job_category = json_data.get('JobCategory', 'Unknown')
|
|
studio_code = json_data.get('StudioCode', 'Unknown')
|
|
self.client_stats[client]['job_categories'][job_category] += 1
|
|
self.client_stats[client]['studio_codes'][studio_code] += 1
|
|
|
|
# Track routing
|
|
if destinations:
|
|
self.client_stats[client]['routed'] += 1
|
|
for dest in destinations:
|
|
self.destination_stats[dest.name] += 1
|
|
if 'Celtra' in dest.name:
|
|
self.client_stats[client]['celtra'] += 1
|
|
|
|
# Track performance
|
|
self.processing_times.append(processing_time)
|
|
else:
|
|
self.total_errors += 1
|
|
self.client_stats[client]['errors'] += 1
|
|
if error:
|
|
self.error_details.append({
|
|
'time': datetime.now().strftime('%H:%M:%S'),
|
|
'client': client,
|
|
'error': error
|
|
})
|
|
|
|
def record_batch(self, batch_size: int):
|
|
"""Record a completed batch"""
|
|
with self.lock:
|
|
self.total_batches += 1
|
|
self.batch_sizes.append(batch_size)
|
|
|
|
def get_stats_summary(self) -> Dict[str, Any]:
|
|
"""Get current statistics summary"""
|
|
with self.lock:
|
|
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
|
|
avg_batch_size = sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0
|
|
|
|
return {
|
|
'date': self.date.strftime('%Y-%m-%d'),
|
|
'uptime': str(datetime.now() - self.start_time).split('.')[0],
|
|
'total_processed': self.total_processed,
|
|
'total_errors': self.total_errors,
|
|
'total_batches': self.total_batches,
|
|
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
|
|
'avg_processing_time': round(avg_processing_time, 2),
|
|
'avg_batch_size': round(avg_batch_size, 1),
|
|
'client_stats': dict(self.client_stats),
|
|
'destination_stats': dict(self.destination_stats),
|
|
'hourly_stats': dict(self.hourly_stats),
|
|
'error_details': self.error_details.copy()
|
|
}
|
|
|
|
|
|
class ReportingJSONProcessor:
|
|
"""JSON processor with comprehensive reporting capabilities"""
|
|
|
|
def __init__(self):
|
|
self.setup_logging()
|
|
self.ensure_directories()
|
|
self.file_queue = Queue()
|
|
self.daily_stats = DailyStats()
|
|
self.setup_daily_reporting()
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s'
|
|
# Create log directory if needed
|
|
log_dir = Path("/PRODUCTION/JSON_PARSER_LOGS")
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=log_format,
|
|
handlers=[
|
|
logging.FileHandler(log_dir / 'json_workflow_reporting.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info("Reporting JSON Workflow Processor initialized")
|
|
|
|
def ensure_directories(self):
|
|
"""Create necessary directories"""
|
|
directories = [
|
|
Config.HOT_FOLDER,
|
|
Config.JSON_STORE,
|
|
Config.SYNC_BASE,
|
|
Config.REPORTS_DIR
|
|
] + list(Config.DESTINATIONS.values())
|
|
|
|
for directory in directories:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
def setup_daily_reporting(self):
|
|
"""Setup scheduled daily reporting"""
|
|
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
|
|
|
|
# Start scheduler thread
|
|
def run_scheduler():
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(60) # Check every minute
|
|
|
|
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
|
|
scheduler_thread.start()
|
|
|
|
self.logger.info(f"Daily reports scheduled for {Config.DAILY_REPORT_TIME}")
|
|
|
|
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""Parse JSON file and extract key fields"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
|
|
|
|
extracted = {
|
|
"JobCategory": job_details.get("JobCategory", ""),
|
|
"StudioCode": job_details.get("StudioCode", ""),
|
|
"Title": job_details.get("Title", ""),
|
|
"ClientCode": job_details.get("ClientCode", "")
|
|
}
|
|
|
|
return extracted
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading {file_path.name}: {e}")
|
|
return None
|
|
|
|
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
|
|
"""Determine destination paths based on client code and job category"""
|
|
client_code = json_data.get("ClientCode", "").upper()
|
|
job_category = json_data.get("JobCategory", "").upper()
|
|
destinations = []
|
|
|
|
# Check for specific client routing by ClientCode
|
|
if client_code in Config.DESTINATIONS:
|
|
destinations.append(Config.DESTINATIONS[client_code])
|
|
|
|
# Check for Celtra by JobCategory (only for eligible clients)
|
|
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
|
|
celtra_dest = Config.DESTINATIONS["CELTRA"]
|
|
if celtra_dest not in destinations:
|
|
destinations.append(celtra_dest)
|
|
|
|
return destinations
|
|
|
|
def store_file(self, file_path: Path, folder_name: str) -> bool:
|
|
"""Store file to JSON_STORE"""
|
|
try:
|
|
store_dir = Config.JSON_STORE / folder_name
|
|
store_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
destination = store_dir / file_path.name
|
|
shutil.copy2(file_path, destination)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to store {file_path.name}: {e}")
|
|
return False
|
|
|
|
def route_file(self, file_path: Path, destination: Path) -> bool:
|
|
"""Route file to specific destination"""
|
|
try:
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
dest_file = destination / file_path.name
|
|
shutil.copy2(file_path, dest_file)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
|
|
return False
|
|
|
|
def process_single_file(self, file_path: Path) -> Dict[str, Any]:
|
|
"""Process a single JSON file with timing"""
|
|
start_time = time.time()
|
|
thread_name = threading.current_thread().name
|
|
|
|
try:
|
|
self.logger.info(f"[{thread_name}] Processing: {file_path.name}")
|
|
|
|
time.sleep(Config.WAIT_DELAY)
|
|
|
|
# Parse JSON
|
|
json_data = self.parse_json_file(file_path)
|
|
if json_data is None:
|
|
error_msg = "JSON parsing failed"
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
processing_time = time.time() - start_time
|
|
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, {}, [], processing_time, False, error_msg
|
|
)
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
# Determine folder name
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
|
|
# Store file
|
|
stored = self.store_file(file_path, folder_name)
|
|
|
|
# Route file
|
|
destinations = self.determine_routing(json_data, folder_name)
|
|
routed_count = 0
|
|
|
|
for destination in destinations:
|
|
if self.route_file(file_path, destination):
|
|
routed_count += 1
|
|
|
|
success = stored and (routed_count == len(destinations) if destinations else True)
|
|
processing_time = time.time() - start_time
|
|
|
|
# Record statistics
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, json_data, destinations, processing_time,
|
|
success, None if success else "Processing failed"
|
|
)
|
|
|
|
if success:
|
|
file_path.unlink()
|
|
self.logger.info(f"[{thread_name}] Successfully processed {file_path.name} in {processing_time:.2f}s")
|
|
|
|
return {'success': success}
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, {}, [], processing_time, False, str(e)
|
|
)
|
|
|
|
self.logger.error(f"[{thread_name}] Error processing {file_path.name}: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def process_batch(self, file_paths: List[Path]):
|
|
"""Process a batch of files concurrently"""
|
|
if not file_paths:
|
|
return
|
|
|
|
batch_start = time.time()
|
|
self.logger.info(f"Processing batch of {len(file_paths)} files")
|
|
|
|
with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor:
|
|
future_to_file = {
|
|
executor.submit(self.process_single_file, file_path): file_path
|
|
for file_path in file_paths
|
|
}
|
|
|
|
for future in as_completed(future_to_file):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
file_path = future_to_file[future]
|
|
self.logger.error(f"Batch processing error for {file_path.name}: {e}")
|
|
|
|
batch_time = time.time() - batch_start
|
|
self.daily_stats.record_batch(len(file_paths))
|
|
self.logger.info(f"Batch completed: {len(file_paths)} files in {batch_time:.2f}s")
|
|
|
|
def queue_file(self, file_path: Path):
|
|
"""Add file to processing queue"""
|
|
self.file_queue.put(file_path)
|
|
|
|
def batch_processor_worker(self):
|
|
"""Background worker for batch processing"""
|
|
while True:
|
|
files_to_process = []
|
|
batch_start_time = time.time()
|
|
|
|
while (len(files_to_process) < Config.BATCH_SIZE and
|
|
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
|
|
|
|
try:
|
|
file_path = self.file_queue.get(timeout=1.0)
|
|
files_to_process.append(file_path)
|
|
self.file_queue.task_done()
|
|
except:
|
|
if files_to_process:
|
|
break
|
|
continue
|
|
|
|
if files_to_process:
|
|
self.process_batch(files_to_process)
|
|
|
|
def generate_daily_report(self):
|
|
"""Generate comprehensive daily report"""
|
|
try:
|
|
stats = self.daily_stats.get_stats_summary()
|
|
|
|
# Generate report content
|
|
report_content = self.format_daily_report(stats)
|
|
|
|
# Save to file
|
|
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
|
|
with open(report_file, 'w') as f:
|
|
f.write(report_content)
|
|
|
|
# Email report
|
|
self.email_daily_report(report_content, stats['date'])
|
|
|
|
# Clean old reports
|
|
self.cleanup_old_reports()
|
|
|
|
# Reset stats for new day
|
|
self.daily_stats.reset_stats()
|
|
|
|
self.logger.info(f"Daily report generated: {report_file}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to generate daily report: {e}")
|
|
|
|
def format_daily_report(self, stats: Dict[str, Any]) -> str:
|
|
"""Format daily statistics into readable report"""
|
|
report = f"""
|
|
JSON WORKFLOW DAILY REPORT
|
|
Date: {stats['date']}
|
|
Uptime: {stats['uptime']}
|
|
|
|
=== SUMMARY ===
|
|
Total Files Processed: {stats['total_processed']}
|
|
Total Errors: {stats['total_errors']}
|
|
Success Rate: {stats['success_rate']:.1f}%
|
|
Total Batches: {stats['total_batches']}
|
|
Average Processing Time: {stats['avg_processing_time']}s
|
|
Average Batch Size: {stats['avg_batch_size']} files
|
|
|
|
=== CLIENT BREAKDOWN ===
|
|
"""
|
|
|
|
for client, client_data in sorted(stats['client_stats'].items()):
|
|
report += f"\n{client}:\n"
|
|
report += f" Processed: {client_data['processed']}\n"
|
|
report += f" Routed: {client_data['routed']}\n"
|
|
report += f" Celtra Projects: {client_data['celtra']}\n"
|
|
report += f" Errors: {client_data['errors']}\n"
|
|
|
|
if client_data['job_categories']:
|
|
report += f" Job Categories: {dict(client_data['job_categories'])}\n"
|
|
if client_data['studio_codes']:
|
|
report += f" Studio Codes: {dict(client_data['studio_codes'])}\n"
|
|
|
|
report += "\n=== DESTINATION BREAKDOWN ===\n"
|
|
for dest, count in sorted(stats['destination_stats'].items()):
|
|
report += f"{dest}: {count} files\n"
|
|
|
|
report += "\n=== HOURLY BREAKDOWN ===\n"
|
|
for hour in range(24):
|
|
count = stats['hourly_stats'].get(hour, 0)
|
|
if count > 0:
|
|
report += f"{hour:02d}:00 - {count} files\n"
|
|
|
|
if stats['error_details']:
|
|
report += "\n=== ERROR DETAILS ===\n"
|
|
for error in stats['error_details'][-10:]: # Show last 10 errors
|
|
report += f"{error['time']} - {error['client']}: {error['error']}\n"
|
|
|
|
return report
|
|
|
|
def email_daily_report(self, report_content: str, report_date: str):
|
|
"""Email the daily report"""
|
|
try:
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.SENDER_EMAIL
|
|
msg['To'] = ', '.join(Config.REPORT_EMAILS)
|
|
msg['Subject'] = f"JSON Workflow Daily Report - {report_date}"
|
|
|
|
msg.attach(MIMEText(report_content, 'plain'))
|
|
|
|
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
|
|
server.starttls()
|
|
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
|
|
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
|
|
server.quit()
|
|
|
|
self.logger.info("Daily report emailed successfully")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to email daily report: {e}")
|
|
|
|
def cleanup_old_reports(self):
|
|
"""Remove old report files"""
|
|
try:
|
|
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
|
|
|
|
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
|
|
try:
|
|
file_date_str = report_file.stem.split('_')[-1]
|
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
|
|
|
|
if file_date < cutoff_date:
|
|
report_file.unlink()
|
|
self.logger.debug(f"Deleted old report: {report_file}")
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not process report file {report_file}: {e}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to cleanup old reports: {e}")
|
|
|
|
def get_current_stats(self) -> str:
|
|
"""Get current statistics as formatted string"""
|
|
stats = self.daily_stats.get_stats_summary()
|
|
return f"Today: {stats['total_processed']} processed, {stats['total_errors']} errors, {stats['success_rate']:.1f}% success"
|
|
|
|
|
|
class ReportingFileHandler(FileSystemEventHandler):
|
|
"""File system event handler with reporting"""
|
|
|
|
def __init__(self, processor: ReportingJSONProcessor):
|
|
self.processor = processor
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
return
|
|
|
|
file_path = Path(event.src_path)
|
|
if file_path.suffix.lower() == '.json':
|
|
self.processor.queue_file(file_path)
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory:
|
|
return
|
|
|
|
dest_path = Path(event.dest_path)
|
|
if dest_path.suffix.lower() == '.json':
|
|
self.processor.queue_file(dest_path)
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
processor = ReportingJSONProcessor()
|
|
|
|
# Start batch processing worker
|
|
batch_worker = threading.Thread(
|
|
target=processor.batch_processor_worker,
|
|
name="BatchWorker",
|
|
daemon=True
|
|
)
|
|
batch_worker.start()
|
|
|
|
# Setup file monitoring
|
|
event_handler = ReportingFileHandler(processor)
|
|
observer = Observer()
|
|
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
|
|
|
|
# Start monitoring
|
|
observer.start()
|
|
processor.logger.info(f"Started reporting workflow processor on {Config.HOT_FOLDER}")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(60) # Log stats every minute
|
|
current_stats = processor.get_current_stats()
|
|
processor.logger.info(f"Stats: {current_stats}")
|
|
except KeyboardInterrupt:
|
|
processor.logger.info("Stopping Reporting JSON Workflow Processor...")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
processor.logger.info("Reporting JSON Workflow Processor stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |