Changes: - Added DELIVERABLE_ITEM_TYPE_ID config (5793903 for Staging) - Updated create_deliverable_project() to use customItemTypeId parameter - Deliverables now created with proper "Deliverable" custom item type - Results in scope=WsFolder (instead of RbFolder) - Matches manually created deliverables in Wrike UI This ensures deliverables display correctly as "Deliverable" type in the Wrike interface, not generic "Project" type. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1018 lines
40 KiB
Python
1018 lines
40 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Wrike Import Monitor - Real-time JSON processor with daily reporting
|
|
Monitors a folder for JSON files and automatically creates Wrike projects/tasks
|
|
|
|
Features:
|
|
- Real-time folder monitoring with watchdog
|
|
- Periodic scanning for missed files
|
|
- Duplicate detection via OMG numbers
|
|
- Daily email reports at 7PM
|
|
- Concurrent batch processing
|
|
- Failed file handling
|
|
- Auto-cleanup of old processed files
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
import shutil
|
|
import time
|
|
import threading
|
|
import schedule
|
|
import requests
|
|
from collections import defaultdict, Counter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, date, timedelta
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any, List, Set
|
|
from queue import Queue
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
|
|
class Config:
|
|
"""Configuration settings - EDIT THESE FOR YOUR ENVIRONMENT"""
|
|
|
|
# === PATHS (EDIT FOR LOCAL/SERVER) ===
|
|
# development paths
|
|
HOT_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/json_files")
|
|
PROCESSED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed")
|
|
FAILED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/Failed")
|
|
REPORTS_DIR = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/reports")
|
|
|
|
|
|
# === WRIKE API SETTINGS ===
|
|
WRIKE_API_BASE = "https://www.wrike.com/api/v4"
|
|
WRIKE_TOKEN = "eyJ0dCI6InAiLCJhbGciOiJIUzI1NiIsInR2IjoiMiJ9.eyJkIjoie1wiYVwiOjY5NzM0OTgsXCJpXCI6OTUyOTY3MCxcImNcIjo0Njk5NTI3LFwidVwiOjIzMjcyNDA0LFwiclwiOlwiVVNcIixcInNcIjpbXCJXXCIsXCJGXCIsXCJJXCIsXCJVXCIsXCJLXCIsXCJDXCIsXCJEXCIsXCJNXCIsXCJBXCIsXCJMXCIsXCJQXCJdLFwielwiOltdLFwidFwiOjB9IiwiaWF0IjoxNzYwMDI4ODIzfQ.9f4t15LycpoH-NlzQC3s1K19fVqnAwcahG2D-J5E8dg"
|
|
|
|
# Wrike Space - EDIT FOR DIFFERENT ENVIRONMENTS
|
|
WRIKE_SPACE_ID = "MQAAAABpz7l_" # Staging
|
|
WRIKE_SPACE_NAME = "Staging"
|
|
DELIVERABLE_ITEM_TYPE_ID = "5793903" # Custom item type "Deliverable"
|
|
|
|
# Production space (uncomment for production)
|
|
# WRIKE_SPACE_ID = "MQAAAABoHcTY" # LGL Team
|
|
# WRIKE_SPACE_NAME = "LGL Team"
|
|
# DELIVERABLE_ITEM_TYPE_ID = "XXXXXX" # Update with production custom item type ID
|
|
|
|
# Custom field IDs in Wrike
|
|
CUSTOM_FIELDS = {
|
|
"budget": "IEAGU2B2JUAJRZ7P",
|
|
"impact": "IEAGU2B2JUAJRZ7Q",
|
|
"notes": "IEAGU2B2JUAJRZ7R",
|
|
"rag": "IEAGU2B2JUAJRZ7S",
|
|
"deliverable_category": "IEAGU2B2JUAJRZ7T",
|
|
"actions": "IEAGU2B2JUAJRZ7W",
|
|
"shoot_date": "IEAGU2B2JUAJRZ7X",
|
|
"omg_number": "IEAGU2B2JUAJV4P7", # Plain OMG# number
|
|
"omg_url": "IEAGU2B2JUAJRZ7Y", # OMG URL field
|
|
"box_link": "IEAGU2B2JUAJRZ7Z",
|
|
"owner": "IEAGU2B2JUAJRZ72"
|
|
}
|
|
|
|
# === EMAIL SETTINGS ===
|
|
SMTP_SERVER = "smtp.mailgun.org"
|
|
SMTP_PORT = 587
|
|
SMTP_USER = "twist@mail.dev.oliver.solutions"
|
|
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
|
|
SENDER_EMAIL = "WRIKE-MONITOR@oliver.agency"
|
|
REPORT_EMAILS = ["daveporter@oliver.agency"]
|
|
|
|
# === PROCESSING SETTINGS ===
|
|
BATCH_SIZE = 1 # Process one file at a time to avoid race conditions
|
|
MAX_WORKERS = 1 # No concurrent processing
|
|
BATCH_TIMEOUT = 30
|
|
WAIT_DELAY = 2
|
|
|
|
# === MONITORING SETTINGS ===
|
|
PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds
|
|
PERIODIC_SCAN_TIMEOUT = 120 # Max scan time
|
|
SLOW_SCAN_THRESHOLD = 30
|
|
|
|
# === REPORTING SETTINGS ===
|
|
DAILY_REPORT_TIME = "19:00" # 7PM daily report
|
|
KEEP_REPORTS_DAYS = 30
|
|
CLEANUP_PROCESSED_HOURS = 24 # Delete processed files after 24 hours
|
|
|
|
|
|
class DailyStats:
|
|
"""Thread-safe daily statistics collector"""
|
|
|
|
def __init__(self):
|
|
self.lock = threading.Lock()
|
|
self.reset_stats()
|
|
self.start_time = datetime.now()
|
|
|
|
def reset_stats(self):
|
|
"""Reset daily statistics"""
|
|
with self.lock:
|
|
self.date = date.today()
|
|
self.total_processed = 0
|
|
self.total_created = 0
|
|
self.total_skipped = 0
|
|
self.total_errors = 0
|
|
self.startup_files_processed = 0
|
|
self.periodic_files_found = 0
|
|
self.failed_files_moved = 0
|
|
self.periodic_scans_completed = 0
|
|
self.slow_scans = 0
|
|
|
|
# Folder/Project breakdown
|
|
self.folder_stats = defaultdict(lambda: {
|
|
'projects': Counter(),
|
|
'tasks_created': 0,
|
|
'tasks_skipped': 0,
|
|
'errors': 0
|
|
})
|
|
|
|
# Performance metrics
|
|
self.processing_times = []
|
|
self.hourly_stats = defaultdict(int)
|
|
self.error_details = []
|
|
|
|
def record_file_processed(self, folder_name: str, project_name: str,
|
|
task_created: bool, skipped: bool, processing_time: float,
|
|
success: bool = True, error: str = None,
|
|
is_startup: bool = False, is_periodic: bool = False):
|
|
"""Record a processed file"""
|
|
with self.lock:
|
|
hour = datetime.now().hour
|
|
self.hourly_stats[hour] += 1
|
|
|
|
if is_startup:
|
|
self.startup_files_processed += 1
|
|
elif is_periodic:
|
|
self.periodic_files_found += 1
|
|
|
|
if success:
|
|
self.total_processed += 1
|
|
if task_created:
|
|
self.total_created += 1
|
|
self.folder_stats[folder_name]['tasks_created'] += 1
|
|
if skipped:
|
|
self.total_skipped += 1
|
|
self.folder_stats[folder_name]['tasks_skipped'] += 1
|
|
|
|
self.folder_stats[folder_name]['projects'][project_name] += 1
|
|
self.processing_times.append(processing_time)
|
|
else:
|
|
self.total_errors += 1
|
|
self.folder_stats[folder_name]['errors'] += 1
|
|
if error:
|
|
self.error_details.append({
|
|
'time': datetime.now().strftime('%H:%M:%S'),
|
|
'folder': folder_name,
|
|
'project': project_name,
|
|
'error': error
|
|
})
|
|
|
|
def record_failed_file_moved(self):
|
|
"""Record a failed file being moved"""
|
|
with self.lock:
|
|
self.failed_files_moved += 1
|
|
|
|
def record_periodic_scan(self, duration: float, files_found: int):
|
|
"""Record periodic scan statistics"""
|
|
with self.lock:
|
|
self.periodic_scans_completed += 1
|
|
if duration > Config.SLOW_SCAN_THRESHOLD:
|
|
self.slow_scans += 1
|
|
|
|
def get_stats_summary(self) -> Dict[str, Any]:
|
|
"""Get current statistics summary"""
|
|
with self.lock:
|
|
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
|
|
|
|
return {
|
|
'date': self.date.strftime('%Y-%m-%d'),
|
|
'uptime': str(datetime.now() - self.start_time).split('.')[0],
|
|
'total_processed': self.total_processed,
|
|
'total_created': self.total_created,
|
|
'total_skipped': self.total_skipped,
|
|
'startup_files_processed': self.startup_files_processed,
|
|
'periodic_files_found': self.periodic_files_found,
|
|
'failed_files_moved': self.failed_files_moved,
|
|
'periodic_scans_completed': self.periodic_scans_completed,
|
|
'slow_scans': self.slow_scans,
|
|
'total_errors': self.total_errors,
|
|
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
|
|
'avg_processing_time': round(avg_processing_time, 2),
|
|
'folder_stats': dict(self.folder_stats),
|
|
'hourly_stats': dict(self.hourly_stats),
|
|
'error_details': self.error_details.copy()
|
|
}
|
|
|
|
|
|
class WrikeMonitor:
|
|
"""Wrike import processor with real-time monitoring"""
|
|
|
|
def __init__(self):
|
|
self.setup_logging()
|
|
self.ensure_directories()
|
|
self.file_queue = Queue()
|
|
self.daily_stats = DailyStats()
|
|
self.startup_complete = False
|
|
|
|
# Caches
|
|
self.folder_cache = {}
|
|
self.project_cache = {}
|
|
|
|
# Track recently processed files
|
|
self.recently_processed: Set[str] = set()
|
|
self.processed_lock = threading.Lock()
|
|
|
|
# Scan protection
|
|
self.scan_in_progress = False
|
|
self.scan_lock = threading.Lock()
|
|
|
|
self.setup_daily_reporting()
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
log_format = '%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s'
|
|
Config.REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=log_format,
|
|
handlers=[
|
|
logging.FileHandler(Config.REPORTS_DIR / 'wrike_monitor.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info(f"Wrike Monitor initialized - Target: {Config.WRIKE_SPACE_NAME}")
|
|
|
|
def ensure_directories(self):
|
|
"""Create necessary directories"""
|
|
for directory in [Config.HOT_FOLDER, Config.PROCESSED_FOLDER,
|
|
Config.FAILED_FOLDER, Config.REPORTS_DIR]:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
def setup_daily_reporting(self):
|
|
"""Setup scheduled daily reporting"""
|
|
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
|
|
|
|
def run_scheduler():
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(60)
|
|
|
|
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
|
|
scheduler_thread.start()
|
|
|
|
def make_wrike_request(self, method, endpoint, data=None):
|
|
"""Make a request to the Wrike API"""
|
|
url = f"{Config.WRIKE_API_BASE}{endpoint}"
|
|
headers = {
|
|
"Authorization": f"Bearer {Config.WRIKE_TOKEN}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
try:
|
|
if method == "GET":
|
|
response = requests.get(url, headers=headers, timeout=30)
|
|
elif method == "POST":
|
|
response = requests.post(url, headers=headers, json=data, timeout=30)
|
|
elif method == "PUT":
|
|
response = requests.put(url, headers=headers, json=data, timeout=30)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Wrike API error: {e}")
|
|
return None
|
|
|
|
def parse_business_area(self, business_area):
|
|
"""Extract folder name from BusinessArea"""
|
|
parts = business_area.split(">")
|
|
return parts[-1].strip() if parts else business_area.strip()
|
|
|
|
def parse_date(self, date_string):
|
|
"""Convert date string to YYYY-MM-DD format"""
|
|
if not date_string:
|
|
return None
|
|
try:
|
|
dt = datetime.strptime(date_string.split('+')[0].strip(), "%Y-%m-%d %H:%M:%S")
|
|
return dt.strftime("%Y-%m-%d")
|
|
except:
|
|
return None
|
|
|
|
def get_or_create_folder(self, folder_name, parent_id=None):
|
|
"""Get existing folder or create new one"""
|
|
if parent_id is None:
|
|
parent_id = Config.WRIKE_SPACE_ID
|
|
|
|
cache_key = f"{parent_id}:{folder_name}"
|
|
if cache_key in self.folder_cache:
|
|
return self.folder_cache[cache_key]
|
|
|
|
# Get folder tree
|
|
result = self.make_wrike_request("GET", f"/folders/{parent_id}/folders")
|
|
if result and "data" in result:
|
|
# First entry is the parent folder itself with childIds
|
|
parent_folder = None
|
|
for item in result["data"]:
|
|
if item["id"] == parent_id:
|
|
parent_folder = item
|
|
break
|
|
|
|
if parent_folder:
|
|
child_ids = parent_folder.get("childIds", [])
|
|
# Search only in direct children
|
|
for folder in result["data"]:
|
|
if folder["id"] in child_ids and folder["title"] == folder_name:
|
|
folder_id = folder["id"]
|
|
self.folder_cache[cache_key] = folder_id
|
|
self.logger.debug(f"Found existing folder: {folder_name} ({folder_id})")
|
|
return folder_id
|
|
|
|
# Create new folder
|
|
self.logger.info(f"Creating new folder: {folder_name}")
|
|
data = {"title": folder_name, "description": f"{folder_name} category folder"}
|
|
result = self.make_wrike_request("POST", f"/folders/{parent_id}/folders", data)
|
|
if result and "data" in result and len(result["data"]) > 0:
|
|
folder_id = result["data"][0]["id"]
|
|
self.folder_cache[cache_key] = folder_id
|
|
self.logger.info(f"Created folder: {folder_name} ({folder_id})")
|
|
return folder_id
|
|
|
|
return None
|
|
|
|
def get_or_create_project(self, project_title, folder_id, project_details, campaign_code):
|
|
"""Get existing project or create new one"""
|
|
cache_key = f"{folder_id}:{project_title}"
|
|
if cache_key in self.project_cache:
|
|
return self.project_cache[cache_key]
|
|
|
|
# Get folder tree
|
|
result = self.make_wrike_request("GET", f"/folders/{folder_id}/folders")
|
|
if result and "data" in result:
|
|
# Find parent folder with childIds
|
|
parent_folder = None
|
|
for item in result["data"]:
|
|
if item["id"] == folder_id:
|
|
parent_folder = item
|
|
break
|
|
|
|
if parent_folder:
|
|
child_ids = parent_folder.get("childIds", [])
|
|
# Search only in direct children
|
|
for item in result["data"]:
|
|
if item["id"] in child_ids and item["title"] == project_title and "project" in item:
|
|
project_id = item["id"]
|
|
self.project_cache[cache_key] = project_id
|
|
self.logger.debug(f"Found existing project: {project_title} ({project_id})")
|
|
return project_id
|
|
|
|
# Create new project
|
|
description = project_details.get("Description", "")
|
|
if description:
|
|
description = description.replace("<p>", "").replace("</p>", "")
|
|
|
|
data = {"title": project_title, "description": description}
|
|
result = self.make_wrike_request("POST", f"/folders/{folder_id}/folders", data)
|
|
|
|
if result and "data" in result and len(result["data"]) > 0:
|
|
project_id = result["data"][0]["id"]
|
|
|
|
# Convert to project with dates
|
|
start_date = self.parse_date(project_details.get("StartDate"))
|
|
end_date = self.parse_date(project_details.get("EndDate"))
|
|
|
|
project_data = {"project": {}}
|
|
if start_date:
|
|
project_data["project"]["startDate"] = start_date
|
|
if end_date:
|
|
project_data["project"]["endDate"] = end_date
|
|
|
|
# Add campaign code
|
|
if campaign_code:
|
|
project_data["customFields"] = [
|
|
{"id": Config.CUSTOM_FIELDS["omg_number"], "value": campaign_code}
|
|
]
|
|
|
|
self.make_wrike_request("PUT", f"/folders/{project_id}", project_data)
|
|
self.project_cache[cache_key] = project_id
|
|
self.logger.info(f"Created project: {project_title} ({campaign_code})")
|
|
return project_id
|
|
|
|
return None
|
|
|
|
def find_task_by_omg_number(self, project_id, omg_number):
|
|
"""Find task with matching OMG number"""
|
|
if not omg_number:
|
|
return None
|
|
|
|
result = self.make_wrike_request("GET", f"/folders/{project_id}/tasks?fields=[\"customFields\"]")
|
|
|
|
if result and "data" in result:
|
|
for task in result["data"]:
|
|
custom_fields = task.get("customFields", [])
|
|
for field in custom_fields:
|
|
if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]:
|
|
if field.get("value") == omg_number:
|
|
return task["id"]
|
|
return None
|
|
|
|
def find_deliverable_by_omg_number(self, parent_project_id, omg_number):
|
|
"""Find deliverable (project) with matching OMG number"""
|
|
if not omg_number:
|
|
return None
|
|
|
|
try:
|
|
# Get all subfolders/projects with custom fields
|
|
result = self.make_wrike_request("GET", f"/folders/{parent_project_id}/folders")
|
|
|
|
if result and "data" in result:
|
|
# Find parent folder with childIds
|
|
parent_folder = None
|
|
for item in result["data"]:
|
|
if item["id"] == parent_project_id:
|
|
parent_folder = item
|
|
break
|
|
|
|
if parent_folder:
|
|
child_ids = parent_folder.get("childIds", [])
|
|
|
|
# For each child, get its details with custom fields
|
|
for child_id in child_ids:
|
|
child_result = self.make_wrike_request("GET", f"/folders/{child_id}")
|
|
if child_result and "data" in child_result and len(child_result["data"]) > 0:
|
|
child = child_result["data"][0]
|
|
|
|
# Check if it's a project (deliverable)
|
|
if "project" in child:
|
|
custom_fields = child.get("customFields", [])
|
|
for field in custom_fields:
|
|
if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]:
|
|
if field.get("value") == omg_number:
|
|
self.logger.debug(f"Found existing deliverable with OMG# {omg_number}: {child_id}")
|
|
return child_id
|
|
except Exception as e:
|
|
self.logger.error(f"Error searching for deliverable: {e}")
|
|
|
|
return None
|
|
|
|
def generate_omg_url(self, omg_number):
|
|
"""Generate OMG URL from OMG number (subtract 999999)"""
|
|
try:
|
|
omg_int = int(omg_number)
|
|
job_id = omg_int - 999999
|
|
return f"https://bissell.omg.oliver.solutions/jobs/{job_id}"
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
def create_deliverable_project(self, parent_project_id, job_details):
|
|
"""Create deliverable as a project (not a task) - skip if exists"""
|
|
deliverable_title = job_details.get("Title", "Untitled Deliverable")
|
|
job_number = job_details.get("Number", "")
|
|
|
|
# Check if exists by searching for existing deliverable with same OMG#
|
|
if job_number:
|
|
existing_deliverable_id = self.find_deliverable_by_omg_number(parent_project_id, job_number)
|
|
if existing_deliverable_id:
|
|
self.logger.info(f"⊙ Deliverable already exists: {deliverable_title} (#{job_number}) - skipping")
|
|
return existing_deliverable_id, True # Return (deliverable_id, skipped=True)
|
|
|
|
# Parse dates - use BriefDate as start, LiveDate (or DueDate) as end
|
|
brief_date = self.parse_date(job_details.get("BriefDate"))
|
|
live_date = self.parse_date(job_details.get("LiveDate")) or self.parse_date(job_details.get("DueDate"))
|
|
|
|
# Build description
|
|
description_parts = []
|
|
if job_details.get("Notes"):
|
|
description_parts.append(job_details["Notes"])
|
|
if job_details.get("Type"):
|
|
description_parts.append(f"Type: {job_details['Type']}")
|
|
if job_details.get("MediaType"):
|
|
description_parts.append(f"Media: {job_details['MediaType']}")
|
|
|
|
deliverable_description = " | ".join(description_parts) if description_parts else ""
|
|
|
|
# Create deliverable as a project (subfolder)
|
|
deliverable_data = {
|
|
"title": deliverable_title,
|
|
"description": deliverable_description
|
|
}
|
|
|
|
result = self.make_wrike_request("POST", f"/folders/{parent_project_id}/folders", deliverable_data)
|
|
|
|
if result and "data" in result and len(result["data"]) > 0:
|
|
deliverable_id = result["data"][0]["id"]
|
|
|
|
# Convert to project with dates and custom item type
|
|
project_data = {"project": {"customItemTypeId": Config.DELIVERABLE_ITEM_TYPE_ID}}
|
|
if brief_date:
|
|
project_data["project"]["startDate"] = brief_date
|
|
if live_date:
|
|
project_data["project"]["endDate"] = live_date
|
|
|
|
# Add custom fields
|
|
custom_fields = []
|
|
|
|
# For deliverables: OMG# = plain number, OMG_URL = URL
|
|
if job_number:
|
|
# Plain OMG# number for duplicate detection
|
|
custom_fields.append({
|
|
"id": Config.CUSTOM_FIELDS["omg_number"],
|
|
"value": job_number
|
|
})
|
|
|
|
# Generate and add URL to OMG_URL field
|
|
omg_url = self.generate_omg_url(job_number)
|
|
if omg_url:
|
|
custom_fields.append({
|
|
"id": Config.CUSTOM_FIELDS["omg_url"],
|
|
"value": omg_url
|
|
})
|
|
|
|
job_category = job_details.get("JobCategory", job_details.get("MediaType", ""))
|
|
if job_category:
|
|
custom_fields.append({
|
|
"id": Config.CUSTOM_FIELDS["deliverable_category"],
|
|
"value": job_category
|
|
})
|
|
|
|
notes_parts = []
|
|
if job_details.get("Type"):
|
|
notes_parts.append(f"Type: {job_details['Type']}")
|
|
if job_details.get("Details"):
|
|
notes_parts.append(job_details["Details"])
|
|
if notes_parts:
|
|
custom_fields.append({
|
|
"id": Config.CUSTOM_FIELDS["notes"],
|
|
"value": " | ".join(notes_parts)
|
|
})
|
|
|
|
if custom_fields:
|
|
project_data["customFields"] = custom_fields
|
|
|
|
# Update with project dates and custom fields
|
|
self.make_wrike_request("PUT", f"/folders/{deliverable_id}", project_data)
|
|
|
|
self.logger.info(f"✓ Created deliverable (as project): {deliverable_title} (#{job_number})")
|
|
return deliverable_id, False # Return (deliverable_id, skipped=False)
|
|
|
|
return None, False
|
|
|
|
def process_json_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False):
|
|
"""Process single JSON file"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Parse JSON
|
|
with open(file_path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
job_spec = data.get("JobSpecification", {})
|
|
project_details = job_spec.get("ProjectDetails", {})
|
|
job_details = job_spec.get("JobDetails", {})
|
|
|
|
# Extract folder name
|
|
business_area = project_details.get("BusinessArea", job_details.get("BusinessArea", ""))
|
|
folder_name = self.parse_business_area(business_area)
|
|
|
|
if not folder_name:
|
|
raise ValueError("No BusinessArea found")
|
|
|
|
# Get/Create folder
|
|
folder_id = self.get_or_create_folder(folder_name)
|
|
if not folder_id:
|
|
raise ValueError(f"Failed to create folder: {folder_name}")
|
|
|
|
# Get/Create project
|
|
project_title = project_details.get("Title", "Untitled Project")
|
|
campaign_code = job_details.get("CampaignCode", "")
|
|
|
|
project_id = self.get_or_create_project(project_title, folder_id, project_details, campaign_code)
|
|
if not project_id:
|
|
raise ValueError(f"Failed to create project: {project_title}")
|
|
|
|
# Create deliverable (as project)
|
|
deliverable_id, skipped = self.create_deliverable_project(project_id, job_details)
|
|
if not deliverable_id:
|
|
raise ValueError("Failed to create deliverable")
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
# Record stats
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, project_title, not skipped, skipped,
|
|
processing_time, True, None, is_startup, is_periodic
|
|
)
|
|
|
|
# Move to processed
|
|
self.move_to_processed(file_path)
|
|
self.add_to_recently_processed(file_path)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
self.logger.error(f"Error processing {file_path.name}: {e}")
|
|
|
|
# Move to failed
|
|
self.move_to_failed(file_path, str(e))
|
|
|
|
# Record error
|
|
try:
|
|
folder_name = self.parse_business_area(
|
|
job_spec.get("ProjectDetails", {}).get("BusinessArea", "Unknown")
|
|
)
|
|
project_title = job_spec.get("ProjectDetails", {}).get("Title", "Unknown")
|
|
except:
|
|
folder_name = "Unknown"
|
|
project_title = "Unknown"
|
|
|
|
self.daily_stats.record_file_processed(
|
|
folder_name, project_title, False, False,
|
|
processing_time, False, str(e), is_startup, is_periodic
|
|
)
|
|
|
|
return False
|
|
|
|
def add_to_recently_processed(self, file_path: Path):
|
|
"""Add file to recently processed set"""
|
|
with self.processed_lock:
|
|
try:
|
|
stat_info = file_path.stat()
|
|
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
|
|
self.recently_processed.add(file_id)
|
|
|
|
if len(self.recently_processed) > 1000:
|
|
oldest = list(self.recently_processed)[:200]
|
|
for entry in oldest:
|
|
self.recently_processed.discard(entry)
|
|
except:
|
|
pass
|
|
|
|
def was_recently_processed(self, file_path: Path) -> bool:
|
|
"""Check if file was recently processed"""
|
|
with self.processed_lock:
|
|
try:
|
|
stat_info = file_path.stat()
|
|
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
|
|
return file_id in self.recently_processed
|
|
except:
|
|
return False
|
|
|
|
def move_to_processed(self, file_path: Path):
|
|
"""Move file to Processed folder"""
|
|
try:
|
|
destination = Config.PROCESSED_FOLDER / file_path.name
|
|
shutil.move(str(file_path), str(destination))
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move to processed: {e}")
|
|
|
|
def move_to_failed(self, file_path: Path, error_msg: str):
|
|
"""Move file to Failed folder with error log"""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
failed_filename = f"{timestamp}_{file_path.name}"
|
|
failed_path = Config.FAILED_FOLDER / failed_filename
|
|
|
|
shutil.move(str(file_path), str(failed_path))
|
|
|
|
# Create error log
|
|
error_log_path = Config.FAILED_FOLDER / f"{timestamp}_{file_path.stem}_ERROR.txt"
|
|
with open(error_log_path, 'w') as f:
|
|
f.write(f"File: {file_path.name}\n")
|
|
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"Error: {error_msg}\n")
|
|
|
|
self.daily_stats.record_failed_file_moved()
|
|
self.logger.warning(f"Moved to failed: {file_path.name}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move failed file: {e}")
|
|
|
|
def scan_existing_files(self) -> List[Path]:
|
|
"""Scan for existing JSON files"""
|
|
self.logger.info("🔍 Scanning for existing JSON files...")
|
|
json_files = list(Config.HOT_FOLDER.glob("*.json"))
|
|
self.logger.info(f"Found {len(json_files)} existing files")
|
|
return json_files
|
|
|
|
def periodic_scan(self):
|
|
"""Protected periodic scan for missed files"""
|
|
while True:
|
|
try:
|
|
if self.startup_complete:
|
|
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
|
|
|
|
with self.scan_lock:
|
|
if self.scan_in_progress:
|
|
self.logger.warning("Skipping scan - already in progress")
|
|
continue
|
|
self.scan_in_progress = True
|
|
|
|
self._perform_periodic_scan()
|
|
|
|
with self.scan_lock:
|
|
self.scan_in_progress = False
|
|
else:
|
|
time.sleep(5)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Periodic scan error: {e}")
|
|
with self.scan_lock:
|
|
self.scan_in_progress = False
|
|
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
|
|
|
|
def _perform_periodic_scan(self):
|
|
"""Perform actual periodic scan"""
|
|
scan_start = time.time()
|
|
missed_files = []
|
|
|
|
try:
|
|
for json_file in Config.HOT_FOLDER.glob("*.json"):
|
|
if not self.was_recently_processed(json_file):
|
|
missed_files.append(json_file)
|
|
|
|
scan_duration = time.time() - scan_start
|
|
|
|
if missed_files:
|
|
self.logger.info(f"🔄 Periodic scan found {len(missed_files)} files ({scan_duration:.1f}s)")
|
|
for file_path in missed_files:
|
|
self.queue_file(file_path, is_periodic=True)
|
|
|
|
self.daily_stats.record_periodic_scan(scan_duration, len(missed_files))
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Periodic scan error: {e}")
|
|
|
|
def process_startup_files(self, files: List[Path]):
|
|
"""Process existing files from startup scan"""
|
|
if not files:
|
|
self.logger.info("✅ No existing files to process")
|
|
return
|
|
|
|
self.logger.info(f"🚀 Processing {len(files)} existing files...")
|
|
|
|
for i in range(0, len(files), Config.BATCH_SIZE):
|
|
batch = files[i:i + Config.BATCH_SIZE]
|
|
self.process_batch(batch, is_startup=True)
|
|
|
|
self.logger.info(f"✅ Startup processing complete")
|
|
|
|
def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False):
|
|
"""Process files sequentially (one at a time)"""
|
|
if not file_paths:
|
|
return
|
|
|
|
# Process sequentially to avoid race conditions with Wrike API
|
|
for file_path in file_paths:
|
|
try:
|
|
self.process_json_file(file_path, is_startup, is_periodic)
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing {file_path.name}: {e}")
|
|
|
|
def queue_file(self, file_path: Path, is_periodic: bool = False):
|
|
"""Add file to processing queue"""
|
|
if self.startup_complete:
|
|
self.file_queue.put((file_path, is_periodic))
|
|
|
|
def batch_processor_worker(self):
|
|
"""Background worker for batch processing"""
|
|
while True:
|
|
files_to_process = []
|
|
periodic_flags = []
|
|
batch_start_time = time.time()
|
|
|
|
while (len(files_to_process) < Config.BATCH_SIZE and
|
|
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
|
|
try:
|
|
file_data = self.file_queue.get(timeout=1.0)
|
|
file_path, is_periodic = file_data if isinstance(file_data, tuple) else (file_data, False)
|
|
files_to_process.append(file_path)
|
|
periodic_flags.append(is_periodic)
|
|
self.file_queue.task_done()
|
|
except:
|
|
if files_to_process:
|
|
break
|
|
continue
|
|
|
|
if files_to_process and self.startup_complete:
|
|
is_periodic_batch = any(periodic_flags)
|
|
self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch)
|
|
|
|
def cleanup_old_processed_files(self):
|
|
"""Delete files older than configured hours from Processed folder"""
|
|
try:
|
|
cutoff_time = time.time() - (Config.CLEANUP_PROCESSED_HOURS * 3600)
|
|
deleted = 0
|
|
|
|
for file in Config.PROCESSED_FOLDER.glob("*.json"):
|
|
if file.stat().st_mtime < cutoff_time:
|
|
file.unlink()
|
|
deleted += 1
|
|
|
|
if deleted > 0:
|
|
self.logger.info(f"🗑️ Cleaned up {deleted} old processed files")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Cleanup error: {e}")
|
|
|
|
def generate_daily_report(self):
|
|
"""Generate and email daily report"""
|
|
try:
|
|
stats = self.daily_stats.get_stats_summary()
|
|
report_content = self.format_daily_report(stats)
|
|
|
|
# Save report
|
|
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
|
|
with open(report_file, 'w') as f:
|
|
f.write(report_content)
|
|
|
|
# Email report
|
|
self.email_daily_report(report_content, stats['date'])
|
|
|
|
# Cleanup old reports
|
|
self.cleanup_old_reports()
|
|
|
|
# Cleanup old processed files
|
|
self.cleanup_old_processed_files()
|
|
|
|
# Reset stats
|
|
self.daily_stats.reset_stats()
|
|
|
|
self.logger.info(f"Daily report generated: {report_file}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to generate daily report: {e}")
|
|
|
|
def format_daily_report(self, stats: Dict[str, Any]) -> str:
|
|
"""Format daily statistics into readable report"""
|
|
startup_info = f"Startup Files: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else ""
|
|
periodic_info = f"Periodic Scan Files: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else ""
|
|
failed_info = f"Failed Files: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else ""
|
|
|
|
report = f"""
|
|
WRIKE IMPORT DAILY REPORT
|
|
Date: {stats['date']}
|
|
Uptime: {stats['uptime']}
|
|
Space: {Config.WRIKE_SPACE_NAME}
|
|
|
|
=== SUMMARY ===
|
|
Total Files Processed: {stats['total_processed']}
|
|
Tasks Created: {stats['total_created']}
|
|
Tasks Skipped (duplicates): {stats['total_skipped']}
|
|
{startup_info}{periodic_info}{failed_info}Errors: {stats['total_errors']}
|
|
Success Rate: {stats['success_rate']:.1f}%
|
|
Average Processing Time: {stats['avg_processing_time']}s
|
|
Periodic Scans: {stats['periodic_scans_completed']} ({stats['slow_scans']} slow)
|
|
|
|
=== FOLDER BREAKDOWN ===
|
|
"""
|
|
|
|
for folder, folder_data in sorted(stats['folder_stats'].items()):
|
|
report += f"\n{folder}:\n"
|
|
report += f" Tasks Created: {folder_data['tasks_created']}\n"
|
|
report += f" Tasks Skipped: {folder_data['tasks_skipped']}\n"
|
|
report += f" Errors: {folder_data['errors']}\n"
|
|
|
|
if folder_data['projects']:
|
|
report += f" Projects:\n"
|
|
for project, count in folder_data['projects'].most_common(10):
|
|
report += f" - {project}: {count} deliverable(s)\n"
|
|
|
|
report += "\n=== HOURLY BREAKDOWN ===\n"
|
|
for hour in range(24):
|
|
count = stats['hourly_stats'].get(hour, 0)
|
|
if count > 0:
|
|
report += f"{hour:02d}:00 - {count} files\n"
|
|
|
|
if stats['error_details']:
|
|
report += "\n=== RECENT ERRORS ===\n"
|
|
for error in stats['error_details'][-10:]:
|
|
report += f"{error['time']} - {error['folder']}/{error['project']}: {error['error']}\n"
|
|
|
|
return report
|
|
|
|
def email_daily_report(self, report_content: str, report_date: str):
|
|
"""Email the daily report"""
|
|
try:
|
|
msg = MIMEMultipart()
|
|
msg['From'] = Config.SENDER_EMAIL
|
|
msg['To'] = ', '.join(Config.REPORT_EMAILS)
|
|
msg['Subject'] = f"Wrike Import Daily Report - {Config.WRIKE_SPACE_NAME} - {report_date}"
|
|
|
|
msg.attach(MIMEText(report_content, 'plain'))
|
|
|
|
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
|
|
server.starttls()
|
|
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
|
|
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
|
|
server.quit()
|
|
|
|
self.logger.info("Daily report emailed successfully")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to email report: {e}")
|
|
|
|
def cleanup_old_reports(self):
|
|
"""Remove old report files"""
|
|
try:
|
|
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
|
|
|
|
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
|
|
try:
|
|
file_date_str = report_file.stem.split('_')[-1]
|
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
|
|
|
|
if file_date < cutoff_date:
|
|
report_file.unlink()
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to cleanup old reports: {e}")
|
|
|
|
def get_current_stats(self) -> str:
|
|
"""Get current statistics as string"""
|
|
stats = self.daily_stats.get_stats_summary()
|
|
return f"Today: {stats['total_processed']} processed ({stats['total_created']} created, {stats['total_skipped']} skipped), {stats['total_errors']} errors"
|
|
|
|
|
|
class WrikeFileHandler(FileSystemEventHandler):
|
|
"""File system event handler for Wrike monitor"""
|
|
|
|
def __init__(self, monitor: WrikeMonitor):
|
|
self.monitor = monitor
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory or not self.monitor.startup_complete:
|
|
return
|
|
|
|
file_path = Path(event.src_path)
|
|
if file_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"📥 New file detected: {file_path.name}")
|
|
self.monitor.queue_file(file_path, is_periodic=False)
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory or not self.monitor.startup_complete:
|
|
return
|
|
|
|
dest_path = Path(event.dest_path)
|
|
if dest_path.suffix.lower() == '.json':
|
|
self.logger.debug(f"📁 File moved: {dest_path.name}")
|
|
self.monitor.queue_file(dest_path, is_periodic=False)
|
|
|
|
|
|
def main():
|
|
"""Main entry point with real-time monitoring"""
|
|
monitor = WrikeMonitor()
|
|
|
|
# Step 1: Process existing files
|
|
existing_files = monitor.scan_existing_files()
|
|
monitor.process_startup_files(existing_files)
|
|
|
|
# Step 2: Start real-time monitoring
|
|
monitor.startup_complete = True
|
|
monitor.logger.info("🎯 Startup complete - switching to real-time monitoring")
|
|
|
|
# Start batch processing worker
|
|
batch_worker = threading.Thread(
|
|
target=monitor.batch_processor_worker,
|
|
name="BatchWorker",
|
|
daemon=True
|
|
)
|
|
batch_worker.start()
|
|
|
|
# Start periodic scanner
|
|
periodic_scanner = threading.Thread(
|
|
target=monitor.periodic_scan,
|
|
name="PeriodicScanner",
|
|
daemon=True
|
|
)
|
|
periodic_scanner.start()
|
|
|
|
# Setup file monitoring
|
|
event_handler = WrikeFileHandler(monitor)
|
|
observer = Observer()
|
|
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=False)
|
|
observer.start()
|
|
|
|
monitor.logger.info(f"🛡️ Real-time monitoring active on: {Config.HOT_FOLDER}")
|
|
monitor.logger.info(f"📧 Daily reports at {Config.DAILY_REPORT_TIME} to {', '.join(Config.REPORT_EMAILS)}")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(60)
|
|
current_stats = monitor.get_current_stats()
|
|
monitor.logger.info(f"📊 {current_stats}")
|
|
except KeyboardInterrupt:
|
|
monitor.logger.info("Stopping Wrike Monitor...")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
monitor.logger.info("Wrike Monitor stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|