bissell-wrike-python/wrike_import.py
Dave Porter ad0c512012 Initial commit: Wrike import script v1.2
- Automatic folder/project/task creation from JSON files
- Duplicate detection via OMG numbers
- Custom field mapping
- File management (Processed folder + 24hr cleanup)
- Production ready

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-10 12:52:08 -04:00

488 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Wrike Import Script
Processes JSON files to create folder structures, projects, and deliverable tasks in Wrike.
"""
import json
import os
import sys
import requests
import shutil
import time
from datetime import datetime, timedelta
from pathlib import Path
# Configuration
WRIKE_API_BASE = "https://www.wrike.com/api/v4"
WRIKE_TOKEN = "eyJ0dCI6InAiLCJhbGciOiJIUzI1NiIsInR2IjoiMiJ9.eyJkIjoie1wiYVwiOjY5NzM0OTgsXCJpXCI6OTUyOTY3MCxcImNcIjo0Njk5NTI3LFwidVwiOjIzMjcyNDA0LFwiclwiOlwiVVNcIixcInNcIjpbXCJXXCIsXCJGXCIsXCJJXCIsXCJVXCIsXCJLXCIsXCJDXCIsXCJEXCIsXCJNXCIsXCJBXCIsXCJMXCIsXCJQXCJdLFwielwiOltdLFwidFwiOjB9IiwiaWF0IjoxNzYwMDI4ODIzfQ.9f4t15LycpoH-NlzQC3s1K19fVqnAwcahG2D-J5E8dg"
STAGING_SPACE_ID = "MQAAAABpz7l_"
# Custom field IDs in Staging space
CUSTOM_FIELDS = {
"budget": "IEAGU2B2JUAJRZ7P",
"impact": "IEAGU2B2JUAJRZ7Q",
"notes": "IEAGU2B2JUAJRZ7R",
"rag": "IEAGU2B2JUAJRZ7S",
"deliverable_category": "IEAGU2B2JUAJRZ7T",
"actions": "IEAGU2B2JUAJRZ7W",
"shoot_date": "IEAGU2B2JUAJRZ7X",
"omg_number": "IEAGU2B2JUAJRZ7Y",
"box_link": "IEAGU2B2JUAJRZ7Z",
"owner": "IEAGU2B2JUAJRZ72"
}
# Cache for folder/project IDs
folder_cache = {}
project_cache = {}
def make_wrike_request(method, endpoint, data=None):
"""Make a request to the Wrike API."""
url = f"{WRIKE_API_BASE}{endpoint}"
headers = {
"Authorization": f"Bearer {WRIKE_TOKEN}",
"Content-Type": "application/json"
}
try:
if method == "GET":
response = requests.get(url, headers=headers)
elif method == "POST":
response = requests.post(url, headers=headers, json=data)
elif method == "PUT":
response = requests.put(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error making Wrike request: {e}")
if hasattr(e.response, 'text'):
print(f"Response: {e.response.text}")
return None
def parse_business_area(business_area):
"""Extract the folder name from BusinessArea string.
Example: 'BISSELL > PRODUCT MARKETING > Dry Specialty' -> 'Dry Specialty'
"""
parts = business_area.split(">")
if len(parts) > 0:
return parts[-1].strip()
return business_area.strip()
def parse_date(date_string):
"""Convert date string to YYYY-MM-DD format."""
if not date_string:
return None
try:
# Parse format like "2025-05-23 16:00:00+00"
dt = datetime.strptime(date_string.split('+')[0].strip(), "%Y-%m-%d %H:%M:%S")
return dt.strftime("%Y-%m-%d")
except:
return None
def get_or_create_folder(folder_name, parent_id=STAGING_SPACE_ID):
"""Get existing folder or create new one."""
# Check cache
cache_key = f"{parent_id}:{folder_name}"
if cache_key in folder_cache:
print(f" Found folder '{folder_name}' in cache: {folder_cache[cache_key]}")
return folder_cache[cache_key]
# Get folders in parent
result = make_wrike_request("GET", f"/folders/{parent_id}/folders")
if result and "data" in result:
for folder in result["data"]:
if folder["title"] == folder_name:
folder_id = folder["id"]
folder_cache[cache_key] = folder_id
print(f" Found existing folder '{folder_name}': {folder_id}")
return folder_id
# Create new folder
print(f" Creating new folder '{folder_name}' under {parent_id}")
data = {
"title": folder_name,
"description": f"{folder_name} category folder"
}
result = make_wrike_request("POST", f"/folders/{parent_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
folder_id = result["data"][0]["id"]
folder_cache[cache_key] = folder_id
print(f" Created folder '{folder_name}': {folder_id}")
return folder_id
return None
def get_or_create_project(project_title, folder_id, project_details, campaign_code):
"""Get existing project or create new one."""
# Check cache
cache_key = f"{folder_id}:{project_title}"
if cache_key in project_cache:
print(f" Found project '{project_title}' in cache: {project_cache[cache_key]}")
return project_cache[cache_key]
# Get folders in parent
result = make_wrike_request("GET", f"/folders/{folder_id}/folders")
if result and "data" in result:
for item in result["data"]:
if item["title"] == project_title and "project" in item:
project_id = item["id"]
project_cache[cache_key] = project_id
print(f" Found existing project '{project_title}': {project_id}")
return project_id
# Create new project (as folder first, then convert)
print(f" Creating new project '{project_title}' under folder {folder_id}")
# Extract description (remove HTML tags)
description = project_details.get("Description", "")
if description:
description = description.replace("<p>", "").replace("</p>", "")
data = {
"title": project_title,
"description": description
}
result = make_wrike_request("POST", f"/folders/{folder_id}/folders", data)
if result and "data" in result and len(result["data"]) > 0:
project_id = result["data"][0]["id"]
# Convert to project with dates and custom fields
start_date = parse_date(project_details.get("StartDate"))
end_date = parse_date(project_details.get("EndDate"))
project_data = {"project": {}}
if start_date:
project_data["project"]["startDate"] = start_date
if end_date:
project_data["project"]["endDate"] = end_date
# Add campaign code as OMG # custom field for the project
if campaign_code:
project_data["customFields"] = [
{
"id": CUSTOM_FIELDS["omg_number"],
"value": campaign_code
}
]
result = make_wrike_request("PUT", f"/folders/{project_id}", project_data)
if result:
project_cache[cache_key] = project_id
print(f" Created project '{project_title}': {project_id} (Campaign Code: {campaign_code})")
return project_id
return None
def find_task_by_omg_number(project_id, omg_number):
"""Find a task in the project with the matching OMG number."""
if not omg_number:
return None
# Get all tasks in the project with custom fields
# IMPORTANT: Must add fields parameter to get custom field data
result = make_wrike_request("GET", f"/folders/{project_id}/tasks?fields=[\"customFields\"]")
if result and "data" in result:
for task in result["data"]:
# Check custom fields for matching OMG number
custom_fields = task.get("customFields", [])
for field in custom_fields:
if field.get("id") == CUSTOM_FIELDS["omg_number"]:
if field.get("value") == omg_number:
return task["id"]
return None
def create_or_update_deliverable_task(project_id, job_details):
"""Create or update a deliverable task in the project."""
task_title = job_details.get("Title", "Untitled Task")
job_number = job_details.get("Number", "")
# OMG number for deliverable is just the job number
omg_number = job_number
# Check if task already exists with this OMG number
existing_task_id = None
if omg_number:
print(f" Checking for existing task with OMG #: {omg_number}")
existing_task_id = find_task_by_omg_number(project_id, omg_number)
if existing_task_id:
print(f" Found existing task: {existing_task_id}")
print(f" ⊙ Task '{task_title}' already exists (Job #{job_number}) - skipping")
return existing_task_id # Return existing ID to mark as success
else:
print(f" Creating new task '{task_title}' (Job #{job_number})")
# Parse dates
due_date = parse_date(job_details.get("DueDate"))
brief_date = parse_date(job_details.get("BriefDate"))
# Build task data
task_data = {
"title": task_title,
"description": job_details.get("Notes", ""),
}
# Add dates
if due_date:
task_data["dates"] = {"due": due_date}
if brief_date:
task_data["dates"]["start"] = brief_date
# Add custom fields
custom_fields = []
# Deliverable Category (from JobCategory or MediaType)
job_category = job_details.get("JobCategory", job_details.get("MediaType", ""))
if job_category:
custom_fields.append({
"id": CUSTOM_FIELDS["deliverable_category"],
"value": job_category
})
# OMG Number (Job Number only)
if omg_number:
custom_fields.append({
"id": CUSTOM_FIELDS["omg_number"],
"value": omg_number
})
# Notes (combine type and details)
notes_parts = []
if job_details.get("Type"):
notes_parts.append(f"Type: {job_details['Type']}")
if job_details.get("Details"):
notes_parts.append(job_details["Details"])
if notes_parts:
custom_fields.append({
"id": CUSTOM_FIELDS["notes"],
"value": " | ".join(notes_parts)
})
if custom_fields:
task_data["customFields"] = custom_fields
# Create new task (without custom fields first)
basic_task_data = {
"title": task_title,
"description": job_details.get("Notes", ""),
}
# Add dates
if due_date:
basic_task_data["dates"] = {"due": due_date}
if brief_date:
basic_task_data["dates"]["start"] = brief_date
result = make_wrike_request("POST", f"/folders/{project_id}/tasks", basic_task_data)
if result and "data" in result and len(result["data"]) > 0:
task_id = result["data"][0]["id"]
print(f" ✓ Created task '{task_title}': {task_id}")
# Now update with custom fields
if custom_fields:
print(f" Adding custom fields to task...")
update_data = {"customFields": custom_fields}
update_result = make_wrike_request("PUT", f"/tasks/{task_id}", update_data)
if update_result:
print(f" ✓ Custom fields added")
else:
print(f" ⚠ Warning: Failed to add custom fields")
return task_id
else:
print(f" ✗ Failed to create task '{task_title}'")
return None
def process_json_file(json_file_path):
"""Process a single JSON file and create Wrike structure."""
print(f"\n{'='*80}")
print(f"Processing: {json_file_path.name}")
print(f"{'='*80}")
try:
with open(json_file_path, 'r') as f:
data = json.load(f)
except Exception as e:
print(f"Error reading JSON file: {e}")
return False
# Extract data
job_spec = data.get("JobSpecification", {})
project_details = job_spec.get("ProjectDetails", {})
job_details = job_spec.get("JobDetails", {})
# 1. Get/Create top-level folder from BusinessArea
business_area = project_details.get("BusinessArea", job_details.get("BusinessArea", ""))
folder_name = parse_business_area(business_area)
if not folder_name:
print(" ✗ No BusinessArea found, skipping")
return False
print(f"\n1. Processing folder: '{folder_name}'")
folder_id = get_or_create_folder(folder_name)
if not folder_id:
print(f" ✗ Failed to get/create folder '{folder_name}'")
return False
# 2. Get/Create project
project_title = project_details.get("Title", "Untitled Project")
campaign_code = job_details.get("CampaignCode", "")
print(f"\n2. Processing project: '{project_title}'")
if campaign_code:
print(f" Campaign Code: {campaign_code}")
project_id = get_or_create_project(project_title, folder_id, project_details, campaign_code)
if not project_id:
print(f" ✗ Failed to get/create project '{project_title}'")
return False
# 3. Create or update deliverable task
print(f"\n3. Processing deliverable task")
task_id = create_or_update_deliverable_task(project_id, job_details)
if task_id:
# Check if task was skipped (already existed)
job_number = job_details.get("Number", "")
existing_task = find_task_by_omg_number(project_id, job_number) if job_number else None
if existing_task and existing_task == task_id:
# Task already existed, was skipped
print(f"\n⊙ Successfully processed {json_file_path.name} (task already exists)")
return "skipped"
else:
# New task was created
print(f"\n✓ Successfully processed {json_file_path.name}")
return True
else:
print(f"\n✗ Failed to create deliverable task for {json_file_path.name}")
return False
def move_to_processed(json_file, json_dir):
"""Move processed JSON file to Processed subfolder."""
processed_dir = json_dir / "Processed"
processed_dir.mkdir(exist_ok=True)
destination = processed_dir / json_file.name
try:
shutil.move(str(json_file), str(destination))
print(f" → Moved to: Processed/{json_file.name}")
return True
except Exception as e:
print(f" ✗ Failed to move file: {e}")
return False
def cleanup_old_files(json_dir, hours=24):
"""Delete files in Processed folder older than specified hours."""
processed_dir = json_dir / "Processed"
if not processed_dir.exists():
return 0
cutoff_time = time.time() - (hours * 3600)
deleted_count = 0
for file in processed_dir.glob("*.json"):
try:
file_age = file.stat().st_mtime
if file_age < cutoff_time:
file.unlink()
deleted_count += 1
print(f" Deleted old file: {file.name}")
except Exception as e:
print(f" Warning: Could not delete {file.name}: {e}")
return deleted_count
def main():
"""Main function to process all JSON files in a directory."""
if len(sys.argv) < 2:
print("Usage: python wrike_import.py <json_directory>")
print("\nExample: python wrike_import.py ./json_files/")
sys.exit(1)
json_dir = Path(sys.argv[1])
if not json_dir.exists() or not json_dir.is_dir():
print(f"Error: Directory '{json_dir}' does not exist")
sys.exit(1)
# Find all JSON files (exclude Processed subfolder)
json_files = [f for f in json_dir.glob("*.json") if "Processed" not in str(f)]
if not json_files:
print(f"No JSON files found in '{json_dir}'")
sys.exit(1)
print(f"\nFound {len(json_files)} JSON file(s) to process")
print(f"Target: Wrike Staging Space ({STAGING_SPACE_ID})")
# Process each file
success_count = 0
failed_count = 0
moved_count = 0
skipped_count = 0
for json_file in json_files:
try:
result = process_json_file(json_file)
if result == "skipped":
skipped_count += 1
success_count += 1 # Still count as success for file movement
# Move skipped file to processed
if move_to_processed(json_file, json_dir):
moved_count += 1
elif result:
success_count += 1
# Move successfully processed file
if move_to_processed(json_file, json_dir):
moved_count += 1
else:
failed_count += 1
except Exception as e:
print(f"\n✗ Unexpected error processing {json_file.name}: {e}")
failed_count += 1
# Cleanup old files in Processed folder
print(f"\n{'='*80}")
print(f"CLEANUP")
print(f"{'='*80}")
deleted_count = cleanup_old_files(json_dir, hours=24)
if deleted_count > 0:
print(f"Deleted {deleted_count} file(s) older than 24 hours from Processed folder")
else:
print("No old files to delete")
# Summary
print(f"\n{'='*80}")
print(f"SUMMARY")
print(f"{'='*80}")
print(f"Total files: {len(json_files)}")
print(f"Successful: {success_count}")
print(f"Skipped (already exists): {skipped_count}")
print(f"Failed: {failed_count}")
print(f"Moved to Processed: {moved_count}")
print(f"\nFolders created/found: {len(folder_cache)}")
print(f"Projects created/found: {len(project_cache)}")
if __name__ == "__main__":
main()