ferrero-opentext/Python-Version/scripts/shared/dam_client.py
nickviljoen 695eefadf3 Fix: Recurse into subfolders with numeric extensions (e.g. "2.0")
DAM subfolder "WND_PCS 2026 2.0" was being treated as a file because
".0" was not in the known extensions list and defaulted to is_folder=False.
This caused an HTTP 404 on download since it's a folder, not a file.

Added numeric-only extension check (.0, .1, etc.) to the folder detection
logic so the script correctly recurses into versioned subfolders and
downloads the assets inside them.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 09:46:32 +02:00

1505 lines
60 KiB
Python

"""
DAM Client - OpenText DAM API Integration
Handles OAuth2 and mTLS certificate authentication
Compatible with Python 3.6+
"""
import requests
import time
import logging
import os
import urllib3
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
from .common import sanitize_box_item_name
# Disable SSL warnings when verify=False
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger('DAMClient')
@contextmanager
def pfx_to_pem(pfx_path, pfx_password):
"""
Convert PFX certificate to temporary PEM file for requests library
Args:
pfx_path: Path to PFX/P12 certificate file
pfx_password: Certificate password
Yields:
str: Path to temporary PEM file
"""
try:
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
# Load PFX file
pfx = Path(pfx_path).read_bytes()
private_key, main_cert, add_certs = load_key_and_certificates(
pfx,
pfx_password.encode('utf-8') if pfx_password else None,
None
)
# Create temporary PEM file
with NamedTemporaryFile(suffix='.pem', delete=False) as t_pem:
with open(t_pem.name, 'wb') as pem_file:
# Write private key
pem_file.write(private_key.private_bytes(
Encoding.PEM,
PrivateFormat.PKCS8,
NoEncryption()
))
# Write main certificate
pem_file.write(main_cert.public_bytes(Encoding.PEM))
# Write additional certificates in chain
for cert in add_certs:
pem_file.write(cert.public_bytes(Encoding.PEM))
yield t_pem.name
# Cleanup temporary file
try:
os.unlink(t_pem.name)
except Exception:
pass
except Exception as e:
logger.error("Failed to convert PFX to PEM: {}".format(str(e)))
raise
class DAMClient:
def __init__(self, config, use_mtls=False, auth_mode='oauth'):
"""
Initialize DAM Client
Args:
config: Configuration dict
use_mtls: Legacy boolean flag (deprecated, use auth_mode)
auth_mode: Authentication mode ('oauth', 'mtls', 'mtls_v2')
'oauth': Standard OAuth2 (default)
'mtls': Legacy mTLS via APIM
'mtls_v2': Hybrid mTLS -> OAuth token -> Direct DAM
"""
self.timeout = config['dam']['timeout_seconds']
# Handle legacy flag
if use_mtls:
self.auth_mode = 'mtls'
else:
self.auth_mode = auth_mode
if self.auth_mode == 'mtls':
# Legacy mTLS - Uses APIM URL
self.base_url = config['dam'].get('mtls_base_url', '').rstrip('/')
self.mtls_cert_path = config['dam'].get('mtls_cert_path')
self.mtls_cert_password = config['dam'].get('mtls_cert_password')
if not self.base_url:
raise Exception("mTLS base URL not configured (DAM_MTLS_BASE_URL)")
if not self.mtls_cert_path or not os.path.exists(self.mtls_cert_path):
raise Exception("mTLS cert path not found: {}".format(self.mtls_cert_path))
self.session = None
self.session_token = None
logger.info("🔒 Using mTLS authentication (Legacy APIM)")
elif self.auth_mode == 'mtls_v2':
# New mTLS V2 - Uses Direct DAM URL + OAuth via Cert
self.base_url = config['dam']['base_url'].rstrip('/') # Direct URL
self.mtls_oauth_url = config['dam'].get('mtls_oauth_url')
self.mtls_cert_path = config['dam'].get('mtls_cert_path')
self.mtls_cert_password = config['dam'].get('mtls_cert_password')
if not self.mtls_oauth_url:
raise Exception("mTLS OAuth URL not configured (DAM_MTLS_OAUTH_URL)")
if not self.mtls_cert_path or not os.path.exists(self.mtls_cert_path):
raise Exception("mTLS cert path not found: {}".format(self.mtls_cert_path))
self.access_token = None
self.token_expiry = 0
logger.info("🔐 Using mTLS V2 authentication (Hybrid)")
logger.info("OAuth Endpoint: {}".format(self.mtls_oauth_url))
else:
# Standard OAuth2
self.base_url = config['dam']['base_url'].rstrip('/')
self.auth_url = config['dam']['auth_url']
self.client_id = config['dam']['client_id']
self.client_secret = config['dam']['client_secret']
self.access_token = None
self.token_expiry = 0
logger.info("🔑 Using OAuth2 authentication")
logger.info("Base URL: {}".format(self.base_url))
def get_access_token(self):
"""Get OAuth2 access token with auto-refresh"""
if self.access_token and time.time() < self.token_expiry:
return self.access_token
if self.auth_mode == 'mtls_v2':
return self._get_oauth_token_via_mtls()
else:
return self._request_new_token()
def _get_oauth_token_via_mtls(self):
"""Request OAuth token using Client Certificate (mTLS V2)"""
try:
logger.debug("Requesting OAuth token via mTLS from: {}".format(self.mtls_oauth_url))
with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert:
# Use requests with cert to call OAuth endpoint
response = requests.post(
self.mtls_oauth_url,
cert=cert,
verify=True,
timeout=30
)
logger.debug("OAuth response: HTTP {}".format(response.status_code))
if response.status_code == 200:
data = response.json()
self.access_token = data['access_token']
expires_in = data.get('expires_in', 3600)
self.token_expiry = time.time() + expires_in - 60
# Log full token for debugging
logger.info("mTLS OAuth token obtained: {} (expires in {}s)".format(self.access_token, expires_in))
return self.access_token
else:
raise Exception("mTLS OAuth failed: HTTP {} - {}".format(
response.status_code, response.text
))
except Exception as e:
logger.error("Failed to get mTLS OAuth token: {}".format(str(e)))
raise
def _request_new_token(self):
"""Request new OAuth2 token (Standard Flow)"""
try:
# Debug logging
logger.debug("Requesting OAuth token from: {}".format(self.auth_url))
response = requests.post(
self.auth_url,
data={
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
},
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
},
verify=False,
timeout=30
)
if response.status_code == 200:
data = response.json()
self.access_token = data['access_token']
expires_in = data.get('expires_in', 3600)
self.token_expiry = time.time() + expires_in - 60
# Log full token for debugging
logger.info("OAuth token obtained: {} (expires in {}s)".format(self.access_token, expires_in))
return self.access_token
else:
raise Exception("OAuth failed: HTTP {} - {}".format(
response.status_code, response.text
))
except Exception as e:
logger.error("Failed to get OAuth token: {}".format(str(e)))
raise
def _get_mtls_session(self):
"""Get or create mTLS session (Legacy Mode Only)"""
if self.session is None:
import requests as requests_lib
self.session = requests_lib.Session()
with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert:
self.session.cert = cert
self.session.verify = True
try:
login_url = self.base_url.replace('/otmmapi', '/otdsws/login')
response = self.session.get(login_url, timeout=self.timeout)
if response.status_code == 200 and 'OTDSTicket' in response.cookies:
self.session_token = response.cookies['OTDSTicket']
except Exception:
pass
return self.session
def _make_api_request(self, method, url, **kwargs):
"""Make API request with appropriate authentication"""
# Verbose logging for all DAM API calls
logger.info("=" * 80)
logger.info("DAM API REQUEST")
logger.info(" Method: {}".format(method))
logger.info(" URL: {}".format(url))
# Log headers (sanitize auth tokens)
headers = kwargs.get('headers', {})
if headers:
sanitized_headers = headers.copy()
if 'Authorization' in sanitized_headers:
sanitized_headers['Authorization'] = 'Bearer ***REDACTED***'
logger.info(" Headers: {}".format(sanitized_headers))
# Log request body/data if present
if 'json' in kwargs:
logger.info(" JSON Payload: {}".format(kwargs['json']))
elif 'data' in kwargs:
logger.info(" Data Payload: {}".format(kwargs['data']))
# Log query params if present
if 'params' in kwargs:
logger.info(" Query Params: {}".format(kwargs['params']))
logger.info("=" * 80)
if self.auth_mode == 'mtls':
# Legacy mTLS (APIM)
with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert:
import requests as requests_lib
session = requests_lib.Session()
session.cert = cert
session.verify = True
if self.session_token:
if 'cookies' not in kwargs: kwargs['cookies'] = {}
kwargs['cookies']['OTDSTicket'] = self.session_token
response = session.request(
method, url, timeout=kwargs.pop('timeout', self.timeout), **kwargs
)
# Log response status
logger.info("DAM API RESPONSE: {} - Status: {}".format(url, response.status_code))
return response
else:
# OAuth2 (Standard) OR mTLS V2 (Hybrid)
# Both use Bearer token
token = self.get_access_token()
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['Authorization'] = 'Bearer {}'.format(token)
response = requests.request(
method,
url,
verify=False, # Disable SSL verification for OAuth/Hybrid
timeout=kwargs.pop('timeout', self.timeout),
**kwargs
)
# Log response status
logger.info("DAM API RESPONSE: {} - Status: {}".format(url, response.status_code))
return response
def search_campaigns(self, status=None, campaign_type='Local Adaptation'):
"""
Search for campaigns with optional status filter
Args:
status: Content Scaling Status (A1, A2, B1, B2, etc.) or None for all
campaign_type: 'Local Adaptation' for A-series or 'Global comm' for B-series
Returns:
List of campaign dictionaries
"""
try:
import json as json_module
import urllib.parse
# Build search condition (like Postman collection)
search_condition = {
"search_condition_list": {
"search_condition": [
{
"type": "com.artesia.search.SearchScalarCondition",
"metadata_field_id": "ARTESIA.FIELD.CONTAINER TYPE NAME",
"relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS",
"value": "GLOBALCAMPAING",
"left_paren": "(",
"right_paren": ")"
},
{
"type": "com.artesia.search.SearchScalarCondition",
"metadata_field_id": "FERRERO.FIELD.CAMPAIGN TYPE",
"relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS",
"value": campaign_type, # 'Local Adaptation' or 'Global comm'
"relational_operator": "and"
}
]
}
}
# URL encode search condition
search_condition_str = json_module.dumps(search_condition)
search_condition_encoded = urllib.parse.quote(search_condition_str)
# Use GET with query parameters (matching Postman)
url = "{}/v6/search/text?load_type=metadata&search_config_id=18&search_condition_list={}".format(
self.base_url,
search_condition_encoded
)
response = self._make_api_request(
'GET',
url,
headers={'Accept': 'application/json'}
)
if response.status_code != 200:
raise Exception("Search failed: HTTP {} - {}".format(
response.status_code, response.text[:200]
))
data = response.json()
all_campaigns_raw = []
# Extract asset list from response
if 'search_result_resource' in data:
# Try asset_list first (like PHP version)
all_campaigns_raw = data['search_result_resource'].get('asset_list', [])
# Fallback to search_result_element_list
if not all_campaigns_raw:
results = data['search_result_resource'].get('search_result', {}).get('search_result_element_list', [])
all_campaigns_raw = [r.get('resource', {}) for r in results]
logger.info("Found {} total campaigns in search".format(len(all_campaigns_raw)))
# Extract campaign info and filter by status
all_campaigns = []
for asset in all_campaigns_raw:
campaign = self._extract_campaign_info(asset)
# Debug log the status
logger.debug("Campaign: {} - Status: {}".format(
campaign.get('campaign_name', 'Unknown'),
campaign.get('status', 'NO STATUS')
))
# Filter by status if provided
if status is None or campaign.get('status') == status:
all_campaigns.append(campaign)
logger.info("Found {} campaigns{}".format(
len(all_campaigns),
" with status {}".format(status) if status else ""
))
return all_campaigns
except Exception as e:
logger.error("Failed to search campaigns: {}".format(str(e)))
raise
def _extract_campaign_info(self, asset):
"""Extract campaign information from asset data"""
campaign = {
'asset_id': asset.get('asset_id'),
'campaign_name': asset.get('name'), # Try name field first
'campaign_id': None,
'status': None,
'brand': None,
'market': None
}
# Extract metadata fields
if 'metadata' in asset and 'metadata_element_list' in asset['metadata']:
for category in asset['metadata']['metadata_element_list']:
if 'metadata_element_list' in category:
for field in category['metadata_element_list']:
field_id = field.get('id')
value = self._extract_field_value(field)
if field_id == 'ARTESIA.FIELD.NAME' or field_id == 'INER_NAME_GENERIC':
campaign['campaign_name'] = value
elif field_id == 'FERRERO.FIELD.CAMPAIGN ID' or field_id == 'FERRERO.FIELD.CAMPAIGN_ID':
campaign['campaign_id'] = value
elif field_id == 'CONTENT.SCALING.STATUS':
campaign['status'] = value
elif field_id == 'FERRERO.FIELD.CAMPAIGN_BRAND':
campaign['brand'] = value
elif field_id == 'FERRERO.FIELD.CAMPAIGN_MARKET':
campaign['market'] = value
return campaign
def _extract_field_value(self, field):
"""Extract value from field structure"""
if 'value' in field:
val = field['value']
if isinstance(val, dict):
if 'value' in val and isinstance(val['value'], dict):
if 'value' in val['value']:
return val['value']['value']
elif 'field_value' in val['value'] and 'value' in val['value']['field_value']:
return val['value']['field_value']['value']
return None
def get_master_assets(self, campaign_id, is_global=False):
"""
Get master assets from Master Assets folder (A1) or Final Assets folder (B1)
Searches recursively through subfolders
Args:
campaign_id: Campaign folder asset ID
is_global: True for B1 Global campaigns (use Final Assets), False for A1 Local (use Master Assets)
Returns:
List of asset dictionaries with full metadata, including 'folder_path' for subfolder structure
"""
try:
# B1 Global campaigns use "05. Final Assets" folder
# A1 Local campaigns use "01. Master Assets" folder
if is_global:
master_folder_id = self.find_final_assets_folder(campaign_id)
logger.info("Looking for Final Assets folder (B1 Global campaign)")
else:
master_folder_id = self._find_master_assets_folder(campaign_id)
logger.info("Looking for Master Assets folder (A1 Local campaign)")
if not master_folder_id:
raise Exception("Assets folder not found (tried {})".format(
"Final Assets" if is_global else "Master Assets"
))
# Recursively get all assets from folder and subfolders
all_assets = self._get_assets_recursive(master_folder_id, folder_path="")
logger.info("Found {} master assets (recursive) in campaign {}".format(len(all_assets), campaign_id))
return all_assets
except Exception as e:
logger.error("Failed to get master assets: {}".format(str(e)))
raise
def _get_assets_recursive(self, folder_id, folder_path=""):
"""
Recursively get all assets from a folder and its subfolders
Args:
folder_id: Folder asset ID to search
folder_path: Current path (e.g., "Subfolder1/Subfolder2")
Returns:
List of assets with 'folder_path' attribute added
"""
assets = []
try:
response = self._make_api_request(
'GET',
"{}/v6/folders/{}/children?load_type=full".format(
self.base_url, folder_id
),
headers={'Accept': 'application/json'}
)
if response.status_code != 200:
logger.warning("Failed to get folder {} children: HTTP {}".format(folder_id, response.status_code))
return assets
data = response.json()
children = data.get('folder_children', {}).get('asset_list', [])
for child in children:
# Check if it's a folder or an asset
asset_type = child.get('asset_type', {})
type_name = asset_type.get('name', '') if isinstance(asset_type, dict) else str(asset_type)
child_name = child.get('name', '')
# Multiple checks to determine if this is a folder:
# 1. Explicit folder type in asset_type name
is_explicit_folder = 'folder' in type_name.lower()
# 2. Check resource_type field (more reliable than asset_type)
resource_type = child.get('resource_type', '')
is_resource_folder = resource_type.lower() in ['folder', 'container']
# 3. Check for known file extensions (common media files)
# If it has a recognized file extension, it's definitely a file, not a folder
_, ext = os.path.splitext(child_name)
ext_lower = ext.lower() if ext else ''
# Known media/document file extensions
known_file_extensions = {
# Video
'.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv', '.webm', '.m4v',
# Audio
'.mp3', '.wav', '.aac', '.flac', '.m4a', '.wma', '.ogg',
# Images
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp', '.svg', '.psd', '.ai', '.eps',
# Documents
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf',
# Archives
'.zip', '.rar', '.7z', '.tar', '.gz',
# Other
'.xml', '.json', '.csv', '.srt', '.vtt'
}
has_known_file_extension = ext_lower in known_file_extensions
# Decision logic:
# - If it has a known file extension, it's definitely a file
# - If it's explicitly marked as a folder/container, it's a folder
# - Otherwise, check if the "extension" looks like a real extension
if has_known_file_extension:
is_folder = False
elif is_explicit_folder or is_resource_folder:
is_folder = True
else:
# Check if this looks like a real file extension or a false positive
# Real extensions: .mp4, .jpg, .pdf (lowercase, no spaces)
# False positives: ". REFERENCE FILES", ". OPEN FILES" (space after period, or uppercase start)
# If there's no extension, it's a folder
if not ext or len(ext) < 2:
is_folder = True
# If extension starts with period + space/uppercase, it's a false positive (folder name)
elif len(ext) > 1 and ext[1] in (' ', '\t') or (len(ext) > 1 and ext[1].isupper()):
is_folder = True
# If extension has spaces in it, it's not a real extension
elif ' ' in ext:
is_folder = True
# Numeric-only extension = version number (e.g. "WND_PCS 2026 2.0"), not a file
elif ext[1:].isdigit():
is_folder = True
else:
# Has an extension-like string, but not in our known list
# Could be an uncommon file type - assume it's a file to be safe
is_folder = False
logger.debug("Item: {} - Type: {} - Resource: {} - Ext: '{}' - Is Folder: {}".format(
child_name, type_name, resource_type, ext_lower, is_folder
))
if is_folder:
# It's a folder (or container) - recurse into it
folder_name = child.get('name', 'Unknown')
# SANITIZE: Use shared utility to handle all Box constraints (slashes, spaces, non-printable)
safe_folder_name = sanitize_box_item_name(folder_name)
subfolder_path = folder_path + "/" + safe_folder_name if folder_path else safe_folder_name
logger.info("Recursing into subfolder: {} (Type: {})".format(subfolder_path, type_name))
sub_assets = self._get_assets_recursive(child['asset_id'], subfolder_path)
assets.extend(sub_assets)
else:
# It's an asset - add it with folder path
child['folder_path'] = folder_path
assets.append(child)
return assets
except Exception as e:
logger.error("Error in recursive asset search for folder {}: {}".format(folder_id, str(e)))
return assets
def is_asset_not_approved(self, asset_data):
"""
Check if asset has ECOMMERCE STATUS = "NOT APPROVED"
Used for A5→A6 rework workflow to filter rejected assets
Args:
asset_data: Complete asset JSON with metadata
Returns:
bool: True if status is "NOT APPROVED", False otherwise
"""
try:
metadata = asset_data.get('metadata', {})
metadata_elements = metadata.get('metadata_element_list', [])
for category in metadata_elements:
for element in category.get('metadata_element_list', []):
if element.get('id') == 'FERRERO.FIELD.ECOMMERCE STATUS':
status_value = self._extract_field_value(element)
if status_value:
# Case-insensitive comparison
return status_value.strip().upper() == 'NOT APPROVED'
return False
except Exception as e:
logger.error("Error checking NOT APPROVED status: {}".format(str(e)))
return False
def extract_rejection_details(self, asset_data):
"""
Extract all rejection comments from a NOT APPROVED asset
Extracts details from Approver, Legal, and IA&CC reviewers
Args:
asset_data: Complete asset JSON with metadata
Returns:
dict with rejection details from all reviewers, or None if not rejected
"""
if not self.is_asset_not_approved(asset_data):
return None
metadata = asset_data.get('metadata', {})
metadata_elements = metadata.get('metadata_element_list', [])
# Initialize rejection details structure
details = {
'asset_id': asset_data.get('asset_id'),
'asset_name': asset_data.get('name'),
'status': 'NOT APPROVED',
'approver': {
'comment': None,
'certifier_name': None,
'date': None
},
'legal': {
'comment': None,
'certifier_name': None,
'date': None
},
'ia_cc': {
'comment': None,
'certifier_name': None,
'date': None
}
}
# Extract all rejection fields
for category in metadata_elements:
for element in category.get('metadata_element_list', []):
field_id = element.get('id', '')
value = self._extract_field_value(element)
# Approver rejection details
if field_id == 'FERRERO.MARKETING.FIELD.CERTIFIER COMMENT':
details['approver']['comment'] = value
elif field_id == 'FERRERO.FIELD.ECOMMERCE CERTIFIER':
details['approver']['certifier_name'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.APPROVAL DATE':
details['approver']['date'] = value
# Legal rejection details
elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL COMMENT':
details['legal']['comment'] = value
elif field_id == 'FERRERO.FIELD.LEGAL CERTIFER': # Note: typo in field ID
details['legal']['certifier_name'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL APPROVAL DATE':
details['legal']['date'] = value
# IA&CC rejection details
elif field_id == 'FERRERO.MARKETING.FIELD.IA CC COMMENT':
details['ia_cc']['comment'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.IA CERTIFIER':
details['ia_cc']['certifier_name'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.IA CC APPROVAL DATE':
details['ia_cc']['date'] = value
return details
def _find_master_assets_folder(self, campaign_id):
"""Find '01. Master Assets' folder in campaign"""
try:
response = self._make_api_request(
'GET',
"{}/v6/folders/{}/children".format(self.base_url, campaign_id)
)
if response.status_code != 200:
return None
data = response.json()
folders = data.get('folder_children', {}).get('asset_list', [])
for folder in folders:
name = folder.get('name', '')
if 'Master' in name and 'Assets' in name:
return folder['asset_id']
return None
except Exception as e:
logger.error("Error finding Master Assets folder: {}".format(str(e)))
return None
def find_final_assets_folder(self, campaign_id):
"""Find '01. Final Assets' folder in campaign"""
try:
response = self._make_api_request(
'GET',
"{}/v6/folders/{}/children".format(self.base_url, campaign_id)
)
if response.status_code != 200:
return None
data = response.json()
folders = data.get('folder_children', {}).get('asset_list', [])
for folder in folders:
name = folder.get('name', '')
if 'Final' in name and 'Assets' in name:
return folder['asset_id']
return None
except Exception as e:
logger.error("Error finding Final Assets folder: {}".format(str(e)))
return None
def download_asset(self, asset_id, output_dir='.'):
"""
Download asset content
Args:
asset_id: Asset ID to download
output_dir: Directory to save file
Returns:
Path to downloaded file
"""
try:
# Get asset metadata first to get filename
metadata_response = self._make_api_request(
'GET',
"{}/v6/assets/{}".format(self.base_url, asset_id)
)
if metadata_response.status_code != 200:
raise Exception("Failed to get asset metadata: HTTP {}".format(
metadata_response.status_code
))
asset_data = metadata_response.json()
asset = asset_data.get('asset_resource', {}).get('asset', asset_data)
filename = asset.get('name', 'download_{}'.format(asset_id))
# Download content
content_response = self._make_api_request(
'GET',
"{}/v6/assets/{}/contents".format(self.base_url, asset_id),
stream=True
)
if content_response.status_code != 200:
raise Exception("Download failed: HTTP {}".format(content_response.status_code))
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save file
filepath = os.path.join(output_dir, filename)
with open(filepath, 'wb') as f:
for chunk in content_response.iter_content(chunk_size=8192):
f.write(chunk)
file_size = os.path.getsize(filepath)
logger.info("Downloaded: {} ({} bytes)".format(filename, file_size))
return filepath
except Exception as e:
logger.error("Failed to download asset {}: {}".format(asset_id, str(e)))
raise
def upload_asset(self, file_path, folder_id, asset_representation, video_metadata=None):
"""
Upload asset to DAM
Args:
file_path: Path to file to upload
folder_id: Parent folder ID
asset_representation: Asset metadata JSON structure
video_metadata: Optional video metadata (width, height, etc.)
Returns:
dict with success, asset_id, http_code
"""
try:
filename = os.path.basename(file_path)
mime_type = self._get_mime_type(file_path)
# Build upload manifest
file_info = {
'file_name': filename,
'file_type': mime_type
}
# Add video dimensions if available
if video_metadata:
if video_metadata.get('width', 0) > 0:
file_info['width'] = video_metadata['width']
if video_metadata.get('height', 0) > 0:
file_info['height'] = video_metadata['height']
upload_manifest = {
'upload_manifest': {
'master_files': [
{'file': file_info}
]
}
}
# Prepare multipart upload
import json
files = {
'files': (filename, open(file_path, 'rb'), mime_type)
}
data = {
'asset_representation': json.dumps(asset_representation),
'parent_folder_id': folder_id,
'manifest': json.dumps(upload_manifest)
}
# Log asset representation being sent
logger.info("Uploading: {}".format(filename))
logger.info(" Parent Folder ID: {}".format(folder_id))
logger.info("=" * 60)
logger.info("FULL ASSET REPRESENTATION (JSON):")
logger.info("=" * 60)
logger.info(json.dumps(asset_representation, indent=2))
logger.info("=" * 60)
logger.info(" Summary:")
logger.info(" Model ID: {}".format(asset_representation.get('metadata_model_id', 'N/A')))
logger.info(" Security Policies: {}".format(len(asset_representation.get('security_policy_list', []))))
if 'metadata' in asset_representation:
metadata_fields = asset_representation['metadata'].get('metadata_element_list', [])
total_fields = sum(len(cat.get('metadata_element_list', [])) for cat in metadata_fields)
logger.info(" Metadata Fields: {}".format(total_fields))
response = self._make_api_request(
'POST',
"{}/v6/assets".format(self.base_url),
data=data,
files=files
)
# Close file
files['files'][1].close()
http_code = response.status_code
if http_code in [201, 202]:
result_data = response.json()
asset_id = None
job_id = None
if 'asset_resource_list' in result_data:
# Direct response with asset ID
asset_id = result_data['asset_resource_list']['asset_resource'][0]['asset']['asset_id']
logger.info("Upload successful (immediate): {} → Asset ID: {}".format(filename, asset_id))
elif 'job_handle' in result_data:
# Async job response - store job_id, don't poll (too slow!)
job_id = result_data['job_handle'].get('job_id')
logger.info("Upload accepted (async): {} → Job ID: {}".format(filename, job_id))
logger.info("Note: Job processing in background. Asset ID can be found later by searching folder.")
# Use job_id temporarily (can be updated later by searching folder)
asset_id = job_id
return {
'success': True,
'asset_id': asset_id,
'job_id': job_id,
'http_code': http_code,
'is_async': job_id is not None
}
else:
error_msg = "Upload failed: HTTP {}".format(http_code)
if response.text:
try:
error_data = response.json()
if 'exception_body' in error_data:
error_msg = error_data['exception_body'].get('message', error_msg)
except:
pass
logger.error(error_msg)
return {
'success': False,
'error': error_msg,
'http_code': http_code
}
except Exception as e:
logger.error("Upload exception: {}".format(str(e)))
return {
'success': False,
'error': str(e),
'http_code': 0
}
def find_asset_by_filename_in_folder(self, folder_id, filename):
"""
Find asset in folder by filename (useful after async upload)
Args:
folder_id: Folder ID to search
filename: Filename to find
Returns:
str: Asset ID if found, None otherwise
"""
try:
response = self._make_api_request(
'GET',
"{}/v6/folders/{}/children?load_type=full".format(self.base_url, folder_id),
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
data = response.json()
assets = data.get('folder_children', {}).get('asset_list', [])
for asset in assets:
if asset.get('name') == filename:
asset_id = asset.get('asset_id')
logger.info("Found uploaded asset: {} → Asset ID: {}".format(filename, asset_id))
return asset_id
logger.warning("Asset not found in folder: {}".format(filename))
return None
else:
logger.warning("Failed to search folder: HTTP {}".format(response.status_code))
return None
except Exception as e:
logger.error("Error searching for asset: {}".format(str(e)))
return None
def _poll_job_for_asset_id(self, job_id, filename, max_attempts=10, delay=2):
"""
Poll job status to get actual asset ID after async upload
Args:
job_id: Job ID from upload response
filename: Filename being uploaded (for logging)
max_attempts: Maximum polling attempts (default 10)
delay: Seconds between attempts (default 2)
Returns:
str: Asset ID if found, None otherwise
"""
import time as time_module
for attempt in range(max_attempts):
try:
# Wait before polling
if attempt > 0:
time_module.sleep(delay)
# Get job status
response = self._make_api_request(
'GET',
"{}/v6/jobs/{}".format(self.base_url, job_id)
)
if response.status_code == 200:
job_data = response.json()
# Check job status
status = job_data.get('job', {}).get('status', {}).get('status', 'unknown')
logger.info(" Job status (attempt {}): {}".format(attempt + 1, status))
# If completed, look for asset ID
if status.lower() in ['completed', 'success', 'done']:
# Try to find asset ID in response
asset_id = None
# Check various locations where asset ID might be
if 'job' in job_data:
job = job_data['job']
# Check result
if 'result' in job:
result = job['result']
if isinstance(result, dict):
asset_id = result.get('asset_id') or result.get('created_asset_id')
# Check created_assets
if not asset_id and 'created_assets' in job:
created = job['created_assets']
if isinstance(created, list) and len(created) > 0:
asset_id = created[0].get('asset_id')
elif isinstance(created, dict):
asset_id = created.get('asset_id')
# Check asset_list
if not asset_id and 'asset_list' in job:
assets = job['asset_list']
if isinstance(assets, list) and len(assets) > 0:
asset_id = assets[0].get('asset_id')
return asset_id
elif status.lower() in ['failed', 'error']:
logger.error("Job failed: {}".format(job_data.get('job', {}).get('status', {}).get('message', 'Unknown error')))
return None
# Still running, continue polling
else:
logger.warning("Job status check failed: HTTP {}".format(response.status_code))
except Exception as e:
logger.warning("Job polling error (attempt {}): {}".format(attempt + 1, str(e)))
logger.warning("Max polling attempts reached ({}) for job {}".format(max_attempts, job_id))
return None
def update_campaign_status(self, campaign_id, new_status):
"""
Update Content Scaling Status field
Args:
campaign_id: Campaign folder ID
new_status: New status value (A2, A3, etc.)
Returns:
dict with success boolean
"""
try:
payload = {
"edited_folder": {
"data": {
"metadata": [
{
"id": "CONTENT.SCALING.STATUS",
"type": "com.artesia.metadata.MetadataField",
"value": {
"cascading_domain_value": False,
"domain_value": True,
"value": {
"type": "string",
"value": new_status
}
}
}
]
}
}
}
response = self._make_api_request(
'PATCH',
"{}/v6/folders/{}?lock_strategy=optimistic".format(
self.base_url, campaign_id
),
json=payload,
headers={
'Content-Type': 'application/json',
'Accept': 'application/json'
}
)
if response.status_code == 200:
logger.info("Updated campaign {} status to {}".format(campaign_id, new_status))
return {'success': True}
else:
error_msg = "Status update failed: HTTP {}".format(response.status_code)
logger.error(error_msg)
return {'success': False, 'error': error_msg}
except Exception as e:
logger.error("Status update exception: {}".format(str(e)))
return {'success': False, 'error': str(e)}
def update_folder_scaling_status(self, folder_id, scaling_status):
"""
Update folder content scaling status via folders_scaling_update API
Args:
folder_id: Folder ID to update
scaling_status: New scaling status (A3, B2, etc.)
Returns:
dict with success boolean and response data
"""
try:
payload = {
"folderId": folder_id,
"contentScalingStatus": scaling_status
}
logger.info("Calling folders_scaling_update API for folder {} with status {}".format(
folder_id, scaling_status
))
response = self._make_api_request(
'POST',
"{}/v6/folders_scaling_update".format(self.base_url),
json=payload,
headers={
'Content-Type': 'application/json',
'Accept': 'application/json'
}
)
if response.status_code in [200, 201, 202]:
logger.info("Folder scaling status update successful: HTTP {}".format(response.status_code))
return {'success': True, 'status_code': response.status_code, 'response': response.json() if response.text else {}}
else:
error_msg = "Folder scaling update failed: HTTP {} - {}".format(
response.status_code,
response.text[:200] if response.text else 'No response'
)
logger.error(error_msg)
return {'success': False, 'error': error_msg, 'status_code': response.status_code}
except Exception as e:
logger.error("Folder scaling update exception: {}".format(str(e)))
return {'success': False, 'error': str(e)}
def test_connection(self):
"""Test DAM connection with current auth method"""
try:
if self.auth_mode in ['mtls', 'mtls_v2']:
# Test mTLS/Hybrid by getting OAuth token
# For mtls_v2, this will trigger _get_oauth_token_via_mtls()
# For mtls, we just verify the certificate exists
token = self.get_access_token()
return token is not None
else:
# Test OAuth2 by getting token
token = self.get_access_token()
return token is not None
except Exception as e:
logger.error("Connection test failed: {}".format(str(e)))
return False
def _get_mime_type(self, file_path):
"""Get MIME type for file"""
import mimetypes
mime_type, _ = mimetypes.guess_type(file_path)
return mime_type or 'application/octet-stream'
def register_master_asset_id_domain_value(self, master_asset_id):
"""
Register a master asset ID in the FERRERO_MASTER_ASSET_ID lookup domain.
Required in PPR environment before using the ID in asset creation.
The OpenText API does not support creating new domain values during asset
creation, so this must be called before the create asset API.
Args:
master_asset_id: The master asset ID to register
Returns:
dict with success, http_code, and optional error
"""
# Only for PPR environment
if 'ppr' not in self.base_url.lower():
return {'success': True, 'skipped': True, 'reason': 'Not PPR environment'}
try:
payload = {
"domain_value_resource": {
"domain_value": {
"description": master_asset_id,
"display_value": master_asset_id,
"field_value": {
"type": "string",
"value": master_asset_id
}
}
}
}
logger.info("PPR: Registering master asset ID '{}' in lookup domain...".format(master_asset_id))
response = self._make_api_request(
'POST',
"{}/v6/lookupdomains/FERRERO_MASTER_ASSET_ID/lookupvalues".format(self.base_url),
json=payload,
headers={
'Content-Type': 'application/json',
'Accept': 'application/json'
}
)
# Success cases
if response.status_code in [200, 201, 202]:
logger.info("PPR: Master asset ID '{}' registered successfully".format(master_asset_id))
return {
'success': True,
'http_code': response.status_code,
'already_existed': False
}
# Already exists - OpenText returns 409 OR 500 with "duplicate code" message
if response.status_code == 409:
logger.info("PPR: Master asset ID '{}' already exists in lookup domain".format(master_asset_id))
return {
'success': True,
'http_code': response.status_code,
'already_existed': True
}
# Check for duplicate error in 500 response (OpenText quirk)
if response.status_code == 500:
try:
error_data = response.json()
error_msg = error_data.get('exception_body', {}).get('message', '')
if 'duplicate' in error_msg.lower():
logger.info("PPR: Master asset ID '{}' already exists in lookup domain".format(master_asset_id))
return {
'success': True,
'http_code': response.status_code,
'already_existed': True
}
except:
pass
# Actual failure
error_msg = "Failed to register master asset ID '{}': HTTP {} - {}".format(
master_asset_id,
response.status_code,
response.text[:200] if response.text else 'No response'
)
logger.warning(error_msg)
return {
'success': False,
'http_code': response.status_code,
'error': error_msg
}
except Exception as e:
error_msg = "Exception registering master asset ID '{}': {}".format(master_asset_id, str(e))
logger.error(error_msg)
return {
'success': False,
'error': error_msg
}
def register_master_asset_ids_for_ppr(self, master_asset_ids):
"""
Register all master asset IDs in the lookup domain.
Call this before creating an asset that references these IDs.
The OpenText DAM API does not support creating new domain values during
asset creation. We must first add each master asset ID to the
FERRERO_MASTER_ASSET_ID domain value table before the create asset call.
Args:
master_asset_ids: List of master asset IDs to register
Returns:
dict with success, registered_ids, failed_ids
"""
if not master_asset_ids:
return {'success': True, 'registered_ids': [], 'failed_ids': []}
logger.info("=" * 60)
logger.info("Registering {} master asset ID(s) in lookup domain".format(len(master_asset_ids)))
logger.info(" IDs: {}".format(', '.join(master_asset_ids)))
logger.info("=" * 60)
registered = []
failed = []
for master_id in master_asset_ids:
result = self.register_master_asset_id_domain_value(master_id)
if result.get('success'):
registered.append(master_id)
else:
failed.append({'id': master_id, 'error': result.get('error')})
logger.info("Domain registration complete - {}/{} succeeded".format(
len(registered), len(master_asset_ids)))
if failed:
logger.warning("Failed to register: {}".format(
', '.join([f['id'] for f in failed])))
# Return success even if some failed (better to try the upload and see)
return {
'success': len(failed) == 0,
'registered_ids': registered,
'failed_ids': failed
}
def get_or_create_subfolder_path(self, base_folder_id, subfolder_path):
"""
Create or find subfolder structure in DAM matching Box structure
Args:
base_folder_id: Base folder (e.g., "01. Final Assets")
subfolder_path: Path like "Europe/Germany"
Returns:
str: Folder ID of the deepest subfolder
Example:
Base folder: "89fa..." (01. Final Assets)
Path: "Europe/Germany"
Creates: 01. Final Assets/Europe/Germany
Returns: ID of "Germany" folder
"""
if not subfolder_path:
return base_folder_id
# Split path into parts
parts = subfolder_path.split('/')
current_folder_id = base_folder_id
for folder_name in parts:
logger.info("Looking for/creating folder: {}".format(folder_name))
# Check if exists
existing = self._find_subfolder_by_name(current_folder_id, folder_name)
if existing:
current_folder_id = existing
logger.info("Found existing folder: {} (ID: {})".format(folder_name, current_folder_id))
else:
# Folder doesn't exist - DAM doesn't allow folder creation via API
# Upload to parent folder instead
logger.warning("Folder '{}' not found in DAM. DAM does not allow folder creation. Files will be uploaded to parent folder.".format(folder_name))
return current_folder_id # Return current parent folder instead of trying to create
return current_folder_id
def _find_subfolder_by_name(self, parent_folder_id, folder_name):
"""Find subfolder by name, return ID or None"""
try:
# Get folder contents with full details (load_type=full returns folder objects with names)
url = '{}/v6/folders/{}/children?load_type=full'.format(self.base_url, parent_folder_id)
response = self._make_api_request('GET', url)
if response.status_code == 200:
data = response.json()
# Debug: Log response structure
logger.info("Folder children response keys: {}".format(data.keys()))
# Get folder_children and check its type
folder_children = data.get('folder_children')
logger.info("folder_children type: {}".format(type(folder_children).__name__))
# If it's a dict, it might have folder objects as values or nested structure
if isinstance(folder_children, dict):
logger.info("folder_children is a dict with keys: {}".format(folder_children.keys()))
# Try to get the actual list of folders
# Common patterns: {'folder_list': [...]} or {'folder': [...]} or {'folders': [...]}
if 'folder_list' in folder_children:
items = folder_children['folder_list']
elif 'folder' in folder_children:
items = folder_children['folder']
elif 'folders' in folder_children:
items = folder_children['folders']
else:
# If keys are folder IDs, values are folder objects
items = list(folder_children.values())
# If first value is a list, extract it
if items and isinstance(items[0], list):
items = items[0]
elif isinstance(folder_children, list):
items = folder_children
else:
items = []
logger.info("Found {} items in folder".format(len(items)))
# Debug: Log first item structure
if items and len(items) > 0:
logger.info("First item type: {}".format(type(items[0]).__name__))
if isinstance(items[0], dict):
logger.info("First item keys: {}".format(items[0].keys()))
logger.info("First item name: {}".format(items[0].get('name', items[0].get('folder_name', 'N/A'))))
# Look for folders
for item in items:
if not isinstance(item, dict):
continue
# Check different possible field names for type and name
item_type = item.get('type', item.get('resource_type', item.get('object_type')))
item_name = item.get('name', item.get('folder_name', ''))
logger.info("Checking item: type='{}', name='{}'".format(item_type, item_name))
# Check if this is a folder/container
is_folder = (
item_type == 'folder' or
item_type == 'com.artesia.container.Container' or
(item_type and 'container' in item_type.lower())
)
if is_folder and item_name and item_name.lower() == folder_name.lower():
folder_id = item.get('container_id', item.get('id', item.get('folder_id', item.get('resource_id'))))
logger.info("Found matching folder: '{}' (ID: {})".format(item_name, folder_id))
return folder_id
return None
except Exception as e:
logger.warning("Error finding subfolder: {}".format(str(e)))
return None
def _create_folder(self, parent_folder_id, folder_name):
"""
Create folder in DAM, return new folder ID
NOTE: OTMM v6 API may not support dynamic folder creation.
If folder creation fails, folders may need to be pre-created in DAM UI.
"""
try:
url = '{}/v6/folders/{}/children'.format(self.base_url, parent_folder_id)
payload = {
'name': folder_name,
'type': 'folder'
}
response = self._make_api_request('POST', url, json=payload)
if response.status_code in [200, 201]:
data = response.json()
folder_id = data.get('id') or data.get('container_id') or data.get('folder_id')
logger.info("Successfully created folder '{}' (ID: {})".format(folder_name, folder_id))
return folder_id
elif response.status_code == 405:
logger.warning("DAM v6 API does not support folder creation (HTTP 405)")
logger.warning("Folder '{}' must be pre-created manually in DAM UI".format(folder_name))
logger.warning("Files will be uploaded to parent folder instead")
return None
else:
logger.error("Failed to create folder: HTTP {} - {}".format(
response.status_code,
response.text[:200] if response.text else 'No response'
))
return None
except Exception as e:
logger.error("Error creating folder: {}".format(str(e)))
return None