bissell-wrike-python/wrike_monitor_lgl.py
Dave Porter a0cfe4b796 Add LGL Team migration with recursive duplicate detection
- Add wrike_monitor_lgl.py for LGL Team board
  * Updated API token and space configuration
  * All 11 custom fields mapped correctly
  * OMG# field now stores HTML links (matching LGL Team format)
  * Recursive duplicate detection across entire Business Areas folder
  * Handles HTML link extraction for accurate comparison

- Add discover_board_info.py
  * Automated discovery tool for board configuration
  * Finds space IDs, custom field IDs, and item types
  * Generates configuration snippets

- Add config_lgl_team.py
  * Reference configuration for LGL Team space
  * Complete field mapping documentation

- Add test_duplicate_detection.py
  * Testing tool to verify duplicate detection logic
  * Can search for specific OMG# values

- Update requirements.txt

- Remove wrike_import.py (moved to OLD/)

Key Features:
- NO DUPLICATES: Searches entire Business Areas folder before creating
- HTML Link Support: OMG# stored as clickable links matching existing format
- Global Search: Uses descendants=true for efficient recursive search
- Format Matching: Generates OMG# links identical to existing entries

🤖 Generated with Claude Code
2025-12-17 15:19:17 -05:00

1064 lines
42 KiB
Python

#!/usr/bin/env python3
"""
Wrike Import Monitor - Real-time JSON processor with daily reporting
Monitors a folder for JSON files and automatically creates Wrike projects/tasks
Features:
- Real-time folder monitoring with watchdog
- Periodic scanning for missed files
- Duplicate detection via OMG numbers
- Daily email reports at 7PM
- Concurrent batch processing
- Failed file handling
- Auto-cleanup of old processed files
"""
import json
import logging
import smtplib
import shutil
import time
import threading
import schedule
import requests
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, date, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path
from typing import Dict, Optional, Any, List, Set
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings - EDIT THESE FOR YOUR ENVIRONMENT"""
# === PATHS (EDIT FOR LOCAL/SERVER) ===
# LGL Team (Production) paths
HOT_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Bissell-Wrike/json_files")
PROCESSED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Bissell-Wrike/processed")
FAILED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Bissell-Wrike/processed/Failed")
REPORTS_DIR = Path("/Users/daveporter/Desktop/CODING-2024/Bissell-Wrike/processed/reports")
# === WRIKE API SETTINGS ===
WRIKE_API_BASE = "https://www.wrike.com/api/v4"
WRIKE_TOKEN = "eyJ0dCI6InAiLCJhbGciOiJIUzI1NiIsInR2IjoiMiJ9.eyJkIjoie1wiYVwiOjY5NjM3MzksXCJpXCI6OTU5MTM2MSxcImNcIjo0NzAyNjA1LFwidVwiOjIzMjcyNDA0LFwiclwiOlwiVVNcIixcInNcIjpbXCJXXCIsXCJGXCIsXCJJXCIsXCJVXCIsXCJLXCIsXCJDXCIsXCJEXCIsXCJNXCIsXCJBXCIsXCJMXCIsXCJQXCJdLFwielwiOltdLFwidFwiOjB9IiwiaWF0IjoxNzY2MDAwNjMyfQ.FwtNCIiBUbb82MlEnI_Z3vKt-W8IgKRwLwZY6IY6fEI"
# Wrike Space - LGL Team (Production)
WRIKE_SPACE_ID = "MQAAAABoHcTY" # LGL Team
WRIKE_SPACE_NAME = "LGL Team"
DELIVERABLE_ITEM_TYPE_ID = None # Custom item types not available in LGL team space
# Custom field IDs for LGL Team space
CUSTOM_FIELDS = {
"budget": "IEAGUQQ3JUAJHWE5",
"impact": "IEAGUQQ3JUAJHWE6", # Using "Importance" field
"notes": "IEAGUQQ3JUAJHWNB",
"rag": "IEAGUQQ3JUAJLB2W",
"deliverable_category": "IEAGUQQ3JUAJLKMT",
"actions": "IEAGUQQ3JUAJNOSK",
"shoot_date": "IEAGUQQ3JUAJNOWG",
"omg_number": "IEAGUQQ3JUAJL7YF", # OMG# field
"omg_url": "IEAGUQQ3JUAKGGLP", # OMG URL field
"box_link": "IEAGUQQ3JUAJNXKL",
"owner": "IEAGUQQ3JUAJOKJU"
}
# === EMAIL SETTINGS ===
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 587
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
SENDER_EMAIL = "WRIKE-MONITOR@oliver.agency"
REPORT_EMAILS = ["daveporter@oliver.agency"]
# === PROCESSING SETTINGS ===
BATCH_SIZE = 1 # Process one file at a time to avoid race conditions
MAX_WORKERS = 1 # No concurrent processing
BATCH_TIMEOUT = 30
WAIT_DELAY = 2
# === MONITORING SETTINGS ===
PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds
PERIODIC_SCAN_TIMEOUT = 120 # Max scan time
SLOW_SCAN_THRESHOLD = 30
# === REPORTING SETTINGS ===
DAILY_REPORT_TIME = "19:00" # 7PM daily report
KEEP_REPORTS_DAYS = 30
CLEANUP_PROCESSED_HOURS = 24 # Delete processed files after 24 hours
class DailyStats:
"""Thread-safe daily statistics collector"""
def __init__(self):
self.lock = threading.Lock()
self.reset_stats()
self.start_time = datetime.now()
def reset_stats(self):
"""Reset daily statistics"""
with self.lock:
self.date = date.today()
self.total_processed = 0
self.total_created = 0
self.total_skipped = 0
self.total_errors = 0
self.startup_files_processed = 0
self.periodic_files_found = 0
self.failed_files_moved = 0
self.periodic_scans_completed = 0
self.slow_scans = 0
# Folder/Project breakdown
self.folder_stats = defaultdict(lambda: {
'projects': Counter(),
'tasks_created': 0,
'tasks_skipped': 0,
'errors': 0
})
# Performance metrics
self.processing_times = []
self.hourly_stats = defaultdict(int)
self.error_details = []
def record_file_processed(self, folder_name: str, project_name: str,
task_created: bool, skipped: bool, processing_time: float,
success: bool = True, error: str = None,
is_startup: bool = False, is_periodic: bool = False):
"""Record a processed file"""
with self.lock:
hour = datetime.now().hour
self.hourly_stats[hour] += 1
if is_startup:
self.startup_files_processed += 1
elif is_periodic:
self.periodic_files_found += 1
if success:
self.total_processed += 1
if task_created:
self.total_created += 1
self.folder_stats[folder_name]['tasks_created'] += 1
if skipped:
self.total_skipped += 1
self.folder_stats[folder_name]['tasks_skipped'] += 1
self.folder_stats[folder_name]['projects'][project_name] += 1
self.processing_times.append(processing_time)
else:
self.total_errors += 1
self.folder_stats[folder_name]['errors'] += 1
if error:
self.error_details.append({
'time': datetime.now().strftime('%H:%M:%S'),
'folder': folder_name,
'project': project_name,
'error': error
})
def record_failed_file_moved(self):
"""Record a failed file being moved"""
with self.lock:
self.failed_files_moved += 1
def record_periodic_scan(self, duration: float, files_found: int):
"""Record periodic scan statistics"""
with self.lock:
self.periodic_scans_completed += 1
if duration > Config.SLOW_SCAN_THRESHOLD:
self.slow_scans += 1
def get_stats_summary(self) -> Dict[str, Any]:
"""Get current statistics summary"""
with self.lock:
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
return {
'date': self.date.strftime('%Y-%m-%d'),
'uptime': str(datetime.now() - self.start_time).split('.')[0],
'total_processed': self.total_processed,
'total_created': self.total_created,
'total_skipped': self.total_skipped,
'startup_files_processed': self.startup_files_processed,
'periodic_files_found': self.periodic_files_found,
'failed_files_moved': self.failed_files_moved,
'periodic_scans_completed': self.periodic_scans_completed,
'slow_scans': self.slow_scans,
'total_errors': self.total_errors,
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
'avg_processing_time': round(avg_processing_time, 2),
'folder_stats': dict(self.folder_stats),
'hourly_stats': dict(self.hourly_stats),
'error_details': self.error_details.copy()
}
class WrikeMonitor:
"""Wrike import processor with real-time monitoring"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
self.file_queue = Queue()
self.daily_stats = DailyStats()
self.startup_complete = False
# Caches
self.folder_cache = {}
self.project_cache = {}
# Track recently processed files
self.recently_processed: Set[str] = set()
self.processed_lock = threading.Lock()
# Scan protection
self.scan_in_progress = False
self.scan_lock = threading.Lock()
self.setup_daily_reporting()
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s'
Config.REPORTS_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(Config.REPORTS_DIR / 'wrike_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info(f"Wrike Monitor initialized - Target: {Config.WRIKE_SPACE_NAME}")
def ensure_directories(self):
"""Create necessary directories"""
for directory in [Config.HOT_FOLDER, Config.PROCESSED_FOLDER,
Config.FAILED_FOLDER, Config.REPORTS_DIR]:
directory.mkdir(parents=True, exist_ok=True)
def setup_daily_reporting(self):
"""Setup scheduled daily reporting"""
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60)
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
scheduler_thread.start()
def make_wrike_request(self, method, endpoint, data=None):
"""Make a request to the Wrike API"""
url = f"{Config.WRIKE_API_BASE}{endpoint}"
headers = {
"Authorization": f"Bearer {Config.WRIKE_TOKEN}",
"Content-Type": "application/json"
}
try:
if method == "GET":
response = requests.get(url, headers=headers, timeout=30)
elif method == "POST":
response = requests.post(url, headers=headers, json=data, timeout=30)
elif method == "PUT":
response = requests.put(url, headers=headers, json=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"Wrike API error: {e}")
return None
def parse_business_area(self, business_area):
"""Extract folder name from BusinessArea"""
parts = business_area.split(">")
return parts[-1].strip() if parts else business_area.strip()
def parse_date(self, date_string):
"""Convert date string to YYYY-MM-DD format"""
if not date_string:
return None
try:
dt = datetime.strptime(date_string.split('+')[0].strip(), "%Y-%m-%d %H:%M:%S")
return dt.strftime("%Y-%m-%d")
except:
return None
def get_or_create_folder(self, folder_name, parent_id=None):
"""Get existing folder or create new one"""
if parent_id is None:
parent_id = Config.WRIKE_SPACE_ID
cache_key = f"{parent_id}:{folder_name}"
if cache_key in self.folder_cache:
return self.folder_cache[cache_key]
# Get folder tree
result = self.make_wrike_request("GET", f"/folders/{parent_id}/folders")
if result and "data" in result:
# First entry is the parent folder itself with childIds
parent_folder = None
for item in result["data"]:
if item["id"] == parent_id:
parent_folder = item
break
if parent_folder:
child_ids = parent_folder.get("childIds", [])
# Search only in direct children
for folder in result["data"]:
if folder["id"] in child_ids and folder["title"] == folder_name:
folder_id = folder["id"]
self.folder_cache[cache_key] = folder_id
self.logger.debug(f"Found existing folder: {folder_name} ({folder_id})")
return folder_id
# Create new folder
self.logger.info(f"Creating new folder: {folder_name}")
data = {"title": folder_name, "description": f"{folder_name} category folder"}
result = self.make_wrike_request("POST", f"/folders/{parent_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
folder_id = result["data"][0]["id"]
self.folder_cache[cache_key] = folder_id
self.logger.info(f"Created folder: {folder_name} ({folder_id})")
return folder_id
return None
def get_or_create_project(self, project_title, folder_id, project_details, campaign_code):
"""Get existing project or create new one"""
cache_key = f"{folder_id}:{project_title}"
if cache_key in self.project_cache:
return self.project_cache[cache_key]
# Get folder tree
result = self.make_wrike_request("GET", f"/folders/{folder_id}/folders")
if result and "data" in result:
# Find parent folder with childIds
parent_folder = None
for item in result["data"]:
if item["id"] == folder_id:
parent_folder = item
break
if parent_folder:
child_ids = parent_folder.get("childIds", [])
# Search only in direct children
for item in result["data"]:
if item["id"] in child_ids and item["title"] == project_title and "project" in item:
project_id = item["id"]
self.project_cache[cache_key] = project_id
self.logger.debug(f"Found existing project: {project_title} ({project_id})")
return project_id
# Create new project
description = project_details.get("Description", "")
if description:
description = description.replace("<p>", "").replace("</p>", "")
data = {"title": project_title, "description": description}
result = self.make_wrike_request("POST", f"/folders/{folder_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
project_id = result["data"][0]["id"]
# Convert to project with dates
start_date = self.parse_date(project_details.get("StartDate"))
end_date = self.parse_date(project_details.get("EndDate"))
project_data = {"project": {}}
if start_date:
project_data["project"]["startDate"] = start_date
if end_date:
project_data["project"]["endDate"] = end_date
# Add campaign code
if campaign_code:
project_data["customFields"] = [
{"id": Config.CUSTOM_FIELDS["omg_number"], "value": campaign_code}
]
self.make_wrike_request("PUT", f"/folders/{project_id}", project_data)
self.project_cache[cache_key] = project_id
self.logger.info(f"Created project: {project_title} ({campaign_code})")
return project_id
return None
def find_task_by_omg_number(self, project_id, omg_number):
"""Find task with matching OMG number"""
if not omg_number:
return None
result = self.make_wrike_request("GET", f"/folders/{project_id}/tasks?fields=[\"customFields\"]")
if result and "data" in result:
for task in result["data"]:
custom_fields = task.get("customFields", [])
for field in custom_fields:
if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]:
if field.get("value") == omg_number:
return task["id"]
return None
def find_deliverable_by_omg_number_in_business_areas(self, omg_number):
"""Find deliverable with matching OMG number in entire Business Areas folder"""
if not omg_number:
return None
try:
# Find Business Areas folder
space_result = self.make_wrike_request("GET", f"/folders/{Config.WRIKE_SPACE_ID}/folders")
business_areas_id = None
if space_result and "data" in space_result:
for item in space_result["data"]:
if item.get("title") == "Business Areas":
business_areas_id = item["id"]
break
if not business_areas_id:
self.logger.warning("Business Areas folder not found - skipping duplicate check")
return None
# Get ALL folders/projects recursively in Business Areas using descendants=true
self.logger.debug(f"Searching for OMG# {omg_number} in entire Business Areas folder...")
result = self.make_wrike_request("GET", f"/folders/{business_areas_id}/folders?descendants=true&fields=[\"customFields\"]")
if result and "data" in result:
for item in result["data"]:
# Check if it's a project (deliverable)
if "project" in item:
custom_fields = item.get("customFields", [])
for field in custom_fields:
if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]:
# Extract OMG# from HTML if needed
existing_value = field.get("value", "")
existing_omg = self.extract_omg_from_html(existing_value)
if existing_omg == str(omg_number):
self.logger.info(f"✓ Found existing deliverable with OMG# {omg_number} in Business Areas: {item.get('title')} ({item['id']})")
return item["id"]
except Exception as e:
self.logger.error(f"Error searching for deliverable in Business Areas: {e}")
return None
def find_deliverable_by_omg_number(self, parent_project_id, omg_number):
"""Find deliverable (project) with matching OMG number - searches Business Areas globally"""
# Use global search in Business Areas instead of local search
return self.find_deliverable_by_omg_number_in_business_areas(omg_number)
def extract_omg_from_html(self, html_value):
"""Extract plain OMG number from HTML link format"""
if not html_value:
return None
# If it's already plain text (no HTML), return as-is
if not html_value.startswith('<'):
return html_value
# Extract number from HTML: <a href="...">1988861</a>
try:
import re
match = re.search(r'>(\d+)</a>', html_value)
if match:
return match.group(1)
except:
pass
return html_value
def generate_omg_html_link(self, omg_number):
"""Generate OMG# field value as HTML link (matching LGL Team format)"""
try:
omg_int = int(omg_number)
# Calculate project ID (based on observed pattern: 1988861 -> 1986220)
# The difference is 2641, but let's use the original formula as fallback
project_id = omg_int - 999999
# Generate HTML link matching LGL Team format
html_link = (
f'<a rel="nofollow noreferrer noopener" target="_blank" '
f'href="https://bissell.omg.oliver.solutions/projects/{project_id}">'
f'{omg_number}</a>'
)
return html_link
except (ValueError, TypeError):
return None
def generate_omg_url(self, omg_number):
"""Generate OMG URL from OMG number (subtract 999999)"""
try:
omg_int = int(omg_number)
job_id = omg_int - 999999
return f"https://bissell.omg.oliver.solutions/jobs/{job_id}"
except (ValueError, TypeError):
return None
def create_deliverable_project(self, parent_project_id, job_details):
"""Create deliverable as a project (not a task) - skip if exists"""
deliverable_title = job_details.get("Title", "Untitled Deliverable")
job_number = job_details.get("Number", "")
# Check if exists by searching for existing deliverable with same OMG#
if job_number:
existing_deliverable_id = self.find_deliverable_by_omg_number(parent_project_id, job_number)
if existing_deliverable_id:
self.logger.info(f"⊙ Deliverable already exists: {deliverable_title} (#{job_number}) - skipping")
return existing_deliverable_id, True # Return (deliverable_id, skipped=True)
# Parse dates - use BriefDate as start, LiveDate (or DueDate) as end
brief_date = self.parse_date(job_details.get("BriefDate"))
live_date = self.parse_date(job_details.get("LiveDate")) or self.parse_date(job_details.get("DueDate"))
# Build description
description_parts = []
if job_details.get("Notes"):
description_parts.append(job_details["Notes"])
if job_details.get("Type"):
description_parts.append(f"Type: {job_details['Type']}")
if job_details.get("MediaType"):
description_parts.append(f"Media: {job_details['MediaType']}")
deliverable_description = " | ".join(description_parts) if description_parts else ""
# Create deliverable as a project (subfolder)
deliverable_data = {
"title": deliverable_title,
"description": deliverable_description
}
result = self.make_wrike_request("POST", f"/folders/{parent_project_id}/folders", deliverable_data)
if result and "data" in result and len(result["data"]) > 0:
deliverable_id = result["data"][0]["id"]
# Convert to project with dates and custom item type (if available)
project_data = {"project": {}}
if Config.DELIVERABLE_ITEM_TYPE_ID:
project_data["project"]["customItemTypeId"] = Config.DELIVERABLE_ITEM_TYPE_ID
if brief_date:
project_data["project"]["startDate"] = brief_date
if live_date:
project_data["project"]["endDate"] = live_date
# Add custom fields
custom_fields = []
# For deliverables: OMG# = HTML link, OMG_URL = separate URL field
if job_number:
# OMG# as HTML link (matching LGL Team format)
omg_html = self.generate_omg_html_link(job_number)
if omg_html:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["omg_number"],
"value": omg_html
})
# Generate and add URL to OMG_URL field (separate field)
omg_url = self.generate_omg_url(job_number)
if omg_url:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["omg_url"],
"value": omg_url
})
job_category = job_details.get("JobCategory", job_details.get("MediaType", ""))
if job_category:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["deliverable_category"],
"value": job_category
})
notes_parts = []
if job_details.get("Type"):
notes_parts.append(f"Type: {job_details['Type']}")
if job_details.get("Details"):
notes_parts.append(job_details["Details"])
if notes_parts:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["notes"],
"value": " | ".join(notes_parts)
})
if custom_fields:
project_data["customFields"] = custom_fields
# Update with project dates and custom fields
self.make_wrike_request("PUT", f"/folders/{deliverable_id}", project_data)
self.logger.info(f"✓ Created deliverable (as project): {deliverable_title} (#{job_number})")
return deliverable_id, False # Return (deliverable_id, skipped=False)
return None, False
def process_json_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False):
"""Process single JSON file"""
start_time = time.time()
try:
# Parse JSON
with open(file_path, 'r') as f:
data = json.load(f)
job_spec = data.get("JobSpecification", {})
project_details = job_spec.get("ProjectDetails", {})
job_details = job_spec.get("JobDetails", {})
# Extract folder name
business_area = project_details.get("BusinessArea", job_details.get("BusinessArea", ""))
folder_name = self.parse_business_area(business_area)
if not folder_name:
raise ValueError("No BusinessArea found")
# Get/Create folder
folder_id = self.get_or_create_folder(folder_name)
if not folder_id:
raise ValueError(f"Failed to create folder: {folder_name}")
# Get/Create project
project_title = project_details.get("Title", "Untitled Project")
campaign_code = job_details.get("CampaignCode", "")
project_id = self.get_or_create_project(project_title, folder_id, project_details, campaign_code)
if not project_id:
raise ValueError(f"Failed to create project: {project_title}")
# Create deliverable (as project)
deliverable_id, skipped = self.create_deliverable_project(project_id, job_details)
if not deliverable_id:
raise ValueError("Failed to create deliverable")
processing_time = time.time() - start_time
# Record stats
self.daily_stats.record_file_processed(
folder_name, project_title, not skipped, skipped,
processing_time, True, None, is_startup, is_periodic
)
# Move to processed
self.move_to_processed(file_path)
self.add_to_recently_processed(file_path)
return True
except Exception as e:
processing_time = time.time() - start_time
self.logger.error(f"Error processing {file_path.name}: {e}")
# Move to failed
self.move_to_failed(file_path, str(e))
# Record error
try:
folder_name = self.parse_business_area(
job_spec.get("ProjectDetails", {}).get("BusinessArea", "Unknown")
)
project_title = job_spec.get("ProjectDetails", {}).get("Title", "Unknown")
except:
folder_name = "Unknown"
project_title = "Unknown"
self.daily_stats.record_file_processed(
folder_name, project_title, False, False,
processing_time, False, str(e), is_startup, is_periodic
)
return False
def add_to_recently_processed(self, file_path: Path):
"""Add file to recently processed set"""
with self.processed_lock:
try:
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
self.recently_processed.add(file_id)
if len(self.recently_processed) > 1000:
oldest = list(self.recently_processed)[:200]
for entry in oldest:
self.recently_processed.discard(entry)
except:
pass
def was_recently_processed(self, file_path: Path) -> bool:
"""Check if file was recently processed"""
with self.processed_lock:
try:
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
return file_id in self.recently_processed
except:
return False
def move_to_processed(self, file_path: Path):
"""Move file to Processed folder"""
try:
destination = Config.PROCESSED_FOLDER / file_path.name
shutil.move(str(file_path), str(destination))
except Exception as e:
self.logger.error(f"Failed to move to processed: {e}")
def move_to_failed(self, file_path: Path, error_msg: str):
"""Move file to Failed folder with error log"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
failed_filename = f"{timestamp}_{file_path.name}"
failed_path = Config.FAILED_FOLDER / failed_filename
shutil.move(str(file_path), str(failed_path))
# Create error log
error_log_path = Config.FAILED_FOLDER / f"{timestamp}_{file_path.stem}_ERROR.txt"
with open(error_log_path, 'w') as f:
f.write(f"File: {file_path.name}\n")
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Error: {error_msg}\n")
self.daily_stats.record_failed_file_moved()
self.logger.warning(f"Moved to failed: {file_path.name}")
except Exception as e:
self.logger.error(f"Failed to move failed file: {e}")
def scan_existing_files(self) -> List[Path]:
"""Scan for existing JSON files"""
self.logger.info("🔍 Scanning for existing JSON files...")
json_files = list(Config.HOT_FOLDER.glob("*.json"))
self.logger.info(f"Found {len(json_files)} existing files")
return json_files
def periodic_scan(self):
"""Protected periodic scan for missed files"""
while True:
try:
if self.startup_complete:
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
with self.scan_lock:
if self.scan_in_progress:
self.logger.warning("Skipping scan - already in progress")
continue
self.scan_in_progress = True
self._perform_periodic_scan()
with self.scan_lock:
self.scan_in_progress = False
else:
time.sleep(5)
except Exception as e:
self.logger.error(f"Periodic scan error: {e}")
with self.scan_lock:
self.scan_in_progress = False
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
def _perform_periodic_scan(self):
"""Perform actual periodic scan"""
scan_start = time.time()
missed_files = []
try:
for json_file in Config.HOT_FOLDER.glob("*.json"):
if not self.was_recently_processed(json_file):
missed_files.append(json_file)
scan_duration = time.time() - scan_start
if missed_files:
self.logger.info(f"🔄 Periodic scan found {len(missed_files)} files ({scan_duration:.1f}s)")
for file_path in missed_files:
self.queue_file(file_path, is_periodic=True)
self.daily_stats.record_periodic_scan(scan_duration, len(missed_files))
except Exception as e:
self.logger.error(f"Periodic scan error: {e}")
def process_startup_files(self, files: List[Path]):
"""Process existing files from startup scan"""
if not files:
self.logger.info("✅ No existing files to process")
return
self.logger.info(f"🚀 Processing {len(files)} existing files...")
for i in range(0, len(files), Config.BATCH_SIZE):
batch = files[i:i + Config.BATCH_SIZE]
self.process_batch(batch, is_startup=True)
self.logger.info(f"✅ Startup processing complete")
def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False):
"""Process files sequentially (one at a time)"""
if not file_paths:
return
# Process sequentially to avoid race conditions with Wrike API
for file_path in file_paths:
try:
self.process_json_file(file_path, is_startup, is_periodic)
except Exception as e:
self.logger.error(f"Error processing {file_path.name}: {e}")
def queue_file(self, file_path: Path, is_periodic: bool = False):
"""Add file to processing queue"""
if self.startup_complete:
self.file_queue.put((file_path, is_periodic))
def batch_processor_worker(self):
"""Background worker for batch processing"""
while True:
files_to_process = []
periodic_flags = []
batch_start_time = time.time()
while (len(files_to_process) < Config.BATCH_SIZE and
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
try:
file_data = self.file_queue.get(timeout=1.0)
file_path, is_periodic = file_data if isinstance(file_data, tuple) else (file_data, False)
files_to_process.append(file_path)
periodic_flags.append(is_periodic)
self.file_queue.task_done()
except:
if files_to_process:
break
continue
if files_to_process and self.startup_complete:
is_periodic_batch = any(periodic_flags)
self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch)
def cleanup_old_processed_files(self):
"""Delete files older than configured hours from Processed folder"""
try:
cutoff_time = time.time() - (Config.CLEANUP_PROCESSED_HOURS * 3600)
deleted = 0
for file in Config.PROCESSED_FOLDER.glob("*.json"):
if file.stat().st_mtime < cutoff_time:
file.unlink()
deleted += 1
if deleted > 0:
self.logger.info(f"🗑️ Cleaned up {deleted} old processed files")
except Exception as e:
self.logger.error(f"Cleanup error: {e}")
def generate_daily_report(self):
"""Generate and email daily report"""
try:
stats = self.daily_stats.get_stats_summary()
report_content = self.format_daily_report(stats)
# Save report
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
with open(report_file, 'w') as f:
f.write(report_content)
# Email report
self.email_daily_report(report_content, stats['date'])
# Cleanup old reports
self.cleanup_old_reports()
# Cleanup old processed files
self.cleanup_old_processed_files()
# Reset stats
self.daily_stats.reset_stats()
self.logger.info(f"Daily report generated: {report_file}")
except Exception as e:
self.logger.error(f"Failed to generate daily report: {e}")
def format_daily_report(self, stats: Dict[str, Any]) -> str:
"""Format daily statistics into readable report"""
startup_info = f"Startup Files: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else ""
periodic_info = f"Periodic Scan Files: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else ""
failed_info = f"Failed Files: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else ""
report = f"""
WRIKE IMPORT DAILY REPORT
Date: {stats['date']}
Uptime: {stats['uptime']}
Space: {Config.WRIKE_SPACE_NAME}
=== SUMMARY ===
Total Files Processed: {stats['total_processed']}
Tasks Created: {stats['total_created']}
Tasks Skipped (duplicates): {stats['total_skipped']}
{startup_info}{periodic_info}{failed_info}Errors: {stats['total_errors']}
Success Rate: {stats['success_rate']:.1f}%
Average Processing Time: {stats['avg_processing_time']}s
Periodic Scans: {stats['periodic_scans_completed']} ({stats['slow_scans']} slow)
=== FOLDER BREAKDOWN ===
"""
for folder, folder_data in sorted(stats['folder_stats'].items()):
report += f"\n{folder}:\n"
report += f" Tasks Created: {folder_data['tasks_created']}\n"
report += f" Tasks Skipped: {folder_data['tasks_skipped']}\n"
report += f" Errors: {folder_data['errors']}\n"
if folder_data['projects']:
report += f" Projects:\n"
for project, count in folder_data['projects'].most_common(10):
report += f" - {project}: {count} deliverable(s)\n"
report += "\n=== HOURLY BREAKDOWN ===\n"
for hour in range(24):
count = stats['hourly_stats'].get(hour, 0)
if count > 0:
report += f"{hour:02d}:00 - {count} files\n"
if stats['error_details']:
report += "\n=== RECENT ERRORS ===\n"
for error in stats['error_details'][-10:]:
report += f"{error['time']} - {error['folder']}/{error['project']}: {error['error']}\n"
return report
def email_daily_report(self, report_content: str, report_date: str):
"""Email the daily report"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = ', '.join(Config.REPORT_EMAILS)
msg['Subject'] = f"Wrike Import Daily Report - {Config.WRIKE_SPACE_NAME} - {report_date}"
msg.attach(MIMEText(report_content, 'plain'))
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
server.quit()
self.logger.info("Daily report emailed successfully")
except Exception as e:
self.logger.error(f"Failed to email report: {e}")
def cleanup_old_reports(self):
"""Remove old report files"""
try:
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
try:
file_date_str = report_file.stem.split('_')[-1]
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
if file_date < cutoff_date:
report_file.unlink()
except:
pass
except Exception as e:
self.logger.error(f"Failed to cleanup old reports: {e}")
def get_current_stats(self) -> str:
"""Get current statistics as string"""
stats = self.daily_stats.get_stats_summary()
return f"Today: {stats['total_processed']} processed ({stats['total_created']} created, {stats['total_skipped']} skipped), {stats['total_errors']} errors"
class WrikeFileHandler(FileSystemEventHandler):
"""File system event handler for Wrike monitor"""
def __init__(self, monitor: WrikeMonitor):
self.monitor = monitor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
if event.is_directory or not self.monitor.startup_complete:
return
file_path = Path(event.src_path)
if file_path.suffix.lower() == '.json':
self.logger.debug(f"📥 New file detected: {file_path.name}")
self.monitor.queue_file(file_path, is_periodic=False)
def on_moved(self, event):
if event.is_directory or not self.monitor.startup_complete:
return
dest_path = Path(event.dest_path)
if dest_path.suffix.lower() == '.json':
self.logger.debug(f"📁 File moved: {dest_path.name}")
self.monitor.queue_file(dest_path, is_periodic=False)
def main():
"""Main entry point with real-time monitoring"""
monitor = WrikeMonitor()
# Step 1: Process existing files
existing_files = monitor.scan_existing_files()
monitor.process_startup_files(existing_files)
# Step 2: Start real-time monitoring
monitor.startup_complete = True
monitor.logger.info("🎯 Startup complete - switching to real-time monitoring")
# Start batch processing worker
batch_worker = threading.Thread(
target=monitor.batch_processor_worker,
name="BatchWorker",
daemon=True
)
batch_worker.start()
# Start periodic scanner
periodic_scanner = threading.Thread(
target=monitor.periodic_scan,
name="PeriodicScanner",
daemon=True
)
periodic_scanner.start()
# Setup file monitoring
event_handler = WrikeFileHandler(monitor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=False)
observer.start()
monitor.logger.info(f"🛡️ Real-time monitoring active on: {Config.HOT_FOLDER}")
monitor.logger.info(f"📧 Daily reports at {Config.DAILY_REPORT_TIME} to {', '.join(Config.REPORT_EMAILS)}")
try:
while True:
time.sleep(60)
current_stats = monitor.get_current_stats()
monitor.logger.info(f"📊 {current_stats}")
except KeyboardInterrupt:
monitor.logger.info("Stopping Wrike Monitor...")
observer.stop()
observer.join()
monitor.logger.info("Wrike Monitor stopped")
if __name__ == "__main__":
main()