""" Box Client - Box.com API Integration Handles JWT authentication and Box operations Compatible with Python 3.6+ """ import json import logging from boxsdk import Client, JWTAuth logger = logging.getLogger('BoxClient') class BoxClient: def __init__(self, config, root_folder_id=None): self.config = config # Use provided folder ID or default to A1→A2 folder if root_folder_id: self.root_folder_id = root_folder_id else: self.root_folder_id = config['box'].get('root_folder_a1_a2', config['box'].get('root_folder_id', '348304357505')) # Load Box config for JWT box_config_path = config['box']['rsa_private_key_path'] try: with open(box_config_path, 'r') as f: box_config = json.load(f) # Initialize JWT authentication auth = JWTAuth.from_settings_dictionary(box_config) self.client = Client(auth) logger.info("Box client initialized with JWT auth") logger.info("Using Box root folder: {}".format(self.root_folder_id)) except Exception as e: logger.error("Failed to initialize Box client: {}".format(str(e))) raise def upload_with_tracking_id(self, file_path, campaign_id, campaign_name, tracking_id, subfolder_path=None): """ Upload file to Box with tracking ID in filename Preserves folder structure from DAM if subfolder_path provided Args: file_path: Path to local file campaign_id: Campaign ID campaign_name: Campaign name tracking_id: 6-character tracking ID subfolder_path: Optional subfolder path (e.g., "Subfolder1/Subfolder2") Returns: dict with file_id, url, folder_id """ try: import os # Create or find campaign folder campaign_folder = self._get_or_create_campaign_folder(campaign_id, campaign_name) # If subfolder path provided, create/navigate to subfolder structure if subfolder_path: target_folder = self._get_or_create_subfolder_path(campaign_folder, subfolder_path) else: target_folder = campaign_folder # Get original filename original_filename = os.path.basename(file_path) name_without_ext, ext = os.path.splitext(original_filename) # Add tracking ID to filename box_filename = "{}_{}{}".format(name_without_ext, tracking_id, ext) # Upload file uploaded_file = target_folder.upload(file_path, box_filename) # Try to set description (API may vary by SDK version) try: description = "Tracking ID: {}\nOriginal: {}".format( tracking_id, original_filename ) if subfolder_path: description += "\nDAM Path: {}".format(subfolder_path) # boxsdk 3.x API uploaded_file = uploaded_file.update_info(data={'description': description}) except Exception as e: # Description update failed - not critical, file is uploaded logger.warning("Could not set Box file description: {}".format(str(e))) logger.info("Uploaded to Box: {} → File ID: {}{}".format( box_filename, uploaded_file.id, " (in {})".format(subfolder_path) if subfolder_path else "" )) # Get folder ID safely folder_id = target_folder.object_id if hasattr(target_folder, 'object_id') else str(target_folder) return { 'file_id': uploaded_file.id, 'url': 'https://app.box.com/file/{}'.format(uploaded_file.id), 'folder_id': folder_id, 'box_filename': box_filename } except Exception as e: logger.error("Box upload failed: {}".format(str(e))) raise def upload_file(self, file_path, folder_id, target_filename=None): """ Upload arbitrary file to specific Box folder Args: file_path: Path to local file folder_id: Target Box folder ID target_filename: Optional target filename (defaults to local filename) Returns: dict with file_id, url """ try: import os folder = self.client.folder(folder_id) if not target_filename: target_filename = os.path.basename(file_path) logger.info("Uploading {} to folder {} as {}".format(file_path, folder_id, target_filename)) uploaded_file = folder.upload(file_path, target_filename) return { 'file_id': uploaded_file.id, 'url': 'https://app.box.com/file/{}'.format(uploaded_file.id) } except Exception as e: logger.error("Box generic upload failed: {}".format(str(e))) raise def move_file(self, file_id, target_folder_id): """ Move a file to a different folder Args: file_id: Box file ID to move target_folder_id: Destination folder ID Returns: Updated file object """ try: file_obj = self.client.file(file_id) target_folder = self.client.folder(target_folder_id) moved_file = file_obj.move(target_folder) logger.info("Moved file {} to folder {}".format(file_id, target_folder_id)) return moved_file except Exception as e: logger.error("Failed to move file {}: {}".format(file_id, str(e))) raise def _get_or_create_campaign_folder(self, campaign_id, campaign_name): """Get or create campaign folder in Box""" try: root_folder = self.client.folder(self.root_folder_id) # Folder name format: C000000078-Campaign_Name folder_name = "{}-{}".format(campaign_id, campaign_name.replace(' ', '_')) # Check if folder exists items = root_folder.get_items() for item in items: if item.type == 'folder' and item.name == folder_name: logger.info("Using existing Box folder: {}".format(folder_name)) return self.client.folder(item.id) # Create new folder new_folder = root_folder.create_subfolder(folder_name) logger.info("Created new Box folder: {}".format(folder_name)) return new_folder except Exception as e: logger.error("Failed to get/create Box folder: {}".format(str(e))) raise def _get_or_create_subfolder_path(self, parent_folder, subfolder_path): """ Create or navigate to subfolder path in Box, preserving DAM structure Args: parent_folder: Parent Box folder object subfolder_path: Path string (e.g., "Subfolder1/Subfolder2") Returns: Box folder object at the end of the path """ try: if not subfolder_path: return parent_folder # Split path into components path_parts = subfolder_path.split('/') current_folder = parent_folder # Navigate/create each folder in the path for folder_name in path_parts: if not folder_name: # Skip empty parts continue # Check if subfolder exists found = False items = current_folder.get_items() for item in items: if item.type == 'folder' and item.name == folder_name: current_folder = self.client.folder(item.id) found = True break # Create if doesn't exist if not found: current_folder = current_folder.create_subfolder(folder_name) logger.info("Created Box subfolder: {}".format(folder_name)) return current_folder except Exception as e: logger.error("Failed to create subfolder path '{}': {}".format(subfolder_path, str(e))) raise def get_file_metadata(self, file_id, template_name='ferrerodammetadata'): """ Get metadata from Box file using metadata template Args: file_id: Box file ID template_name: Metadata template name (default: ferrerodammetadata) Returns: dict with metadata fields or empty dict if not found """ try: file_obj = self.client.file(file_id) # Try to get metadata from template (scope is enterprise_ENTERPRISE_ID) try: metadata_dict = file_obj.metadata(scope='enterprise', template=template_name).get() logger.info("Retrieved Box metadata from template: {}".format(template_name)) # Extract CreativeX fields (camelCase field names) creativex_data = {} if 'creativexScore' in metadata_dict: creativex_data['score'] = metadata_dict['creativexScore'] logger.info("CreativeX Score: {}".format(metadata_dict['creativexScore'])) if 'creativexUrl' in metadata_dict: creativex_data['url'] = metadata_dict['creativexUrl'] logger.info("CreativeX URL: {}".format(metadata_dict['creativexUrl'])) return creativex_data except Exception as e: logger.warning("No metadata template found on file ({}): {}".format(template_name, str(e))) return {} except Exception as e: logger.error("Failed to get file metadata: {}".format(str(e))) return {} def download_file(self, file_id, output_path): """ Download file from Box Args: file_id: Box file ID output_path: Path to save file Returns: Path to downloaded file """ try: import os file_obj = self.client.file(file_id) file_info = file_obj.get() # Ensure output directory exists os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True) # Download file with open(output_path, 'wb') as f: file_obj.download_to(f) file_size = os.path.getsize(output_path) logger.info("Downloaded from Box: {} ({} bytes)".format(file_info.name, file_size)) return output_path except Exception as e: logger.error("Box download failed: {}".format(str(e))) raise def list_folder_files(self, folder_id): """ List all files in a Box folder Args: folder_id: Box folder ID Returns: List of file dictionaries """ try: folder = self.client.folder(folder_id) items = folder.get_items() files = [] for item in items: if item.type == 'file': # Get full item details (boxsdk 3.x requires explicit .get()) try: file_info = item.get() files.append({ 'id': file_info.id, 'name': file_info.name, 'size': getattr(file_info, 'size', 0), 'modified_at': getattr(file_info, 'modified_at', None), 'url': 'https://app.box.com/file/{}'.format(file_info.id) }) except Exception as e: # Fallback to basic info logger.warning("Could not get full file info for {}: {}".format(item.name, str(e))) files.append({ 'id': item.id, 'name': item.name, 'size': 0, 'modified_at': None, 'url': 'https://app.box.com/file/{}'.format(item.id) }) logger.info("Found {} files in Box folder {}".format(len(files), folder_id)) return files except Exception as e: logger.error("Failed to list Box folder: {}".format(str(e))) raise def list_folder_files_recursive(self, folder_id, parent_path='', current_depth=0): """ Recursively list all files in Box folder Skips first-level folders (job/batch identifiers) Preserves structure from 2nd level onwards for DAM upload Args: folder_id: Box folder ID to scan parent_path: Internal - tracks subfolder path (excludes 1st level) current_depth: Internal - tracks depth (0=root, 1=job folders, 2+=preserve) Returns: List of dicts with 'subfolder_path' key - subfolder_path will be None for root/job-level files - subfolder_path will be "Europe/Germany" for nested files Example: DAM-UPLOAD/1234567/Europe/Germany/file.mp4 -> subfolder_path = "Europe/Germany" """ try: files = [] folder = self.client.folder(folder_id) items = folder.get_items() for item in items: item_name = item.name # Skip hidden/system folders (start with . or _) if item_name.startswith('.') or item_name.startswith('_'): logger.debug("Skipping hidden/system: {}".format(item_name)) continue if item.type == 'file': # Add file with current subfolder path try: file_info = item.get() files.append({ 'id': file_info.id, 'name': file_info.name, 'size': getattr(file_info, 'size', 0), 'subfolder_path': parent_path if parent_path else None, 'modified_at': getattr(file_info, 'modified_at', None), 'url': 'https://app.box.com/file/{}'.format(file_info.id) }) if current_depth == 1: logger.debug("File at job level: {} (will go to DAM root)".format(file_info.name)) elif parent_path: logger.debug("File in subfolder: {} -> {}".format(file_info.name, parent_path)) else: logger.debug("File at root: {}".format(file_info.name)) except Exception as e: logger.warning("Could not get file info for {}: {}".format(item_name, str(e))) files.append({ 'id': item.id, 'name': item_name, 'size': 0, 'subfolder_path': parent_path if parent_path else None, 'url': 'https://app.box.com/file/{}'.format(item.id) }) elif item.type == 'folder': subfolder_name = item.name if current_depth == 0: # Depth 0 = Root of DAM-UPLOAD # Next level (depth 1) will be job folders - don't add to path yet logger.info("Scanning job/batch folder: {}".format(subfolder_name)) subfolder_files = self.list_folder_files_recursive( item.id, parent_path='', # Don't start path yet current_depth=1 ) files.extend(subfolder_files) elif current_depth == 1: # Depth 1 = Inside job folder (e.g., 1234567/) # Start building path from here logger.info("Scanning subfolder: {}".format(subfolder_name)) subfolder_files = self.list_folder_files_recursive( item.id, parent_path=subfolder_name, # Start path here current_depth=2 ) files.extend(subfolder_files) else: # Depth 2+ = Deeper subfolders (e.g., Europe/Germany/) # Append to existing path new_path = '{}/{}'.format(parent_path, subfolder_name) if parent_path else subfolder_name logger.info("Scanning nested subfolder: {}".format(new_path)) subfolder_files = self.list_folder_files_recursive( item.id, parent_path=new_path, current_depth=current_depth + 1 ) files.extend(subfolder_files) return files except Exception as e: logger.error("Recursive folder scan failed at depth {}: {}".format(current_depth, str(e))) raise def test_connection(self): """Test Box connection""" try: user = self.client.user().get() logger.info("Box connection OK - User: {}".format(user.name)) return True except Exception as e: logger.error("Box connection failed: {}".format(str(e))) return False