Add real-time monitoring service (v2.0)

Major additions:
- wrike_monitor.py: Real-time folder monitoring with watchdog
- Daily email reports at 7PM with comprehensive statistics
- Failed file handling with error logs
- Periodic scanning for missed files
- systemd service support for production deployment
- Sequential processing to prevent race conditions
- Proper parent/child folder matching using childIds

Configuration:
- Easy path configuration for local/production
- Configurable Wrike space ID
- Email settings for daily reports
- Auto-cleanup of processed files (24h retention)

Documentation:
- INSTALLATION.md: Complete systemd service setup guide
- QUICKSTART.md: Quick reference for both tools
- Updated README.md with tool comparison
- requirements.txt with clear dependencies

Bug fixes:
- Fixed duplicate folder/project creation via childIds matching
- Added logging for skipped deliverables
- Improved cache management

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Dave Porter 2025-10-10 14:31:19 -04:00
parent 06c472c748
commit 255c03b65d
6 changed files with 1650 additions and 12 deletions

359
INSTALLATION.md Normal file
View file

@ -0,0 +1,359 @@
# Wrike Monitor Installation Guide
## Overview
This guide covers installing `wrike_monitor.py` as a systemd service that runs automatically on server boot, monitors a folder 24/7, and sends daily email reports at 7PM.
**For simple batch processing**, use `wrike_import.py` instead (no installation needed, just run it).
## Prerequisites
- Python 3.6+
- Root/sudo access
- Wrike API token with write permissions
## Installation Steps
### 1. Install Python Dependencies
```bash
pip3 install -r requirements.txt
```
Or manually:
```bash
pip3 install requests watchdog schedule
```
### 2. Configure the Script
Edit `wrike_monitor.py` and update the `Config` class (starting at line 36):
#### A. Set Paths (Lines 41-44)
**For Local Testing:**
```python
HOT_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/json_files")
PROCESSED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed")
FAILED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/Failed")
REPORTS_DIR = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/reports")
```
**For Production Server:**
```python
HOT_FOLDER = Path("/data/PRODUCTION/WRIKE_JSON")
PROCESSED_FOLDER = Path("/data/PRODUCTION/WRIKE_JSON/Processed")
FAILED_FOLDER = Path("/data/PRODUCTION/WRIKE_JSON/Failed")
REPORTS_DIR = Path("/PRODUCTION/WRIKE_LOGS")
```
#### B. Set Wrike Space (Lines 52-53)
**For Staging:**
```python
WRIKE_SPACE_ID = "MQAAAABpz7l_"
WRIKE_SPACE_NAME = "Staging"
```
**For Production:**
```python
WRIKE_SPACE_ID = "MQAAAABoHcTY"
WRIKE_SPACE_NAME = "LGL Team"
```
#### C. Update Email Settings (Lines 79, 84)
```python
REPORT_EMAILS = ["your-email@oliver.agency", "another@oliver.agency"]
DAILY_REPORT_TIME = "19:00" # 7PM daily report
```
#### D. Update API Token (Line 49)
```python
WRIKE_TOKEN = "your_production_api_token_here"
```
### 3. Copy Files to Server
```bash
# Create directory
sudo mkdir -p /root/wrike-import
sudo chmod 755 /root/wrike-import
# Copy script
sudo cp wrike_monitor.py /root/wrike-import/
sudo chmod +x /root/wrike-import/wrike_monitor.py
# Create required directories
sudo mkdir -p /data/PRODUCTION/WRIKE_JSON
sudo mkdir -p /data/PRODUCTION/WRIKE_JSON/Processed
sudo mkdir -p /data/PRODUCTION/WRIKE_JSON/Failed
sudo mkdir -p /PRODUCTION/WRIKE_LOGS
sudo chmod -R 777 /data/PRODUCTION/WRIKE_JSON
sudo chmod -R 777 /PRODUCTION/WRIKE_LOGS
```
### 4. Install systemd Service
```bash
# Copy service file
sudo cp wrike-monitor.service /etc/systemd/system/
# Reload systemd
sudo systemctl daemon-reload
# Enable service (start on boot)
sudo systemctl enable wrike-monitor
# Start service
sudo systemctl start wrike-monitor
```
### 5. Verify Installation
```bash
# Check service status
sudo systemctl status wrike-monitor
# View real-time logs
sudo journalctl -u wrike-monitor -f
# Check if monitoring is working
echo '{"test": "data"}' > /data/PRODUCTION/WRIKE_JSON/test.json
sudo journalctl -u wrike-monitor -f
```
## Service Management Commands
### Check Status
```bash
sudo systemctl status wrike-monitor
```
### Start Service
```bash
sudo systemctl start wrike-monitor
```
### Stop Service
```bash
sudo systemctl stop wrike-monitor
```
### Restart Service
```bash
sudo systemctl restart wrike-monitor
```
### View Logs
```bash
# Live tail
sudo journalctl -u wrike-monitor -f
# Last 100 lines
sudo journalctl -u wrike-monitor -n 100
# Since today
sudo journalctl -u wrike-monitor --since today
# Specific time range
sudo journalctl -u wrike-monitor --since "2025-10-09 09:00" --until "2025-10-09 17:00"
```
### Disable Service
```bash
sudo systemctl disable wrike-monitor
sudo systemctl stop wrike-monitor
```
## Configuration Changes
After modifying `wrike_monitor.py`:
```bash
# Restart service to apply changes
sudo systemctl restart wrike-monitor
# Verify it started correctly
sudo systemctl status wrike-monitor
```
## Troubleshooting
### Service Won't Start
1. Check Python path:
```bash
which python3
# Update ExecStart in wrike-monitor.service if different
```
2. Check file permissions:
```bash
ls -la /root/wrike-import/wrike_monitor.py
sudo chmod +x /root/wrike-import/wrike_monitor.py
```
3. Check logs:
```bash
sudo journalctl -u wrike-monitor -n 50
```
### No Files Being Processed
1. Verify folder exists and has correct permissions:
```bash
ls -la /data/PRODUCTION/WRIKE_JSON
sudo chmod 777 /data/PRODUCTION/WRIKE_JSON
```
2. Test manually:
```bash
sudo systemctl stop wrike-monitor
cd /root/wrike-import
python3 wrike_monitor.py
# Watch for errors, then Ctrl+C
sudo systemctl start wrike-monitor
```
### Email Reports Not Sending
1. Check SMTP credentials in Config class
2. Verify email addresses in `REPORT_EMAILS`
3. Check logs for email errors:
```bash
sudo journalctl -u wrike-monitor | grep -i email
```
### High CPU/Memory Usage
1. Reduce batch size and workers in Config:
```python
BATCH_SIZE = 3
MAX_WORKERS = 3
```
2. Increase scan interval:
```python
PERIODIC_SCAN_INTERVAL = 120 # Scan every 2 minutes instead of 1
```
## Testing Before Production
### Local Testing
1. Keep local paths in Config
2. Run manually:
```bash
python3 wrike_monitor.py
```
3. Drop test JSON files into the hot folder
4. Verify they're processed and moved to Processed/
### Server Testing (without service)
```bash
# Run in foreground
sudo python3 /root/wrike-import/wrike_monitor.py
# Test file processing
sudo cp test.json /data/PRODUCTION/WRIKE_JSON/
# Watch logs
# Ctrl+C to stop when satisfied
```
## Monitoring Production
### Check Daily Reports
```bash
# View today's report
cat /PRODUCTION/WRIKE_LOGS/daily_report_$(date +%Y-%m-%d).txt
# List all reports
ls -lh /PRODUCTION/WRIKE_LOGS/
```
### Monitor Performance
```bash
# View stats every minute (in logs)
sudo journalctl -u wrike-monitor -f | grep "📊"
# Check for errors
sudo journalctl -u wrike-monitor | grep -i error
# Check for slow scans
sudo journalctl -u wrike-monitor | grep -i "slow"
```
### Check Processed/Failed Files
```bash
# Processed files (will auto-delete after 24h)
ls -lh /data/PRODUCTION/WRIKE_JSON/Processed/
# Failed files (need manual review)
ls -lh /data/PRODUCTION/WRIKE_JSON/Failed/
# View error logs
cat /data/PRODUCTION/WRIKE_JSON/Failed/*_ERROR.txt
```
## Maintenance
### Manually Trigger Daily Report
```bash
# The report runs automatically at 7PM, but to test:
sudo systemctl restart wrike-monitor
# Then wait or modify DAILY_REPORT_TIME in config
```
### Clear Failed Files After Review
```bash
# After reviewing and fixing failed files
sudo rm -rf /data/PRODUCTION/WRIKE_JSON/Failed/*
```
### Backup Configuration
```bash
# Backup before changes
sudo cp /root/wrike-import/wrike_monitor.py /root/wrike-import/wrike_monitor.py.backup
```
## Uninstallation
```bash
# Stop and disable service
sudo systemctl stop wrike-monitor
sudo systemctl disable wrike-monitor
# Remove service file
sudo rm /etc/systemd/system/wrike-monitor.service
sudo systemctl daemon-reload
# Remove script (optional)
sudo rm -rf /root/wrike-import
# Remove data directories (optional - be careful!)
# sudo rm -rf /data/PRODUCTION/WRIKE_JSON
# sudo rm -rf /PRODUCTION/WRIKE_LOGS
```
## Support
For issues:
1. Check logs: `sudo journalctl -u wrike-monitor -f`
2. Verify configuration in `wrike_monitor.py`
3. Test manually before running as service
4. Review daily reports for patterns
---
**Last Updated**: October 2025

221
QUICKSTART.md Normal file
View file

@ -0,0 +1,221 @@
# Quick Start Guide
## Choose Your Tool
### Option 1: Simple Batch Import (wrike_import.py)
**Best for:** One-time imports, manual runs, testing
```bash
# Install
pip install requests
# Run
python wrike_import.py /path/to/json/files/
# That's it! Files processed and moved to Processed/
```
---
### Option 2: Real-time Monitor Service (wrike_monitor.py)
**Best for:** Production, 24/7 monitoring, automatic processing
```bash
# Install
pip install -r requirements.txt
# Configure (edit these lines in wrike_monitor.py):
# Line 41: HOT_FOLDER = Path("your/input/folder")
# Line 42: PROCESSED_FOLDER = Path("your/processed/folder")
# Line 52: WRIKE_SPACE_ID = "your_space_id"
# Line 79: REPORT_EMAILS = ["your@email.com"]
# Run locally for testing
python wrike_monitor.py
# Deploy as service (see INSTALLATION.md)
```
---
## Configuration Quick Reference
### wrike_monitor.py - Key Settings
| Setting | Line | Description |
|---------|------|-------------|
| `HOT_FOLDER` | 41 | Input folder to watch |
| `PROCESSED_FOLDER` | 42 | Where processed files go |
| `FAILED_FOLDER` | 43 | Where failed files go |
| `REPORTS_DIR` | 44 | Where reports are saved |
| `WRIKE_TOKEN` | 49 | Your Wrike API token |
| `WRIKE_SPACE_ID` | 52 | Target Wrike space ID |
| `WRIKE_SPACE_NAME` | 53 | Space name (for logs) |
| `REPORT_EMAILS` | 79 | Who gets daily reports |
| `DAILY_REPORT_TIME` | 84 | When to send report (24h format) |
| `CLEANUP_PROCESSED_HOURS` | 98 | Delete processed files after X hours |
| `PERIODIC_SCAN_INTERVAL` | 88 | How often to scan for missed files |
### Custom Field IDs (Lines 57-68)
Update these if you're using a different Wrike space with different custom field IDs.
---
## Example Workflows
### Workflow 1: Local Testing
```bash
# 1. Create test folder
mkdir -p ~/wrike-test/json_files
# 2. Edit wrike_monitor.py
# Set HOT_FOLDER = Path("/Users/yourname/wrike-test/json_files")
# 3. Run monitor
python wrike_monitor.py
# 4. Drop JSON files into json_files/
# Watch them get processed automatically!
```
### Workflow 2: Production Deployment
```bash
# 1. Configure paths for production in wrike_monitor.py
# 2. Copy to server
scp wrike_monitor.py wrike-monitor.service server:/root/wrike-import/
# 3. SSH to server and install service
ssh server
sudo cp /root/wrike-import/wrike-monitor.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable wrike-monitor
sudo systemctl start wrike-monitor
# 4. Monitor logs
sudo journalctl -u wrike-monitor -f
# 5. Get daily reports at 7PM via email!
```
### Workflow 3: Batch Import Existing Files
```bash
# Use wrike_import.py for one-time batch
python wrike_import.py /path/to/existing/json/files/
# All files processed and moved to Processed/
# Switch to wrike_monitor.py for ongoing monitoring
```
---
## Testing Checklist
Before deploying to production:
- [ ] Test with sample JSON files locally
- [ ] Verify folders/projects created correctly in Wrike
- [ ] Check duplicate detection works (reprocess same file)
- [ ] Confirm files move to Processed/ folder
- [ ] Test failed file handling (drop invalid JSON)
- [ ] Verify email settings (check inbox at report time)
- [ ] Review custom field IDs match your space
- [ ] Test with production paths (but staging space first!)
---
## Common Configuration Scenarios
### Scenario: Multiple Environments
Create separate config files:
```python
# wrike_monitor_staging.py - points to Staging space
# wrike_monitor_production.py - points to LGL Team space
```
Or use environment variables:
```python
import os
WRIKE_SPACE_ID = os.environ.get('WRIKE_SPACE_ID', 'MQAAAABpz7l_')
```
### Scenario: Different Report Times
```python
# Morning report at 9AM
DAILY_REPORT_TIME = "09:00"
# Multiple reports per day - use schedule in setup_daily_reporting():
schedule.every().day.at("09:00").do(self.generate_daily_report)
schedule.every().day.at("17:00").do(self.generate_daily_report)
schedule.every().day.at("19:00").do(self.generate_daily_report)
```
### Scenario: Longer File Retention
```python
# Keep processed files for 7 days instead of 24 hours
CLEANUP_PROCESSED_HOURS = 168 # 7 days * 24 hours
```
---
## Troubleshooting Quick Fixes
### Files not being processed
```bash
# Check folder permissions
chmod 777 /your/input/folder
# Check if monitor is running
ps aux | grep wrike_monitor
```
### Duplicates being created
- Sequential processing is enabled (BATCH_SIZE=1, MAX_WORKERS=1)
- Caches persist during runtime
- Restart service if caches get stale: `sudo systemctl restart wrike-monitor`
### Email not sending
- Check SMTP credentials in Config
- Test manually: send test email from server
- Check firewall allows SMTP port 587
### High memory usage
- Reduce BATCH_SIZE (line 82)
- Increase PERIODIC_SCAN_INTERVAL (line 88)
- Clear old logs: `rm /PRODUCTION/WRIKE_LOGS/*.log.old`
---
## Getting Space IDs
```bash
# Get all spaces
curl -X GET "https://www.wrike.com/api/v4/spaces" \
-H "Authorization: Bearer YOUR_TOKEN"
# Find your space and copy the ID
```
## Getting Custom Field IDs
```bash
# Get all custom fields
curl -X GET "https://www.wrike.com/api/v4/customfields" \
-H "Authorization: Bearer YOUR_TOKEN"
# Update CUSTOM_FIELDS in Config with your IDs
```
---
**Need Help?** Check the main [README.md](README.md) for detailed documentation.

107
README.md
View file

@ -1,35 +1,91 @@
# Wrike Import Script
# Wrike Import Tools
A Python script to automatically import project structures and deliverable tasks into Wrike from JSON files.
Two Python tools for automatically importing project structures and deliverable tasks into Wrike from JSON files.
## Overview
## Tools Overview
This script processes JSON files containing job specifications and creates the corresponding structure in Wrike:
### 1. wrike_import.py - Simple Batch Processor
A standalone script for one-time or manual batch imports. Processes all JSON files in a directory and exits.
### 2. wrike_monitor.py - Real-time Monitor Service
A continuous monitoring service that watches a folder 24/7, processes files as they arrive, and sends daily email reports. Designed to run as a systemd service.
## Common Features
Both tools create the following Wrike structure:
- **Folders** (product categories like "Dry Specialty", "Air", "PDC")
- **Projects** (campaigns with timelines and metadata)
- **Tasks** (deliverables with custom fields and due dates)
The script intelligently handles duplicates by checking OMG numbers before creating new items, updating existing ones instead.
Both intelligently handle duplicates by checking OMG numbers before creating new items.
## Features
## Features Comparison
### wrike_import.py (Batch Script)
- ✅ Simple command-line usage
- ✅ Process all files and exit
- ✅ Good for one-time imports or manual runs
- ✅ Moves processed files to Processed/
- ✅ Auto-cleanup of old files
### wrike_monitor.py (Service)
- ✅ **Real-time folder monitoring** with watchdog
- ✅ **Automatic processing** as files arrive
- ✅ **Daily email reports** at 7PM
- ✅ **Periodic scanning** for missed files
- ✅ **Failed file handling** with error logs
- ✅ **Auto-cleanup** of processed files (24h)
- ✅ **Statistics tracking** and performance monitoring
- ✅ **Systemd service** support for production
- ✅ **Sequential processing** to prevent race conditions
### Common Features (Both)
- ✅ **Automatic folder creation** from BusinessArea hierarchy
- ✅ **Project management** with start/end dates and campaign codes
- ✅ **Task creation/updates** with comprehensive custom fields
- ✅ **Task creation** with comprehensive custom fields
- ✅ **Duplicate prevention** via OMG number checking
- ✅ **Smart caching** to minimize API calls
- ✅ **Detailed logging** with progress tracking
- ✅ **Batch processing** of multiple JSON files
- ✅ **Automatic file management** - moves processed files to Processed subfolder
- ✅ **Auto-cleanup** - deletes files older than 24 hours from Processed folder
## Quick Start
### For Batch Processing (wrike_import.py)
```bash
# Install dependencies
pip install requests
# Run on a folder
python wrike_import.py /path/to/json/files/
```
### For Real-time Monitoring (wrike_monitor.py)
```bash
# Install dependencies
pip install -r requirements.txt
# Configure paths in wrike_monitor.py (lines 41-44)
# Run the monitor
python wrike_monitor.py
```
See [INSTALLATION.md](INSTALLATION.md) for systemd service setup.
## Requirements
### Python Dependencies
**For wrike_import.py:**
```bash
pip install requests
```
**For wrike_monitor.py:**
```bash
pip install requests watchdog schedule
# Or use requirements.txt:
pip install -r requirements.txt
```
### Python Version
- Python 3.6 or higher
@ -469,9 +525,36 @@ When reporting issues, include:
This script is provided as-is for internal use.
## Which Tool Should I Use?
### Use wrike_import.py if you want to:
- Import a batch of JSON files once
- Run manually when needed
- Test imports locally
- Simple command-line operation
### Use wrike_monitor.py if you want to:
- Continuous 24/7 monitoring
- Automatic processing as files arrive
- Daily email reports
- Production deployment as a service
- Detailed statistics and monitoring
## Changelog
### Version 1.2 (Current)
### Version 2.0 (Current)
- **NEW**: Added wrike_monitor.py - real-time monitoring service
- **NEW**: Folder watching with watchdog
- **NEW**: Daily email reports at 7PM
- **NEW**: Failed file handling with error logs
- **NEW**: Periodic scanning for missed files
- **NEW**: Statistics tracking and performance monitoring
- **NEW**: systemd service support
- **FIXED**: Sequential processing to prevent race conditions
- **FIXED**: Proper parent/child folder matching to avoid duplicates
- **IMPROVED**: Logging for skipped deliverables
### Version 1.2
- Added automatic file movement to Processed subfolder
- Implemented auto-cleanup of files older than 24 hours
- Improved file management and organization
@ -493,4 +576,4 @@ This script is provided as-is for internal use.
**Last Updated**: October 2025
**Author**: Dave Porter
**Wrike Space**: Staging (MQAAAABpz7l_)
**Repository**: https://bitbucket.org/zlalani/bissell-wrike-python

12
requirements.txt Normal file
View file

@ -0,0 +1,12 @@
# Core dependencies (required for both scripts)
requests>=2.31.0
# Additional dependencies for wrike_monitor.py only
watchdog>=3.0.0
schedule>=1.2.0
# Install all:
# pip install -r requirements.txt
# Install only for wrike_import.py:
# pip install requests

23
wrike-monitor.service Normal file
View file

@ -0,0 +1,23 @@
[Unit]
Description=Wrike Import Monitor Service
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/root/wrike-import
ExecStart=/usr/bin/python3 /root/wrike-import/wrike_monitor.py
Restart=always
RestartSec=10
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=wrike-monitor
# Resource limits
MemoryLimit=1G
CPUQuota=50%
[Install]
WantedBy=multi-user.target

940
wrike_monitor.py Normal file
View file

@ -0,0 +1,940 @@
#!/usr/bin/env python3
"""
Wrike Import Monitor - Real-time JSON processor with daily reporting
Monitors a folder for JSON files and automatically creates Wrike projects/tasks
Features:
- Real-time folder monitoring with watchdog
- Periodic scanning for missed files
- Duplicate detection via OMG numbers
- Daily email reports at 7PM
- Concurrent batch processing
- Failed file handling
- Auto-cleanup of old processed files
"""
import json
import logging
import smtplib
import shutil
import time
import threading
import schedule
import requests
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, date, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path
from typing import Dict, Optional, Any, List, Set
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Config:
"""Configuration settings - EDIT THESE FOR YOUR ENVIRONMENT"""
# === PATHS (EDIT FOR LOCAL/SERVER) ===
# development paths
HOT_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/json_files")
PROCESSED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed")
FAILED_FOLDER = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/Failed")
REPORTS_DIR = Path("/Users/daveporter/Desktop/CODING-2024/Wrike/Processed/reports")
# === WRIKE API SETTINGS ===
WRIKE_API_BASE = "https://www.wrike.com/api/v4"
WRIKE_TOKEN = "eyJ0dCI6InAiLCJhbGciOiJIUzI1NiIsInR2IjoiMiJ9.eyJkIjoie1wiYVwiOjY5NzM0OTgsXCJpXCI6OTUyOTY3MCxcImNcIjo0Njk5NTI3LFwidVwiOjIzMjcyNDA0LFwiclwiOlwiVVNcIixcInNcIjpbXCJXXCIsXCJGXCIsXCJJXCIsXCJVXCIsXCJLXCIsXCJDXCIsXCJEXCIsXCJNXCIsXCJBXCIsXCJMXCIsXCJQXCJdLFwielwiOltdLFwidFwiOjB9IiwiaWF0IjoxNzYwMDI4ODIzfQ.9f4t15LycpoH-NlzQC3s1K19fVqnAwcahG2D-J5E8dg"
# Wrike Space - EDIT FOR DIFFERENT ENVIRONMENTS
WRIKE_SPACE_ID = "MQAAAABpz7l_" # Staging
WRIKE_SPACE_NAME = "Staging"
# Production space (uncomment for production)
# WRIKE_SPACE_ID = "MQAAAABoHcTY" # LGL Team
# WRIKE_SPACE_NAME = "LGL Team"
# Custom field IDs in Wrike
CUSTOM_FIELDS = {
"budget": "IEAGU2B2JUAJRZ7P",
"impact": "IEAGU2B2JUAJRZ7Q",
"notes": "IEAGU2B2JUAJRZ7R",
"rag": "IEAGU2B2JUAJRZ7S",
"deliverable_category": "IEAGU2B2JUAJRZ7T",
"actions": "IEAGU2B2JUAJRZ7W",
"shoot_date": "IEAGU2B2JUAJRZ7X",
"omg_number": "IEAGU2B2JUAJRZ7Y",
"box_link": "IEAGU2B2JUAJRZ7Z",
"owner": "IEAGU2B2JUAJRZ72"
}
# === EMAIL SETTINGS ===
SMTP_SERVER = "smtp.mailgun.org"
SMTP_PORT = 587
SMTP_USER = "twist@mail.dev.oliver.solutions"
SMTP_PASSWORD = "102115e9f3b9d7332d0cd1d4329bc0d4-77751bfc-ca066b71"
SENDER_EMAIL = "WRIKE-MONITOR@oliver.agency"
REPORT_EMAILS = ["daveporter@oliver.agency"]
# === PROCESSING SETTINGS ===
BATCH_SIZE = 1 # Process one file at a time to avoid race conditions
MAX_WORKERS = 1 # No concurrent processing
BATCH_TIMEOUT = 30
WAIT_DELAY = 2
# === MONITORING SETTINGS ===
PERIODIC_SCAN_INTERVAL = 60 # Scan every 60 seconds
PERIODIC_SCAN_TIMEOUT = 120 # Max scan time
SLOW_SCAN_THRESHOLD = 30
# === REPORTING SETTINGS ===
DAILY_REPORT_TIME = "19:00" # 7PM daily report
KEEP_REPORTS_DAYS = 30
CLEANUP_PROCESSED_HOURS = 24 # Delete processed files after 24 hours
class DailyStats:
"""Thread-safe daily statistics collector"""
def __init__(self):
self.lock = threading.Lock()
self.reset_stats()
self.start_time = datetime.now()
def reset_stats(self):
"""Reset daily statistics"""
with self.lock:
self.date = date.today()
self.total_processed = 0
self.total_created = 0
self.total_skipped = 0
self.total_errors = 0
self.startup_files_processed = 0
self.periodic_files_found = 0
self.failed_files_moved = 0
self.periodic_scans_completed = 0
self.slow_scans = 0
# Folder/Project breakdown
self.folder_stats = defaultdict(lambda: {
'projects': Counter(),
'tasks_created': 0,
'tasks_skipped': 0,
'errors': 0
})
# Performance metrics
self.processing_times = []
self.hourly_stats = defaultdict(int)
self.error_details = []
def record_file_processed(self, folder_name: str, project_name: str,
task_created: bool, skipped: bool, processing_time: float,
success: bool = True, error: str = None,
is_startup: bool = False, is_periodic: bool = False):
"""Record a processed file"""
with self.lock:
hour = datetime.now().hour
self.hourly_stats[hour] += 1
if is_startup:
self.startup_files_processed += 1
elif is_periodic:
self.periodic_files_found += 1
if success:
self.total_processed += 1
if task_created:
self.total_created += 1
self.folder_stats[folder_name]['tasks_created'] += 1
if skipped:
self.total_skipped += 1
self.folder_stats[folder_name]['tasks_skipped'] += 1
self.folder_stats[folder_name]['projects'][project_name] += 1
self.processing_times.append(processing_time)
else:
self.total_errors += 1
self.folder_stats[folder_name]['errors'] += 1
if error:
self.error_details.append({
'time': datetime.now().strftime('%H:%M:%S'),
'folder': folder_name,
'project': project_name,
'error': error
})
def record_failed_file_moved(self):
"""Record a failed file being moved"""
with self.lock:
self.failed_files_moved += 1
def record_periodic_scan(self, duration: float, files_found: int):
"""Record periodic scan statistics"""
with self.lock:
self.periodic_scans_completed += 1
if duration > Config.SLOW_SCAN_THRESHOLD:
self.slow_scans += 1
def get_stats_summary(self) -> Dict[str, Any]:
"""Get current statistics summary"""
with self.lock:
avg_processing_time = sum(self.processing_times) / len(self.processing_times) if self.processing_times else 0
return {
'date': self.date.strftime('%Y-%m-%d'),
'uptime': str(datetime.now() - self.start_time).split('.')[0],
'total_processed': self.total_processed,
'total_created': self.total_created,
'total_skipped': self.total_skipped,
'startup_files_processed': self.startup_files_processed,
'periodic_files_found': self.periodic_files_found,
'failed_files_moved': self.failed_files_moved,
'periodic_scans_completed': self.periodic_scans_completed,
'slow_scans': self.slow_scans,
'total_errors': self.total_errors,
'success_rate': (self.total_processed / (self.total_processed + self.total_errors) * 100) if (self.total_processed + self.total_errors) > 0 else 0,
'avg_processing_time': round(avg_processing_time, 2),
'folder_stats': dict(self.folder_stats),
'hourly_stats': dict(self.hourly_stats),
'error_details': self.error_details.copy()
}
class WrikeMonitor:
"""Wrike import processor with real-time monitoring"""
def __init__(self):
self.setup_logging()
self.ensure_directories()
self.file_queue = Queue()
self.daily_stats = DailyStats()
self.startup_complete = False
# Caches
self.folder_cache = {}
self.project_cache = {}
# Track recently processed files
self.recently_processed: Set[str] = set()
self.processed_lock = threading.Lock()
# Scan protection
self.scan_in_progress = False
self.scan_lock = threading.Lock()
self.setup_daily_reporting()
def setup_logging(self):
"""Setup logging configuration"""
log_format = '%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s'
Config.REPORTS_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(Config.REPORTS_DIR / 'wrike_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info(f"Wrike Monitor initialized - Target: {Config.WRIKE_SPACE_NAME}")
def ensure_directories(self):
"""Create necessary directories"""
for directory in [Config.HOT_FOLDER, Config.PROCESSED_FOLDER,
Config.FAILED_FOLDER, Config.REPORTS_DIR]:
directory.mkdir(parents=True, exist_ok=True)
def setup_daily_reporting(self):
"""Setup scheduled daily reporting"""
schedule.every().day.at(Config.DAILY_REPORT_TIME).do(self.generate_daily_report)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60)
scheduler_thread = threading.Thread(target=run_scheduler, name="Scheduler", daemon=True)
scheduler_thread.start()
def make_wrike_request(self, method, endpoint, data=None):
"""Make a request to the Wrike API"""
url = f"{Config.WRIKE_API_BASE}{endpoint}"
headers = {
"Authorization": f"Bearer {Config.WRIKE_TOKEN}",
"Content-Type": "application/json"
}
try:
if method == "GET":
response = requests.get(url, headers=headers, timeout=30)
elif method == "POST":
response = requests.post(url, headers=headers, json=data, timeout=30)
elif method == "PUT":
response = requests.put(url, headers=headers, json=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"Wrike API error: {e}")
return None
def parse_business_area(self, business_area):
"""Extract folder name from BusinessArea"""
parts = business_area.split(">")
return parts[-1].strip() if parts else business_area.strip()
def parse_date(self, date_string):
"""Convert date string to YYYY-MM-DD format"""
if not date_string:
return None
try:
dt = datetime.strptime(date_string.split('+')[0].strip(), "%Y-%m-%d %H:%M:%S")
return dt.strftime("%Y-%m-%d")
except:
return None
def get_or_create_folder(self, folder_name, parent_id=None):
"""Get existing folder or create new one"""
if parent_id is None:
parent_id = Config.WRIKE_SPACE_ID
cache_key = f"{parent_id}:{folder_name}"
if cache_key in self.folder_cache:
return self.folder_cache[cache_key]
# Get folder tree
result = self.make_wrike_request("GET", f"/folders/{parent_id}/folders")
if result and "data" in result:
# First entry is the parent folder itself with childIds
parent_folder = None
for item in result["data"]:
if item["id"] == parent_id:
parent_folder = item
break
if parent_folder:
child_ids = parent_folder.get("childIds", [])
# Search only in direct children
for folder in result["data"]:
if folder["id"] in child_ids and folder["title"] == folder_name:
folder_id = folder["id"]
self.folder_cache[cache_key] = folder_id
self.logger.debug(f"Found existing folder: {folder_name} ({folder_id})")
return folder_id
# Create new folder
self.logger.info(f"Creating new folder: {folder_name}")
data = {"title": folder_name, "description": f"{folder_name} category folder"}
result = self.make_wrike_request("POST", f"/folders/{parent_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
folder_id = result["data"][0]["id"]
self.folder_cache[cache_key] = folder_id
self.logger.info(f"Created folder: {folder_name} ({folder_id})")
return folder_id
return None
def get_or_create_project(self, project_title, folder_id, project_details, campaign_code):
"""Get existing project or create new one"""
cache_key = f"{folder_id}:{project_title}"
if cache_key in self.project_cache:
return self.project_cache[cache_key]
# Get folder tree
result = self.make_wrike_request("GET", f"/folders/{folder_id}/folders")
if result and "data" in result:
# Find parent folder with childIds
parent_folder = None
for item in result["data"]:
if item["id"] == folder_id:
parent_folder = item
break
if parent_folder:
child_ids = parent_folder.get("childIds", [])
# Search only in direct children
for item in result["data"]:
if item["id"] in child_ids and item["title"] == project_title and "project" in item:
project_id = item["id"]
self.project_cache[cache_key] = project_id
self.logger.debug(f"Found existing project: {project_title} ({project_id})")
return project_id
# Create new project
description = project_details.get("Description", "")
if description:
description = description.replace("<p>", "").replace("</p>", "")
data = {"title": project_title, "description": description}
result = self.make_wrike_request("POST", f"/folders/{folder_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
project_id = result["data"][0]["id"]
# Convert to project with dates
start_date = self.parse_date(project_details.get("StartDate"))
end_date = self.parse_date(project_details.get("EndDate"))
project_data = {"project": {}}
if start_date:
project_data["project"]["startDate"] = start_date
if end_date:
project_data["project"]["endDate"] = end_date
# Add campaign code
if campaign_code:
project_data["customFields"] = [
{"id": Config.CUSTOM_FIELDS["omg_number"], "value": campaign_code}
]
self.make_wrike_request("PUT", f"/folders/{project_id}", project_data)
self.project_cache[cache_key] = project_id
self.logger.info(f"Created project: {project_title} ({campaign_code})")
return project_id
return None
def find_task_by_omg_number(self, project_id, omg_number):
"""Find task with matching OMG number"""
if not omg_number:
return None
result = self.make_wrike_request("GET", f"/folders/{project_id}/tasks?fields=[\"customFields\"]")
if result and "data" in result:
for task in result["data"]:
custom_fields = task.get("customFields", [])
for field in custom_fields:
if field.get("id") == Config.CUSTOM_FIELDS["omg_number"]:
if field.get("value") == omg_number:
return task["id"]
return None
def create_deliverable_task(self, project_id, job_details):
"""Create deliverable task (skip if exists)"""
task_title = job_details.get("Title", "Untitled Task")
job_number = job_details.get("Number", "")
# Check if exists
if job_number:
existing_task_id = self.find_task_by_omg_number(project_id, job_number)
if existing_task_id:
self.logger.info(f"⊙ Task already exists: {task_title} (#{job_number}) - skipping")
return existing_task_id, True # Return (task_id, skipped=True)
# Parse dates
due_date = self.parse_date(job_details.get("DueDate"))
brief_date = self.parse_date(job_details.get("BriefDate"))
# Create task
task_data = {
"title": task_title,
"description": job_details.get("Notes", ""),
}
if due_date:
task_data["dates"] = {"due": due_date}
if brief_date:
task_data["dates"]["start"] = brief_date
result = self.make_wrike_request("POST", f"/folders/{project_id}/tasks", task_data)
if result and "data" in result and len(result["data"]) > 0:
task_id = result["data"][0]["id"]
# Add custom fields
custom_fields = []
job_category = job_details.get("JobCategory", job_details.get("MediaType", ""))
if job_category:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["deliverable_category"],
"value": job_category
})
if job_number:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["omg_number"],
"value": job_number
})
notes_parts = []
if job_details.get("Type"):
notes_parts.append(f"Type: {job_details['Type']}")
if job_details.get("Details"):
notes_parts.append(job_details["Details"])
if notes_parts:
custom_fields.append({
"id": Config.CUSTOM_FIELDS["notes"],
"value": " | ".join(notes_parts)
})
if custom_fields:
self.make_wrike_request("PUT", f"/tasks/{task_id}", {"customFields": custom_fields})
self.logger.info(f"Created task: {task_title} (#{job_number})")
return task_id, False # Return (task_id, skipped=False)
return None, False
def process_json_file(self, file_path: Path, is_startup: bool = False, is_periodic: bool = False):
"""Process single JSON file"""
start_time = time.time()
try:
# Parse JSON
with open(file_path, 'r') as f:
data = json.load(f)
job_spec = data.get("JobSpecification", {})
project_details = job_spec.get("ProjectDetails", {})
job_details = job_spec.get("JobDetails", {})
# Extract folder name
business_area = project_details.get("BusinessArea", job_details.get("BusinessArea", ""))
folder_name = self.parse_business_area(business_area)
if not folder_name:
raise ValueError("No BusinessArea found")
# Get/Create folder
folder_id = self.get_or_create_folder(folder_name)
if not folder_id:
raise ValueError(f"Failed to create folder: {folder_name}")
# Get/Create project
project_title = project_details.get("Title", "Untitled Project")
campaign_code = job_details.get("CampaignCode", "")
project_id = self.get_or_create_project(project_title, folder_id, project_details, campaign_code)
if not project_id:
raise ValueError(f"Failed to create project: {project_title}")
# Create task
task_id, skipped = self.create_deliverable_task(project_id, job_details)
if not task_id:
raise ValueError("Failed to create task")
processing_time = time.time() - start_time
# Record stats
self.daily_stats.record_file_processed(
folder_name, project_title, not skipped, skipped,
processing_time, True, None, is_startup, is_periodic
)
# Move to processed
self.move_to_processed(file_path)
self.add_to_recently_processed(file_path)
return True
except Exception as e:
processing_time = time.time() - start_time
self.logger.error(f"Error processing {file_path.name}: {e}")
# Move to failed
self.move_to_failed(file_path, str(e))
# Record error
try:
folder_name = self.parse_business_area(
job_spec.get("ProjectDetails", {}).get("BusinessArea", "Unknown")
)
project_title = job_spec.get("ProjectDetails", {}).get("Title", "Unknown")
except:
folder_name = "Unknown"
project_title = "Unknown"
self.daily_stats.record_file_processed(
folder_name, project_title, False, False,
processing_time, False, str(e), is_startup, is_periodic
)
return False
def add_to_recently_processed(self, file_path: Path):
"""Add file to recently processed set"""
with self.processed_lock:
try:
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
self.recently_processed.add(file_id)
if len(self.recently_processed) > 1000:
oldest = list(self.recently_processed)[:200]
for entry in oldest:
self.recently_processed.discard(entry)
except:
pass
def was_recently_processed(self, file_path: Path) -> bool:
"""Check if file was recently processed"""
with self.processed_lock:
try:
stat_info = file_path.stat()
file_id = f"{file_path}_{stat_info.st_size}_{stat_info.st_mtime}"
return file_id in self.recently_processed
except:
return False
def move_to_processed(self, file_path: Path):
"""Move file to Processed folder"""
try:
destination = Config.PROCESSED_FOLDER / file_path.name
shutil.move(str(file_path), str(destination))
except Exception as e:
self.logger.error(f"Failed to move to processed: {e}")
def move_to_failed(self, file_path: Path, error_msg: str):
"""Move file to Failed folder with error log"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
failed_filename = f"{timestamp}_{file_path.name}"
failed_path = Config.FAILED_FOLDER / failed_filename
shutil.move(str(file_path), str(failed_path))
# Create error log
error_log_path = Config.FAILED_FOLDER / f"{timestamp}_{file_path.stem}_ERROR.txt"
with open(error_log_path, 'w') as f:
f.write(f"File: {file_path.name}\n")
f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Error: {error_msg}\n")
self.daily_stats.record_failed_file_moved()
self.logger.warning(f"Moved to failed: {file_path.name}")
except Exception as e:
self.logger.error(f"Failed to move failed file: {e}")
def scan_existing_files(self) -> List[Path]:
"""Scan for existing JSON files"""
self.logger.info("🔍 Scanning for existing JSON files...")
json_files = list(Config.HOT_FOLDER.glob("*.json"))
self.logger.info(f"Found {len(json_files)} existing files")
return json_files
def periodic_scan(self):
"""Protected periodic scan for missed files"""
while True:
try:
if self.startup_complete:
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
with self.scan_lock:
if self.scan_in_progress:
self.logger.warning("Skipping scan - already in progress")
continue
self.scan_in_progress = True
self._perform_periodic_scan()
with self.scan_lock:
self.scan_in_progress = False
else:
time.sleep(5)
except Exception as e:
self.logger.error(f"Periodic scan error: {e}")
with self.scan_lock:
self.scan_in_progress = False
time.sleep(Config.PERIODIC_SCAN_INTERVAL)
def _perform_periodic_scan(self):
"""Perform actual periodic scan"""
scan_start = time.time()
missed_files = []
try:
for json_file in Config.HOT_FOLDER.glob("*.json"):
if not self.was_recently_processed(json_file):
missed_files.append(json_file)
scan_duration = time.time() - scan_start
if missed_files:
self.logger.info(f"🔄 Periodic scan found {len(missed_files)} files ({scan_duration:.1f}s)")
for file_path in missed_files:
self.queue_file(file_path, is_periodic=True)
self.daily_stats.record_periodic_scan(scan_duration, len(missed_files))
except Exception as e:
self.logger.error(f"Periodic scan error: {e}")
def process_startup_files(self, files: List[Path]):
"""Process existing files from startup scan"""
if not files:
self.logger.info("✅ No existing files to process")
return
self.logger.info(f"🚀 Processing {len(files)} existing files...")
for i in range(0, len(files), Config.BATCH_SIZE):
batch = files[i:i + Config.BATCH_SIZE]
self.process_batch(batch, is_startup=True)
self.logger.info(f"✅ Startup processing complete")
def process_batch(self, file_paths: List[Path], is_startup: bool = False, is_periodic: bool = False):
"""Process files sequentially (one at a time)"""
if not file_paths:
return
# Process sequentially to avoid race conditions with Wrike API
for file_path in file_paths:
try:
self.process_json_file(file_path, is_startup, is_periodic)
except Exception as e:
self.logger.error(f"Error processing {file_path.name}: {e}")
def queue_file(self, file_path: Path, is_periodic: bool = False):
"""Add file to processing queue"""
if self.startup_complete:
self.file_queue.put((file_path, is_periodic))
def batch_processor_worker(self):
"""Background worker for batch processing"""
while True:
files_to_process = []
periodic_flags = []
batch_start_time = time.time()
while (len(files_to_process) < Config.BATCH_SIZE and
(time.time() - batch_start_time) < Config.BATCH_TIMEOUT):
try:
file_data = self.file_queue.get(timeout=1.0)
file_path, is_periodic = file_data if isinstance(file_data, tuple) else (file_data, False)
files_to_process.append(file_path)
periodic_flags.append(is_periodic)
self.file_queue.task_done()
except:
if files_to_process:
break
continue
if files_to_process and self.startup_complete:
is_periodic_batch = any(periodic_flags)
self.process_batch(files_to_process, is_startup=False, is_periodic=is_periodic_batch)
def cleanup_old_processed_files(self):
"""Delete files older than configured hours from Processed folder"""
try:
cutoff_time = time.time() - (Config.CLEANUP_PROCESSED_HOURS * 3600)
deleted = 0
for file in Config.PROCESSED_FOLDER.glob("*.json"):
if file.stat().st_mtime < cutoff_time:
file.unlink()
deleted += 1
if deleted > 0:
self.logger.info(f"🗑️ Cleaned up {deleted} old processed files")
except Exception as e:
self.logger.error(f"Cleanup error: {e}")
def generate_daily_report(self):
"""Generate and email daily report"""
try:
stats = self.daily_stats.get_stats_summary()
report_content = self.format_daily_report(stats)
# Save report
report_file = Config.REPORTS_DIR / f"daily_report_{stats['date']}.txt"
with open(report_file, 'w') as f:
f.write(report_content)
# Email report
self.email_daily_report(report_content, stats['date'])
# Cleanup old reports
self.cleanup_old_reports()
# Cleanup old processed files
self.cleanup_old_processed_files()
# Reset stats
self.daily_stats.reset_stats()
self.logger.info(f"Daily report generated: {report_file}")
except Exception as e:
self.logger.error(f"Failed to generate daily report: {e}")
def format_daily_report(self, stats: Dict[str, Any]) -> str:
"""Format daily statistics into readable report"""
startup_info = f"Startup Files: {stats['startup_files_processed']}\n" if stats['startup_files_processed'] > 0 else ""
periodic_info = f"Periodic Scan Files: {stats['periodic_files_found']}\n" if stats['periodic_files_found'] > 0 else ""
failed_info = f"Failed Files: {stats['failed_files_moved']}\n" if stats['failed_files_moved'] > 0 else ""
report = f"""
WRIKE IMPORT DAILY REPORT
Date: {stats['date']}
Uptime: {stats['uptime']}
Space: {Config.WRIKE_SPACE_NAME}
=== SUMMARY ===
Total Files Processed: {stats['total_processed']}
Tasks Created: {stats['total_created']}
Tasks Skipped (duplicates): {stats['total_skipped']}
{startup_info}{periodic_info}{failed_info}Errors: {stats['total_errors']}
Success Rate: {stats['success_rate']:.1f}%
Average Processing Time: {stats['avg_processing_time']}s
Periodic Scans: {stats['periodic_scans_completed']} ({stats['slow_scans']} slow)
=== FOLDER BREAKDOWN ===
"""
for folder, folder_data in sorted(stats['folder_stats'].items()):
report += f"\n{folder}:\n"
report += f" Tasks Created: {folder_data['tasks_created']}\n"
report += f" Tasks Skipped: {folder_data['tasks_skipped']}\n"
report += f" Errors: {folder_data['errors']}\n"
if folder_data['projects']:
report += f" Projects:\n"
for project, count in folder_data['projects'].most_common(10):
report += f" - {project}: {count} deliverable(s)\n"
report += "\n=== HOURLY BREAKDOWN ===\n"
for hour in range(24):
count = stats['hourly_stats'].get(hour, 0)
if count > 0:
report += f"{hour:02d}:00 - {count} files\n"
if stats['error_details']:
report += "\n=== RECENT ERRORS ===\n"
for error in stats['error_details'][-10:]:
report += f"{error['time']} - {error['folder']}/{error['project']}: {error['error']}\n"
return report
def email_daily_report(self, report_content: str, report_date: str):
"""Email the daily report"""
try:
msg = MIMEMultipart()
msg['From'] = Config.SENDER_EMAIL
msg['To'] = ', '.join(Config.REPORT_EMAILS)
msg['Subject'] = f"Wrike Import Daily Report - {Config.WRIKE_SPACE_NAME} - {report_date}"
msg.attach(MIMEText(report_content, 'plain'))
server = smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT)
server.starttls()
server.login(Config.SMTP_USER, Config.SMTP_PASSWORD)
server.sendmail(Config.SENDER_EMAIL, Config.REPORT_EMAILS, msg.as_string())
server.quit()
self.logger.info("Daily report emailed successfully")
except Exception as e:
self.logger.error(f"Failed to email report: {e}")
def cleanup_old_reports(self):
"""Remove old report files"""
try:
cutoff_date = date.today() - timedelta(days=Config.KEEP_REPORTS_DAYS)
for report_file in Config.REPORTS_DIR.glob("daily_report_*.txt"):
try:
file_date_str = report_file.stem.split('_')[-1]
file_date = datetime.strptime(file_date_str, '%Y-%m-%d').date()
if file_date < cutoff_date:
report_file.unlink()
except:
pass
except Exception as e:
self.logger.error(f"Failed to cleanup old reports: {e}")
def get_current_stats(self) -> str:
"""Get current statistics as string"""
stats = self.daily_stats.get_stats_summary()
return f"Today: {stats['total_processed']} processed ({stats['total_created']} created, {stats['total_skipped']} skipped), {stats['total_errors']} errors"
class WrikeFileHandler(FileSystemEventHandler):
"""File system event handler for Wrike monitor"""
def __init__(self, monitor: WrikeMonitor):
self.monitor = monitor
self.logger = logging.getLogger(__name__)
def on_created(self, event):
if event.is_directory or not self.monitor.startup_complete:
return
file_path = Path(event.src_path)
if file_path.suffix.lower() == '.json':
self.logger.debug(f"📥 New file detected: {file_path.name}")
self.monitor.queue_file(file_path, is_periodic=False)
def on_moved(self, event):
if event.is_directory or not self.monitor.startup_complete:
return
dest_path = Path(event.dest_path)
if dest_path.suffix.lower() == '.json':
self.logger.debug(f"📁 File moved: {dest_path.name}")
self.monitor.queue_file(dest_path, is_periodic=False)
def main():
"""Main entry point with real-time monitoring"""
monitor = WrikeMonitor()
# Step 1: Process existing files
existing_files = monitor.scan_existing_files()
monitor.process_startup_files(existing_files)
# Step 2: Start real-time monitoring
monitor.startup_complete = True
monitor.logger.info("🎯 Startup complete - switching to real-time monitoring")
# Start batch processing worker
batch_worker = threading.Thread(
target=monitor.batch_processor_worker,
name="BatchWorker",
daemon=True
)
batch_worker.start()
# Start periodic scanner
periodic_scanner = threading.Thread(
target=monitor.periodic_scan,
name="PeriodicScanner",
daemon=True
)
periodic_scanner.start()
# Setup file monitoring
event_handler = WrikeFileHandler(monitor)
observer = Observer()
observer.schedule(event_handler, str(Config.HOT_FOLDER), recursive=False)
observer.start()
monitor.logger.info(f"🛡️ Real-time monitoring active on: {Config.HOT_FOLDER}")
monitor.logger.info(f"📧 Daily reports at {Config.DAILY_REPORT_TIME} to {', '.join(Config.REPORT_EMAILS)}")
try:
while True:
time.sleep(60)
current_stats = monitor.get_current_stats()
monitor.logger.info(f"📊 {current_stats}")
except KeyboardInterrupt:
monitor.logger.info("Stopping Wrike Monitor...")
observer.stop()
observer.join()
monitor.logger.info("Wrike Monitor stopped")
if __name__ == "__main__":
main()