json-parser-twist-metaserver/OLD/json_workflow_processor-local.py
Dave Porter af8acbd986 Add all project files including previous versions and documentation
- Added INSTALL_GUIDE.md and README.md documentation
- Added OLD/ folder with previous script versions for reference
- Added data/ folder with sample JSON test files
- Added older json_workflow_processor-hybrid-protected.py version
- Excludes venv and .DS_Store (per .gitignore)

Complete project backup with full history and test data.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-02-06 11:07:22 -05:00

347 lines
No EOL
13 KiB
Python

#!/usr/bin/env python3
"""
JSON Workflow Processor
Converted from TWIST workflow definition
Monitors dynamic client hot folders for JSON files and routes them based on client codes:
- Structure: /data/PRODUCTION/JSON/[CLIENT_NAME]/file.json
- Celtra projects → Celtra folder
- Reckitt Benckiser → Monday RB folder
- Rank → Monday Rank folder
- All others → Archive only (to JSON_STORE/[CLIENT_NAME]/)
Features:
- Dynamic hot folder monitoring (hundreds of client folders)
- JSON validation and parsing
- Client-based routing for special clients
- File archiving for all clients
- Error notifications
- Activity logging
"""
import json
import logging
import smtplib
import shutil
import time
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
from typing import Dict, Optional, Any
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings"""
# Paths
HOT_FOLDER = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON")
JSON_STORE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/JSON_STORE/")
SYNC_BASE = Path("/Users/daveporter/Python-Enviroments/JSON-Parser/data/PRODUCTION/SYNC/MAKE")
# Clients eligible for Celtra projects (based on JobCategory)
# Only these clients can have JobCategory="Celtra" - add others as needed
CELTRA_ELIGIBLE_CLIENTS = {
"CIBC", "OLIVER", "ADIDAS", "PAYPAL",
"RECKITTBENCKISER", "BAYER", "3M", "RANK"
}
# Client destinations
DESTINATIONS = {
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
"RANK": SYNC_BASE / "Monday Rank"
}
# Email settings (from original TWIST config)
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 25
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "A604D75B0CAADB20137B85299A433588839AC735F59FB3010E71F132F887CABBEB2A1ADE627ADCD608445CB058AB40B10361D0E881D7CDA4"
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
ERROR_EMAIL = "daveporter@oliver.agency"
# Monitoring settings
POLL_INTERVAL = 5 # seconds
WAIT_DELAY = 5 # seconds to wait for file stability
class JSONWorkflowProcessor:
"""Main workflow processor class"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler('json_workflow.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info("JSON Workflow Processor initialized")
def ensure_directories(self):
"""Create necessary directories if they don't exist"""
directories = [
Config.HOT_FOLDER,
Config.JSON_STORE,
Config.SYNC_BASE
] + list(Config.DESTINATIONS.values())
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
self.logger.debug(f"Ensured directory exists: {directory}")
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""
Parse JSON file and extract key fields
Returns dict with JobCategory, StudioCode, Title, ClientCode or None if invalid
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extract fields from JobSpecification.JobDetails
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
extracted = {
"JobCategory": job_details.get("JobCategory", ""),
"StudioCode": job_details.get("StudioCode", ""),
"Title": job_details.get("Title", ""),
"ClientCode": job_details.get("ClientCode", "")
}
self.logger.info(f"Parsed JSON {file_path.name}: {extracted}")
return extracted
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
return None
except Exception as e:
self.logger.error(f"Error reading {file_path.name}: {e}")
return None
def determine_routing(self, json_data: Dict[str, Any], client_folder: str) -> List[Path]:
"""
Determine destination paths based on client code and job category
Returns list of destination paths (can be multiple for RANK/RECKITTBENCKISER with Celtra JobCategory)
"""
client_code = json_data.get("ClientCode", "").upper()
job_category = json_data.get("JobCategory", "").upper()
destinations = []
# Check for specific client routing by ClientCode
if client_code in Config.DESTINATIONS:
destinations.append(Config.DESTINATIONS[client_code])
# Check for Celtra by JobCategory (only for eligible clients)
if job_category == "CELTRA" and client_folder.upper() in Config.CELTRA_ELIGIBLE_CLIENTS:
# Add Celtra destination if not already added
celtra_dest = Config.DESTINATIONS["CELTRA"]
if celtra_dest not in destinations:
destinations.append(celtra_dest)
self.logger.info(f"Celtra project detected for eligible client: {client_folder}")
elif job_category == "CELTRA" and client_folder.upper() not in Config.CELTRA_ELIGIBLE_CLIENTS:
self.logger.warning(f"Celtra JobCategory found but {client_folder} not in eligible clients list")
if not destinations:
self.logger.info(f"No routing destination for ClientCode: {client_code}, JobCategory: {job_category}, Client: {client_folder}")
return destinations
def archive_file(self, file_path: Path, folder_name: str) -> bool:
"""Archive file to JSON_STORE/folder_name/"""
try:
archive_dir = Config.JSON_STORE / folder_name
archive_dir.mkdir(parents=True, exist_ok=True)
destination = archive_dir / file_path.name
shutil.copy2(file_path, destination)
self.logger.info(f"Archived {file_path.name} to {destination}")
return True
except Exception as e:
self.logger.error(f"Failed to archive {file_path.name}: {e}")
return False
def route_file(self, file_path: Path, destination: Path) -> bool:
"""Route file to specific destination"""
try:
destination.mkdir(parents=True, exist_ok=True)
dest_file = destination / file_path.name
shutil.copy2(file_path, dest_file)
self.logger.info(f"Routed {file_path.name} to {destination}")
return True
except Exception as e:
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
return False
def send_error_email(self, file_path: Path, error_msg: str):
"""Send error notification email"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = Config.ERROR_EMAIL
msg['Subject'] = "JSON will not Parse"
body = f"""Workflow: JSON_Parser
Process: JSON_PARSE
File: {file_path.name} will not parse
Error: {error_msg}
"""
msg.attach(MIMEText(body, 'plain'))
# Attach the problematic file
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {file_path.name}'
)
msg.attach(part)
# Send email
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.ERROR_EMAIL, msg.as_string())
server.quit()
self.logger.info(f"Error email sent for {file_path.name}")
except Exception as e:
self.logger.error(f"Failed to send error email: {e}")
def process_file(self, file_path: Path):
"""Process a single JSON file"""
self.logger.info(f"Processing file: {file_path.name}")
# Wait for file stability
time.sleep(Config.WAIT_DELAY)
# Parse JSON
json_data = self.parse_json_file(file_path)
if json_data is None:
self.send_error_email(file_path, "JSON parsing failed")
return
# Determine folder name for storing (from hot folder structure)
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
# Store file to JSON_STORE (always done for all clients)
stored = self.archive_file(file_path, folder_name)
# Determine routing for special clients
destinations = self.determine_routing(json_data, folder_name)
routed_count = 0
total_destinations = len(destinations)
if destinations:
# Special clients: route to all applicable destinations
for destination in destinations:
if self.route_file(file_path, destination):
routed_count += 1
self.logger.info(f"Routed {file_path.name} to {destination.name}")
else:
self.logger.error(f"Routing failed for {file_path.name} to {destination.name}")
all_routed = routed_count == total_destinations
if all_routed:
self.logger.info(f"Successfully routed {file_path.name} to {total_destinations} destination(s)")
else:
self.logger.error(f"Only {routed_count}/{total_destinations} destinations succeeded for {file_path.name}")
else:
all_routed = True # No routing needed
self.logger.info(f"Regular client: {file_path.name} stored to JSON_STORE only")
# Remove original file if processing was successful
# For special clients: both stored AND all routing must succeed
# For regular clients: only stored must succeed
processing_successful = stored and all_routed
if processing_successful:
try:
file_path.unlink()
self.logger.info(f"Removed original JSON file: {file_path.name}")
except Exception as e:
self.logger.error(f"Failed to remove original file {file_path.name}: {e}")
else:
self.logger.error(f"Processing failed for {file_path.name} - file NOT removed")
class JSONFileHandler(FileSystemEventHandler):
"""File system event handler for monitoring JSON files"""
def __init__(self, processor: JSONWorkflowProcessor):
self.processor = processor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
"""Handle file creation events"""
if event.is_directory:
return
file_path = Path(event.src_path)
# Only process JSON files
if file_path.suffix.lower() == '.json':
self.logger.info(f"New JSON file detected: {file_path.name}")
self.processor.process_file(file_path)
def on_moved(self, event):
"""Handle file move events (files moved into hot folder)"""
if event.is_directory:
return
dest_path = Path(event.dest_path)
# Only process JSON files
if dest_path.suffix.lower() == '.json':
self.logger.info(f"JSON file moved to hot folder: {dest_path.name}")
self.processor.process_file(dest_path)
def main():
"""Main entry point"""
processor = JSONWorkflowProcessor()
# Setup file monitoring
event_handler = JSONFileHandler(processor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
# Start monitoring
observer.start()
processor.logger.info(f"Started monitoring {Config.HOT_FOLDER}")
try:
while True:
time.sleep(Config.POLL_INTERVAL)
except KeyboardInterrupt:
processor.logger.info("Stopping JSON Workflow Processor...")
observer.stop()
observer.join()
processor.logger.info("JSON Workflow Processor stopped")
if __name__ == "__main__":
main()