bissell-wrike-python/wrike_monitor.py
Dave Porter bd092e75cd Use custom 'Deliverable' item type when creating deliverables
Changes:
- Added DELIVERABLE_ITEM_TYPE_ID config (5793903 for Staging)
- Updated create_deliverable_project() to use customItemTypeId parameter
- Deliverables now created with proper "Deliverable" custom item type
- Results in scope=WsFolder (instead of RbFolder)
- Matches manually created deliverables in Wrike UI

This ensures deliverables display correctly as "Deliverable" type
in the Wrike interface, not generic "Project" type.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-20 15:16:24 -04:00

1018 lines
40 KiB
Python

#!/usr/bin/env python3
"""
Wrike Import Monitor - Real-time JSON processor with daily reporting
Monitors a folder for JSON files and automatically creates Wrike projects/tasks
Features:
- Real-time folder monitoring with watchdog
- Periodic scanning for missed files
- Duplicate detection via OMG numbers
- Daily email reports at 7PM
- Concurrent batch processing
- Failed file handling
- Auto-cleanup of old processed files
"""
import json
import logging
import smtplib
import shutil
import time
import threading
import schedule
import requests
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, date, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path
from typing import Dict, Optional, Any, List, Set
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings - EDIT THESE FOR YOUR ENVIRONMENT"""
# === PATHS (EDIT FOR LOCAL/SERVER) ===
# development paths
HOT_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/json_files")
PROCESSED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed")
FAILED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/Failed")
REPORTS_DIR = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/reports")
# === WRIKE API SETTINGS ===
WRIKE_API_BASE = "https://www.wrike.com/api/v4"
WRIKE_TOKEN = "eyJ0dCI6InAiLCJhbGciOiJIUzI1NiIsInR2IjoiMiJ9.eyJkIjoie1wiYVwiOjY5NzM0OTgsXCJpXCI6OTUyOTY3MCxcImNcIjo0Njk5NTI3LFwidVwiOjIzMjcyNDA0LFwiclwiOlwiVVNcIixcInNcIjpbXCJXXCIsXCJGXCIsXCJJXCIsXCJVXCIsXCJLXCIsXCJDXCIsXCJEXCIsXCJNXCIsXCJBXCIsXCJMXCIsXCJQXCJdLFwielwiOltdLFwidFwiOjB9IiwiaWF0IjoxNzYwMDI4ODIzfQ.9f4t15LycpoH-NlzQC3s1K19fVqnAwcahG2D-J5E8dg"
# Wrike Space - EDIT FOR DIFFERENT ENVIRONMENTS
WRIKE_SPACE_ID = "MQAAAABpz7l_" # Staging
WRIKE_SPACE_NAME = "Staging"
DELIVERABLE_ITEM_TYPE_ID = "5793903" # Custom item type "Deliverable"
# Production space (uncomment for production)
# WRIKE_SPACE_ID = "MQAAAABoHcTY" # LGL Team
# WRIKE_SPACE_NAME = "LGL Team"
# DELIVERABLE_ITEM_TYPE_ID = "XXXXXX" # Update with production custom item type ID
# Custom field IDs in Wrike
CUSTOM_FIELDS = {
"budget": "IEAGU2B2JUAJRZ7P",
"impact": "IEAGU2B2JUAJRZ7Q",
"notes": "IEAGU2B2JUAJRZ7R",
"rag": "IEAGU2B2JUAJRZ7S",
"deliverable_category": "IEAGU2B2JUAJRZ7T",
"actions": "IEAGU2B2JUAJRZ7W",
"shoot_date": "IEAGU2B2JUAJRZ7X",
"omg_number": "IEAGU2B2JUAJV4P7", # Plain OMG# number
"omg_url": "IEAGU2B2JUAJRZ7Y", # OMG URL field
"box_link": "IEAGU2B2JUAJRZ7Z",
"owner": "IEAGU2B2JUAJRZ72"
}
# === EMAIL SETTINGS ===
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 587
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
SENDER_EMAIL = "WRIKE-MONITOR@oliver.agency"
REPORT_EMAILS = ["daveporter@oliver.agency"]
# === PROCESSING SETTINGS ===
BATCH_SIZE = 1 # Process one file at a time to avoid race conditions
MAX_WORKERS = 1 # No concurrent processing
BATCH_TIMEOUT = 30
WAIT_DELAY = 2
# === MONITORING SETTINGS ===
PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds
PERIODIC_SCAN_TIMEOUT = 120 # Max scan time
SLOW_SCAN_THRESHOLD = 30
# === REPORTING SETTINGS ===
DAILY_REPORT_TIME = "19:00" # 7PM daily report
KEEP_REPORTS_DAYS = 30
CLEANUP_PROCESSED_HOURS = 24 # Delete processed files after 24 hours
class DailyStats:
"""Thread-safe daily statistics collector"""
def __init__(self):
self.lock = threading.Lock()
self.reset_stats()
self.start_time = datetime.now()
def reset_stats(self):
"""Reset daily statistics"""
with self.lock:
self.date = date.today()
self.total_processed = 0
self.total_created = 0
self.total_skipped = 0
self.total_errors = 0
self.startup_files_processed = 0
self.periodic_files_found = 0
self.failed_files_moved = 0
self.periodic_scans_completed = 0
self.slow_scans = 0
# Folder/Project breakdown
self.folder_stats = defaultdict(lambda: {
'projects': Counter(),
'tasks_created': 0,
'tasks_skipped': 0,
'errors': 0
})
# Performance metrics
self.processing_times = []
self.hourly_stats = defaultdict(int)
self.error_details = []
def record_file_processed(self, folder_name: str, project_name: str,
task_created: bool, skipped: bool, processing_time: float,
success: bool = True, error: str = None,
is_startup: bool = False, is_periodic: bool = False):
"""Record a processed file"""
with self.lock:
hour = datetime.now().hour
self.hourly_stats[hour] += 1
if is_startup:
self.startup_files_processed += 1
elif is_periodic:
self.periodic_files_found += 1
if success:
self.total_processed += 1
if task_created:
self.total_created += 1
self.folder_stats[folder_name]['tasks_created'] += 1
if skipped:
self.total_skipped += 1
self.folder_stats[folder_name]['tasks_skipped'] += 1
self.folder_stats[folder_name]['projects'][project_name] += 1
self.processing_times.append(processing_time)
else:
self.total_errors += 1
self.folder_stats[folder_name]['errors'] += 1
if error:
self.error_details.append({
'time': datetime.now().strftime('%H:%M:%S'),
'folder': folder_name,
'project': project_name,
'error': error
})
def record_failed_file_moved(self):
"""Record a failed file being moved"""
with self.lock:
self.failed_files_moved += 1
def record_periodic_scan(self, duration: float, files_found: int):
"""Record periodic scan statistics"""
with self.lock:
self.periodic_scans_completed += 1
if duration > Config.SLOW_SCAN_THRESHOLD:
self.slow_scans += 1
def get_stats_summary(self) -> Dict[str, Any]:
"""Get current statistics summary"""
with self.lock:
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
return {
'date': self.date.strftime('%Y-%m-%d'),
'uptime': str(datetime.now() - self.start_time).split('.')[0],
'total_processed': self.total_processed,
'total_created': self.total_created,
'total_skipped': self.total_skipped,
'startup_files_processed': self.startup_files_processed,
'periodic_files_found': self.periodic_files_found,
'failed_files_moved': self.failed_files_moved,
'periodic_scans_completed': self.periodic_scans_completed,
'slow_scans': self.slow_scans,
'total_errors': self.total_errors,
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
'avg_processing_time': round(avg_processing_time, 2),
'folder_stats': dict(self.folder_stats),
'hourly_stats': dict(self.hourly_stats),
'error_details': self.error_details.copy()
}
class WrikeMonitor:
"""Wrike import processor with real-time monitoring"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
self.file_queue = Queue()
self.daily_stats = DailyStats()
self.startup_complete = False
# Caches
self.folder_cache = {}
self.project_cache = {}
# Track recently processed files
self.recently_processed: Set[str] = set()
self.processed_lock = threading.Lock()
# Scan protection
self.scan_in_progress = False
self.scan_lock = threading.Lock()
self.setup_daily_reporting()
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s'
Config.REPORTS_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(Config.REPORTS_DIR / 'wrike_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info(f"Wrike Monitor initialized - Target: {Config.WRIKE_SPACE_NAME}")
def ensure_directories(self):
"""Create necessary directories"""
for directory in [Config.HOT_FOLDER, Config.PROCESSED_FOLDER,
Config.FAILED_FOLDER, Config.REPORTS_DIR]:
directory.mkdir(parents=True, exist_ok=True)
def setup_daily_reporting(self):
"""Setup scheduled daily reporting"""
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60)
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
scheduler_thread.start()
def make_wrike_request(self, method, endpoint, data=None):
"""Make a request to the Wrike API"""
url = f"{Config.WRIKE_API_BASE}{endpoint}"
headers = {
"Authorization": f"Bearer {Config.WRIKE_TOKEN}",
"Content-Type": "application/json"
}
try:
if method == "GET":
response = requests.get(url, headers=headers, timeout=30)
elif method == "POST":
response = requests.post(url, headers=headers, json=data, timeout=30)
elif method == "PUT":
response = requests.put(url, headers=headers, json=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"Wrike API error: {e}")
return None
def parse_business_area(self, business_area):
"""Extract folder name from BusinessArea"""
parts = business_area.split(">")
return parts[-1].strip() if parts else business_area.strip()
def parse_date(self, date_string):
"""Convert date string to YYYY-MM-DD format"""
if not date_string:
return None
try:
dt = datetime.strptime(date_string.split('+')[0].strip(), "%Y-%m-%d %H:%M:%S")
return dt.strftime("%Y-%m-%d")
except:
return None
def get_or_create_folder(self, folder_name, parent_id=None):
"""Get existing folder or create new one"""
if parent_id is None:
parent_id = Config.WRIKE_SPACE_ID
cache_key = f"{parent_id}:{folder_name}"
if cache_key in self.folder_cache:
return self.folder_cache[cache_key]
# Get folder tree
result = self.make_wrike_request("GET", f"/folders/{parent_id}/folders")
if result and "data" in result:
# First entry is the parent folder itself with childIds
parent_folder = None
for item in result["data"]:
if item["id"] == parent_id:
parent_folder = item
break
if parent_folder:
child_ids = parent_folder.get("childIds", [])
# Search only in direct children
for folder in result["data"]:
if folder["id"] in child_ids and folder["title"] == folder_name:
folder_id = folder["id"]
self.folder_cache[cache_key] = folder_id
self.logger.debug(f"Found existing folder: {folder_name} ({folder_id})")
return folder_id
# Create new folder
self.logger.info(f"Creating new folder: {folder_name}")
data = {"title": folder_name, "description": f"{folder_name} category folder"}
result = self.make_wrike_request("POST", f"/folders/{parent_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
folder_id = result["data"][0]["id"]
self.folder_cache[cache_key] = folder_id
self.logger.info(f"Created folder: {folder_name} ({folder_id})")
return folder_id
return None
def get_or_create_project(self, project_title, folder_id, project_details, campaign_code):
"""Get existing project or create new one"""
cache_key = f"{folder_id}:{project_title}"
if cache_key in self.project_cache:
return self.project_cache[cache_key]
# Get folder tree
result = self.make_wrike_request("GET", f"/folders/{folder_id}/folders")
if result and "data" in result:
# Find parent folder with childIds
parent_folder = None
for item in result["data"]:
if item["id"] == folder_id:
parent_folder = item
break
if parent_folder:
child_ids = parent_folder.get("childIds", [])
# Search only in direct children
for item in result["data"]:
if item["id"] in child_ids and item["title"] == project_title and "project" in item:
project_id = item["id"]
self.project_cache[cache_key] = project_id
self.logger.debug(f"Found existing project: {project_title} ({project_id})")
return project_id
# Create new project
description = project_details.get("Description", "")
if description:
description = description.replace("<p>", "").replace("</p>", "")
data = {"title": project_title, "description": description}
result = self.make_wrike_request("POST", f"/folders/{folder_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
project_id = result["data"][0]["id"]
# Convert to project with dates
start_date = self.parse_date(project_details.get("StartDate"))
end_date = self.parse_date(project_details.get("EndDate"))
project_data = {"project": {}}
if start_date:
project_data["project"]["startDate"] = start_date
if end_date:
project_data["project"]["endDate"] = end_date
# Add campaign code
if campaign_code:
project_data["customFields"] = [
{"id": Config.CUSTOM_FIELDS["omg_number"], "value": campaign_code}
]
self.make_wrike_request("PUT", f"/folders/{project_id}", project_data)
self.project_cache[cache_key] = project_id
self.logger.info(f"Created project: {project_title} ({campaign_code})")
return project_id
return None
def find_task_by_omg_number(self, project_id, omg_number):
"""Find task with matching OMG number"""
if not omg_number:
return None
result = self.make_wrike_request("GET", f"/folders/{project_id}/tasks?fields=[\"customFields\"]")
if result and "data" in result:
for task in result["data"]:
custom_fields = task.get("customFields", [])
for field in custom_fields:
if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]:
if field.get("value") == omg_number:
return task["id"]
return None
def find_deliverable_by_omg_number(self, parent_project_id, omg_number):
"""Find deliverable (project) with matching OMG number"""
if not omg_number:
return None
try:
# Get all subfolders/projects with custom fields
result = self.make_wrike_request("GET", f"/folders/{parent_project_id}/folders")
if result and "data" in result:
# Find parent folder with childIds
parent_folder = None
for item in result["data"]:
if item["id"] == parent_project_id:
parent_folder = item
break
if parent_folder:
child_ids = parent_folder.get("childIds", [])
# For each child, get its details with custom fields
for child_id in child_ids:
child_result = self.make_wrike_request("GET", f"/folders/{child_id}")
if child_result and "data" in child_result and len(child_result["data"]) > 0:
child = child_result["data"][0]
# Check if it's a project (deliverable)
if "project" in child:
custom_fields = child.get("customFields", [])
for field in custom_fields:
if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]:
if field.get("value") == omg_number:
self.logger.debug(f"Found existing deliverable with OMG# {omg_number}: {child_id}")
return child_id
except Exception as e:
self.logger.error(f"Error searching for deliverable: {e}")
return None
def generate_omg_url(self, omg_number):
"""Generate OMG URL from OMG number (subtract 999999)"""
try:
omg_int = int(omg_number)
job_id = omg_int - 999999
return f"https://bissell.omg.oliver.solutions/jobs/{job_id}"
except (ValueError, TypeError):
return None
def create_deliverable_project(self, parent_project_id, job_details):
"""Create deliverable as a project (not a task) - skip if exists"""
deliverable_title = job_details.get("Title", "Untitled Deliverable")
job_number = job_details.get("Number", "")
# Check if exists by searching for existing deliverable with same OMG#
if job_number:
existing_deliverable_id = self.find_deliverable_by_omg_number(parent_project_id, job_number)
if existing_deliverable_id:
self.logger.info(f"⊙ Deliverable already exists: {deliverable_title} (#{job_number}) - skipping")
return existing_deliverable_id, True # Return (deliverable_id, skipped=True)
# Parse dates - use BriefDate as start, LiveDate (or DueDate) as end
brief_date = self.parse_date(job_details.get("BriefDate"))
live_date = self.parse_date(job_details.get("LiveDate")) or self.parse_date(job_details.get("DueDate"))
# Build description
description_parts = []
if job_details.get("Notes"):
description_parts.append(job_details["Notes"])
if job_details.get("Type"):
description_parts.append(f"Type: {job_details['Type']}")
if job_details.get("MediaType"):
description_parts.append(f"Media: {job_details['MediaType']}")
deliverable_description = " | ".join(description_parts) if description_parts else ""
# Create deliverable as a project (subfolder)
deliverable_data = {
"title": deliverable_title,
"description": deliverable_description
}
result = self.make_wrike_request("POST", f"/folders/{parent_project_id}/folders", deliverable_data)
if result and "data" in result and len(result["data"]) > 0:
deliverable_id = result["data"][0]["id"]
# Convert to project with dates and custom item type
project_data = {"project": {"customItemTypeId": Config.DELIVERABLE_ITEM_TYPE_ID}}
if brief_date:
project_data["project"]["startDate"] = brief_date
if live_date:
project_data["project"]["endDate"] = live_date
# Add custom fields
custom_fields = []
# For deliverables: OMG# = plain number, OMG_URL = URL
if job_number:
# Plain OMG# number for duplicate detection
custom_fields.append({
"id": Config.CUSTOM_FIELDS["omg_number"],
"value": job_number
})
# Generate and add URL to OMG_URL field
omg_url = self.generate_omg_url(job_number)
if omg_url:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["omg_url"],
"value": omg_url
})
job_category = job_details.get("JobCategory", job_details.get("MediaType", ""))
if job_category:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["deliverable_category"],
"value": job_category
})
notes_parts = []
if job_details.get("Type"):
notes_parts.append(f"Type: {job_details['Type']}")
if job_details.get("Details"):
notes_parts.append(job_details["Details"])
if notes_parts:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["notes"],
"value": " | ".join(notes_parts)
})
if custom_fields:
project_data["customFields"] = custom_fields
# Update with project dates and custom fields
self.make_wrike_request("PUT", f"/folders/{deliverable_id}", project_data)
self.logger.info(f"✓ Created deliverable (as project): {deliverable_title} (#{job_number})")
return deliverable_id, False # Return (deliverable_id, skipped=False)
return None, False
def process_json_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False):
"""Process single JSON file"""
start_time = time.time()
try:
# Parse JSON
with open(file_path, 'r') as f:
data = json.load(f)
job_spec = data.get("JobSpecification", {})
project_details = job_spec.get("ProjectDetails", {})
job_details = job_spec.get("JobDetails", {})
# Extract folder name
business_area = project_details.get("BusinessArea", job_details.get("BusinessArea", ""))
folder_name = self.parse_business_area(business_area)
if not folder_name:
raise ValueError("No BusinessArea found")
# Get/Create folder
folder_id = self.get_or_create_folder(folder_name)
if not folder_id:
raise ValueError(f"Failed to create folder: {folder_name}")
# Get/Create project
project_title = project_details.get("Title", "Untitled Project")
campaign_code = job_details.get("CampaignCode", "")
project_id = self.get_or_create_project(project_title, folder_id, project_details, campaign_code)
if not project_id:
raise ValueError(f"Failed to create project: {project_title}")
# Create deliverable (as project)
deliverable_id, skipped = self.create_deliverable_project(project_id, job_details)
if not deliverable_id:
raise ValueError("Failed to create deliverable")
processing_time = time.time() - start_time
# Record stats
self.daily_stats.record_file_processed(
folder_name, project_title, not skipped, skipped,
processing_time, True, None, is_startup, is_periodic
)
# Move to processed
self.move_to_processed(file_path)
self.add_to_recently_processed(file_path)
return True
except Exception as e:
processing_time = time.time() - start_time
self.logger.error(f"Error processing {file_path.name}: {e}")
# Move to failed
self.move_to_failed(file_path, str(e))
# Record error
try:
folder_name = self.parse_business_area(
job_spec.get("ProjectDetails", {}).get("BusinessArea", "Unknown")
)
project_title = job_spec.get("ProjectDetails", {}).get("Title", "Unknown")
except:
folder_name = "Unknown"
project_title = "Unknown"
self.daily_stats.record_file_processed(
folder_name, project_title, False, False,
processing_time, False, str(e), is_startup, is_periodic
)
return False
def add_to_recently_processed(self, file_path: Path):
"""Add file to recently processed set"""
with self.processed_lock:
try:
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
self.recently_processed.add(file_id)
if len(self.recently_processed) > 1000:
oldest = list(self.recently_processed)[:200]
for entry in oldest:
self.recently_processed.discard(entry)
except:
pass
def was_recently_processed(self, file_path: Path) -> bool:
"""Check if file was recently processed"""
with self.processed_lock:
try:
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
return file_id in self.recently_processed
except:
return False
def move_to_processed(self, file_path: Path):
"""Move file to Processed folder"""
try:
destination = Config.PROCESSED_FOLDER / file_path.name
shutil.move(str(file_path), str(destination))
except Exception as e:
self.logger.error(f"Failed to move to processed: {e}")
def move_to_failed(self, file_path: Path, error_msg: str):
"""Move file to Failed folder with error log"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
failed_filename = f"{timestamp}_{file_path.name}"
failed_path = Config.FAILED_FOLDER / failed_filename
shutil.move(str(file_path), str(failed_path))
# Create error log
error_log_path = Config.FAILED_FOLDER / f"{timestamp}_{file_path.stem}_ERROR.txt"
with open(error_log_path, 'w') as f:
f.write(f"File: {file_path.name}\n")
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Error: {error_msg}\n")
self.daily_stats.record_failed_file_moved()
self.logger.warning(f"Moved to failed: {file_path.name}")
except Exception as e:
self.logger.error(f"Failed to move failed file: {e}")
def scan_existing_files(self) -> List[Path]:
"""Scan for existing JSON files"""
self.logger.info("🔍 Scanning for existing JSON files...")
json_files = list(Config.HOT_FOLDER.glob("*.json"))
self.logger.info(f"Found {len(json_files)} existing files")
return json_files
def periodic_scan(self):
"""Protected periodic scan for missed files"""
while True:
try:
if self.startup_complete:
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
with self.scan_lock:
if self.scan_in_progress:
self.logger.warning("Skipping scan - already in progress")
continue
self.scan_in_progress = True
self._perform_periodic_scan()
with self.scan_lock:
self.scan_in_progress = False
else:
time.sleep(5)
except Exception as e:
self.logger.error(f"Periodic scan error: {e}")
with self.scan_lock:
self.scan_in_progress = False
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
def _perform_periodic_scan(self):
"""Perform actual periodic scan"""
scan_start = time.time()
missed_files = []
try:
for json_file in Config.HOT_FOLDER.glob("*.json"):
if not self.was_recently_processed(json_file):
missed_files.append(json_file)
scan_duration = time.time() - scan_start
if missed_files:
self.logger.info(f"🔄 Periodic scan found {len(missed_files)} files ({scan_duration:.1f}s)")
for file_path in missed_files:
self.queue_file(file_path, is_periodic=True)
self.daily_stats.record_periodic_scan(scan_duration, len(missed_files))
except Exception as e:
self.logger.error(f"Periodic scan error: {e}")
def process_startup_files(self, files: List[Path]):
"""Process existing files from startup scan"""
if not files:
self.logger.info("✅ No existing files to process")
return
self.logger.info(f"🚀 Processing {len(files)} existing files...")
for i in range(0, len(files), Config.BATCH_SIZE):
batch = files[i:i + Config.BATCH_SIZE]
self.process_batch(batch, is_startup=True)
self.logger.info(f"✅ Startup processing complete")
def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False):
"""Process files sequentially (one at a time)"""
if not file_paths:
return
# Process sequentially to avoid race conditions with Wrike API
for file_path in file_paths:
try:
self.process_json_file(file_path, is_startup, is_periodic)
except Exception as e:
self.logger.error(f"Error processing {file_path.name}: {e}")
def queue_file(self, file_path: Path, is_periodic: bool = False):
"""Add file to processing queue"""
if self.startup_complete:
self.file_queue.put((file_path, is_periodic))
def batch_processor_worker(self):
"""Background worker for batch processing"""
while True:
files_to_process = []
periodic_flags = []
batch_start_time = time.time()
while (len(files_to_process) < Config.BATCH_SIZE and
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
try:
file_data = self.file_queue.get(timeout=1.0)
file_path, is_periodic = file_data if isinstance(file_data, tuple) else (file_data, False)
files_to_process.append(file_path)
periodic_flags.append(is_periodic)
self.file_queue.task_done()
except:
if files_to_process:
break
continue
if files_to_process and self.startup_complete:
is_periodic_batch = any(periodic_flags)
self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch)
def cleanup_old_processed_files(self):
"""Delete files older than configured hours from Processed folder"""
try:
cutoff_time = time.time() - (Config.CLEANUP_PROCESSED_HOURS * 3600)
deleted = 0
for file in Config.PROCESSED_FOLDER.glob("*.json"):
if file.stat().st_mtime < cutoff_time:
file.unlink()
deleted += 1
if deleted > 0:
self.logger.info(f"🗑️ Cleaned up {deleted} old processed files")
except Exception as e:
self.logger.error(f"Cleanup error: {e}")
def generate_daily_report(self):
"""Generate and email daily report"""
try:
stats = self.daily_stats.get_stats_summary()
report_content = self.format_daily_report(stats)
# Save report
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
with open(report_file, 'w') as f:
f.write(report_content)
# Email report
self.email_daily_report(report_content, stats['date'])
# Cleanup old reports
self.cleanup_old_reports()
# Cleanup old processed files
self.cleanup_old_processed_files()
# Reset stats
self.daily_stats.reset_stats()
self.logger.info(f"Daily report generated: {report_file}")
except Exception as e:
self.logger.error(f"Failed to generate daily report: {e}")
def format_daily_report(self, stats: Dict[str, Any]) -> str:
"""Format daily statistics into readable report"""
startup_info = f"Startup Files: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else ""
periodic_info = f"Periodic Scan Files: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else ""
failed_info = f"Failed Files: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else ""
report = f"""
WRIKE IMPORT DAILY REPORT
Date: {stats['date']}
Uptime: {stats['uptime']}
Space: {Config.WRIKE_SPACE_NAME}
=== SUMMARY ===
Total Files Processed: {stats['total_processed']}
Tasks Created: {stats['total_created']}
Tasks Skipped (duplicates): {stats['total_skipped']}
{startup_info}{periodic_info}{failed_info}Errors: {stats['total_errors']}
Success Rate: {stats['success_rate']:.1f}%
Average Processing Time: {stats['avg_processing_time']}s
Periodic Scans: {stats['periodic_scans_completed']} ({stats['slow_scans']} slow)
=== FOLDER BREAKDOWN ===
"""
for folder, folder_data in sorted(stats['folder_stats'].items()):
report += f"\n{folder}:\n"
report += f" Tasks Created: {folder_data['tasks_created']}\n"
report += f" Tasks Skipped: {folder_data['tasks_skipped']}\n"
report += f" Errors: {folder_data['errors']}\n"
if folder_data['projects']:
report += f" Projects:\n"
for project, count in folder_data['projects'].most_common(10):
report += f" - {project}: {count} deliverable(s)\n"
report += "\n=== HOURLY BREAKDOWN ===\n"
for hour in range(24):
count = stats['hourly_stats'].get(hour, 0)
if count > 0:
report += f"{hour:02d}:00 - {count} files\n"
if stats['error_details']:
report += "\n=== RECENT ERRORS ===\n"
for error in stats['error_details'][-10:]:
report += f"{error['time']} - {error['folder']}/{error['project']}: {error['error']}\n"
return report
def email_daily_report(self, report_content: str, report_date: str):
"""Email the daily report"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = ', '.join(Config.REPORT_EMAILS)
msg['Subject'] = f"Wrike Import Daily Report - {Config.WRIKE_SPACE_NAME} - {report_date}"
msg.attach(MIMEText(report_content, 'plain'))
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
server.quit()
self.logger.info("Daily report emailed successfully")
except Exception as e:
self.logger.error(f"Failed to email report: {e}")
def cleanup_old_reports(self):
"""Remove old report files"""
try:
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
try:
file_date_str = report_file.stem.split('_')[-1]
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
if file_date < cutoff_date:
report_file.unlink()
except:
pass
except Exception as e:
self.logger.error(f"Failed to cleanup old reports: {e}")
def get_current_stats(self) -> str:
"""Get current statistics as string"""
stats = self.daily_stats.get_stats_summary()
return f"Today: {stats['total_processed']} processed ({stats['total_created']} created, {stats['total_skipped']} skipped), {stats['total_errors']} errors"
class WrikeFileHandler(FileSystemEventHandler):
"""File system event handler for Wrike monitor"""
def __init__(self, monitor: WrikeMonitor):
self.monitor = monitor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
if event.is_directory or not self.monitor.startup_complete:
return
file_path = Path(event.src_path)
if file_path.suffix.lower() == '.json':
self.logger.debug(f"📥 New file detected: {file_path.name}")
self.monitor.queue_file(file_path, is_periodic=False)
def on_moved(self, event):
if event.is_directory or not self.monitor.startup_complete:
return
dest_path = Path(event.dest_path)
if dest_path.suffix.lower() == '.json':
self.logger.debug(f"📁 File moved: {dest_path.name}")
self.monitor.queue_file(dest_path, is_periodic=False)
def main():
"""Main entry point with real-time monitoring"""
monitor = WrikeMonitor()
# Step 1: Process existing files
existing_files = monitor.scan_existing_files()
monitor.process_startup_files(existing_files)
# Step 2: Start real-time monitoring
monitor.startup_complete = True
monitor.logger.info("🎯 Startup complete - switching to real-time monitoring")
# Start batch processing worker
batch_worker = threading.Thread(
target=monitor.batch_processor_worker,
name="BatchWorker",
daemon=True
)
batch_worker.start()
# Start periodic scanner
periodic_scanner = threading.Thread(
target=monitor.periodic_scan,
name="PeriodicScanner",
daemon=True
)
periodic_scanner.start()
# Setup file monitoring
event_handler = WrikeFileHandler(monitor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=False)
observer.start()
monitor.logger.info(f"🛡️ Real-time monitoring active on: {Config.HOT_FOLDER}")
monitor.logger.info(f"📧 Daily reports at {Config.DAILY_REPORT_TIME} to {', '.join(Config.REPORT_EMAILS)}")
try:
while True:
time.sleep(60)
current_stats = monitor.get_current_stats()
monitor.logger.info(f"📊 {current_stats}")
except KeyboardInterrupt:
monitor.logger.info("Stopping Wrike Monitor...")
observer.stop()
observer.join()
monitor.logger.info("Wrike Monitor stopped")
if __name__ == "__main__":
main()