""" DAM Client - OpenText DAM API Integration Handles OAuth2 and mTLS certificate authentication Compatible with Python 3.6+ """ import requests import time import logging import os import urllib3 from contextlib import contextmanager from pathlib import Path from tempfile import NamedTemporaryFile from .common import sanitize_box_item_name # Disable SSL warnings when verify=False urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logger = logging.getLogger('DAMClient') @contextmanager def pfx_to_pem(pfx_path, pfx_password): """ Convert PFX certificate to temporary PEM file for requests library Args: pfx_path: Path to PFX/P12 certificate file pfx_password: Certificate password Yields: str: Path to temporary PEM file """ try: from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates # Load PFX file pfx = Path(pfx_path).read_bytes() private_key, main_cert, add_certs = load_key_and_certificates( pfx, pfx_password.encode('utf-8') if pfx_password else None, None ) # Create temporary PEM file with NamedTemporaryFile(suffix='.pem', delete=False) as t_pem: with open(t_pem.name, 'wb') as pem_file: # Write private key pem_file.write(private_key.private_bytes( Encoding.PEM, PrivateFormat.PKCS8, NoEncryption() )) # Write main certificate pem_file.write(main_cert.public_bytes(Encoding.PEM)) # Write additional certificates in chain for cert in add_certs: pem_file.write(cert.public_bytes(Encoding.PEM)) yield t_pem.name # Cleanup temporary file try: os.unlink(t_pem.name) except Exception: pass except Exception as e: logger.error("Failed to convert PFX to PEM: {}".format(str(e))) raise class DAMClient: def __init__(self, config, use_mtls=False, auth_mode='oauth'): """ Initialize DAM Client Args: config: Configuration dict use_mtls: Legacy boolean flag (deprecated, use auth_mode) auth_mode: Authentication mode ('oauth', 'mtls', 'mtls_v2') 'oauth': Standard OAuth2 (default) 'mtls': Legacy mTLS via APIM 'mtls_v2': Hybrid mTLS -> OAuth token -> Direct DAM """ self.timeout = config['dam']['timeout_seconds'] # Handle legacy flag if use_mtls: self.auth_mode = 'mtls' else: self.auth_mode = auth_mode if self.auth_mode == 'mtls': # Legacy mTLS - Uses APIM URL self.base_url = config['dam'].get('mtls_base_url', '').rstrip('/') self.mtls_cert_path = config['dam'].get('mtls_cert_path') self.mtls_cert_password = config['dam'].get('mtls_cert_password') if not self.base_url: raise Exception("mTLS base URL not configured (DAM_MTLS_BASE_URL)") if not self.mtls_cert_path or not os.path.exists(self.mtls_cert_path): raise Exception("mTLS cert path not found: {}".format(self.mtls_cert_path)) self.session = None self.session_token = None logger.info("🔒 Using mTLS authentication (Legacy APIM)") elif self.auth_mode == 'mtls_v2': # New mTLS V2 - Uses Direct DAM URL + OAuth via Cert self.base_url = config['dam']['base_url'].rstrip('/') # Direct URL self.mtls_oauth_url = config['dam'].get('mtls_oauth_url') self.mtls_cert_path = config['dam'].get('mtls_cert_path') self.mtls_cert_password = config['dam'].get('mtls_cert_password') if not self.mtls_oauth_url: raise Exception("mTLS OAuth URL not configured (DAM_MTLS_OAUTH_URL)") if not self.mtls_cert_path or not os.path.exists(self.mtls_cert_path): raise Exception("mTLS cert path not found: {}".format(self.mtls_cert_path)) self.access_token = None self.token_expiry = 0 logger.info("🔐 Using mTLS V2 authentication (Hybrid)") logger.info("OAuth Endpoint: {}".format(self.mtls_oauth_url)) else: # Standard OAuth2 self.base_url = config['dam']['base_url'].rstrip('/') self.auth_url = config['dam']['auth_url'] self.client_id = config['dam']['client_id'] self.client_secret = config['dam']['client_secret'] self.access_token = None self.token_expiry = 0 logger.info("🔑 Using OAuth2 authentication") logger.info("Base URL: {}".format(self.base_url)) def get_access_token(self): """Get OAuth2 access token with auto-refresh""" if self.access_token and time.time() < self.token_expiry: return self.access_token if self.auth_mode == 'mtls_v2': return self._get_oauth_token_via_mtls() else: return self._request_new_token() def _get_oauth_token_via_mtls(self): """Request OAuth token using Client Certificate (mTLS V2)""" try: logger.debug("Requesting OAuth token via mTLS from: {}".format(self.mtls_oauth_url)) with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert: # Use requests with cert to call OAuth endpoint response = requests.post( self.mtls_oauth_url, cert=cert, verify=True, timeout=30 ) logger.debug("OAuth response: HTTP {}".format(response.status_code)) if response.status_code == 200: data = response.json() self.access_token = data['access_token'] expires_in = data.get('expires_in', 3600) self.token_expiry = time.time() + expires_in - 60 # Log full token for debugging logger.info("mTLS OAuth token obtained: {} (expires in {}s)".format(self.access_token, expires_in)) return self.access_token else: raise Exception("mTLS OAuth failed: HTTP {} - {}".format( response.status_code, response.text )) except Exception as e: logger.error("Failed to get mTLS OAuth token: {}".format(str(e))) raise def _request_new_token(self): """Request new OAuth2 token (Standard Flow)""" try: # Debug logging logger.debug("Requesting OAuth token from: {}".format(self.auth_url)) response = requests.post( self.auth_url, data={ 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret }, headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' }, verify=False, timeout=30 ) if response.status_code == 200: data = response.json() self.access_token = data['access_token'] expires_in = data.get('expires_in', 3600) self.token_expiry = time.time() + expires_in - 60 # Log full token for debugging logger.info("OAuth token obtained: {} (expires in {}s)".format(self.access_token, expires_in)) return self.access_token else: raise Exception("OAuth failed: HTTP {} - {}".format( response.status_code, response.text )) except Exception as e: logger.error("Failed to get OAuth token: {}".format(str(e))) raise def _get_mtls_session(self): """Get or create mTLS session (Legacy Mode Only)""" if self.session is None: import requests as requests_lib self.session = requests_lib.Session() with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert: self.session.cert = cert self.session.verify = True try: login_url = self.base_url.replace('/otmmapi', '/otdsws/login') response = self.session.get(login_url, timeout=self.timeout) if response.status_code == 200 and 'OTDSTicket' in response.cookies: self.session_token = response.cookies['OTDSTicket'] except Exception: pass return self.session def _make_api_request(self, method, url, **kwargs): """Make API request with appropriate authentication""" # Verbose logging for all DAM API calls logger.info("=" * 80) logger.info("DAM API REQUEST") logger.info(" Method: {}".format(method)) logger.info(" URL: {}".format(url)) # Log headers (sanitize auth tokens) headers = kwargs.get('headers', {}) if headers: sanitized_headers = headers.copy() if 'Authorization' in sanitized_headers: sanitized_headers['Authorization'] = 'Bearer ***REDACTED***' logger.info(" Headers: {}".format(sanitized_headers)) # Log request body/data if present if 'json' in kwargs: logger.info(" JSON Payload: {}".format(kwargs['json'])) elif 'data' in kwargs: logger.info(" Data Payload: {}".format(kwargs['data'])) # Log query params if present if 'params' in kwargs: logger.info(" Query Params: {}".format(kwargs['params'])) logger.info("=" * 80) if self.auth_mode == 'mtls': # Legacy mTLS (APIM) with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert: import requests as requests_lib session = requests_lib.Session() session.cert = cert session.verify = True if self.session_token: if 'cookies' not in kwargs: kwargs['cookies'] = {} kwargs['cookies']['OTDSTicket'] = self.session_token response = session.request( method, url, timeout=kwargs.pop('timeout', self.timeout), **kwargs ) # Log response status logger.info("DAM API RESPONSE: {} - Status: {}".format(url, response.status_code)) return response else: # OAuth2 (Standard) OR mTLS V2 (Hybrid) # Both use Bearer token token = self.get_access_token() if 'headers' not in kwargs: kwargs['headers'] = {} kwargs['headers']['Authorization'] = 'Bearer {}'.format(token) response = requests.request( method, url, verify=False, # Disable SSL verification for OAuth/Hybrid timeout=kwargs.pop('timeout', self.timeout), **kwargs ) # Log response status logger.info("DAM API RESPONSE: {} - Status: {}".format(url, response.status_code)) return response def search_campaigns(self, status=None, campaign_type='Local Adaptation'): """ Search for campaigns with optional status filter Args: status: Content Scaling Status (A1, A2, B1, B2, etc.) or None for all campaign_type: 'Local Adaptation' for A-series or 'Global comm' for B-series Returns: List of campaign dictionaries """ try: import json as json_module import urllib.parse # Build search condition (like Postman collection) search_condition = { "search_condition_list": { "search_condition": [ { "type": "com.artesia.search.SearchScalarCondition", "metadata_field_id": "ARTESIA.FIELD.CONTAINER TYPE NAME", "relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS", "value": "GLOBALCAMPAING", "left_paren": "(", "right_paren": ")" }, { "type": "com.artesia.search.SearchScalarCondition", "metadata_field_id": "FERRERO.FIELD.CAMPAIGN TYPE", "relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS", "value": campaign_type, # 'Local Adaptation' or 'Global comm' "relational_operator": "and" } ] } } # URL encode search condition search_condition_str = json_module.dumps(search_condition) search_condition_encoded = urllib.parse.quote(search_condition_str) # Use GET with query parameters (matching Postman) url = "{}/v6/search/text?load_type=metadata&search_config_id=18&search_condition_list={}".format( self.base_url, search_condition_encoded ) response = self._make_api_request( 'GET', url, headers={'Accept': 'application/json'} ) if response.status_code != 200: raise Exception("Search failed: HTTP {} - {}".format( response.status_code, response.text[:200] )) data = response.json() all_campaigns_raw = [] # Extract asset list from response if 'search_result_resource' in data: # Try asset_list first (like PHP version) all_campaigns_raw = data['search_result_resource'].get('asset_list', []) # Fallback to search_result_element_list if not all_campaigns_raw: results = data['search_result_resource'].get('search_result', {}).get('search_result_element_list', []) all_campaigns_raw = [r.get('resource', {}) for r in results] logger.info("Found {} total campaigns in search".format(len(all_campaigns_raw))) # Extract campaign info and filter by status all_campaigns = [] for asset in all_campaigns_raw: campaign = self._extract_campaign_info(asset) # Debug log the status logger.debug("Campaign: {} - Status: {}".format( campaign.get('campaign_name', 'Unknown'), campaign.get('status', 'NO STATUS') )) # Filter by status if provided if status is None or campaign.get('status') == status: all_campaigns.append(campaign) logger.info("Found {} campaigns{}".format( len(all_campaigns), " with status {}".format(status) if status else "" )) return all_campaigns except Exception as e: logger.error("Failed to search campaigns: {}".format(str(e))) raise def _extract_campaign_info(self, asset): """Extract campaign information from asset data""" campaign = { 'asset_id': asset.get('asset_id'), 'campaign_name': asset.get('name'), # Try name field first 'campaign_id': None, 'status': None, 'brand': None, 'market': None } # Extract metadata fields if 'metadata' in asset and 'metadata_element_list' in asset['metadata']: for category in asset['metadata']['metadata_element_list']: if 'metadata_element_list' in category: for field in category['metadata_element_list']: field_id = field.get('id') value = self._extract_field_value(field) if field_id == 'ARTESIA.FIELD.NAME' or field_id == 'INER_NAME_GENERIC': campaign['campaign_name'] = value elif field_id == 'FERRERO.FIELD.CAMPAIGN ID' or field_id == 'FERRERO.FIELD.CAMPAIGN_ID': campaign['campaign_id'] = value elif field_id == 'CONTENT.SCALING.STATUS': campaign['status'] = value elif field_id == 'FERRERO.FIELD.CAMPAIGN_BRAND': campaign['brand'] = value elif field_id == 'FERRERO.FIELD.CAMPAIGN_MARKET': campaign['market'] = value return campaign def _extract_field_value(self, field): """Extract value from field structure""" if 'value' in field: val = field['value'] if isinstance(val, dict): if 'value' in val and isinstance(val['value'], dict): if 'value' in val['value']: return val['value']['value'] elif 'field_value' in val['value'] and 'value' in val['value']['field_value']: return val['value']['field_value']['value'] return None def get_master_assets(self, campaign_id, is_global=False): """ Get master assets from Master Assets folder (A1) or Final Assets folder (B1) Searches recursively through subfolders Args: campaign_id: Campaign folder asset ID is_global: True for B1 Global campaigns (use Final Assets), False for A1 Local (use Master Assets) Returns: List of asset dictionaries with full metadata, including 'folder_path' for subfolder structure """ try: # B1 Global campaigns use "05. Final Assets" folder # A1 Local campaigns use "01. Master Assets" folder if is_global: master_folder_id = self.find_final_assets_folder(campaign_id) logger.info("Looking for Final Assets folder (B1 Global campaign)") else: master_folder_id = self._find_master_assets_folder(campaign_id) logger.info("Looking for Master Assets folder (A1 Local campaign)") if not master_folder_id: raise Exception("Assets folder not found (tried {})".format( "Final Assets" if is_global else "Master Assets" )) # Recursively get all assets from folder and subfolders all_assets = self._get_assets_recursive(master_folder_id, folder_path="") logger.info("Found {} master assets (recursive) in campaign {}".format(len(all_assets), campaign_id)) return all_assets except Exception as e: logger.error("Failed to get master assets: {}".format(str(e))) raise def _get_assets_recursive(self, folder_id, folder_path=""): """ Recursively get all assets from a folder and its subfolders Args: folder_id: Folder asset ID to search folder_path: Current path (e.g., "Subfolder1/Subfolder2") Returns: List of assets with 'folder_path' attribute added """ assets = [] try: response = self._make_api_request( 'GET', "{}/v6/folders/{}/children?load_type=full".format( self.base_url, folder_id ), headers={'Accept': 'application/json'} ) if response.status_code != 200: logger.warning("Failed to get folder {} children: HTTP {}".format(folder_id, response.status_code)) return assets data = response.json() children = data.get('folder_children', {}).get('asset_list', []) for child in children: # Check if it's a folder or an asset asset_type = child.get('asset_type', {}) type_name = asset_type.get('name', '') if isinstance(asset_type, dict) else str(asset_type) child_name = child.get('name', '') # Multiple checks to determine if this is a folder: # 1. Explicit folder type in asset_type name is_explicit_folder = 'folder' in type_name.lower() # 2. Check resource_type field (more reliable than asset_type) resource_type = child.get('resource_type', '') is_resource_folder = resource_type.lower() in ['folder', 'container'] # 3. Check for known file extensions (common media files) # If it has a recognized file extension, it's definitely a file, not a folder _, ext = os.path.splitext(child_name) ext_lower = ext.lower() if ext else '' # Known media/document file extensions known_file_extensions = { # Video '.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv', '.webm', '.m4v', # Audio '.mp3', '.wav', '.aac', '.flac', '.m4a', '.wma', '.ogg', # Images '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp', '.svg', '.psd', '.ai', '.eps', # Documents '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf', # Archives '.zip', '.rar', '.7z', '.tar', '.gz', # Other '.xml', '.json', '.csv', '.srt', '.vtt' } has_known_file_extension = ext_lower in known_file_extensions # Decision logic: # - If it has a known file extension, it's definitely a file # - If it's explicitly marked as a folder/container, it's a folder # - Otherwise, check if the "extension" looks like a real extension if has_known_file_extension: is_folder = False elif is_explicit_folder or is_resource_folder: is_folder = True else: # Check if this looks like a real file extension or a false positive # Real extensions: .mp4, .jpg, .pdf (lowercase, no spaces) # False positives: ". REFERENCE FILES", ". OPEN FILES" (space after period, or uppercase start) # If there's no extension, it's a folder if not ext or len(ext) < 2: is_folder = True # If extension starts with period + space/uppercase, it's a false positive (folder name) elif len(ext) > 1 and ext[1] in (' ', '\t') or (len(ext) > 1 and ext[1].isupper()): is_folder = True # If extension has spaces in it, it's not a real extension elif ' ' in ext: is_folder = True # Numeric-only extension = version number (e.g. "WND_PCS 2026 2.0"), not a file elif ext[1:].isdigit(): is_folder = True else: # Has an extension-like string, but not in our known list # Could be an uncommon file type - assume it's a file to be safe is_folder = False logger.debug("Item: {} - Type: {} - Resource: {} - Ext: '{}' - Is Folder: {}".format( child_name, type_name, resource_type, ext_lower, is_folder )) if is_folder: # It's a folder (or container) - recurse into it folder_name = child.get('name', 'Unknown') # SANITIZE: Use shared utility to handle all Box constraints (slashes, spaces, non-printable) safe_folder_name = sanitize_box_item_name(folder_name) subfolder_path = folder_path + "/" + safe_folder_name if folder_path else safe_folder_name logger.info("Recursing into subfolder: {} (Type: {})".format(subfolder_path, type_name)) sub_assets = self._get_assets_recursive(child['asset_id'], subfolder_path) assets.extend(sub_assets) else: # It's an asset - add it with folder path child['folder_path'] = folder_path assets.append(child) return assets except Exception as e: logger.error("Error in recursive asset search for folder {}: {}".format(folder_id, str(e))) return assets def is_asset_not_approved(self, asset_data): """ Check if asset has ECOMMERCE STATUS = "NOT APPROVED" Used for A5→A6 rework workflow to filter rejected assets Args: asset_data: Complete asset JSON with metadata Returns: bool: True if status is "NOT APPROVED", False otherwise """ try: metadata = asset_data.get('metadata', {}) metadata_elements = metadata.get('metadata_element_list', []) for category in metadata_elements: for element in category.get('metadata_element_list', []): if element.get('id') == 'FERRERO.FIELD.ECOMMERCE STATUS': status_value = self._extract_field_value(element) if status_value: # Case-insensitive comparison return status_value.strip().upper() == 'NOT APPROVED' return False except Exception as e: logger.error("Error checking NOT APPROVED status: {}".format(str(e))) return False def extract_rejection_details(self, asset_data): """ Extract all rejection comments from a NOT APPROVED asset Extracts details from Approver, Legal, and IA&CC reviewers Args: asset_data: Complete asset JSON with metadata Returns: dict with rejection details from all reviewers, or None if not rejected """ if not self.is_asset_not_approved(asset_data): return None metadata = asset_data.get('metadata', {}) metadata_elements = metadata.get('metadata_element_list', []) # Initialize rejection details structure details = { 'asset_id': asset_data.get('asset_id'), 'asset_name': asset_data.get('name'), 'status': 'NOT APPROVED', 'approver': { 'comment': None, 'certifier_name': None, 'date': None }, 'legal': { 'comment': None, 'certifier_name': None, 'date': None }, 'ia_cc': { 'comment': None, 'certifier_name': None, 'date': None } } # Extract all rejection fields for category in metadata_elements: for element in category.get('metadata_element_list', []): field_id = element.get('id', '') value = self._extract_field_value(element) # Approver rejection details if field_id == 'FERRERO.MARKETING.FIELD.CERTIFIER COMMENT': details['approver']['comment'] = value elif field_id == 'FERRERO.FIELD.ECOMMERCE CERTIFIER': details['approver']['certifier_name'] = value elif field_id == 'FERRERO.MARKETING.FIELD.APPROVAL DATE': details['approver']['date'] = value # Legal rejection details elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL COMMENT': details['legal']['comment'] = value elif field_id == 'FERRERO.FIELD.LEGAL CERTIFER': # Note: typo in field ID details['legal']['certifier_name'] = value elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL APPROVAL DATE': details['legal']['date'] = value # IA&CC rejection details elif field_id == 'FERRERO.MARKETING.FIELD.IA CC COMMENT': details['ia_cc']['comment'] = value elif field_id == 'FERRERO.MARKETING.FIELD.IA CERTIFIER': details['ia_cc']['certifier_name'] = value elif field_id == 'FERRERO.MARKETING.FIELD.IA CC APPROVAL DATE': details['ia_cc']['date'] = value return details def _find_master_assets_folder(self, campaign_id): """Find '01. Master Assets' folder in campaign""" try: response = self._make_api_request( 'GET', "{}/v6/folders/{}/children".format(self.base_url, campaign_id) ) if response.status_code != 200: return None data = response.json() folders = data.get('folder_children', {}).get('asset_list', []) for folder in folders: name = folder.get('name', '') if 'Master' in name and 'Assets' in name: return folder['asset_id'] return None except Exception as e: logger.error("Error finding Master Assets folder: {}".format(str(e))) return None def find_final_assets_folder(self, campaign_id): """Find '01. Final Assets' folder in campaign""" try: response = self._make_api_request( 'GET', "{}/v6/folders/{}/children".format(self.base_url, campaign_id) ) if response.status_code != 200: return None data = response.json() folders = data.get('folder_children', {}).get('asset_list', []) for folder in folders: name = folder.get('name', '') if 'Final' in name and 'Assets' in name: return folder['asset_id'] return None except Exception as e: logger.error("Error finding Final Assets folder: {}".format(str(e))) return None def download_asset(self, asset_id, output_dir='.'): """ Download asset content Args: asset_id: Asset ID to download output_dir: Directory to save file Returns: Path to downloaded file """ try: # Get asset metadata first to get filename metadata_response = self._make_api_request( 'GET', "{}/v6/assets/{}".format(self.base_url, asset_id) ) if metadata_response.status_code != 200: raise Exception("Failed to get asset metadata: HTTP {}".format( metadata_response.status_code )) asset_data = metadata_response.json() asset = asset_data.get('asset_resource', {}).get('asset', asset_data) filename = asset.get('name', 'download_{}'.format(asset_id)) # Download content content_response = self._make_api_request( 'GET', "{}/v6/assets/{}/contents".format(self.base_url, asset_id), stream=True ) if content_response.status_code != 200: raise Exception("Download failed: HTTP {}".format(content_response.status_code)) # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) # Save file filepath = os.path.join(output_dir, filename) with open(filepath, 'wb') as f: for chunk in content_response.iter_content(chunk_size=8192): f.write(chunk) file_size = os.path.getsize(filepath) logger.info("Downloaded: {} ({} bytes)".format(filename, file_size)) return filepath except Exception as e: logger.error("Failed to download asset {}: {}".format(asset_id, str(e))) raise def upload_asset(self, file_path, folder_id, asset_representation, video_metadata=None): """ Upload asset to DAM Args: file_path: Path to file to upload folder_id: Parent folder ID asset_representation: Asset metadata JSON structure video_metadata: Optional video metadata (width, height, etc.) Returns: dict with success, asset_id, http_code """ try: filename = os.path.basename(file_path) mime_type = self._get_mime_type(file_path) # Build upload manifest file_info = { 'file_name': filename, 'file_type': mime_type } # Add video dimensions if available if video_metadata: if video_metadata.get('width', 0) > 0: file_info['width'] = video_metadata['width'] if video_metadata.get('height', 0) > 0: file_info['height'] = video_metadata['height'] upload_manifest = { 'upload_manifest': { 'master_files': [ {'file': file_info} ] } } # Prepare multipart upload import json files = { 'files': (filename, open(file_path, 'rb'), mime_type) } data = { 'asset_representation': json.dumps(asset_representation), 'parent_folder_id': folder_id, 'manifest': json.dumps(upload_manifest) } # Log asset representation being sent logger.info("Uploading: {}".format(filename)) logger.info(" Parent Folder ID: {}".format(folder_id)) logger.info("=" * 60) logger.info("FULL ASSET REPRESENTATION (JSON):") logger.info("=" * 60) logger.info(json.dumps(asset_representation, indent=2)) logger.info("=" * 60) logger.info(" Summary:") logger.info(" Model ID: {}".format(asset_representation.get('metadata_model_id', 'N/A'))) logger.info(" Security Policies: {}".format(len(asset_representation.get('security_policy_list', [])))) if 'metadata' in asset_representation: metadata_fields = asset_representation['metadata'].get('metadata_element_list', []) total_fields = sum(len(cat.get('metadata_element_list', [])) for cat in metadata_fields) logger.info(" Metadata Fields: {}".format(total_fields)) response = self._make_api_request( 'POST', "{}/v6/assets".format(self.base_url), data=data, files=files ) # Close file files['files'][1].close() http_code = response.status_code if http_code in [201, 202]: result_data = response.json() asset_id = None job_id = None if 'asset_resource_list' in result_data: # Direct response with asset ID asset_id = result_data['asset_resource_list']['asset_resource'][0]['asset']['asset_id'] logger.info("Upload successful (immediate): {} → Asset ID: {}".format(filename, asset_id)) elif 'job_handle' in result_data: # Async job response - store job_id, don't poll (too slow!) job_id = result_data['job_handle'].get('job_id') logger.info("Upload accepted (async): {} → Job ID: {}".format(filename, job_id)) logger.info("Note: Job processing in background. Asset ID can be found later by searching folder.") # Use job_id temporarily (can be updated later by searching folder) asset_id = job_id return { 'success': True, 'asset_id': asset_id, 'job_id': job_id, 'http_code': http_code, 'is_async': job_id is not None } else: error_msg = "Upload failed: HTTP {}".format(http_code) if response.text: try: error_data = response.json() if 'exception_body' in error_data: error_msg = error_data['exception_body'].get('message', error_msg) except: pass logger.error(error_msg) return { 'success': False, 'error': error_msg, 'http_code': http_code } except Exception as e: logger.error("Upload exception: {}".format(str(e))) return { 'success': False, 'error': str(e), 'http_code': 0 } def find_asset_by_filename_in_folder(self, folder_id, filename): """ Find asset in folder by filename (useful after async upload) Args: folder_id: Folder ID to search filename: Filename to find Returns: str: Asset ID if found, None otherwise """ try: response = self._make_api_request( 'GET', "{}/v6/folders/{}/children?load_type=full".format(self.base_url, folder_id), headers={'Accept': 'application/json'} ) if response.status_code == 200: data = response.json() assets = data.get('folder_children', {}).get('asset_list', []) for asset in assets: if asset.get('name') == filename: asset_id = asset.get('asset_id') logger.info("Found uploaded asset: {} → Asset ID: {}".format(filename, asset_id)) return asset_id logger.warning("Asset not found in folder: {}".format(filename)) return None else: logger.warning("Failed to search folder: HTTP {}".format(response.status_code)) return None except Exception as e: logger.error("Error searching for asset: {}".format(str(e))) return None def _poll_job_for_asset_id(self, job_id, filename, max_attempts=10, delay=2): """ Poll job status to get actual asset ID after async upload Args: job_id: Job ID from upload response filename: Filename being uploaded (for logging) max_attempts: Maximum polling attempts (default 10) delay: Seconds between attempts (default 2) Returns: str: Asset ID if found, None otherwise """ import time as time_module for attempt in range(max_attempts): try: # Wait before polling if attempt > 0: time_module.sleep(delay) # Get job status response = self._make_api_request( 'GET', "{}/v6/jobs/{}".format(self.base_url, job_id) ) if response.status_code == 200: job_data = response.json() # Check job status status = job_data.get('job', {}).get('status', {}).get('status', 'unknown') logger.info(" Job status (attempt {}): {}".format(attempt + 1, status)) # If completed, look for asset ID if status.lower() in ['completed', 'success', 'done']: # Try to find asset ID in response asset_id = None # Check various locations where asset ID might be if 'job' in job_data: job = job_data['job'] # Check result if 'result' in job: result = job['result'] if isinstance(result, dict): asset_id = result.get('asset_id') or result.get('created_asset_id') # Check created_assets if not asset_id and 'created_assets' in job: created = job['created_assets'] if isinstance(created, list) and len(created) > 0: asset_id = created[0].get('asset_id') elif isinstance(created, dict): asset_id = created.get('asset_id') # Check asset_list if not asset_id and 'asset_list' in job: assets = job['asset_list'] if isinstance(assets, list) and len(assets) > 0: asset_id = assets[0].get('asset_id') return asset_id elif status.lower() in ['failed', 'error']: logger.error("Job failed: {}".format(job_data.get('job', {}).get('status', {}).get('message', 'Unknown error'))) return None # Still running, continue polling else: logger.warning("Job status check failed: HTTP {}".format(response.status_code)) except Exception as e: logger.warning("Job polling error (attempt {}): {}".format(attempt + 1, str(e))) logger.warning("Max polling attempts reached ({}) for job {}".format(max_attempts, job_id)) return None def update_campaign_status(self, campaign_id, new_status): """ Update Content Scaling Status field Args: campaign_id: Campaign folder ID new_status: New status value (A2, A3, etc.) Returns: dict with success boolean """ try: payload = { "edited_folder": { "data": { "metadata": [ { "id": "CONTENT.SCALING.STATUS", "type": "com.artesia.metadata.MetadataField", "value": { "cascading_domain_value": False, "domain_value": True, "value": { "type": "string", "value": new_status } } } ] } } } response = self._make_api_request( 'PATCH', "{}/v6/folders/{}?lock_strategy=optimistic".format( self.base_url, campaign_id ), json=payload, headers={ 'Content-Type': 'application/json', 'Accept': 'application/json' } ) if response.status_code == 200: logger.info("Updated campaign {} status to {}".format(campaign_id, new_status)) return {'success': True} else: error_msg = "Status update failed: HTTP {}".format(response.status_code) logger.error(error_msg) return {'success': False, 'error': error_msg} except Exception as e: logger.error("Status update exception: {}".format(str(e))) return {'success': False, 'error': str(e)} def update_folder_scaling_status(self, folder_id, scaling_status): """ Update folder content scaling status via folders_scaling_update API Args: folder_id: Folder ID to update scaling_status: New scaling status (A3, B2, etc.) Returns: dict with success boolean and response data """ try: payload = { "folderId": folder_id, "contentScalingStatus": scaling_status } logger.info("Calling folders_scaling_update API for folder {} with status {}".format( folder_id, scaling_status )) response = self._make_api_request( 'POST', "{}/v6/folders_scaling_update".format(self.base_url), json=payload, headers={ 'Content-Type': 'application/json', 'Accept': 'application/json' } ) if response.status_code in [200, 201, 202]: logger.info("Folder scaling status update successful: HTTP {}".format(response.status_code)) return {'success': True, 'status_code': response.status_code, 'response': response.json() if response.text else {}} else: error_msg = "Folder scaling update failed: HTTP {} - {}".format( response.status_code, response.text[:200] if response.text else 'No response' ) logger.error(error_msg) return {'success': False, 'error': error_msg, 'status_code': response.status_code} except Exception as e: logger.error("Folder scaling update exception: {}".format(str(e))) return {'success': False, 'error': str(e)} def test_connection(self): """Test DAM connection with current auth method""" try: if self.auth_mode in ['mtls', 'mtls_v2']: # Test mTLS/Hybrid by getting OAuth token # For mtls_v2, this will trigger _get_oauth_token_via_mtls() # For mtls, we just verify the certificate exists token = self.get_access_token() return token is not None else: # Test OAuth2 by getting token token = self.get_access_token() return token is not None except Exception as e: logger.error("Connection test failed: {}".format(str(e))) return False def _get_mime_type(self, file_path): """Get MIME type for file""" import mimetypes mime_type, _ = mimetypes.guess_type(file_path) return mime_type or 'application/octet-stream' def register_master_asset_id_domain_value(self, master_asset_id): """ Register a master asset ID in the FERRERO_MASTER_ASSET_ID lookup domain. Required in PPR environment before using the ID in asset creation. The OpenText API does not support creating new domain values during asset creation, so this must be called before the create asset API. Args: master_asset_id: The master asset ID to register Returns: dict with success, http_code, and optional error """ # Only for PPR environment if 'ppr' not in self.base_url.lower(): return {'success': True, 'skipped': True, 'reason': 'Not PPR environment'} try: payload = { "domain_value_resource": { "domain_value": { "description": master_asset_id, "display_value": master_asset_id, "field_value": { "type": "string", "value": master_asset_id } } } } logger.info("PPR: Registering master asset ID '{}' in lookup domain...".format(master_asset_id)) response = self._make_api_request( 'POST', "{}/v6/lookupdomains/FERRERO_MASTER_ASSET_ID/lookupvalues".format(self.base_url), json=payload, headers={ 'Content-Type': 'application/json', 'Accept': 'application/json' } ) # Success cases if response.status_code in [200, 201, 202]: logger.info("PPR: Master asset ID '{}' registered successfully".format(master_asset_id)) return { 'success': True, 'http_code': response.status_code, 'already_existed': False } # Already exists - OpenText returns 409 OR 500 with "duplicate code" message if response.status_code == 409: logger.info("PPR: Master asset ID '{}' already exists in lookup domain".format(master_asset_id)) return { 'success': True, 'http_code': response.status_code, 'already_existed': True } # Check for duplicate error in 500 response (OpenText quirk) if response.status_code == 500: try: error_data = response.json() error_msg = error_data.get('exception_body', {}).get('message', '') if 'duplicate' in error_msg.lower(): logger.info("PPR: Master asset ID '{}' already exists in lookup domain".format(master_asset_id)) return { 'success': True, 'http_code': response.status_code, 'already_existed': True } except: pass # Actual failure error_msg = "Failed to register master asset ID '{}': HTTP {} - {}".format( master_asset_id, response.status_code, response.text[:200] if response.text else 'No response' ) logger.warning(error_msg) return { 'success': False, 'http_code': response.status_code, 'error': error_msg } except Exception as e: error_msg = "Exception registering master asset ID '{}': {}".format(master_asset_id, str(e)) logger.error(error_msg) return { 'success': False, 'error': error_msg } def register_master_asset_ids_for_ppr(self, master_asset_ids): """ Register all master asset IDs in the lookup domain. Call this before creating an asset that references these IDs. The OpenText DAM API does not support creating new domain values during asset creation. We must first add each master asset ID to the FERRERO_MASTER_ASSET_ID domain value table before the create asset call. Args: master_asset_ids: List of master asset IDs to register Returns: dict with success, registered_ids, failed_ids """ if not master_asset_ids: return {'success': True, 'registered_ids': [], 'failed_ids': []} logger.info("=" * 60) logger.info("Registering {} master asset ID(s) in lookup domain".format(len(master_asset_ids))) logger.info(" IDs: {}".format(', '.join(master_asset_ids))) logger.info("=" * 60) registered = [] failed = [] for master_id in master_asset_ids: result = self.register_master_asset_id_domain_value(master_id) if result.get('success'): registered.append(master_id) else: failed.append({'id': master_id, 'error': result.get('error')}) logger.info("Domain registration complete - {}/{} succeeded".format( len(registered), len(master_asset_ids))) if failed: logger.warning("Failed to register: {}".format( ', '.join([f['id'] for f in failed]))) # Return success even if some failed (better to try the upload and see) return { 'success': len(failed) == 0, 'registered_ids': registered, 'failed_ids': failed } def get_or_create_subfolder_path(self, base_folder_id, subfolder_path): """ Create or find subfolder structure in DAM matching Box structure Args: base_folder_id: Base folder (e.g., "01. Final Assets") subfolder_path: Path like "Europe/Germany" Returns: str: Folder ID of the deepest subfolder Example: Base folder: "89fa..." (01. Final Assets) Path: "Europe/Germany" Creates: 01. Final Assets/Europe/Germany Returns: ID of "Germany" folder """ if not subfolder_path: return base_folder_id # Split path into parts parts = subfolder_path.split('/') current_folder_id = base_folder_id for folder_name in parts: logger.info("Looking for/creating folder: {}".format(folder_name)) # Check if exists existing = self._find_subfolder_by_name(current_folder_id, folder_name) if existing: current_folder_id = existing logger.info("Found existing folder: {} (ID: {})".format(folder_name, current_folder_id)) else: # Folder doesn't exist - DAM doesn't allow folder creation via API # Upload to parent folder instead logger.warning("Folder '{}' not found in DAM. DAM does not allow folder creation. Files will be uploaded to parent folder.".format(folder_name)) return current_folder_id # Return current parent folder instead of trying to create return current_folder_id def _find_subfolder_by_name(self, parent_folder_id, folder_name): """Find subfolder by name, return ID or None""" try: # Get folder contents with full details (load_type=full returns folder objects with names) url = '{}/v6/folders/{}/children?load_type=full'.format(self.base_url, parent_folder_id) response = self._make_api_request('GET', url) if response.status_code == 200: data = response.json() # Debug: Log response structure logger.info("Folder children response keys: {}".format(data.keys())) # Get folder_children and check its type folder_children = data.get('folder_children') logger.info("folder_children type: {}".format(type(folder_children).__name__)) # If it's a dict, it might have folder objects as values or nested structure if isinstance(folder_children, dict): logger.info("folder_children is a dict with keys: {}".format(folder_children.keys())) # Try to get the actual list of folders # Common patterns: {'folder_list': [...]} or {'folder': [...]} or {'folders': [...]} if 'folder_list' in folder_children: items = folder_children['folder_list'] elif 'folder' in folder_children: items = folder_children['folder'] elif 'folders' in folder_children: items = folder_children['folders'] else: # If keys are folder IDs, values are folder objects items = list(folder_children.values()) # If first value is a list, extract it if items and isinstance(items[0], list): items = items[0] elif isinstance(folder_children, list): items = folder_children else: items = [] logger.info("Found {} items in folder".format(len(items))) # Debug: Log first item structure if items and len(items) > 0: logger.info("First item type: {}".format(type(items[0]).__name__)) if isinstance(items[0], dict): logger.info("First item keys: {}".format(items[0].keys())) logger.info("First item name: {}".format(items[0].get('name', items[0].get('folder_name', 'N/A')))) # Look for folders for item in items: if not isinstance(item, dict): continue # Check different possible field names for type and name item_type = item.get('type', item.get('resource_type', item.get('object_type'))) item_name = item.get('name', item.get('folder_name', '')) logger.info("Checking item: type='{}', name='{}'".format(item_type, item_name)) # Check if this is a folder/container is_folder = ( item_type == 'folder' or item_type == 'com.artesia.container.Container' or (item_type and 'container' in item_type.lower()) ) if is_folder and item_name and item_name.lower() == folder_name.lower(): folder_id = item.get('container_id', item.get('id', item.get('folder_id', item.get('resource_id')))) logger.info("Found matching folder: '{}' (ID: {})".format(item_name, folder_id)) return folder_id return None except Exception as e: logger.warning("Error finding subfolder: {}".format(str(e))) return None def _create_folder(self, parent_folder_id, folder_name): """ Create folder in DAM, return new folder ID NOTE: OTMM v6 API may not support dynamic folder creation. If folder creation fails, folders may need to be pre-created in DAM UI. """ try: url = '{}/v6/folders/{}/children'.format(self.base_url, parent_folder_id) payload = { 'name': folder_name, 'type': 'folder' } response = self._make_api_request('POST', url, json=payload) if response.status_code in [200, 201]: data = response.json() folder_id = data.get('id') or data.get('container_id') or data.get('folder_id') logger.info("Successfully created folder '{}' (ID: {})".format(folder_name, folder_id)) return folder_id elif response.status_code == 405: logger.warning("DAM v6 API does not support folder creation (HTTP 405)") logger.warning("Folder '{}' must be pre-created manually in DAM UI".format(folder_name)) logger.warning("Files will be uploaded to parent folder instead") return None else: logger.error("Failed to create folder: HTTP {} - {}".format( response.status_code, response.text[:200] if response.text else 'No response' )) return None except Exception as e: logger.error("Error creating folder: {}".format(str(e))) return None