json-parser-twist-metaserver/OLD/json_workflow_processor-batch.py
Dave Porter af8acbd986 Add all project files including previous versions and documentation
- Added INSTALL_GUIDE.md and README.md documentation
- Added OLD/ folder with previous script versions for reference
- Added data/ folder with sample JSON test files
- Added older json_workflow_processor-hybrid-protected.py version
- Excludes venv and .DS_Store (per .gitignore)

Complete project backup with full history and test data.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-02-06 11:07:22 -05:00

459 lines
No EOL
18 KiB
Python

#!/usr/bin/env python3
"""
JSON Workflow Processor - BATCH VERSION
Converted from TWIST workflow definition
High-performance version that processes multiple JSON files concurrently.
Monitors dynamic client hot folders and routes them based on client codes and job categories.
Features:
- Concurrent processing (5-10 files at once)
- Dynamic hot folder monitoring (hundreds of client folders)
- JSON validation and parsing
- Smart routing for special clients
- File processing with automatic cleanup
- Error notifications
- Activity logging
"""
import json
import logging
import smtplib
import shutil
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
from typing import Dict, Optional, Any, List
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings"""
# Paths
HOT_FOLDER = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON")
JSON_STORE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON_STORE/")
SYNC_BASE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/SYNC/MAKE")
# Clients eligible for Celtra projects (based on JobCategory)
# Only these clients can have JobCategory="Celtra" - add others as needed
CELTRA_ELIGIBLE_CLIENTS = {
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
}
# Client destinations
DESTINATIONS = {
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
"RANK": SYNC_BASE / "Monday Rank"
}
# Email settings (from original TWIST config)
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 25
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "A604D75B0CAADB20137B85299A433588839AC735F59FB3010E71F132F887CABBEB2A1ADE627ADCD608445CB058AB40B10361D0E881D7CDA4"
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
ERROR_EMAIL = "daveporter@oliver.agency"
# Batch processing settings
BATCH_SIZE = 10 # Process up to 10 files concurrently
MAX_WORKERS = 10 # Maximum thread pool size
BATCH_TIMEOUT = 30 # Seconds to wait for batch to fill before processing
# Monitoring settings
POLL_INTERVAL = 2 # seconds (faster polling for batch mode)
WAIT_DELAY = 3 # seconds to wait for file stability (reduced for speed)
class BatchJSONProcessor:
"""Batch JSON file processor with concurrent processing"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
self.file_queue = Queue()
self.processing_lock = threading.Lock()
self.stats = {
'processed': 0,
'routed': 0,
'errors': 0,
'batches': 0
}
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s'
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler('json_workflow_batch.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info("Batch JSON Workflow Processor initialized")
def ensure_directories(self):
"""Create necessary directories if they don't exist"""
directories = [
Config.HOT_FOLDER,
Config.JSON_STORE,
Config.SYNC_BASE
] + list(Config.DESTINATIONS.values())
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
self.logger.debug(f"Ensured directory exists: {directory}")
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""Parse JSON file and extract key fields"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extract fields from JobSpecification.JobDetails
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
extracted = {
"JobCategory": job_details.get("JobCategory", ""),
"StudioCode": job_details.get("StudioCode", ""),
"Title": job_details.get("Title", ""),
"ClientCode": job_details.get("ClientCode", "")
}
return extracted
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
return None
except Exception as e:
self.logger.error(f"Error reading {file_path.name}: {e}")
return None
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
"""Determine destination paths based on client code and job category"""
client_code = json_data.get("ClientCode", "").upper()
job_category = json_data.get("JobCategory", "").upper()
destinations = []
# Check for specific client routing by ClientCode
if client_code in Config.DESTINATIONS:
destinations.append(Config.DESTINATIONS[client_code])
# Check for Celtra by JobCategory (only for eligible clients)
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
# Add Celtra destination if not already added
celtra_dest = Config.DESTINATIONS["CELTRA"]
if celtra_dest not in destinations:
destinations.append(celtra_dest)
self.logger.info(f"Celtra project detected for eligible client: {client_folder}")
elif job_category == "CELTRA" and client_folder.upper() not in Config.CELTRA_ELIGIBLE_CLIENTS:
self.logger.warning(f"Celtra JobCategory found but {client_folder} not in eligible clients list")
return destinations
def store_file(self, file_path: Path, folder_name: str) -> bool:
"""Store file to JSON_STORE/folder_name/"""
try:
store_dir = Config.JSON_STORE / folder_name
store_dir.mkdir(parents=True, exist_ok=True)
destination = store_dir / file_path.name
shutil.copy2(file_path, destination)
return True
except Exception as e:
self.logger.error(f"Failed to store {file_path.name}: {e}")
return False
def route_file(self, file_path: Path, destination: Path) -> bool:
"""Route file to specific destination"""
try:
destination.mkdir(parents=True, exist_ok=True)
dest_file = destination / file_path.name
shutil.copy2(file_path, dest_file)
return True
except Exception as e:
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
return False
def send_error_email(self, file_path: Path, error_msg: str):
"""Send error notification email (with rate limiting to avoid spam)"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = Config.ERROR_EMAIL
msg['Subject'] = f"JSON Batch Processing Error - {file_path.name}"
body = f"""Workflow: JSON_Parser_BATCH
Process: JSON_PARSE
File: {file_path.name} will not parse
Error: {error_msg}
Batch Statistics:
- Processed: {self.stats['processed']}
- Routed: {self.stats['routed']}
- Errors: {self.stats['errors']}
- Batches: {self.stats['batches']}
"""
msg.attach(MIMEText(body, 'plain'))
# Attach the problematic file
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {file_path.name}'
)
msg.attach(part)
# Send email
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.ERROR_EMAIL, msg.as_string())
server.quit()
except Exception as e:
self.logger.error(f"Failed to send error email: {e}")
def process_single_file(self, file_path: Path) -> Dict[str, Any]:
"""Process a single JSON file - thread-safe"""
thread_name = threading.current_thread().name
result = {
'file': file_path.name,
'success': False,
'stored': False,
'routed': False,
'error': None
}
try:
self.logger.info(f"[{thread_name}] Processing: {file_path.name}")
# Wait for file stability
time.sleep(Config.WAIT_DELAY)
# Parse JSON
json_data = self.parse_json_file(file_path)
if json_data is None:
result['error'] = "JSON parsing failed"
self.send_error_email(file_path, "JSON parsing failed")
return result
# Determine folder name for storing
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
# Store file to JSON_STORE (always done for all clients)
stored = self.store_file(file_path, folder_name)
result['stored'] = stored
# Determine routing for special clients
destinations = self.determine_routing(json_data, folder_name)
routed_count = 0
total_destinations = len(destinations)
if destinations:
# Special clients: route to all applicable destinations
for destination in destinations:
if self.route_file(file_path, destination):
routed_count += 1
self.logger.info(f"[{thread_name}] Routed {file_path.name} to {destination.name}")
else:
self.logger.error(f"[{thread_name}] Routing failed for {file_path.name} to {destination.name}")
result['routed'] = routed_count == total_destinations
if result['routed']:
self.logger.info(f"[{thread_name}] Successfully routed {file_path.name} to {total_destinations} destination(s)")
else:
self.logger.error(f"[{thread_name}] Only {routed_count}/{total_destinations} destinations succeeded for {file_path.name}")
else:
self.logger.info(f"[{thread_name}] Regular client: {file_path.name} stored to JSON_STORE only")
# Determine overall success
processing_successful = stored and (result['routed'] if destinations else True)
result['success'] = processing_successful
if processing_successful:
# Remove original file
try:
file_path.unlink()
self.logger.info(f"[{thread_name}] Removed original: {file_path.name}")
except Exception as e:
self.logger.error(f"[{thread_name}] Failed to remove {file_path.name}: {e}")
result['error'] = f"Failed to remove original: {e}"
else:
result['error'] = "Processing failed - file NOT removed"
self.logger.error(f"[{thread_name}] Processing failed for {file_path.name}")
except Exception as e:
result['error'] = str(e)
self.logger.error(f"[{thread_name}] Unexpected error processing {file_path.name}: {e}")
return result
def process_batch(self, file_paths: List[Path]):
"""Process a batch of files concurrently"""
if not file_paths:
return
batch_start = time.time()
self.logger.info(f"Processing batch of {len(file_paths)} files")
# Process files concurrently
with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor:
# Submit all files for processing
future_to_file = {
executor.submit(self.process_single_file, file_path): file_path
for file_path in file_paths
}
# Collect results
batch_results = []
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
batch_results.append(result)
except Exception as e:
self.logger.error(f"Batch processing error for {file_path.name}: {e}")
batch_results.append({
'file': file_path.name,
'success': False,
'error': str(e)
})
# Update statistics
with self.processing_lock:
self.stats['batches'] += 1
for result in batch_results:
self.stats['processed'] += 1
if result.get('routed'):
self.stats['routed'] += 1
if result.get('error'):
self.stats['errors'] += 1
# Log batch summary
batch_time = time.time() - batch_start
successful = sum(1 for r in batch_results if r['success'])
self.logger.info(f"Batch completed: {successful}/{len(file_paths)} successful in {batch_time:.2f}s")
self.logger.info(f"Overall stats: {self.stats}")
def queue_file(self, file_path: Path):
"""Add file to processing queue"""
self.file_queue.put(file_path)
self.logger.debug(f"Queued file: {file_path.name} (Queue size: {self.file_queue.qsize()})")
def batch_processor_worker(self):
"""Background worker that processes files in batches"""
while True:
files_to_process = []
batch_start_time = time.time()
# Collect files for batch processing
while (len(files_to_process) < Config.BATCH_SIZE and
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
try:
# Wait briefly for files to arrive
file_path = self.file_queue.get(timeout=1.0)
files_to_process.append(file_path)
self.file_queue.task_done()
except:
# Timeout or empty queue
if files_to_process:
break # Process what we have
continue
# Process the batch if we have files
if files_to_process:
self.process_batch(files_to_process)
class BatchJSONFileHandler(FileSystemEventHandler):
"""File system event handler for batch processing"""
def __init__(self, processor: BatchJSONProcessor):
self.processor = processor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
"""Handle file creation events"""
if event.is_directory:
return
file_path = Path(event.src_path)
# Only process JSON files
if file_path.suffix.lower() == '.json':
self.logger.debug(f"New JSON file detected: {file_path.name}")
self.processor.queue_file(file_path)
def on_moved(self, event):
"""Handle file move events"""
if event.is_directory:
return
dest_path = Path(event.dest_path)
# Only process JSON files
if dest_path.suffix.lower() == '.json':
self.logger.debug(f"JSON file moved to hot folder: {dest_path.name}")
self.processor.queue_file(dest_path)
def main():
"""Main entry point"""
processor = BatchJSONProcessor()
# Start batch processing worker thread
batch_worker = threading.Thread(
target=processor.batch_processor_worker,
name="BatchWorker",
daemon=True
)
batch_worker.start()
# Setup file monitoring
event_handler = BatchJSONFileHandler(processor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
# Start monitoring
observer.start()
processor.logger.info(f"Started batch monitoring {Config.HOT_FOLDER} (batch size: {Config.BATCH_SIZE})")
try:
while True:
time.sleep(Config.POLL_INTERVAL)
# Log stats periodically
if processor.stats['batches'] > 0 and processor.stats['batches'] % 10 == 0:
processor.logger.info(f"Periodic stats: {processor.stats}")
except KeyboardInterrupt:
processor.logger.info("Stopping Batch JSON Workflow Processor...")
observer.stop()
observer.join()
processor.logger.info("Batch JSON Workflow Processor stopped")
if __name__ == "__main__":
main()