From 255c03b65de79cc8e5aed794e36e4252b27d3245 Mon Sep 17 00:00:00 2001 From: Dave Porter Date: Fri, 10 Oct 2025 14:31:19 -0400 Subject: [PATCH] Add real-time monitoring service (v2.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major additions: - wrike_monitor.py: Real-time folder monitoring with watchdog - Daily email reports at 7PM with comprehensive statistics - Failed file handling with error logs - Periodic scanning for missed files - systemd service support for production deployment - Sequential processing to prevent race conditions - Proper parent/child folder matching using childIds Configuration: - Easy path configuration for local/production - Configurable Wrike space ID - Email settings for daily reports - Auto-cleanup of processed files (24h retention) Documentation: - INSTALLATION.md: Complete systemd service setup guide - QUICKSTART.md: Quick reference for both tools - Updated README.md with tool comparison - requirements.txt with clear dependencies Bug fixes: - Fixed duplicate folder/project creation via childIds matching - Added logging for skipped deliverables - Improved cache management 🤖 Generated with Claude Code Co-Authored-By: Claude --- INSTALLATION.md | 359 ++++++++++++++++ QUICKSTART.md | 221 ++++++++++ README.md | 107 ++++- requirements.txt | 12 + wrike-monitor.service | 23 ++ wrike_monitor.py | 940 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 1650 insertions(+), 12 deletions(-) create mode 100644 INSTALLATION.md create mode 100644 QUICKSTART.md create mode 100644 requirements.txt create mode 100644 wrike-monitor.service create mode 100644 wrike_monitor.py diff --git a/INSTALLATION.md b/INSTALLATION.md new file mode 100644 index 0000000..5efca2d --- /dev/null +++ b/INSTALLATION.md @@ -0,0 +1,359 @@ +# Wrike Monitor Installation Guide + +## Overview + +This guide covers installing `wrike_monitor.py` as a systemd service that runs automatically on server boot, monitors a folder 24/7, and sends daily email reports at 7PM. + +**For simple batch processing**, use `wrike_import.py` instead (no installation needed, just run it). + +## Prerequisites + +- Python 3.6+ +- Root/sudo access +- Wrike API token with write permissions + +## Installation Steps + +### 1. Install Python Dependencies + +```bash +pip3 install -r requirements.txt +``` + +Or manually: +```bash +pip3 install requests watchdog schedule +``` + +### 2. Configure the Script + +Edit `wrike_monitor.py` and update the `Config` class (starting at line 36): + +#### A. Set Paths (Lines 41-44) + +**For Local Testing:** +```python +HOT_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/json_files") +PROCESSED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed") +FAILED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/Failed") +REPORTS_DIR = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/reports") +``` + +**For Production Server:** +```python +HOT_FOLDER = Path("/data/PRODUCTION/WRIKE_JSON") +PROCESSED_FOLDER = Path("/data/PRODUCTION/WRIKE_JSON/Processed") +FAILED_FOLDER = Path("/data/PRODUCTION/WRIKE_JSON/Failed") +REPORTS_DIR = Path("/PRODUCTION/WRIKE_LOGS") +``` + +#### B. Set Wrike Space (Lines 52-53) + +**For Staging:** +```python +WRIKE_SPACE_ID = "MQAAAABpz7l_" +WRIKE_SPACE_NAME = "Staging" +``` + +**For Production:** +```python +WRIKE_SPACE_ID = "MQAAAABoHcTY" +WRIKE_SPACE_NAME = "LGL Team" +``` + +#### C. Update Email Settings (Lines 79, 84) + +```python +REPORT_EMAILS = ["your-email@oliver.agency", "another@oliver.agency"] +DAILY_REPORT_TIME = "19:00" # 7PM daily report +``` + +#### D. Update API Token (Line 49) + +```python +WRIKE_TOKEN = "your_production_api_token_here" +``` + +### 3. Copy Files to Server + +```bash +# Create directory +sudo mkdir -p /root/wrike-import +sudo chmod 755 /root/wrike-import + +# Copy script +sudo cp wrike_monitor.py /root/wrike-import/ +sudo chmod +x /root/wrike-import/wrike_monitor.py + +# Create required directories +sudo mkdir -p /data/PRODUCTION/WRIKE_JSON +sudo mkdir -p /data/PRODUCTION/WRIKE_JSON/Processed +sudo mkdir -p /data/PRODUCTION/WRIKE_JSON/Failed +sudo mkdir -p /PRODUCTION/WRIKE_LOGS +sudo chmod -R 777 /data/PRODUCTION/WRIKE_JSON +sudo chmod -R 777 /PRODUCTION/WRIKE_LOGS +``` + +### 4. Install systemd Service + +```bash +# Copy service file +sudo cp wrike-monitor.service /etc/systemd/system/ + +# Reload systemd +sudo systemctl daemon-reload + +# Enable service (start on boot) +sudo systemctl enable wrike-monitor + +# Start service +sudo systemctl start wrike-monitor +``` + +### 5. Verify Installation + +```bash +# Check service status +sudo systemctl status wrike-monitor + +# View real-time logs +sudo journalctl -u wrike-monitor -f + +# Check if monitoring is working +echo '{"test": "data"}' > /data/PRODUCTION/WRIKE_JSON/test.json +sudo journalctl -u wrike-monitor -f +``` + +## Service Management Commands + +### Check Status +```bash +sudo systemctl status wrike-monitor +``` + +### Start Service +```bash +sudo systemctl start wrike-monitor +``` + +### Stop Service +```bash +sudo systemctl stop wrike-monitor +``` + +### Restart Service +```bash +sudo systemctl restart wrike-monitor +``` + +### View Logs +```bash +# Live tail +sudo journalctl -u wrike-monitor -f + +# Last 100 lines +sudo journalctl -u wrike-monitor -n 100 + +# Since today +sudo journalctl -u wrike-monitor --since today + +# Specific time range +sudo journalctl -u wrike-monitor --since "2025-10-09 09:00" --until "2025-10-09 17:00" +``` + +### Disable Service +```bash +sudo systemctl disable wrike-monitor +sudo systemctl stop wrike-monitor +``` + +## Configuration Changes + +After modifying `wrike_monitor.py`: + +```bash +# Restart service to apply changes +sudo systemctl restart wrike-monitor + +# Verify it started correctly +sudo systemctl status wrike-monitor +``` + +## Troubleshooting + +### Service Won't Start + +1. Check Python path: +```bash +which python3 +# Update ExecStart in wrike-monitor.service if different +``` + +2. Check file permissions: +```bash +ls -la /root/wrike-import/wrike_monitor.py +sudo chmod +x /root/wrike-import/wrike_monitor.py +``` + +3. Check logs: +```bash +sudo journalctl -u wrike-monitor -n 50 +``` + +### No Files Being Processed + +1. Verify folder exists and has correct permissions: +```bash +ls -la /data/PRODUCTION/WRIKE_JSON +sudo chmod 777 /data/PRODUCTION/WRIKE_JSON +``` + +2. Test manually: +```bash +sudo systemctl stop wrike-monitor +cd /root/wrike-import +python3 wrike_monitor.py +# Watch for errors, then Ctrl+C +sudo systemctl start wrike-monitor +``` + +### Email Reports Not Sending + +1. Check SMTP credentials in Config class +2. Verify email addresses in `REPORT_EMAILS` +3. Check logs for email errors: +```bash +sudo journalctl -u wrike-monitor | grep -i email +``` + +### High CPU/Memory Usage + +1. Reduce batch size and workers in Config: +```python +BATCH_SIZE = 3 +MAX_WORKERS = 3 +``` + +2. Increase scan interval: +```python +PERIODIC_SCAN_INTERVAL = 120 # Scan every 2 minutes instead of 1 +``` + +## Testing Before Production + +### Local Testing + +1. Keep local paths in Config +2. Run manually: +```bash +python3 wrike_monitor.py +``` + +3. Drop test JSON files into the hot folder +4. Verify they're processed and moved to Processed/ + +### Server Testing (without service) + +```bash +# Run in foreground +sudo python3 /root/wrike-import/wrike_monitor.py + +# Test file processing +sudo cp test.json /data/PRODUCTION/WRIKE_JSON/ + +# Watch logs +# Ctrl+C to stop when satisfied +``` + +## Monitoring Production + +### Check Daily Reports + +```bash +# View today's report +cat /PRODUCTION/WRIKE_LOGS/daily_report_$(date +%Y-%m-%d).txt + +# List all reports +ls -lh /PRODUCTION/WRIKE_LOGS/ +``` + +### Monitor Performance + +```bash +# View stats every minute (in logs) +sudo journalctl -u wrike-monitor -f | grep "📊" + +# Check for errors +sudo journalctl -u wrike-monitor | grep -i error + +# Check for slow scans +sudo journalctl -u wrike-monitor | grep -i "slow" +``` + +### Check Processed/Failed Files + +```bash +# Processed files (will auto-delete after 24h) +ls -lh /data/PRODUCTION/WRIKE_JSON/Processed/ + +# Failed files (need manual review) +ls -lh /data/PRODUCTION/WRIKE_JSON/Failed/ + +# View error logs +cat /data/PRODUCTION/WRIKE_JSON/Failed/*_ERROR.txt +``` + +## Maintenance + +### Manually Trigger Daily Report + +```bash +# The report runs automatically at 7PM, but to test: +sudo systemctl restart wrike-monitor +# Then wait or modify DAILY_REPORT_TIME in config +``` + +### Clear Failed Files After Review + +```bash +# After reviewing and fixing failed files +sudo rm -rf /data/PRODUCTION/WRIKE_JSON/Failed/* +``` + +### Backup Configuration + +```bash +# Backup before changes +sudo cp /root/wrike-import/wrike_monitor.py /root/wrike-import/wrike_monitor.py.backup +``` + +## Uninstallation + +```bash +# Stop and disable service +sudo systemctl stop wrike-monitor +sudo systemctl disable wrike-monitor + +# Remove service file +sudo rm /etc/systemd/system/wrike-monitor.service +sudo systemctl daemon-reload + +# Remove script (optional) +sudo rm -rf /root/wrike-import + +# Remove data directories (optional - be careful!) +# sudo rm -rf /data/PRODUCTION/WRIKE_JSON +# sudo rm -rf /PRODUCTION/WRIKE_LOGS +``` + +## Support + +For issues: +1. Check logs: `sudo journalctl -u wrike-monitor -f` +2. Verify configuration in `wrike_monitor.py` +3. Test manually before running as service +4. Review daily reports for patterns + +--- + +**Last Updated**: October 2025 diff --git a/QUICKSTART.md b/QUICKSTART.md new file mode 100644 index 0000000..a3d3003 --- /dev/null +++ b/QUICKSTART.md @@ -0,0 +1,221 @@ +# Quick Start Guide + +## Choose Your Tool + +### Option 1: Simple Batch Import (wrike_import.py) + +**Best for:** One-time imports, manual runs, testing + +```bash +# Install +pip install requests + +# Run +python wrike_import.py /path/to/json/files/ + +# That's it! Files processed and moved to Processed/ +``` + +--- + +### Option 2: Real-time Monitor Service (wrike_monitor.py) + +**Best for:** Production, 24/7 monitoring, automatic processing + +```bash +# Install +pip install -r requirements.txt + +# Configure (edit these lines in wrike_monitor.py): +# Line 41: HOT_FOLDER = Path("your/input/folder") +# Line 42: PROCESSED_FOLDER = Path("your/processed/folder") +# Line 52: WRIKE_SPACE_ID = "your_space_id" +# Line 79: REPORT_EMAILS = ["your@email.com"] + +# Run locally for testing +python wrike_monitor.py + +# Deploy as service (see INSTALLATION.md) +``` + +--- + +## Configuration Quick Reference + +### wrike_monitor.py - Key Settings + +| Setting | Line | Description | +|---------|------|-------------| +| `HOT_FOLDER` | 41 | Input folder to watch | +| `PROCESSED_FOLDER` | 42 | Where processed files go | +| `FAILED_FOLDER` | 43 | Where failed files go | +| `REPORTS_DIR` | 44 | Where reports are saved | +| `WRIKE_TOKEN` | 49 | Your Wrike API token | +| `WRIKE_SPACE_ID` | 52 | Target Wrike space ID | +| `WRIKE_SPACE_NAME` | 53 | Space name (for logs) | +| `REPORT_EMAILS` | 79 | Who gets daily reports | +| `DAILY_REPORT_TIME` | 84 | When to send report (24h format) | +| `CLEANUP_PROCESSED_HOURS` | 98 | Delete processed files after X hours | +| `PERIODIC_SCAN_INTERVAL` | 88 | How often to scan for missed files | + +### Custom Field IDs (Lines 57-68) + +Update these if you're using a different Wrike space with different custom field IDs. + +--- + +## Example Workflows + +### Workflow 1: Local Testing + +```bash +# 1. Create test folder +mkdir -p ~/wrike-test/json_files + +# 2. Edit wrike_monitor.py +# Set HOT_FOLDER = Path("/Users/yourname/wrike-test/json_files") + +# 3. Run monitor +python wrike_monitor.py + +# 4. Drop JSON files into json_files/ +# Watch them get processed automatically! +``` + +### Workflow 2: Production Deployment + +```bash +# 1. Configure paths for production in wrike_monitor.py +# 2. Copy to server +scp wrike_monitor.py wrike-monitor.service server:/root/wrike-import/ + +# 3. SSH to server and install service +ssh server +sudo cp /root/wrike-import/wrike-monitor.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable wrike-monitor +sudo systemctl start wrike-monitor + +# 4. Monitor logs +sudo journalctl -u wrike-monitor -f + +# 5. Get daily reports at 7PM via email! +``` + +### Workflow 3: Batch Import Existing Files + +```bash +# Use wrike_import.py for one-time batch +python wrike_import.py /path/to/existing/json/files/ + +# All files processed and moved to Processed/ +# Switch to wrike_monitor.py for ongoing monitoring +``` + +--- + +## Testing Checklist + +Before deploying to production: + +- [ ] Test with sample JSON files locally +- [ ] Verify folders/projects created correctly in Wrike +- [ ] Check duplicate detection works (reprocess same file) +- [ ] Confirm files move to Processed/ folder +- [ ] Test failed file handling (drop invalid JSON) +- [ ] Verify email settings (check inbox at report time) +- [ ] Review custom field IDs match your space +- [ ] Test with production paths (but staging space first!) + +--- + +## Common Configuration Scenarios + +### Scenario: Multiple Environments + +Create separate config files: + +```python +# wrike_monitor_staging.py - points to Staging space +# wrike_monitor_production.py - points to LGL Team space +``` + +Or use environment variables: + +```python +import os +WRIKE_SPACE_ID = os.environ.get('WRIKE_SPACE_ID', 'MQAAAABpz7l_') +``` + +### Scenario: Different Report Times + +```python +# Morning report at 9AM +DAILY_REPORT_TIME = "09:00" + +# Multiple reports per day - use schedule in setup_daily_reporting(): +schedule.every().day.at("09:00").do(self.generate_daily_report) +schedule.every().day.at("17:00").do(self.generate_daily_report) +schedule.every().day.at("19:00").do(self.generate_daily_report) +``` + +### Scenario: Longer File Retention + +```python +# Keep processed files for 7 days instead of 24 hours +CLEANUP_PROCESSED_HOURS = 168 # 7 days * 24 hours +``` + +--- + +## Troubleshooting Quick Fixes + +### Files not being processed +```bash +# Check folder permissions +chmod 777 /your/input/folder + +# Check if monitor is running +ps aux | grep wrike_monitor +``` + +### Duplicates being created +- Sequential processing is enabled (BATCH_SIZE=1, MAX_WORKERS=1) +- Caches persist during runtime +- Restart service if caches get stale: `sudo systemctl restart wrike-monitor` + +### Email not sending +- Check SMTP credentials in Config +- Test manually: send test email from server +- Check firewall allows SMTP port 587 + +### High memory usage +- Reduce BATCH_SIZE (line 82) +- Increase PERIODIC_SCAN_INTERVAL (line 88) +- Clear old logs: `rm /PRODUCTION/WRIKE_LOGS/*.log.old` + +--- + +## Getting Space IDs + +```bash +# Get all spaces +curl -X GET "https://www.wrike.com/api/v4/spaces" \ + -H "Authorization: Bearer YOUR_TOKEN" + +# Find your space and copy the ID +``` + +## Getting Custom Field IDs + +```bash +# Get all custom fields +curl -X GET "https://www.wrike.com/api/v4/customfields" \ + -H "Authorization: Bearer YOUR_TOKEN" + +# Update CUSTOM_FIELDS in Config with your IDs +``` + +--- + +**Need Help?** Check the main [README.md](README.md) for detailed documentation. diff --git a/README.md b/README.md index 5d37564..47ad60f 100644 --- a/README.md +++ b/README.md @@ -1,35 +1,91 @@ -# Wrike Import Script +# Wrike Import Tools -A Python script to automatically import project structures and deliverable tasks into Wrike from JSON files. +Two Python tools for automatically importing project structures and deliverable tasks into Wrike from JSON files. -## Overview +## Tools Overview -This script processes JSON files containing job specifications and creates the corresponding structure in Wrike: +### 1. wrike_import.py - Simple Batch Processor +A standalone script for one-time or manual batch imports. Processes all JSON files in a directory and exits. + +### 2. wrike_monitor.py - Real-time Monitor Service +A continuous monitoring service that watches a folder 24/7, processes files as they arrive, and sends daily email reports. Designed to run as a systemd service. + +## Common Features + +Both tools create the following Wrike structure: - **Folders** (product categories like "Dry Specialty", "Air", "PDC") - **Projects** (campaigns with timelines and metadata) - **Tasks** (deliverables with custom fields and due dates) -The script intelligently handles duplicates by checking OMG numbers before creating new items, updating existing ones instead. +Both intelligently handle duplicates by checking OMG numbers before creating new items. -## Features +## Features Comparison +### wrike_import.py (Batch Script) +- ✅ Simple command-line usage +- ✅ Process all files and exit +- ✅ Good for one-time imports or manual runs +- ✅ Moves processed files to Processed/ +- ✅ Auto-cleanup of old files + +### wrike_monitor.py (Service) +- ✅ **Real-time folder monitoring** with watchdog +- ✅ **Automatic processing** as files arrive +- ✅ **Daily email reports** at 7PM +- ✅ **Periodic scanning** for missed files +- ✅ **Failed file handling** with error logs +- ✅ **Auto-cleanup** of processed files (24h) +- ✅ **Statistics tracking** and performance monitoring +- ✅ **Systemd service** support for production +- ✅ **Sequential processing** to prevent race conditions + +### Common Features (Both) - ✅ **Automatic folder creation** from BusinessArea hierarchy - ✅ **Project management** with start/end dates and campaign codes -- ✅ **Task creation/updates** with comprehensive custom fields +- ✅ **Task creation** with comprehensive custom fields - ✅ **Duplicate prevention** via OMG number checking - ✅ **Smart caching** to minimize API calls - ✅ **Detailed logging** with progress tracking -- ✅ **Batch processing** of multiple JSON files -- ✅ **Automatic file management** - moves processed files to Processed subfolder -- ✅ **Auto-cleanup** - deletes files older than 24 hours from Processed folder + +## Quick Start + +### For Batch Processing (wrike_import.py) +```bash +# Install dependencies +pip install requests + +# Run on a folder +python wrike_import.py /path/to/json/files/ +``` + +### For Real-time Monitoring (wrike_monitor.py) +```bash +# Install dependencies +pip install -r requirements.txt + +# Configure paths in wrike_monitor.py (lines 41-44) +# Run the monitor +python wrike_monitor.py +``` + +See [INSTALLATION.md](INSTALLATION.md) for systemd service setup. ## Requirements ### Python Dependencies + +**For wrike_import.py:** ```bash pip install requests ``` +**For wrike_monitor.py:** +```bash +pip install requests watchdog schedule +# Or use requirements.txt: +pip install -r requirements.txt +``` + ### Python Version - Python 3.6 or higher @@ -469,9 +525,36 @@ When reporting issues, include: This script is provided as-is for internal use. +## Which Tool Should I Use? + +### Use wrike_import.py if you want to: +- Import a batch of JSON files once +- Run manually when needed +- Test imports locally +- Simple command-line operation + +### Use wrike_monitor.py if you want to: +- Continuous 24/7 monitoring +- Automatic processing as files arrive +- Daily email reports +- Production deployment as a service +- Detailed statistics and monitoring + ## Changelog -### Version 1.2 (Current) +### Version 2.0 (Current) +- **NEW**: Added wrike_monitor.py - real-time monitoring service +- **NEW**: Folder watching with watchdog +- **NEW**: Daily email reports at 7PM +- **NEW**: Failed file handling with error logs +- **NEW**: Periodic scanning for missed files +- **NEW**: Statistics tracking and performance monitoring +- **NEW**: systemd service support +- **FIXED**: Sequential processing to prevent race conditions +- **FIXED**: Proper parent/child folder matching to avoid duplicates +- **IMPROVED**: Logging for skipped deliverables + +### Version 1.2 - Added automatic file movement to Processed subfolder - Implemented auto-cleanup of files older than 24 hours - Improved file management and organization @@ -493,4 +576,4 @@ This script is provided as-is for internal use. **Last Updated**: October 2025 **Author**: Dave Porter -**Wrike Space**: Staging (MQAAAABpz7l_) +**Repository**: https://bitbucket.org/zlalani/bissell-wrike-python diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a64ccec --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +# Core dependencies (required for both scripts) +requests>=2.31.0 + +# Additional dependencies for wrike_monitor.py only +watchdog>=3.0.0 +schedule>=1.2.0 + +# Install all: +# pip install -r requirements.txt + +# Install only for wrike_import.py: +# pip install requests diff --git a/wrike-monitor.service b/wrike-monitor.service new file mode 100644 index 0000000..69e4be8 --- /dev/null +++ b/wrike-monitor.service @@ -0,0 +1,23 @@ +[Unit] +Description=Wrike Import Monitor Service +After=network.target + +[Service] +Type=simple +User=root +WorkingDirectory=/root/wrike-import +ExecStart=/usr/bin/python3 /root/wrike-import/wrike_monitor.py +Restart=always +RestartSec=10 + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=wrike-monitor + +# Resource limits +MemoryLimit=1G +CPUQuota=50% + +[Install] +WantedBy=multi-user.target diff --git a/wrike_monitor.py b/wrike_monitor.py new file mode 100644 index 0000000..97da710 --- /dev/null +++ b/wrike_monitor.py @@ -0,0 +1,940 @@ +#!/usr/bin/env python3 +""" +Wrike Import Monitor - Real-time JSON processor with daily reporting +Monitors a folder for JSON files and automatically creates Wrike projects/tasks + +Features: +- Real-time folder monitoring with watchdog +- Periodic scanning for missed files +- Duplicate detection via OMG numbers +- Daily email reports at 7PM +- Concurrent batch processing +- Failed file handling +- Auto-cleanup of old processed files +""" + +import json +import logging +import smtplib +import shutil +import time +import threading +import schedule +import requests +from collections import defaultdict, Counter +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, date, timedelta +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from pathlib import Path +from typing import Dict, Optional, Any, List, Set +from queue import Queue +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + + +class Config: + """Configuration settings - EDIT THESE FOR YOUR ENVIRONMENT""" + + # === PATHS (EDIT FOR LOCAL/SERVER) === + # development paths + HOT_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/json_files") + PROCESSED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed") + FAILED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/Failed") + REPORTS_DIR = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/reports") + + + # === WRIKE API SETTINGS === + WRIKE_API_BASE = "https://www.wrike.com/api/v4" + WRIKE_TOKEN = "eyJ0dCI6InAiLCJhbGciOiJIUzI1NiIsInR2IjoiMiJ9.eyJkIjoie1wiYVwiOjY5NzM0OTgsXCJpXCI6OTUyOTY3MCxcImNcIjo0Njk5NTI3LFwidVwiOjIzMjcyNDA0LFwiclwiOlwiVVNcIixcInNcIjpbXCJXXCIsXCJGXCIsXCJJXCIsXCJVXCIsXCJLXCIsXCJDXCIsXCJEXCIsXCJNXCIsXCJBXCIsXCJMXCIsXCJQXCJdLFwielwiOltdLFwidFwiOjB9IiwiaWF0IjoxNzYwMDI4ODIzfQ.9f4t15LycpoH-NlzQC3s1K19fVqnAwcahG2D-J5E8dg" + + # Wrike Space - EDIT FOR DIFFERENT ENVIRONMENTS + WRIKE_SPACE_ID = "MQAAAABpz7l_" # Staging + WRIKE_SPACE_NAME = "Staging" + + # Production space (uncomment for production) + # WRIKE_SPACE_ID = "MQAAAABoHcTY" # LGL Team + # WRIKE_SPACE_NAME = "LGL Team" + + # Custom field IDs in Wrike + CUSTOM_FIELDS = { + "budget": "IEAGU2B2JUAJRZ7P", + "impact": "IEAGU2B2JUAJRZ7Q", + "notes": "IEAGU2B2JUAJRZ7R", + "rag": "IEAGU2B2JUAJRZ7S", + "deliverable_category": "IEAGU2B2JUAJRZ7T", + "actions": "IEAGU2B2JUAJRZ7W", + "shoot_date": "IEAGU2B2JUAJRZ7X", + "omg_number": "IEAGU2B2JUAJRZ7Y", + "box_link": "IEAGU2B2JUAJRZ7Z", + "owner": "IEAGU2B2JUAJRZ72" + } + + # === EMAIL SETTINGS === + SMTP_SERVER = "smtp.mailgun.org" + SMTP_PORT = 587 + SMTP_USER = "twist@mail.dev.oliver.solutions" + SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71" + SENDER_EMAIL = "WRIKE-MONITOR@oliver.agency" + REPORT_EMAILS = ["daveporter@oliver.agency"] + + # === PROCESSING SETTINGS === + BATCH_SIZE = 1 # Process one file at a time to avoid race conditions + MAX_WORKERS = 1 # No concurrent processing + BATCH_TIMEOUT = 30 + WAIT_DELAY = 2 + + # === MONITORING SETTINGS === + PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds + PERIODIC_SCAN_TIMEOUT = 120 # Max scan time + SLOW_SCAN_THRESHOLD = 30 + + # === REPORTING SETTINGS === + DAILY_REPORT_TIME = "19:00" # 7PM daily report + KEEP_REPORTS_DAYS = 30 + CLEANUP_PROCESSED_HOURS = 24 # Delete processed files after 24 hours + + +class DailyStats: + """Thread-safe daily statistics collector""" + + def __init__(self): + self.lock = threading.Lock() + self.reset_stats() + self.start_time = datetime.now() + + def reset_stats(self): + """Reset daily statistics""" + with self.lock: + self.date = date.today() + self.total_processed = 0 + self.total_created = 0 + self.total_skipped = 0 + self.total_errors = 0 + self.startup_files_processed = 0 + self.periodic_files_found = 0 + self.failed_files_moved = 0 + self.periodic_scans_completed = 0 + self.slow_scans = 0 + + # Folder/Project breakdown + self.folder_stats = defaultdict(lambda: { + 'projects': Counter(), + 'tasks_created': 0, + 'tasks_skipped': 0, + 'errors': 0 + }) + + # Performance metrics + self.processing_times = [] + self.hourly_stats = defaultdict(int) + self.error_details = [] + + def record_file_processed(self, folder_name: str, project_name: str, + task_created: bool, skipped: bool, processing_time: float, + success: bool = True, error: str = None, + is_startup: bool = False, is_periodic: bool = False): + """Record a processed file""" + with self.lock: + hour = datetime.now().hour + self.hourly_stats[hour] += 1 + + if is_startup: + self.startup_files_processed += 1 + elif is_periodic: + self.periodic_files_found += 1 + + if success: + self.total_processed += 1 + if task_created: + self.total_created += 1 + self.folder_stats[folder_name]['tasks_created'] += 1 + if skipped: + self.total_skipped += 1 + self.folder_stats[folder_name]['tasks_skipped'] += 1 + + self.folder_stats[folder_name]['projects'][project_name] += 1 + self.processing_times.append(processing_time) + else: + self.total_errors += 1 + self.folder_stats[folder_name]['errors'] += 1 + if error: + self.error_details.append({ + 'time': datetime.now().strftime('%H:%M:%S'), + 'folder': folder_name, + 'project': project_name, + 'error': error + }) + + def record_failed_file_moved(self): + """Record a failed file being moved""" + with self.lock: + self.failed_files_moved += 1 + + def record_periodic_scan(self, duration: float, files_found: int): + """Record periodic scan statistics""" + with self.lock: + self.periodic_scans_completed += 1 + if duration > Config.SLOW_SCAN_THRESHOLD: + self.slow_scans += 1 + + def get_stats_summary(self) -> Dict[str, Any]: + """Get current statistics summary""" + with self.lock: + avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0 + + return { + 'date': self.date.strftime('%Y-%m-%d'), + 'uptime': str(datetime.now() - self.start_time).split('.')[0], + 'total_processed': self.total_processed, + 'total_created': self.total_created, + 'total_skipped': self.total_skipped, + 'startup_files_processed': self.startup_files_processed, + 'periodic_files_found': self.periodic_files_found, + 'failed_files_moved': self.failed_files_moved, + 'periodic_scans_completed': self.periodic_scans_completed, + 'slow_scans': self.slow_scans, + 'total_errors': self.total_errors, + 'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0, + 'avg_processing_time': round(avg_processing_time, 2), + 'folder_stats': dict(self.folder_stats), + 'hourly_stats': dict(self.hourly_stats), + 'error_details': self.error_details.copy() + } + + +class WrikeMonitor: + """Wrike import processor with real-time monitoring""" + + def __init__(self): + self.setup_logging() + self.ensure_directories() + self.file_queue = Queue() + self.daily_stats = DailyStats() + self.startup_complete = False + + # Caches + self.folder_cache = {} + self.project_cache = {} + + # Track recently processed files + self.recently_processed: Set[str] = set() + self.processed_lock = threading.Lock() + + # Scan protection + self.scan_in_progress = False + self.scan_lock = threading.Lock() + + self.setup_daily_reporting() + + def setup_logging(self): + """Setup logging configuration""" + log_format = '%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s' + Config.REPORTS_DIR.mkdir(parents=True, exist_ok=True) + + logging.basicConfig( + level=logging.INFO, + format=log_format, + handlers=[ + logging.FileHandler(Config.REPORTS_DIR / 'wrike_monitor.log'), + logging.StreamHandler() + ] + ) + self.logger = logging.getLogger(__name__) + self.logger.info(f"Wrike Monitor initialized - Target: {Config.WRIKE_SPACE_NAME}") + + def ensure_directories(self): + """Create necessary directories""" + for directory in [Config.HOT_FOLDER, Config.PROCESSED_FOLDER, + Config.FAILED_FOLDER, Config.REPORTS_DIR]: + directory.mkdir(parents=True, exist_ok=True) + + def setup_daily_reporting(self): + """Setup scheduled daily reporting""" + schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report) + + def run_scheduler(): + while True: + schedule.run_pending() + time.sleep(60) + + scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True) + scheduler_thread.start() + + def make_wrike_request(self, method, endpoint, data=None): + """Make a request to the Wrike API""" + url = f"{Config.WRIKE_API_BASE}{endpoint}" + headers = { + "Authorization": f"Bearer {Config.WRIKE_TOKEN}", + "Content-Type": "application/json" + } + + try: + if method == "GET": + response = requests.get(url, headers=headers, timeout=30) + elif method == "POST": + response = requests.post(url, headers=headers, json=data, timeout=30) + elif method == "PUT": + response = requests.put(url, headers=headers, json=data, timeout=30) + + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + self.logger.error(f"Wrike API error: {e}") + return None + + def parse_business_area(self, business_area): + """Extract folder name from BusinessArea""" + parts = business_area.split(">") + return parts[-1].strip() if parts else business_area.strip() + + def parse_date(self, date_string): + """Convert date string to YYYY-MM-DD format""" + if not date_string: + return None + try: + dt = datetime.strptime(date_string.split('+')[0].strip(), "%Y-%m-%d %H:%M:%S") + return dt.strftime("%Y-%m-%d") + except: + return None + + def get_or_create_folder(self, folder_name, parent_id=None): + """Get existing folder or create new one""" + if parent_id is None: + parent_id = Config.WRIKE_SPACE_ID + + cache_key = f"{parent_id}:{folder_name}" + if cache_key in self.folder_cache: + return self.folder_cache[cache_key] + + # Get folder tree + result = self.make_wrike_request("GET", f"/folders/{parent_id}/folders") + if result and "data" in result: + # First entry is the parent folder itself with childIds + parent_folder = None + for item in result["data"]: + if item["id"] == parent_id: + parent_folder = item + break + + if parent_folder: + child_ids = parent_folder.get("childIds", []) + # Search only in direct children + for folder in result["data"]: + if folder["id"] in child_ids and folder["title"] == folder_name: + folder_id = folder["id"] + self.folder_cache[cache_key] = folder_id + self.logger.debug(f"Found existing folder: {folder_name} ({folder_id})") + return folder_id + + # Create new folder + self.logger.info(f"Creating new folder: {folder_name}") + data = {"title": folder_name, "description": f"{folder_name} category folder"} + result = self.make_wrike_request("POST", f"/folders/{parent_id}/folders", data) + if result and "data" in result and len(result["data"]) > 0: + folder_id = result["data"][0]["id"] + self.folder_cache[cache_key] = folder_id + self.logger.info(f"Created folder: {folder_name} ({folder_id})") + return folder_id + + return None + + def get_or_create_project(self, project_title, folder_id, project_details, campaign_code): + """Get existing project or create new one""" + cache_key = f"{folder_id}:{project_title}" + if cache_key in self.project_cache: + return self.project_cache[cache_key] + + # Get folder tree + result = self.make_wrike_request("GET", f"/folders/{folder_id}/folders") + if result and "data" in result: + # Find parent folder with childIds + parent_folder = None + for item in result["data"]: + if item["id"] == folder_id: + parent_folder = item + break + + if parent_folder: + child_ids = parent_folder.get("childIds", []) + # Search only in direct children + for item in result["data"]: + if item["id"] in child_ids and item["title"] == project_title and "project" in item: + project_id = item["id"] + self.project_cache[cache_key] = project_id + self.logger.debug(f"Found existing project: {project_title} ({project_id})") + return project_id + + # Create new project + description = project_details.get("Description", "") + if description: + description = description.replace("

", "").replace("

", "") + + data = {"title": project_title, "description": description} + result = self.make_wrike_request("POST", f"/folders/{folder_id}/folders", data) + + if result and "data" in result and len(result["data"]) > 0: + project_id = result["data"][0]["id"] + + # Convert to project with dates + start_date = self.parse_date(project_details.get("StartDate")) + end_date = self.parse_date(project_details.get("EndDate")) + + project_data = {"project": {}} + if start_date: + project_data["project"]["startDate"] = start_date + if end_date: + project_data["project"]["endDate"] = end_date + + # Add campaign code + if campaign_code: + project_data["customFields"] = [ + {"id": Config.CUSTOM_FIELDS["omg_number"], "value": campaign_code} + ] + + self.make_wrike_request("PUT", f"/folders/{project_id}", project_data) + self.project_cache[cache_key] = project_id + self.logger.info(f"Created project: {project_title} ({campaign_code})") + return project_id + + return None + + def find_task_by_omg_number(self, project_id, omg_number): + """Find task with matching OMG number""" + if not omg_number: + return None + + result = self.make_wrike_request("GET", f"/folders/{project_id}/tasks?fields=[\"customFields\"]") + + if result and "data" in result: + for task in result["data"]: + custom_fields = task.get("customFields", []) + for field in custom_fields: + if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]: + if field.get("value") == omg_number: + return task["id"] + return None + + def create_deliverable_task(self, project_id, job_details): + """Create deliverable task (skip if exists)""" + task_title = job_details.get("Title", "Untitled Task") + job_number = job_details.get("Number", "") + + # Check if exists + if job_number: + existing_task_id = self.find_task_by_omg_number(project_id, job_number) + if existing_task_id: + self.logger.info(f"⊙ Task already exists: {task_title} (#{job_number}) - skipping") + return existing_task_id, True # Return (task_id, skipped=True) + + # Parse dates + due_date = self.parse_date(job_details.get("DueDate")) + brief_date = self.parse_date(job_details.get("BriefDate")) + + # Create task + task_data = { + "title": task_title, + "description": job_details.get("Notes", ""), + } + + if due_date: + task_data["dates"] = {"due": due_date} + if brief_date: + task_data["dates"]["start"] = brief_date + + result = self.make_wrike_request("POST", f"/folders/{project_id}/tasks", task_data) + if result and "data" in result and len(result["data"]) > 0: + task_id = result["data"][0]["id"] + + # Add custom fields + custom_fields = [] + + job_category = job_details.get("JobCategory", job_details.get("MediaType", "")) + if job_category: + custom_fields.append({ + "id": Config.CUSTOM_FIELDS["deliverable_category"], + "value": job_category + }) + + if job_number: + custom_fields.append({ + "id": Config.CUSTOM_FIELDS["omg_number"], + "value": job_number + }) + + notes_parts = [] + if job_details.get("Type"): + notes_parts.append(f"Type: {job_details['Type']}") + if job_details.get("Details"): + notes_parts.append(job_details["Details"]) + if notes_parts: + custom_fields.append({ + "id": Config.CUSTOM_FIELDS["notes"], + "value": " | ".join(notes_parts) + }) + + if custom_fields: + self.make_wrike_request("PUT", f"/tasks/{task_id}", {"customFields": custom_fields}) + + self.logger.info(f"Created task: {task_title} (#{job_number})") + return task_id, False # Return (task_id, skipped=False) + + return None, False + + def process_json_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False): + """Process single JSON file""" + start_time = time.time() + + try: + # Parse JSON + with open(file_path, 'r') as f: + data = json.load(f) + + job_spec = data.get("JobSpecification", {}) + project_details = job_spec.get("ProjectDetails", {}) + job_details = job_spec.get("JobDetails", {}) + + # Extract folder name + business_area = project_details.get("BusinessArea", job_details.get("BusinessArea", "")) + folder_name = self.parse_business_area(business_area) + + if not folder_name: + raise ValueError("No BusinessArea found") + + # Get/Create folder + folder_id = self.get_or_create_folder(folder_name) + if not folder_id: + raise ValueError(f"Failed to create folder: {folder_name}") + + # Get/Create project + project_title = project_details.get("Title", "Untitled Project") + campaign_code = job_details.get("CampaignCode", "") + + project_id = self.get_or_create_project(project_title, folder_id, project_details, campaign_code) + if not project_id: + raise ValueError(f"Failed to create project: {project_title}") + + # Create task + task_id, skipped = self.create_deliverable_task(project_id, job_details) + if not task_id: + raise ValueError("Failed to create task") + + processing_time = time.time() - start_time + + # Record stats + self.daily_stats.record_file_processed( + folder_name, project_title, not skipped, skipped, + processing_time, True, None, is_startup, is_periodic + ) + + # Move to processed + self.move_to_processed(file_path) + self.add_to_recently_processed(file_path) + + return True + + except Exception as e: + processing_time = time.time() - start_time + self.logger.error(f"Error processing {file_path.name}: {e}") + + # Move to failed + self.move_to_failed(file_path, str(e)) + + # Record error + try: + folder_name = self.parse_business_area( + job_spec.get("ProjectDetails", {}).get("BusinessArea", "Unknown") + ) + project_title = job_spec.get("ProjectDetails", {}).get("Title", "Unknown") + except: + folder_name = "Unknown" + project_title = "Unknown" + + self.daily_stats.record_file_processed( + folder_name, project_title, False, False, + processing_time, False, str(e), is_startup, is_periodic + ) + + return False + + def add_to_recently_processed(self, file_path: Path): + """Add file to recently processed set""" + with self.processed_lock: + try: + stat_info = file_path.stat() + file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}" + self.recently_processed.add(file_id) + + if len(self.recently_processed) > 1000: + oldest = list(self.recently_processed)[:200] + for entry in oldest: + self.recently_processed.discard(entry) + except: + pass + + def was_recently_processed(self, file_path: Path) -> bool: + """Check if file was recently processed""" + with self.processed_lock: + try: + stat_info = file_path.stat() + file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}" + return file_id in self.recently_processed + except: + return False + + def move_to_processed(self, file_path: Path): + """Move file to Processed folder""" + try: + destination = Config.PROCESSED_FOLDER / file_path.name + shutil.move(str(file_path), str(destination)) + except Exception as e: + self.logger.error(f"Failed to move to processed: {e}") + + def move_to_failed(self, file_path: Path, error_msg: str): + """Move file to Failed folder with error log""" + try: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + failed_filename = f"{timestamp}_{file_path.name}" + failed_path = Config.FAILED_FOLDER / failed_filename + + shutil.move(str(file_path), str(failed_path)) + + # Create error log + error_log_path = Config.FAILED_FOLDER / f"{timestamp}_{file_path.stem}_ERROR.txt" + with open(error_log_path, 'w') as f: + f.write(f"File: {file_path.name}\n") + f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"Error: {error_msg}\n") + + self.daily_stats.record_failed_file_moved() + self.logger.warning(f"Moved to failed: {file_path.name}") + + except Exception as e: + self.logger.error(f"Failed to move failed file: {e}") + + def scan_existing_files(self) -> List[Path]: + """Scan for existing JSON files""" + self.logger.info("🔍 Scanning for existing JSON files...") + json_files = list(Config.HOT_FOLDER.glob("*.json")) + self.logger.info(f"Found {len(json_files)} existing files") + return json_files + + def periodic_scan(self): + """Protected periodic scan for missed files""" + while True: + try: + if self.startup_complete: + time.sleep(Config.PERIODIC_SCAN_INTERVAL) + + with self.scan_lock: + if self.scan_in_progress: + self.logger.warning("Skipping scan - already in progress") + continue + self.scan_in_progress = True + + self._perform_periodic_scan() + + with self.scan_lock: + self.scan_in_progress = False + else: + time.sleep(5) + + except Exception as e: + self.logger.error(f"Periodic scan error: {e}") + with self.scan_lock: + self.scan_in_progress = False + time.sleep(Config.PERIODIC_SCAN_INTERVAL) + + def _perform_periodic_scan(self): + """Perform actual periodic scan""" + scan_start = time.time() + missed_files = [] + + try: + for json_file in Config.HOT_FOLDER.glob("*.json"): + if not self.was_recently_processed(json_file): + missed_files.append(json_file) + + scan_duration = time.time() - scan_start + + if missed_files: + self.logger.info(f"🔄 Periodic scan found {len(missed_files)} files ({scan_duration:.1f}s)") + for file_path in missed_files: + self.queue_file(file_path, is_periodic=True) + + self.daily_stats.record_periodic_scan(scan_duration, len(missed_files)) + + except Exception as e: + self.logger.error(f"Periodic scan error: {e}") + + def process_startup_files(self, files: List[Path]): + """Process existing files from startup scan""" + if not files: + self.logger.info("✅ No existing files to process") + return + + self.logger.info(f"🚀 Processing {len(files)} existing files...") + + for i in range(0, len(files), Config.BATCH_SIZE): + batch = files[i:i + Config.BATCH_SIZE] + self.process_batch(batch, is_startup=True) + + self.logger.info(f"✅ Startup processing complete") + + def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False): + """Process files sequentially (one at a time)""" + if not file_paths: + return + + # Process sequentially to avoid race conditions with Wrike API + for file_path in file_paths: + try: + self.process_json_file(file_path, is_startup, is_periodic) + except Exception as e: + self.logger.error(f"Error processing {file_path.name}: {e}") + + def queue_file(self, file_path: Path, is_periodic: bool = False): + """Add file to processing queue""" + if self.startup_complete: + self.file_queue.put((file_path, is_periodic)) + + def batch_processor_worker(self): + """Background worker for batch processing""" + while True: + files_to_process = [] + periodic_flags = [] + batch_start_time = time.time() + + while (len(files_to_process) < Config.BATCH_SIZE and + (time.time() - batch_start_time) < Config.BATCH_TIMEOUT): + try: + file_data = self.file_queue.get(timeout=1.0) + file_path, is_periodic = file_data if isinstance(file_data, tuple) else (file_data, False) + files_to_process.append(file_path) + periodic_flags.append(is_periodic) + self.file_queue.task_done() + except: + if files_to_process: + break + continue + + if files_to_process and self.startup_complete: + is_periodic_batch = any(periodic_flags) + self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch) + + def cleanup_old_processed_files(self): + """Delete files older than configured hours from Processed folder""" + try: + cutoff_time = time.time() - (Config.CLEANUP_PROCESSED_HOURS * 3600) + deleted = 0 + + for file in Config.PROCESSED_FOLDER.glob("*.json"): + if file.stat().st_mtime < cutoff_time: + file.unlink() + deleted += 1 + + if deleted > 0: + self.logger.info(f"🗑️ Cleaned up {deleted} old processed files") + + except Exception as e: + self.logger.error(f"Cleanup error: {e}") + + def generate_daily_report(self): + """Generate and email daily report""" + try: + stats = self.daily_stats.get_stats_summary() + report_content = self.format_daily_report(stats) + + # Save report + report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt" + with open(report_file, 'w') as f: + f.write(report_content) + + # Email report + self.email_daily_report(report_content, stats['date']) + + # Cleanup old reports + self.cleanup_old_reports() + + # Cleanup old processed files + self.cleanup_old_processed_files() + + # Reset stats + self.daily_stats.reset_stats() + + self.logger.info(f"Daily report generated: {report_file}") + + except Exception as e: + self.logger.error(f"Failed to generate daily report: {e}") + + def format_daily_report(self, stats: Dict[str, Any]) -> str: + """Format daily statistics into readable report""" + startup_info = f"Startup Files: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else "" + periodic_info = f"Periodic Scan Files: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else "" + failed_info = f"Failed Files: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else "" + + report = f""" +WRIKE IMPORT DAILY REPORT +Date: {stats['date']} +Uptime: {stats['uptime']} +Space: {Config.WRIKE_SPACE_NAME} + +=== SUMMARY === +Total Files Processed: {stats['total_processed']} +Tasks Created: {stats['total_created']} +Tasks Skipped (duplicates): {stats['total_skipped']} +{startup_info}{periodic_info}{failed_info}Errors: {stats['total_errors']} +Success Rate: {stats['success_rate']:.1f}% +Average Processing Time: {stats['avg_processing_time']}s +Periodic Scans: {stats['periodic_scans_completed']} ({stats['slow_scans']} slow) + +=== FOLDER BREAKDOWN === +""" + + for folder, folder_data in sorted(stats['folder_stats'].items()): + report += f"\n{folder}:\n" + report += f" Tasks Created: {folder_data['tasks_created']}\n" + report += f" Tasks Skipped: {folder_data['tasks_skipped']}\n" + report += f" Errors: {folder_data['errors']}\n" + + if folder_data['projects']: + report += f" Projects:\n" + for project, count in folder_data['projects'].most_common(10): + report += f" - {project}: {count} deliverable(s)\n" + + report += "\n=== HOURLY BREAKDOWN ===\n" + for hour in range(24): + count = stats['hourly_stats'].get(hour, 0) + if count > 0: + report += f"{hour:02d}:00 - {count} files\n" + + if stats['error_details']: + report += "\n=== RECENT ERRORS ===\n" + for error in stats['error_details'][-10:]: + report += f"{error['time']} - {error['folder']}/{error['project']}: {error['error']}\n" + + return report + + def email_daily_report(self, report_content: str, report_date: str): + """Email the daily report""" + try: + msg = MIMEMultipart() + msg['From'] = Config.SENDER_EMAIL + msg['To'] = ', '.join(Config.REPORT_EMAILS) + msg['Subject'] = f"Wrike Import Daily Report - {Config.WRIKE_SPACE_NAME} - {report_date}" + + msg.attach(MIMEText(report_content, 'plain')) + + server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT) + server.starttls() + server.login(Config.SMTP_USER, Config.SMTP_PASSWORD) + server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string()) + server.quit() + + self.logger.info("Daily report emailed successfully") + + except Exception as e: + self.logger.error(f"Failed to email report: {e}") + + def cleanup_old_reports(self): + """Remove old report files""" + try: + cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS) + + for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"): + try: + file_date_str = report_file.stem.split('_')[-1] + file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date() + + if file_date < cutoff_date: + report_file.unlink() + except: + pass + + except Exception as e: + self.logger.error(f"Failed to cleanup old reports: {e}") + + def get_current_stats(self) -> str: + """Get current statistics as string""" + stats = self.daily_stats.get_stats_summary() + return f"Today: {stats['total_processed']} processed ({stats['total_created']} created, {stats['total_skipped']} skipped), {stats['total_errors']} errors" + + +class WrikeFileHandler(FileSystemEventHandler): + """File system event handler for Wrike monitor""" + + def __init__(self, monitor: WrikeMonitor): + self.monitor = monitor + self.logger = logging.getLogger(__name__) + + def on_created(self, event): + if event.is_directory or not self.monitor.startup_complete: + return + + file_path = Path(event.src_path) + if file_path.suffix.lower() == '.json': + self.logger.debug(f"📥 New file detected: {file_path.name}") + self.monitor.queue_file(file_path, is_periodic=False) + + def on_moved(self, event): + if event.is_directory or not self.monitor.startup_complete: + return + + dest_path = Path(event.dest_path) + if dest_path.suffix.lower() == '.json': + self.logger.debug(f"📁 File moved: {dest_path.name}") + self.monitor.queue_file(dest_path, is_periodic=False) + + +def main(): + """Main entry point with real-time monitoring""" + monitor = WrikeMonitor() + + # Step 1: Process existing files + existing_files = monitor.scan_existing_files() + monitor.process_startup_files(existing_files) + + # Step 2: Start real-time monitoring + monitor.startup_complete = True + monitor.logger.info("🎯 Startup complete - switching to real-time monitoring") + + # Start batch processing worker + batch_worker = threading.Thread( + target=monitor.batch_processor_worker, + name="BatchWorker", + daemon=True + ) + batch_worker.start() + + # Start periodic scanner + periodic_scanner = threading.Thread( + target=monitor.periodic_scan, + name="PeriodicScanner", + daemon=True + ) + periodic_scanner.start() + + # Setup file monitoring + event_handler = WrikeFileHandler(monitor) + observer = Observer() + observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=False) + observer.start() + + monitor.logger.info(f"🛡️ Real-time monitoring active on: {Config.HOT_FOLDER}") + monitor.logger.info(f"📧 Daily reports at {Config.DAILY_REPORT_TIME} to {', '.join(Config.REPORT_EMAILS)}") + + try: + while True: + time.sleep(60) + current_stats = monitor.get_current_stats() + monitor.logger.info(f"📊 {current_stats}") + except KeyboardInterrupt: + monitor.logger.info("Stopping Wrike Monitor...") + observer.stop() + + observer.join() + monitor.logger.info("Wrike Monitor stopped") + + +if __name__ == "__main__": + main()