ferrero-opentext/Python-Version/scripts/shared/box_client.py

467 lines
18 KiB
Python

"""
Box Client - Box.com API Integration
Handles JWT authentication and Box operations
Compatible with Python 3.6+
"""
import json
import logging
from boxsdk import Client, JWTAuth
logger = logging.getLogger('BoxClient')
class BoxClient:
def __init__(self, config, root_folder_id=None):
self.config = config
# Use provided folder ID or default to A1→A2 folder
if root_folder_id:
self.root_folder_id = root_folder_id
else:
self.root_folder_id = config['box'].get('root_folder_a1_a2',
config['box'].get('root_folder_id', '348304357505'))
# Load Box config for JWT
box_config_path = config['box']['rsa_private_key_path']
try:
with open(box_config_path, 'r') as f:
box_config = json.load(f)
# Initialize JWT authentication
auth = JWTAuth.from_settings_dictionary(box_config)
self.client = Client(auth)
logger.info("Box client initialized with JWT auth")
logger.info("Using Box root folder: {}".format(self.root_folder_id))
except Exception as e:
logger.error("Failed to initialize Box client: {}".format(str(e)))
raise
def upload_with_tracking_id(self, file_path, campaign_id, campaign_name, tracking_id, subfolder_path=None):
"""
Upload file to Box with tracking ID in filename
Preserves folder structure from DAM if subfolder_path provided
Args:
file_path: Path to local file
campaign_id: Campaign ID
campaign_name: Campaign name
tracking_id: 6-character tracking ID
subfolder_path: Optional subfolder path (e.g., "Subfolder1/Subfolder2")
Returns:
dict with file_id, url, folder_id
"""
try:
import os
# Create or find campaign folder
campaign_folder = self._get_or_create_campaign_folder(campaign_id, campaign_name)
# If subfolder path provided, create/navigate to subfolder structure
if subfolder_path:
target_folder = self._get_or_create_subfolder_path(campaign_folder, subfolder_path)
else:
target_folder = campaign_folder
# Get original filename
original_filename = os.path.basename(file_path)
name_without_ext, ext = os.path.splitext(original_filename)
# Add tracking ID to filename
box_filename = "{}_{}{}".format(name_without_ext, tracking_id, ext)
# Upload file
uploaded_file = target_folder.upload(file_path, box_filename)
# Try to set description (API may vary by SDK version)
try:
description = "Tracking ID: {}\nOriginal: {}".format(
tracking_id, original_filename
)
if subfolder_path:
description += "\nDAM Path: {}".format(subfolder_path)
# boxsdk 3.x API
uploaded_file = uploaded_file.update_info(data={'description': description})
except Exception as e:
# Description update failed - not critical, file is uploaded
logger.warning("Could not set Box file description: {}".format(str(e)))
logger.info("Uploaded to Box: {} → File ID: {}{}".format(
box_filename, uploaded_file.id,
" (in {})".format(subfolder_path) if subfolder_path else ""
))
# Get folder ID safely
folder_id = target_folder.object_id if hasattr(target_folder, 'object_id') else str(target_folder)
return {
'file_id': uploaded_file.id,
'url': 'https://app.box.com/file/{}'.format(uploaded_file.id),
'folder_id': folder_id,
'box_filename': box_filename
}
except Exception as e:
logger.error("Box upload failed: {}".format(str(e)))
raise
def upload_file(self, file_path, folder_id, target_filename=None):
"""
Upload arbitrary file to specific Box folder
Args:
file_path: Path to local file
folder_id: Target Box folder ID
target_filename: Optional target filename (defaults to local filename)
Returns:
dict with file_id, url
"""
try:
import os
folder = self.client.folder(folder_id)
if not target_filename:
target_filename = os.path.basename(file_path)
logger.info("Uploading {} to folder {} as {}".format(file_path, folder_id, target_filename))
uploaded_file = folder.upload(file_path, target_filename)
return {
'file_id': uploaded_file.id,
'url': 'https://app.box.com/file/{}'.format(uploaded_file.id)
}
except Exception as e:
logger.error("Box generic upload failed: {}".format(str(e)))
raise
def move_file(self, file_id, target_folder_id):
"""
Move a file to a different folder
Args:
file_id: Box file ID to move
target_folder_id: Destination folder ID
Returns:
Updated file object
"""
try:
file_obj = self.client.file(file_id)
target_folder = self.client.folder(target_folder_id)
moved_file = file_obj.move(target_folder)
logger.info("Moved file {} to folder {}".format(file_id, target_folder_id))
return moved_file
except Exception as e:
logger.error("Failed to move file {}: {}".format(file_id, str(e)))
raise
def _get_or_create_campaign_folder(self, campaign_id, campaign_name):
"""Get or create campaign folder in Box"""
try:
root_folder = self.client.folder(self.root_folder_id)
# Folder name format: C000000078-Campaign_Name
folder_name = "{}-{}".format(campaign_id, campaign_name.replace(' ', '_'))
# Check if folder exists
items = root_folder.get_items()
for item in items:
if item.type == 'folder' and item.name == folder_name:
logger.info("Using existing Box folder: {}".format(folder_name))
return self.client.folder(item.id)
# Create new folder
new_folder = root_folder.create_subfolder(folder_name)
logger.info("Created new Box folder: {}".format(folder_name))
return new_folder
except Exception as e:
logger.error("Failed to get/create Box folder: {}".format(str(e)))
raise
def _get_or_create_subfolder_path(self, parent_folder, subfolder_path):
"""
Create or navigate to subfolder path in Box, preserving DAM structure
Args:
parent_folder: Parent Box folder object
subfolder_path: Path string (e.g., "Subfolder1/Subfolder2")
Returns:
Box folder object at the end of the path
"""
try:
if not subfolder_path:
return parent_folder
# Split path into components
path_parts = subfolder_path.split('/')
current_folder = parent_folder
# Navigate/create each folder in the path
for folder_name in path_parts:
if not folder_name: # Skip empty parts
continue
# Check if subfolder exists
found = False
items = current_folder.get_items()
for item in items:
if item.type == 'folder' and item.name == folder_name:
current_folder = self.client.folder(item.id)
found = True
break
# Create if doesn't exist
if not found:
current_folder = current_folder.create_subfolder(folder_name)
logger.info("Created Box subfolder: {}".format(folder_name))
return current_folder
except Exception as e:
logger.error("Failed to create subfolder path '{}': {}".format(subfolder_path, str(e)))
raise
def get_file_metadata(self, file_id, template_name='ferrerodammetadata'):
"""
Get metadata from Box file using metadata template
Args:
file_id: Box file ID
template_name: Metadata template name (default: ferrerodammetadata)
Returns:
dict with metadata fields or empty dict if not found
"""
try:
file_obj = self.client.file(file_id)
# Try to get metadata from template (scope is enterprise_ENTERPRISE_ID)
try:
metadata_dict = file_obj.metadata(scope='enterprise', template=template_name).get()
logger.info("Retrieved Box metadata from template: {}".format(template_name))
# Extract CreativeX fields (camelCase field names)
creativex_data = {}
if 'creativexScore' in metadata_dict:
creativex_data['score'] = metadata_dict['creativexScore']
logger.info("CreativeX Score: {}".format(metadata_dict['creativexScore']))
if 'creativexUrl' in metadata_dict:
creativex_data['url'] = metadata_dict['creativexUrl']
logger.info("CreativeX URL: {}".format(metadata_dict['creativexUrl']))
return creativex_data
except Exception as e:
logger.warning("No metadata template found on file ({}): {}".format(template_name, str(e)))
return {}
except Exception as e:
logger.error("Failed to get file metadata: {}".format(str(e)))
return {}
def download_file(self, file_id, output_path):
"""
Download file from Box
Args:
file_id: Box file ID
output_path: Path to save file
Returns:
Path to downloaded file
"""
try:
import os
file_obj = self.client.file(file_id)
file_info = file_obj.get()
# Ensure output directory exists
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
# Download file
with open(output_path, 'wb') as f:
file_obj.download_to(f)
file_size = os.path.getsize(output_path)
logger.info("Downloaded from Box: {} ({} bytes)".format(file_info.name, file_size))
return output_path
except Exception as e:
logger.error("Box download failed: {}".format(str(e)))
raise
def list_folder_files(self, folder_id):
"""
List all files in a Box folder
Args:
folder_id: Box folder ID
Returns:
List of file dictionaries
"""
try:
folder = self.client.folder(folder_id)
items = folder.get_items()
files = []
for item in items:
if item.type == 'file':
# Get full item details (boxsdk 3.x requires explicit .get())
try:
file_info = item.get()
files.append({
'id': file_info.id,
'name': file_info.name,
'size': getattr(file_info, 'size', 0),
'modified_at': getattr(file_info, 'modified_at', None),
'url': 'https://app.box.com/file/{}'.format(file_info.id)
})
except Exception as e:
# Fallback to basic info
logger.warning("Could not get full file info for {}: {}".format(item.name, str(e)))
files.append({
'id': item.id,
'name': item.name,
'size': 0,
'modified_at': None,
'url': 'https://app.box.com/file/{}'.format(item.id)
})
logger.info("Found {} files in Box folder {}".format(len(files), folder_id))
return files
except Exception as e:
logger.error("Failed to list Box folder: {}".format(str(e)))
raise
def list_folder_files_recursive(self, folder_id, parent_path='', current_depth=0):
"""
Recursively list all files in Box folder
Skips first-level folders (job/batch identifiers)
Preserves structure from 2nd level onwards for DAM upload
Args:
folder_id: Box folder ID to scan
parent_path: Internal - tracks subfolder path (excludes 1st level)
current_depth: Internal - tracks depth (0=root, 1=job folders, 2+=preserve)
Returns:
List of dicts with 'subfolder_path' key
- subfolder_path will be None for root/job-level files
- subfolder_path will be "Europe/Germany" for nested files
Example:
DAM-UPLOAD/1234567/Europe/Germany/file.mp4
-> subfolder_path = "Europe/Germany"
"""
try:
files = []
folder = self.client.folder(folder_id)
items = folder.get_items()
for item in items:
item_name = item.name
# Skip hidden/system folders (start with . or _)
if item_name.startswith('.') or item_name.startswith('_'):
logger.debug("Skipping hidden/system: {}".format(item_name))
continue
if item.type == 'file':
# Add file with current subfolder path
try:
file_info = item.get()
files.append({
'id': file_info.id,
'name': file_info.name,
'size': getattr(file_info, 'size', 0),
'subfolder_path': parent_path if parent_path else None,
'modified_at': getattr(file_info, 'modified_at', None),
'url': 'https://app.box.com/file/{}'.format(file_info.id)
})
if current_depth == 1:
logger.debug("File at job level: {} (will go to DAM root)".format(file_info.name))
elif parent_path:
logger.debug("File in subfolder: {} -> {}".format(file_info.name, parent_path))
else:
logger.debug("File at root: {}".format(file_info.name))
except Exception as e:
logger.warning("Could not get file info for {}: {}".format(item_name, str(e)))
files.append({
'id': item.id,
'name': item_name,
'size': 0,
'subfolder_path': parent_path if parent_path else None,
'url': 'https://app.box.com/file/{}'.format(item.id)
})
elif item.type == 'folder':
subfolder_name = item.name
if current_depth == 0:
# Depth 0 = Root of DAM-UPLOAD
# Next level (depth 1) will be job folders - don't add to path yet
logger.info("Scanning job/batch folder: {}".format(subfolder_name))
subfolder_files = self.list_folder_files_recursive(
item.id,
parent_path='', # Don't start path yet
current_depth=1
)
files.extend(subfolder_files)
elif current_depth == 1:
# Depth 1 = Inside job folder (e.g., 1234567/)
# Start building path from here
logger.info("Scanning subfolder: {}".format(subfolder_name))
subfolder_files = self.list_folder_files_recursive(
item.id,
parent_path=subfolder_name, # Start path here
current_depth=2
)
files.extend(subfolder_files)
else:
# Depth 2+ = Deeper subfolders (e.g., Europe/Germany/)
# Append to existing path
new_path = '{}/{}'.format(parent_path, subfolder_name) if parent_path else subfolder_name
logger.info("Scanning nested subfolder: {}".format(new_path))
subfolder_files = self.list_folder_files_recursive(
item.id,
parent_path=new_path,
current_depth=current_depth + 1
)
files.extend(subfolder_files)
return files
except Exception as e:
logger.error("Recursive folder scan failed at depth {}: {}".format(current_depth, str(e)))
raise
def test_connection(self):
"""Test Box connection"""
try:
user = self.client.user().get()
logger.info("Box connection OK - User: {}".format(user.name))
return True
except Exception as e:
logger.error("Box connection failed: {}".format(str(e)))
return False