json-parser-twist-metaserver/OLD/json_workflow_processor-reporting.py
Dave Porter af8acbd986 Add all project files including previous versions and documentation
- Added INSTALL_GUIDE.md and README.md documentation
- Added OLD/ folder with previous script versions for reference
- Added data/ folder with sample JSON test files
- Added older json_workflow_processor-hybrid-protected.py version
- Excludes venv and .DS_Store (per .gitignore)

Complete project backup with full history and test data.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-02-06 11:07:22 -05:00

604 lines
No EOL
23 KiB
Python

#!/usr/bin/env python3
"""
JSON Workflow Processor - REPORTING VERSION
Converted from TWIST workflow definition
High-performance version with comprehensive daily reporting.
Monitors dynamic client hot folders and generates detailed workflow statistics.
Features:
- Concurrent processing (5-10 files at once)
- Daily workflow reports with client breakdown
- Real-time statistics tracking
- Automatic report generation and email delivery
- Performance metrics and trending
- Error tracking and analysis
"""
import json
import logging
import smtplib
import shutil
import time
import threading
import schedule
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, date, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
from typing import Dict, Optional, Any, List
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings"""
# Paths
HOT_FOLDER = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON")
JSON_STORE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON_STORE/")
SYNC_BASE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/SYNC/MAKE")
REPORTS_DIR = Path("/PRODUCTION/JSON_PARSER_LOGS")
# Clients eligible for Celtra projects (based on JobCategory)
CELTRA_ELIGIBLE_CLIENTS = {
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
}
# Client destinations
DESTINATIONS = {
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
"RANK": SYNC_BASE / "Monday Rank"
}
# Email settings
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 587
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
ERROR_EMAIL = "daveporter@oliver.agency"
REPORT_EMAILS = ["daveporter@oliver.agency"] # Add more recipients as needed
# Batch processing settings
BATCH_SIZE = 10
MAX_WORKERS = 10
BATCH_TIMEOUT = 30
# Monitoring settings
POLL_INTERVAL = 2
WAIT_DELAY = 3
# Reporting settings
DAILY_REPORT_TIME = "00:00" # Generate daily reports at midnight
KEEP_REPORTS_DAYS = 30 # Keep report files for 30 days
class DailyStats:
"""Thread-safe daily statistics collector"""
def __init__(self):
self.lock = threading.Lock()
self.reset_stats()
self.start_time = datetime.now()
def reset_stats(self):
"""Reset daily statistics"""
with self.lock:
self.date = date.today()
self.total_processed = 0
self.total_errors = 0
self.total_batches = 0
# Client breakdown
self.client_stats = defaultdict(lambda: {
'processed': 0,
'routed': 0,
'celtra': 0,
'errors': 0,
'job_categories': Counter(),
'studio_codes': Counter()
})
# Destination breakdown
self.destination_stats = defaultdict(int)
# Performance metrics
self.processing_times = []
self.batch_sizes = []
# Error tracking
self.error_details = []
# Hourly breakdown
self.hourly_stats = defaultdict(int)
def record_file_processed(self, client: str, json_data: Dict[str, Any],
destinations: List[Path], processing_time: float,
success: bool = True, error: str = None):
"""Record a processed file"""
with self.lock:
hour = datetime.now().hour
self.hourly_stats[hour] += 1
if success:
self.total_processed += 1
self.client_stats[client]['processed'] += 1
# Track job categories and studio codes
job_category = json_data.get('JobCategory', 'Unknown')
studio_code = json_data.get('StudioCode', 'Unknown')
self.client_stats[client]['job_categories'][job_category] += 1
self.client_stats[client]['studio_codes'][studio_code] += 1
# Track routing
if destinations:
self.client_stats[client]['routed'] += 1
for dest in destinations:
self.destination_stats[dest.name] += 1
if 'Celtra' in dest.name:
self.client_stats[client]['celtra'] += 1
# Track performance
self.processing_times.append(processing_time)
else:
self.total_errors += 1
self.client_stats[client]['errors'] += 1
if error:
self.error_details.append({
'time': datetime.now().strftime('%H:%M:%S'),
'client': client,
'error': error
})
def record_batch(self, batch_size: int):
"""Record a completed batch"""
with self.lock:
self.total_batches += 1
self.batch_sizes.append(batch_size)
def get_stats_summary(self) -> Dict[str, Any]:
"""Get current statistics summary"""
with self.lock:
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
avg_batch_size = sum(self.batch_sizes) / len(self.batch_sizes) if self.batch_sizes else 0
return {
'date': self.date.strftime('%Y-%m-%d'),
'uptime': str(datetime.now() - self.start_time).split('.')[0],
'total_processed': self.total_processed,
'total_errors': self.total_errors,
'total_batches': self.total_batches,
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
'avg_processing_time': round(avg_processing_time, 2),
'avg_batch_size': round(avg_batch_size, 1),
'client_stats': dict(self.client_stats),
'destination_stats': dict(self.destination_stats),
'hourly_stats': dict(self.hourly_stats),
'error_details': self.error_details.copy()
}
class ReportingJSONProcessor:
"""JSON processor with comprehensive reporting capabilities"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
self.file_queue = Queue()
self.daily_stats = DailyStats()
self.setup_daily_reporting()
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s'
# Create log directory if needed
log_dir = Path("/PRODUCTION/JSON_PARSER_LOGS")
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(log_dir / 'json_workflow_reporting.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info("Reporting JSON Workflow Processor initialized")
def ensure_directories(self):
"""Create necessary directories"""
directories = [
Config.HOT_FOLDER,
Config.JSON_STORE,
Config.SYNC_BASE,
Config.REPORTS_DIR
] + list(Config.DESTINATIONS.values())
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
def setup_daily_reporting(self):
"""Setup scheduled daily reporting"""
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
# Start scheduler thread
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
scheduler_thread.start()
self.logger.info(f"Daily reports scheduled for {Config.DAILY_REPORT_TIME}")
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""Parse JSON file and extract key fields"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
extracted = {
"JobCategory": job_details.get("JobCategory", ""),
"StudioCode": job_details.get("StudioCode", ""),
"Title": job_details.get("Title", ""),
"ClientCode": job_details.get("ClientCode", "")
}
return extracted
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
return None
except Exception as e:
self.logger.error(f"Error reading {file_path.name}: {e}")
return None
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
"""Determine destination paths based on client code and job category"""
client_code = json_data.get("ClientCode", "").upper()
job_category = json_data.get("JobCategory", "").upper()
destinations = []
# Check for specific client routing by ClientCode
if client_code in Config.DESTINATIONS:
destinations.append(Config.DESTINATIONS[client_code])
# Check for Celtra by JobCategory (only for eligible clients)
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
celtra_dest = Config.DESTINATIONS["CELTRA"]
if celtra_dest not in destinations:
destinations.append(celtra_dest)
return destinations
def store_file(self, file_path: Path, folder_name: str) -> bool:
"""Store file to JSON_STORE"""
try:
store_dir = Config.JSON_STORE / folder_name
store_dir.mkdir(parents=True, exist_ok=True)
destination = store_dir / file_path.name
shutil.copy2(file_path, destination)
return True
except Exception as e:
self.logger.error(f"Failed to store {file_path.name}: {e}")
return False
def route_file(self, file_path: Path, destination: Path) -> bool:
"""Route file to specific destination"""
try:
destination.mkdir(parents=True, exist_ok=True)
dest_file = destination / file_path.name
shutil.copy2(file_path, dest_file)
return True
except Exception as e:
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
return False
def process_single_file(self, file_path: Path) -> Dict[str, Any]:
"""Process a single JSON file with timing"""
start_time = time.time()
thread_name = threading.current_thread().name
try:
self.logger.info(f"[{thread_name}] Processing: {file_path.name}")
time.sleep(Config.WAIT_DELAY)
# Parse JSON
json_data = self.parse_json_file(file_path)
if json_data is None:
error_msg = "JSON parsing failed"
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
processing_time = time.time() - start_time
self.daily_stats.record_file_processed(
folder_name, {}, [], processing_time, False, error_msg
)
return {'success': False, 'error': error_msg}
# Determine folder name
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
# Store file
stored = self.store_file(file_path, folder_name)
# Route file
destinations = self.determine_routing(json_data, folder_name)
routed_count = 0
for destination in destinations:
if self.route_file(file_path, destination):
routed_count += 1
success = stored and (routed_count == len(destinations) if destinations else True)
processing_time = time.time() - start_time
# Record statistics
self.daily_stats.record_file_processed(
folder_name, json_data, destinations, processing_time,
success, None if success else "Processing failed"
)
if success:
file_path.unlink()
self.logger.info(f"[{thread_name}] Successfully processed {file_path.name} in {processing_time:.2f}s")
return {'success': success}
except Exception as e:
processing_time = time.time() - start_time
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
self.daily_stats.record_file_processed(
folder_name, {}, [], processing_time, False, str(e)
)
self.logger.error(f"[{thread_name}] Error processing {file_path.name}: {e}")
return {'success': False, 'error': str(e)}
def process_batch(self, file_paths: List[Path]):
"""Process a batch of files concurrently"""
if not file_paths:
return
batch_start = time.time()
self.logger.info(f"Processing batch of {len(file_paths)} files")
with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor:
future_to_file = {
executor.submit(self.process_single_file, file_path): file_path
for file_path in file_paths
}
for future in as_completed(future_to_file):
try:
future.result()
except Exception as e:
file_path = future_to_file[future]
self.logger.error(f"Batch processing error for {file_path.name}: {e}")
batch_time = time.time() - batch_start
self.daily_stats.record_batch(len(file_paths))
self.logger.info(f"Batch completed: {len(file_paths)} files in {batch_time:.2f}s")
def queue_file(self, file_path: Path):
"""Add file to processing queue"""
self.file_queue.put(file_path)
def batch_processor_worker(self):
"""Background worker for batch processing"""
while True:
files_to_process = []
batch_start_time = time.time()
while (len(files_to_process) < Config.BATCH_SIZE and
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
try:
file_path = self.file_queue.get(timeout=1.0)
files_to_process.append(file_path)
self.file_queue.task_done()
except:
if files_to_process:
break
continue
if files_to_process:
self.process_batch(files_to_process)
def generate_daily_report(self):
"""Generate comprehensive daily report"""
try:
stats = self.daily_stats.get_stats_summary()
# Generate report content
report_content = self.format_daily_report(stats)
# Save to file
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
with open(report_file, 'w') as f:
f.write(report_content)
# Email report
self.email_daily_report(report_content, stats['date'])
# Clean old reports
self.cleanup_old_reports()
# Reset stats for new day
self.daily_stats.reset_stats()
self.logger.info(f"Daily report generated: {report_file}")
except Exception as e:
self.logger.error(f"Failed to generate daily report: {e}")
def format_daily_report(self, stats: Dict[str, Any]) -> str:
"""Format daily statistics into readable report"""
report = f"""
JSON WORKFLOW DAILY REPORT
Date: {stats['date']}
Uptime: {stats['uptime']}
=== SUMMARY ===
Total Files Processed: {stats['total_processed']}
Total Errors: {stats['total_errors']}
Success Rate: {stats['success_rate']:.1f}%
Total Batches: {stats['total_batches']}
Average Processing Time: {stats['avg_processing_time']}s
Average Batch Size: {stats['avg_batch_size']} files
=== CLIENT BREAKDOWN ===
"""
for client, client_data in sorted(stats['client_stats'].items()):
report += f"\n{client}:\n"
report += f" Processed: {client_data['processed']}\n"
report += f" Routed: {client_data['routed']}\n"
report += f" Celtra Projects: {client_data['celtra']}\n"
report += f" Errors: {client_data['errors']}\n"
if client_data['job_categories']:
report += f" Job Categories: {dict(client_data['job_categories'])}\n"
if client_data['studio_codes']:
report += f" Studio Codes: {dict(client_data['studio_codes'])}\n"
report += "\n=== DESTINATION BREAKDOWN ===\n"
for dest, count in sorted(stats['destination_stats'].items()):
report += f"{dest}: {count} files\n"
report += "\n=== HOURLY BREAKDOWN ===\n"
for hour in range(24):
count = stats['hourly_stats'].get(hour, 0)
if count > 0:
report += f"{hour:02d}:00 - {count} files\n"
if stats['error_details']:
report += "\n=== ERROR DETAILS ===\n"
for error in stats['error_details'][-10:]: # Show last 10 errors
report += f"{error['time']} - {error['client']}: {error['error']}\n"
return report
def email_daily_report(self, report_content: str, report_date: str):
"""Email the daily report"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = ', '.join(Config.REPORT_EMAILS)
msg['Subject'] = f"JSON Workflow Daily Report - {report_date}"
msg.attach(MIMEText(report_content, 'plain'))
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
server.quit()
self.logger.info("Daily report emailed successfully")
except Exception as e:
self.logger.error(f"Failed to email daily report: {e}")
def cleanup_old_reports(self):
"""Remove old report files"""
try:
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
try:
file_date_str = report_file.stem.split('_')[-1]
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
if file_date < cutoff_date:
report_file.unlink()
self.logger.debug(f"Deleted old report: {report_file}")
except Exception as e:
self.logger.warning(f"Could not process report file {report_file}: {e}")
except Exception as e:
self.logger.error(f"Failed to cleanup old reports: {e}")
def get_current_stats(self) -> str:
"""Get current statistics as formatted string"""
stats = self.daily_stats.get_stats_summary()
return f"Today: {stats['total_processed']} processed, {stats['total_errors']} errors, {stats['success_rate']:.1f}% success"
class ReportingFileHandler(FileSystemEventHandler):
"""File system event handler with reporting"""
def __init__(self, processor: ReportingJSONProcessor):
self.processor = processor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
if event.is_directory:
return
file_path = Path(event.src_path)
if file_path.suffix.lower() == '.json':
self.processor.queue_file(file_path)
def on_moved(self, event):
if event.is_directory:
return
dest_path = Path(event.dest_path)
if dest_path.suffix.lower() == '.json':
self.processor.queue_file(dest_path)
def main():
"""Main entry point"""
processor = ReportingJSONProcessor()
# Start batch processing worker
batch_worker = threading.Thread(
target=processor.batch_processor_worker,
name="BatchWorker",
daemon=True
)
batch_worker.start()
# Setup file monitoring
event_handler = ReportingFileHandler(processor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
# Start monitoring
observer.start()
processor.logger.info(f"Started reporting workflow processor on {Config.HOT_FOLDER}")
try:
while True:
time.sleep(60) # Log stats every minute
current_stats = processor.get_current_stats()
processor.logger.info(f"Stats: {current_stats}")
except KeyboardInterrupt:
processor.logger.info("Stopping Reporting JSON Workflow Processor...")
observer.stop()
observer.join()
processor.logger.info("Reporting JSON Workflow Processor stopped")
if __name__ == "__main__":
main()