- Added INSTALL_GUIDE.md and README.md documentation - Added OLD/ folder with previous script versions for reference - Added data/ folder with sample JSON test files - Added older json_workflow_processor-hybrid-protected.py version - Excludes venv and .DS_Store (per .gitignore) Complete project backup with full history and test data. Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
459 lines
No EOL
18 KiB
Python
459 lines
No EOL
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
JSON Workflow Processor - BATCH VERSION
|
|
Converted from TWIST workflow definition
|
|
|
|
High-performance version that processes multiple JSON files concurrently.
|
|
Monitors dynamic client hot folders and routes them based on client codes and job categories.
|
|
|
|
Features:
|
|
- Concurrent processing (5-10 files at once)
|
|
- Dynamic hot folder monitoring (hundreds of client folders)
|
|
- JSON validation and parsing
|
|
- Smart routing for special clients
|
|
- File processing with automatic cleanup
|
|
- Error notifications
|
|
- Activity logging
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
import shutil
|
|
import time
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any, List
|
|
from queue import Queue
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
|
|
class Config:
|
|
"""Configuration settings"""
|
|
|
|
# Paths
|
|
HOT_FOLDER = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON")
|
|
JSON_STORE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON_STORE/")
|
|
SYNC_BASE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/SYNC/MAKE")
|
|
|
|
# Clients eligible for Celtra projects (based on JobCategory)
|
|
# Only these clients can have JobCategory="Celtra" - add others as needed
|
|
CELTRA_ELIGIBLE_CLIENTS = {
|
|
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
|
|
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
|
|
}
|
|
|
|
# Client destinations
|
|
DESTINATIONS = {
|
|
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
|
|
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
|
|
"RANK": SYNC_BASE / "Monday Rank"
|
|
}
|
|
|
|
# Email settings (from original TWIST config)
|
|
SMTP_SERVER = "smtp.mailgun.org"
|
|
SMTP_PORT = 25
|
|
SMTP_USER = "twist@mail.dev.oliver.solutions"
|
|
SMTP_PASSWORD = "A604D75B0CAADB20137B85299A433588839AC735F59FB3010E71F132F887CABBEB2A1ADE627ADCD608445CB058AB40B10361D0E881D7CDA4"
|
|
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
|
|
ERROR_EMAIL = "daveporter@oliver.agency"
|
|
|
|
# Batch processing settings
|
|
BATCH_SIZE = 10 # Process up to 10 files concurrently
|
|
MAX_WORKERS = 10 # Maximum thread pool size
|
|
BATCH_TIMEOUT = 30 # Seconds to wait for batch to fill before processing
|
|
|
|
# Monitoring settings
|
|
POLL_INTERVAL = 2 # seconds (faster polling for batch mode)
|
|
WAIT_DELAY = 3 # seconds to wait for file stability (reduced for speed)
|
|
|
|
|
|
class BatchJSONProcessor:
|
|
"""Batch JSON file processor with concurrent processing"""
|
|
|
|
def __init__(self):
|
|
self.setup_logging()
|
|
self.ensure_directories()
|
|
self.file_queue = Queue()
|
|
self.processing_lock = threading.Lock()
|
|
self.stats = {
|
|
'processed': 0,
|
|
'routed': 0,
|
|
'errors': 0,
|
|
'batches': 0
|
|
}
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] %(message)s'
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=log_format,
|
|
handlers=[
|
|
logging.FileHandler('json_workflow_batch.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info("Batch JSON Workflow Processor initialized")
|
|
|
|
def ensure_directories(self):
|
|
"""Create necessary directories if they don't exist"""
|
|
directories = [
|
|
Config.HOT_FOLDER,
|
|
Config.JSON_STORE,
|
|
Config.SYNC_BASE
|
|
] + list(Config.DESTINATIONS.values())
|
|
|
|
for directory in directories:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
self.logger.debug(f"Ensured directory exists: {directory}")
|
|
|
|
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""Parse JSON file and extract key fields"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Extract fields from JobSpecification.JobDetails
|
|
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
|
|
|
|
extracted = {
|
|
"JobCategory": job_details.get("JobCategory", ""),
|
|
"StudioCode": job_details.get("StudioCode", ""),
|
|
"Title": job_details.get("Title", ""),
|
|
"ClientCode": job_details.get("ClientCode", "")
|
|
}
|
|
|
|
return extracted
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading {file_path.name}: {e}")
|
|
return None
|
|
|
|
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
|
|
"""Determine destination paths based on client code and job category"""
|
|
client_code = json_data.get("ClientCode", "").upper()
|
|
job_category = json_data.get("JobCategory", "").upper()
|
|
destinations = []
|
|
|
|
# Check for specific client routing by ClientCode
|
|
if client_code in Config.DESTINATIONS:
|
|
destinations.append(Config.DESTINATIONS[client_code])
|
|
|
|
# Check for Celtra by JobCategory (only for eligible clients)
|
|
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
|
|
# Add Celtra destination if not already added
|
|
celtra_dest = Config.DESTINATIONS["CELTRA"]
|
|
if celtra_dest not in destinations:
|
|
destinations.append(celtra_dest)
|
|
self.logger.info(f"Celtra project detected for eligible client: {client_folder}")
|
|
elif job_category == "CELTRA" and client_folder.upper() not in Config.CELTRA_ELIGIBLE_CLIENTS:
|
|
self.logger.warning(f"Celtra JobCategory found but {client_folder} not in eligible clients list")
|
|
|
|
return destinations
|
|
|
|
def store_file(self, file_path: Path, folder_name: str) -> bool:
|
|
"""Store file to JSON_STORE/folder_name/"""
|
|
try:
|
|
store_dir = Config.JSON_STORE / folder_name
|
|
store_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
destination = store_dir / file_path.name
|
|
shutil.copy2(file_path, destination)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to store {file_path.name}: {e}")
|
|
return False
|
|
|
|
def route_file(self, file_path: Path, destination: Path) -> bool:
|
|
"""Route file to specific destination"""
|
|
try:
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
dest_file = destination / file_path.name
|
|
|
|
shutil.copy2(file_path, dest_file)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
|
|
return False
|
|
|
|
def send_error_email(self, file_path: Path, error_msg: str):
|
|
"""Send error notification email (with rate limiting to avoid spam)"""
|
|
try:
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.SENDER_EMAIL
|
|
msg['To'] = Config.ERROR_EMAIL
|
|
msg['Subject'] = f"JSON Batch Processing Error - {file_path.name}"
|
|
|
|
body = f"""Workflow: JSON_Parser_BATCH
|
|
Process: JSON_PARSE
|
|
File: {file_path.name} will not parse
|
|
|
|
Error: {error_msg}
|
|
|
|
Batch Statistics:
|
|
- Processed: {self.stats['processed']}
|
|
- Routed: {self.stats['routed']}
|
|
- Errors: {self.stats['errors']}
|
|
- Batches: {self.stats['batches']}
|
|
"""
|
|
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
# Attach the problematic file
|
|
with open(file_path, "rb") as attachment:
|
|
part = MIMEBase('application', 'octet-stream')
|
|
part.set_payload(attachment.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition',
|
|
f'attachment; filename= {file_path.name}'
|
|
)
|
|
msg.attach(part)
|
|
|
|
# Send email
|
|
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
|
|
server.starttls()
|
|
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
|
|
server.sendmail(Config.SENDER_EMAIL, Config.ERROR_EMAIL, msg.as_string())
|
|
server.quit()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send error email: {e}")
|
|
|
|
def process_single_file(self, file_path: Path) -> Dict[str, Any]:
|
|
"""Process a single JSON file - thread-safe"""
|
|
thread_name = threading.current_thread().name
|
|
result = {
|
|
'file': file_path.name,
|
|
'success': False,
|
|
'stored': False,
|
|
'routed': False,
|
|
'error': None
|
|
}
|
|
|
|
try:
|
|
self.logger.info(f"[{thread_name}] Processing: {file_path.name}")
|
|
|
|
# Wait for file stability
|
|
time.sleep(Config.WAIT_DELAY)
|
|
|
|
# Parse JSON
|
|
json_data = self.parse_json_file(file_path)
|
|
if json_data is None:
|
|
result['error'] = "JSON parsing failed"
|
|
self.send_error_email(file_path, "JSON parsing failed")
|
|
return result
|
|
|
|
# Determine folder name for storing
|
|
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
|
|
|
|
# Store file to JSON_STORE (always done for all clients)
|
|
stored = self.store_file(file_path, folder_name)
|
|
result['stored'] = stored
|
|
|
|
# Determine routing for special clients
|
|
destinations = self.determine_routing(json_data, folder_name)
|
|
routed_count = 0
|
|
total_destinations = len(destinations)
|
|
|
|
if destinations:
|
|
# Special clients: route to all applicable destinations
|
|
for destination in destinations:
|
|
if self.route_file(file_path, destination):
|
|
routed_count += 1
|
|
self.logger.info(f"[{thread_name}] Routed {file_path.name} to {destination.name}")
|
|
else:
|
|
self.logger.error(f"[{thread_name}] Routing failed for {file_path.name} to {destination.name}")
|
|
|
|
result['routed'] = routed_count == total_destinations
|
|
if result['routed']:
|
|
self.logger.info(f"[{thread_name}] Successfully routed {file_path.name} to {total_destinations} destination(s)")
|
|
else:
|
|
self.logger.error(f"[{thread_name}] Only {routed_count}/{total_destinations} destinations succeeded for {file_path.name}")
|
|
else:
|
|
self.logger.info(f"[{thread_name}] Regular client: {file_path.name} stored to JSON_STORE only")
|
|
|
|
# Determine overall success
|
|
processing_successful = stored and (result['routed'] if destinations else True)
|
|
result['success'] = processing_successful
|
|
|
|
if processing_successful:
|
|
# Remove original file
|
|
try:
|
|
file_path.unlink()
|
|
self.logger.info(f"[{thread_name}] Removed original: {file_path.name}")
|
|
except Exception as e:
|
|
self.logger.error(f"[{thread_name}] Failed to remove {file_path.name}: {e}")
|
|
result['error'] = f"Failed to remove original: {e}"
|
|
else:
|
|
result['error'] = "Processing failed - file NOT removed"
|
|
self.logger.error(f"[{thread_name}] Processing failed for {file_path.name}")
|
|
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
self.logger.error(f"[{thread_name}] Unexpected error processing {file_path.name}: {e}")
|
|
|
|
return result
|
|
|
|
def process_batch(self, file_paths: List[Path]):
|
|
"""Process a batch of files concurrently"""
|
|
if not file_paths:
|
|
return
|
|
|
|
batch_start = time.time()
|
|
self.logger.info(f"Processing batch of {len(file_paths)} files")
|
|
|
|
# Process files concurrently
|
|
with ThreadPoolExecutor(max_workers=min(Config.MAX_WORKERS, len(file_paths))) as executor:
|
|
# Submit all files for processing
|
|
future_to_file = {
|
|
executor.submit(self.process_single_file, file_path): file_path
|
|
for file_path in file_paths
|
|
}
|
|
|
|
# Collect results
|
|
batch_results = []
|
|
for future in as_completed(future_to_file):
|
|
file_path = future_to_file[future]
|
|
try:
|
|
result = future.result()
|
|
batch_results.append(result)
|
|
except Exception as e:
|
|
self.logger.error(f"Batch processing error for {file_path.name}: {e}")
|
|
batch_results.append({
|
|
'file': file_path.name,
|
|
'success': False,
|
|
'error': str(e)
|
|
})
|
|
|
|
# Update statistics
|
|
with self.processing_lock:
|
|
self.stats['batches'] += 1
|
|
for result in batch_results:
|
|
self.stats['processed'] += 1
|
|
if result.get('routed'):
|
|
self.stats['routed'] += 1
|
|
if result.get('error'):
|
|
self.stats['errors'] += 1
|
|
|
|
# Log batch summary
|
|
batch_time = time.time() - batch_start
|
|
successful = sum(1 for r in batch_results if r['success'])
|
|
self.logger.info(f"Batch completed: {successful}/{len(file_paths)} successful in {batch_time:.2f}s")
|
|
self.logger.info(f"Overall stats: {self.stats}")
|
|
|
|
def queue_file(self, file_path: Path):
|
|
"""Add file to processing queue"""
|
|
self.file_queue.put(file_path)
|
|
self.logger.debug(f"Queued file: {file_path.name} (Queue size: {self.file_queue.qsize()})")
|
|
|
|
def batch_processor_worker(self):
|
|
"""Background worker that processes files in batches"""
|
|
while True:
|
|
files_to_process = []
|
|
batch_start_time = time.time()
|
|
|
|
# Collect files for batch processing
|
|
while (len(files_to_process) < Config.BATCH_SIZE and
|
|
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
|
|
|
|
try:
|
|
# Wait briefly for files to arrive
|
|
file_path = self.file_queue.get(timeout=1.0)
|
|
files_to_process.append(file_path)
|
|
self.file_queue.task_done()
|
|
except:
|
|
# Timeout or empty queue
|
|
if files_to_process:
|
|
break # Process what we have
|
|
continue
|
|
|
|
# Process the batch if we have files
|
|
if files_to_process:
|
|
self.process_batch(files_to_process)
|
|
|
|
|
|
class BatchJSONFileHandler(FileSystemEventHandler):
|
|
"""File system event handler for batch processing"""
|
|
|
|
def __init__(self, processor: BatchJSONProcessor):
|
|
self.processor = processor
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def on_created(self, event):
|
|
"""Handle file creation events"""
|
|
if event.is_directory:
|
|
return
|
|
|
|
file_path = Path(event.src_path)
|
|
|
|
# Only process JSON files
|
|
if file_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"New JSON file detected: {file_path.name}")
|
|
self.processor.queue_file(file_path)
|
|
|
|
def on_moved(self, event):
|
|
"""Handle file move events"""
|
|
if event.is_directory:
|
|
return
|
|
|
|
dest_path = Path(event.dest_path)
|
|
|
|
# Only process JSON files
|
|
if dest_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"JSON file moved to hot folder: {dest_path.name}")
|
|
self.processor.queue_file(dest_path)
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
processor = BatchJSONProcessor()
|
|
|
|
# Start batch processing worker thread
|
|
batch_worker = threading.Thread(
|
|
target=processor.batch_processor_worker,
|
|
name="BatchWorker",
|
|
daemon=True
|
|
)
|
|
batch_worker.start()
|
|
|
|
# Setup file monitoring
|
|
event_handler = BatchJSONFileHandler(processor)
|
|
observer = Observer()
|
|
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
|
|
|
|
# Start monitoring
|
|
observer.start()
|
|
processor.logger.info(f"Started batch monitoring {Config.HOT_FOLDER} (batch size: {Config.BATCH_SIZE})")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(Config.POLL_INTERVAL)
|
|
# Log stats periodically
|
|
if processor.stats['batches'] > 0 and processor.stats['batches'] % 10 == 0:
|
|
processor.logger.info(f"Periodic stats: {processor.stats}")
|
|
except KeyboardInterrupt:
|
|
processor.logger.info("Stopping Batch JSON Workflow Processor...")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
processor.logger.info("Batch JSON Workflow Processor stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |