262 lines
9.1 KiB
Python
262 lines
9.1 KiB
Python
"""
|
|
Box.com SDK wrapper for JWT authentication and folder operations
|
|
Handles authentication, folder discovery, file download/upload
|
|
"""
|
|
|
|
from boxsdk import JWTAuth, Client
|
|
from boxsdk.exception import BoxException
|
|
import json
|
|
import os
|
|
from typing import Optional, Dict, List
|
|
|
|
|
|
class BoxClient:
|
|
"""Wrapper for Box SDK with JWT authentication"""
|
|
|
|
def __init__(self, config_path: str, as_user_id: str = ''):
|
|
"""
|
|
Initialize Box client with JWT authentication
|
|
|
|
Args:
|
|
config_path: Path to oliver_box_config.json
|
|
as_user_id: Optional Box user ID to impersonate (BOX_AS_USER_ID in .env)
|
|
"""
|
|
self.config_path = config_path
|
|
self.as_user_id = as_user_id
|
|
self.client = None
|
|
self.folder_cache = {}
|
|
|
|
def authenticate(self) -> bool:
|
|
"""
|
|
Authenticate using JWT credentials, optionally impersonating an enterprise user.
|
|
|
|
Returns:
|
|
bool: True if authentication successful
|
|
"""
|
|
try:
|
|
with open(self.config_path, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
auth = JWTAuth.from_settings_dictionary(config)
|
|
service_client = Client(auth)
|
|
|
|
if self.as_user_id:
|
|
# Impersonate the enterprise user who owns the VIDEO_OPTIMIZER folder
|
|
user = service_client.user(self.as_user_id)
|
|
self.client = service_client.as_user(user)
|
|
user_info = self.client.user().get()
|
|
print(f"✓ Authenticated as enterprise user: {user_info.name} ({user_info.login})")
|
|
else:
|
|
self.client = service_client
|
|
user_info = self.client.user().get()
|
|
print(f"✓ Authenticated as service account: {user_info.name} ({user_info.login})")
|
|
|
|
return True
|
|
|
|
except FileNotFoundError:
|
|
print(f"✗ Box config file not found: {self.config_path}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"✗ Box authentication failed: {str(e)}")
|
|
return False
|
|
|
|
def list_enterprise_users(self) -> List[Dict]:
|
|
"""
|
|
List enterprise users to help find the correct BOX_AS_USER_ID.
|
|
Run this once to identify the user who owns the VIDEO_OPTIMIZER folder.
|
|
"""
|
|
try:
|
|
print("Enterprise users:")
|
|
users = []
|
|
for user in self.client.users():
|
|
users.append({'id': user.id, 'name': user.name, 'login': user.login})
|
|
print(f" ID: {user.id} | {user.name} ({user.login})")
|
|
return users
|
|
except BoxException as e:
|
|
print(f"✗ Error listing users: {str(e)}")
|
|
return []
|
|
|
|
def discover_folders(self, video_optimizer_folder_id: str) -> Dict[str, str]:
|
|
"""
|
|
Locate IN, OUT_SUCCESS, OUT_FAILED directly inside the VIDEO_OPTIMIZER folder.
|
|
|
|
Args:
|
|
video_optimizer_folder_id: Box folder ID for VIDEO_OPTIMIZER (from env)
|
|
|
|
Returns:
|
|
Dict mapping folder names to folder IDs
|
|
e.g., {'IN': '123456', 'OUT_SUCCESS': '789012', 'OUT_FAILED': '345678'}
|
|
"""
|
|
folders = {}
|
|
target_names = ['IN', 'OUT_SUCCESS', 'OUT_FAILED']
|
|
|
|
try:
|
|
print(f"Looking for subfolders inside VIDEO_OPTIMIZER (ID: {video_optimizer_folder_id})...")
|
|
items = self.client.folder(video_optimizer_folder_id).get_items()
|
|
|
|
for item in items:
|
|
if item.type == 'folder' and item.name in target_names:
|
|
folders[item.name] = item.id
|
|
print(f"✓ Found {item.name} (ID: {item.id})")
|
|
|
|
except BoxException as e:
|
|
print(f"✗ Error accessing VIDEO_OPTIMIZER folder: {str(e)}")
|
|
print(f" Make sure the service account has been shared on the VIDEO_OPTIMIZER folder.")
|
|
|
|
self.folder_cache = folders
|
|
|
|
missing = [n for n in target_names if n not in folders]
|
|
if missing:
|
|
print(f"✗ Missing required folders inside VIDEO_OPTIMIZER: {missing}")
|
|
|
|
return folders
|
|
|
|
def download_file(self, file_id: str, destination_path: str) -> bool:
|
|
"""
|
|
Download file from Box to local path
|
|
|
|
Args:
|
|
file_id: Box file ID
|
|
destination_path: Local file path to save to
|
|
|
|
Returns:
|
|
bool: True if download successful
|
|
"""
|
|
try:
|
|
# Ensure destination directory exists
|
|
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
|
|
|
|
with open(destination_path, 'wb') as f:
|
|
self.client.file(file_id).download_to(f)
|
|
|
|
file_size = os.path.getsize(destination_path)
|
|
print(f"✓ Downloaded file {file_id} ({file_size} bytes)")
|
|
return True
|
|
|
|
except BoxException as e:
|
|
print(f"✗ Error downloading file {file_id}: {str(e)}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"✗ Unexpected error downloading file {file_id}: {str(e)}")
|
|
return False
|
|
|
|
def upload_file(self, folder_id: str, file_path: str, file_name: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Upload file to Box folder
|
|
|
|
Args:
|
|
folder_id: Target Box folder ID
|
|
file_path: Local file path to upload
|
|
file_name: Optional custom filename (defaults to original)
|
|
|
|
Returns:
|
|
str: Uploaded file ID, or None if failed
|
|
"""
|
|
try:
|
|
if not os.path.exists(file_path):
|
|
print(f"✗ File not found: {file_path}")
|
|
return None
|
|
|
|
if file_name is None:
|
|
file_name = os.path.basename(file_path)
|
|
|
|
folder = self.client.folder(folder_id)
|
|
uploaded_file = folder.upload(file_path, file_name)
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
print(f"✓ Uploaded {file_name} to folder {folder_id} ({file_size} bytes)")
|
|
return uploaded_file.id
|
|
|
|
except BoxException as e:
|
|
print(f"✗ Error uploading {file_path} to folder {folder_id}: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"✗ Unexpected error uploading {file_path}: {str(e)}")
|
|
return None
|
|
|
|
def get_file_info(self, file_id: str) -> Optional[Dict]:
|
|
"""
|
|
Get file metadata from Box
|
|
|
|
Args:
|
|
file_id: Box file ID
|
|
|
|
Returns:
|
|
Dict with file info (name, size, modified_at, etc.)
|
|
"""
|
|
try:
|
|
file = self.client.file(file_id).get()
|
|
return {
|
|
'id': file.id,
|
|
'name': file.name,
|
|
'size': file.size,
|
|
'modified_at': file.modified_at,
|
|
'created_at': file.created_at
|
|
}
|
|
except BoxException as e:
|
|
print(f"✗ Error getting file info for {file_id}: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"✗ Unexpected error getting file info for {file_id}: {str(e)}")
|
|
return None
|
|
|
|
def download_with_retry(self, file_id: str, destination_path: str, max_retries: int = 3) -> bool:
|
|
"""
|
|
Download file with exponential backoff retry
|
|
|
|
Args:
|
|
file_id: Box file ID
|
|
destination_path: Local file path to save to
|
|
max_retries: Maximum number of retry attempts
|
|
|
|
Returns:
|
|
bool: True if download successful
|
|
"""
|
|
import time
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
if self.download_file(file_id, destination_path):
|
|
return True
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt # 1s, 2s, 4s
|
|
print(f"⚠ Download attempt {attempt + 1} failed, retrying in {wait_time}s...")
|
|
time.sleep(wait_time)
|
|
else:
|
|
print(f"✗ Download failed after {max_retries} attempts")
|
|
raise e
|
|
|
|
return False
|
|
|
|
def upload_with_retry(self, folder_id: str, file_path: str, file_name: Optional[str] = None,
|
|
max_retries: int = 3) -> Optional[str]:
|
|
"""
|
|
Upload file with exponential backoff retry
|
|
|
|
Args:
|
|
folder_id: Target Box folder ID
|
|
file_path: Local file path to upload
|
|
file_name: Optional custom filename
|
|
max_retries: Maximum number of retry attempts
|
|
|
|
Returns:
|
|
str: Uploaded file ID, or None if failed
|
|
"""
|
|
import time
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
file_id = self.upload_file(folder_id, file_path, file_name)
|
|
if file_id:
|
|
return file_id
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt # 1s, 2s, 4s
|
|
print(f"⚠ Upload attempt {attempt + 1} failed, retrying in {wait_time}s...")
|
|
time.sleep(wait_time)
|
|
else:
|
|
print(f"✗ Upload failed after {max_retries} attempts")
|
|
raise e
|
|
|
|
return None
|