json-parser-twist-metaserver/OLD/json_workflow_processor.py
Dave Porter af8acbd986 Add all project files including previous versions and documentation
- Added INSTALL_GUIDE.md and README.md documentation
- Added OLD/ folder with previous script versions for reference
- Added data/ folder with sample JSON test files
- Added older json_workflow_processor-hybrid-protected.py version
- Excludes venv and .DS_Store (per .gitignore)

Complete project backup with full history and test data.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-02-06 11:07:22 -05:00

316 lines
No EOL
11 KiB
Python

#!/usr/bin/env python3
"""
JSON Workflow Processor
Converted from TWIST workflow definition
Monitors dynamic client hot folders for JSON files and routes them based on client codes:
- Structure: /data/PRODUCTION/JSON/[CLIENT_NAME]/file.json
- Celtra projects → Celtra folder
- Reckitt Benckiser → Monday RB folder
- Rank → Monday Rank folder
- All others → Archive only (to JSON_STORE/[CLIENT_NAME]/)
Features:
- Dynamic hot folder monitoring (hundreds of client folders)
- JSON validation and parsing
- Client-based routing for special clients
- File archiving for all clients
- Error notifications
- Activity logging
"""
import json
import logging
import smtplib
import shutil
import time
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
from typing import Dict, Optional, Any
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings"""
# Paths
HOT_FOLDER = Path("/data/PRODUCTION/JSON/")
JSON_STORE = Path("/data/PRODUCTION/JSON_STORE/")
SYNC_BASE = Path("/data/PRODUCTION/SYNC/MAKE/")
# Client destinations
DESTINATIONS = {
"CELTRA": SYNC_BASE / "Celtra - Create_Rename - Project_Design File",
"RECKITTBENCKISER": SYNC_BASE / "Monday RB",
"RANK": SYNC_BASE / "Monday Rank"
}
# Email settings (from original TWIST config)
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 25
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "A604D75B0CAADB20137B85299A433588839AC735F59FB3010E71F132F887CABBEB2A1ADE627ADCD608445CB058AB40B10361D0E881D7CDA4"
SENDER_EMAIL = "TWIST-UK-SERVER@oliver.agency"
ERROR_EMAIL = "daveporter@oliver.agency"
# Monitoring settings
POLL_INTERVAL = 5 # seconds
WAIT_DELAY = 5 # seconds to wait for file stability
class JSONWorkflowProcessor:
"""Main workflow processor class"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler('json_workflow.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info("JSON Workflow Processor initialized")
def ensure_directories(self):
"""Create necessary directories if they don't exist"""
directories = [
Config.HOT_FOLDER,
Config.JSON_STORE,
Config.SYNC_BASE
] + list(Config.DESTINATIONS.values())
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
self.logger.debug(f"Ensured directory exists: {directory}")
def parse_json_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""
Parse JSON file and extract key fields
Returns dict with JobCategory, StudioCode, Title, ClientCode or None if invalid
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extract fields from JobSpecification.JobDetails
job_details = data.get("JobSpecification", {}).get("JobDetails", {})
extracted = {
"JobCategory": job_details.get("JobCategory", ""),
"StudioCode": job_details.get("StudioCode", ""),
"Title": job_details.get("Title", ""),
"ClientCode": job_details.get("ClientCode", "")
}
self.logger.info(f"Parsed JSON {file_path.name}: {extracted}")
return extracted
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing error in {file_path.name}: {e}")
return None
except Exception as e:
self.logger.error(f"Error reading {file_path.name}: {e}")
return None
def determine_routing(self, json_data: Dict[str, Any]) -> Optional[Path]:
"""
Determine destination path based on client code and job category
Returns destination path or None for archive-only
"""
client_code = json_data.get("ClientCode", "").upper()
job_category = json_data.get("JobCategory", "").upper()
# Check for specific client routing
if client_code in Config.DESTINATIONS:
return Config.DESTINATIONS[client_code]
# Check for Celtra by job category
if job_category == "CELTRA":
return Config.DESTINATIONS["CELTRA"]
# No specific routing - archive only
self.logger.info(f"No routing destination for ClientCode: {client_code}, JobCategory: {job_category}")
return None
def archive_file(self, file_path: Path, folder_name: str) -> bool:
"""Archive file to JSON_STORE/folder_name/"""
try:
archive_dir = Config.JSON_STORE / folder_name
archive_dir.mkdir(parents=True, exist_ok=True)
destination = archive_dir / file_path.name
shutil.copy2(file_path, destination)
self.logger.info(f"Archived {file_path.name} to {destination}")
return True
except Exception as e:
self.logger.error(f"Failed to archive {file_path.name}: {e}")
return False
def route_file(self, file_path: Path, destination: Path) -> bool:
"""Route file to specific destination"""
try:
destination.mkdir(parents=True, exist_ok=True)
dest_file = destination / file_path.name
shutil.copy2(file_path, dest_file)
self.logger.info(f"Routed {file_path.name} to {destination}")
return True
except Exception as e:
self.logger.error(f"Failed to route {file_path.name} to {destination}: {e}")
return False
def send_error_email(self, file_path: Path, error_msg: str):
"""Send error notification email"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = Config.ERROR_EMAIL
msg['Subject'] = "JSON will not Parse"
body = f"""Workflow: JSON_Parser
Process: JSON_PARSE
File: {file_path.name} will not parse
Error: {error_msg}
"""
msg.attach(MIMEText(body, 'plain'))
# Attach the problematic file
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {file_path.name}'
)
msg.attach(part)
# Send email
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.ERROR_EMAIL, msg.as_string())
server.quit()
self.logger.info(f"Error email sent for {file_path.name}")
except Exception as e:
self.logger.error(f"Failed to send error email: {e}")
def process_file(self, file_path: Path):
"""Process a single JSON file"""
self.logger.info(f"Processing file: {file_path.name}")
# Wait for file stability
time.sleep(Config.WAIT_DELAY)
# Parse JSON
json_data = self.parse_json_file(file_path)
if json_data is None:
self.send_error_email(file_path, "JSON parsing failed")
return
# Determine folder name for archiving (from hot folder structure)
folder_name = file_path.parent.name if file_path.parent != Config.HOT_FOLDER else "DEFAULT"
# Archive file (always done)
archived = self.archive_file(file_path, folder_name)
# Determine routing
destination = self.determine_routing(json_data)
if destination:
# Route to specific destination
routed = self.route_file(file_path, destination)
if routed:
self.logger.info(f"Successfully processed and routed {file_path.name}")
else:
self.logger.error(f"Routing failed for {file_path.name}")
else:
self.logger.info(f"File {file_path.name} archived only (no routing destination)")
# Remove original file if processing was successful
if archived:
try:
file_path.unlink()
self.logger.info(f"Removed original file: {file_path.name}")
except Exception as e:
self.logger.error(f"Failed to remove original file {file_path.name}: {e}")
class JSONFileHandler(FileSystemEventHandler):
"""File system event handler for monitoring JSON files"""
def __init__(self, processor: JSONWorkflowProcessor):
self.processor = processor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
"""Handle file creation events"""
if event.is_directory:
return
file_path = Path(event.src_path)
# Only process JSON files
if file_path.suffix.lower() == '.json':
self.logger.info(f"New JSON file detected: {file_path.name}")
self.processor.process_file(file_path)
def on_moved(self, event):
"""Handle file move events (files moved into hot folder)"""
if event.is_directory:
return
dest_path = Path(event.dest_path)
# Only process JSON files
if dest_path.suffix.lower() == '.json':
self.logger.info(f"JSON file moved to hot folder: {dest_path.name}")
self.processor.process_file(dest_path)
def main():
"""Main entry point"""
processor = JSONWorkflowProcessor()
# Setup file monitoring
event_handler = JSONFileHandler(processor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=True)
# Start monitoring
observer.start()
processor.logger.info(f"Started monitoring {Config.HOT_FOLDER}")
try:
while True:
time.sleep(Config.POLL_INTERVAL)
except KeyboardInterrupt:
processor.logger.info("Stopping JSON Workflow Processor...")
observer.stop()
observer.join()
processor.logger.info("JSON Workflow Processor stopped")
if __name__ == "__main__":
main()