Adds session management for mTLS to handle "No session exists" errors. UPDATES: - Added session storage in __init__ for mTLS mode - Updated _make_api_request to use requests.Session with mTLS - Session persists certificate and cookies across requests - Added OTDSTicket cookie handling CURRENT STATUS: ✓ Certificate loads successfully ✓ Connection test passes ⚠️ Search campaigns returns HTTP 401 "No session exists" This suggests mTLS may need: 1. Different API endpoints than OAuth2 2. Additional session initialization step 3. Specific headers or authentication flow 4. Contact DAM API team for mTLS documentation OAuth2 remains default and fully functional. Use --auth-pfx flag to test mTLS when ready. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
873 lines
32 KiB
Python
873 lines
32 KiB
Python
"""
|
|
DAM Client - OpenText DAM API Integration
|
|
Handles OAuth2 and mTLS certificate authentication
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import logging
|
|
import os
|
|
import urllib3
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
# Disable SSL warnings when verify=False
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
logger = logging.getLogger('DAMClient')
|
|
|
|
@contextmanager
|
|
def pfx_to_pem(pfx_path, pfx_password):
|
|
"""
|
|
Convert PFX certificate to temporary PEM file for requests library
|
|
|
|
Args:
|
|
pfx_path: Path to PFX/P12 certificate file
|
|
pfx_password: Certificate password
|
|
|
|
Yields:
|
|
str: Path to temporary PEM file
|
|
"""
|
|
try:
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
|
|
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
|
|
|
|
# Load PFX file
|
|
pfx = Path(pfx_path).read_bytes()
|
|
private_key, main_cert, add_certs = load_key_and_certificates(
|
|
pfx,
|
|
pfx_password.encode('utf-8') if pfx_password else None,
|
|
None
|
|
)
|
|
|
|
# Create temporary PEM file
|
|
with NamedTemporaryFile(suffix='.pem', delete=False) as t_pem:
|
|
with open(t_pem.name, 'wb') as pem_file:
|
|
# Write private key
|
|
pem_file.write(private_key.private_bytes(
|
|
Encoding.PEM,
|
|
PrivateFormat.PKCS8,
|
|
NoEncryption()
|
|
))
|
|
# Write main certificate
|
|
pem_file.write(main_cert.public_bytes(Encoding.PEM))
|
|
# Write additional certificates in chain
|
|
for cert in add_certs:
|
|
pem_file.write(cert.public_bytes(Encoding.PEM))
|
|
|
|
yield t_pem.name
|
|
|
|
# Cleanup temporary file
|
|
try:
|
|
os.unlink(t_pem.name)
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to convert PFX to PEM: {}".format(str(e)))
|
|
raise
|
|
|
|
class DAMClient:
|
|
def __init__(self, config, use_mtls=False):
|
|
self.base_url = config['dam']['base_url'].rstrip('/')
|
|
self.timeout = config['dam']['timeout_seconds']
|
|
self.use_mtls = use_mtls
|
|
|
|
if self.use_mtls:
|
|
# mTLS Certificate Authentication
|
|
self.mtls_cert_path = config['dam'].get('mtls_cert_path')
|
|
self.mtls_cert_password = config['dam'].get('mtls_cert_password')
|
|
|
|
if not self.mtls_cert_path or not os.path.exists(self.mtls_cert_path):
|
|
raise Exception("mTLS cert path not configured or file not found: {}".format(self.mtls_cert_path))
|
|
|
|
# Session handling for mTLS
|
|
self.session = None
|
|
self.session_token = None
|
|
|
|
logger.info("🔒 Using mTLS certificate authentication")
|
|
logger.info("Certificate: {}".format(self.mtls_cert_path))
|
|
|
|
else:
|
|
# OAuth2 Authentication (default)
|
|
self.auth_url = config['dam']['auth_url']
|
|
self.client_id = config['dam']['client_id']
|
|
self.client_secret = config['dam']['client_secret']
|
|
self.access_token = None
|
|
self.token_expiry = 0
|
|
|
|
logger.info("🔑 Using OAuth2 authentication")
|
|
|
|
def get_access_token(self):
|
|
"""Get OAuth2 access token with auto-refresh"""
|
|
if self.access_token and time.time() < self.token_expiry:
|
|
return self.access_token
|
|
|
|
return self._request_new_token()
|
|
|
|
def _request_new_token(self):
|
|
"""Request new OAuth2 token"""
|
|
try:
|
|
# Debug logging
|
|
logger.debug("Requesting OAuth token from: {}".format(self.auth_url))
|
|
logger.debug("Client ID: {}".format(self.client_id))
|
|
logger.debug("Client Secret length: {}".format(len(self.client_secret)))
|
|
|
|
response = requests.post(
|
|
self.auth_url,
|
|
data={
|
|
'grant_type': 'client_credentials',
|
|
'client_id': self.client_id,
|
|
'client_secret': self.client_secret
|
|
},
|
|
headers={
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Accept': 'application/json'
|
|
},
|
|
verify=False, # Disable SSL verification like PHP version
|
|
timeout=30
|
|
)
|
|
|
|
logger.debug("OAuth response: HTTP {}".format(response.status_code))
|
|
logger.debug("Response body: {}".format(response.text[:200]))
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
self.access_token = data['access_token']
|
|
# Set expiry with 60 second buffer
|
|
expires_in = data.get('expires_in', 3600)
|
|
self.token_expiry = time.time() + expires_in - 60
|
|
|
|
logger.info("OAuth token obtained, expires in {}s".format(expires_in))
|
|
return self.access_token
|
|
else:
|
|
raise Exception("OAuth failed: HTTP {} - {}".format(
|
|
response.status_code, response.text
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get OAuth token: {}".format(str(e)))
|
|
raise
|
|
|
|
def _get_mtls_session(self):
|
|
"""
|
|
Get or create mTLS session with certificate
|
|
Sessions maintain authentication state between requests
|
|
|
|
Returns:
|
|
requests.Session object with certificate configured
|
|
"""
|
|
if self.session is None:
|
|
# Create new session with certificate
|
|
import requests as requests_lib
|
|
|
|
self.session = requests_lib.Session()
|
|
|
|
# Convert PFX to PEM and configure session
|
|
# Note: We need to keep the PEM file for the session lifetime
|
|
with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert:
|
|
self.session.cert = cert
|
|
self.session.verify = True
|
|
|
|
# Try to establish session with login endpoint
|
|
try:
|
|
login_url = self.base_url.replace('/otmmapi', '/otdsws/login')
|
|
logger.info("Attempting mTLS session login: {}".format(login_url))
|
|
|
|
# Try session login
|
|
response = self.session.get(login_url, timeout=self.timeout)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("mTLS session established successfully")
|
|
# Store any session token/cookie
|
|
if 'OTDSTicket' in response.cookies:
|
|
self.session_token = response.cookies['OTDSTicket']
|
|
logger.info("Session ticket obtained")
|
|
else:
|
|
logger.warning("Session login returned HTTP {}, continuing anyway".format(response.status_code))
|
|
|
|
except Exception as e:
|
|
logger.warning("Session login failed: {}, will try direct API access".format(str(e)))
|
|
|
|
return self.session
|
|
|
|
def _make_api_request(self, method, url, **kwargs):
|
|
"""
|
|
Make API request with appropriate authentication (OAuth2 or mTLS)
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, etc.)
|
|
url: Full URL to request
|
|
**kwargs: Additional arguments for requests library
|
|
|
|
Returns:
|
|
requests.Response object
|
|
"""
|
|
if self.use_mtls:
|
|
# mTLS Certificate Authentication with session
|
|
with pfx_to_pem(self.mtls_cert_path, self.mtls_cert_password) as cert:
|
|
# Use requests.Session for persistent cert/cookies
|
|
import requests as requests_lib
|
|
session = requests_lib.Session()
|
|
session.cert = cert
|
|
session.verify = True
|
|
|
|
# Add session token if we have one
|
|
if self.session_token:
|
|
if 'cookies' not in kwargs:
|
|
kwargs['cookies'] = {}
|
|
kwargs['cookies']['OTDSTicket'] = self.session_token
|
|
|
|
return session.request(
|
|
method,
|
|
url,
|
|
timeout=kwargs.pop('timeout', self.timeout),
|
|
**kwargs
|
|
)
|
|
else:
|
|
# OAuth2 Authentication (default - UNTOUCHED)
|
|
token = self.get_access_token()
|
|
|
|
# Add Authorization header
|
|
if 'headers' not in kwargs:
|
|
kwargs['headers'] = {}
|
|
kwargs['headers']['Authorization'] = 'Bearer {}'.format(token)
|
|
|
|
return requests.request(
|
|
method,
|
|
url,
|
|
verify=False, # Disable SSL verification for OAuth2 (like current)
|
|
timeout=kwargs.pop('timeout', self.timeout),
|
|
**kwargs
|
|
)
|
|
|
|
def search_campaigns(self, status=None, campaign_type='Local Adaptation'):
|
|
"""
|
|
Search for campaigns with optional status filter
|
|
|
|
Args:
|
|
status: Content Scaling Status (A1, A2, B1, B2, etc.) or None for all
|
|
campaign_type: 'Local Adaptation' for A-series or 'Global comm' for B-series
|
|
|
|
Returns:
|
|
List of campaign dictionaries
|
|
"""
|
|
try:
|
|
import json as json_module
|
|
import urllib.parse
|
|
|
|
# Build search condition (like Postman collection)
|
|
search_condition = {
|
|
"search_condition_list": {
|
|
"search_condition": [
|
|
{
|
|
"type": "com.artesia.search.SearchScalarCondition",
|
|
"metadata_field_id": "ARTESIA.FIELD.CONTAINER TYPE NAME",
|
|
"relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS",
|
|
"value": "GLOBALCAMPAING",
|
|
"left_paren": "(",
|
|
"right_paren": ")"
|
|
},
|
|
{
|
|
"type": "com.artesia.search.SearchScalarCondition",
|
|
"metadata_field_id": "FERRERO.FIELD.CAMPAIGN TYPE",
|
|
"relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS",
|
|
"value": campaign_type, # 'Local Adaptation' or 'Global comm'
|
|
"relational_operator": "and"
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
# URL encode search condition
|
|
search_condition_str = json_module.dumps(search_condition)
|
|
search_condition_encoded = urllib.parse.quote(search_condition_str)
|
|
|
|
# Use GET with query parameters (matching Postman)
|
|
url = "{}/v6/search/text?load_type=metadata&search_config_id=18&search_condition_list={}".format(
|
|
self.base_url,
|
|
search_condition_encoded
|
|
)
|
|
|
|
response = self._make_api_request(
|
|
'GET',
|
|
url,
|
|
headers={'Accept': 'application/json'}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception("Search failed: HTTP {} - {}".format(
|
|
response.status_code, response.text[:200]
|
|
))
|
|
|
|
data = response.json()
|
|
all_campaigns_raw = []
|
|
|
|
# Extract asset list from response
|
|
if 'search_result_resource' in data:
|
|
# Try asset_list first (like PHP version)
|
|
all_campaigns_raw = data['search_result_resource'].get('asset_list', [])
|
|
|
|
# Fallback to search_result_element_list
|
|
if not all_campaigns_raw:
|
|
results = data['search_result_resource'].get('search_result', {}).get('search_result_element_list', [])
|
|
all_campaigns_raw = [r.get('resource', {}) for r in results]
|
|
|
|
logger.info("Found {} total campaigns in search".format(len(all_campaigns_raw)))
|
|
|
|
# Extract campaign info and filter by status
|
|
all_campaigns = []
|
|
for asset in all_campaigns_raw:
|
|
campaign = self._extract_campaign_info(asset)
|
|
|
|
# Debug log the status
|
|
logger.debug("Campaign: {} - Status: {}".format(
|
|
campaign.get('campaign_name', 'Unknown'),
|
|
campaign.get('status', 'NO STATUS')
|
|
))
|
|
|
|
# Filter by status if provided
|
|
if status is None or campaign.get('status') == status:
|
|
all_campaigns.append(campaign)
|
|
|
|
logger.info("Found {} campaigns{}".format(
|
|
len(all_campaigns),
|
|
" with status {}".format(status) if status else ""
|
|
))
|
|
|
|
return all_campaigns
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to search campaigns: {}".format(str(e)))
|
|
raise
|
|
|
|
def _extract_campaign_info(self, asset):
|
|
"""Extract campaign information from asset data"""
|
|
campaign = {
|
|
'asset_id': asset.get('asset_id'),
|
|
'campaign_name': asset.get('name'), # Try name field first
|
|
'campaign_id': None,
|
|
'status': None,
|
|
'brand': None,
|
|
'market': None
|
|
}
|
|
|
|
# Extract metadata fields
|
|
if 'metadata' in asset and 'metadata_element_list' in asset['metadata']:
|
|
for category in asset['metadata']['metadata_element_list']:
|
|
if 'metadata_element_list' in category:
|
|
for field in category['metadata_element_list']:
|
|
field_id = field.get('id')
|
|
value = self._extract_field_value(field)
|
|
|
|
if field_id == 'ARTESIA.FIELD.NAME' or field_id == 'INER_NAME_GENERIC':
|
|
campaign['campaign_name'] = value
|
|
elif field_id == 'FERRERO.FIELD.CAMPAIGN ID' or field_id == 'FERRERO.FIELD.CAMPAIGN_ID':
|
|
campaign['campaign_id'] = value
|
|
elif field_id == 'CONTENT.SCALING.STATUS':
|
|
campaign['status'] = value
|
|
elif field_id == 'FERRERO.FIELD.CAMPAIGN_BRAND':
|
|
campaign['brand'] = value
|
|
elif field_id == 'FERRERO.FIELD.CAMPAIGN_MARKET':
|
|
campaign['market'] = value
|
|
|
|
return campaign
|
|
|
|
def _extract_field_value(self, field):
|
|
"""Extract value from field structure"""
|
|
if 'value' in field:
|
|
val = field['value']
|
|
if isinstance(val, dict):
|
|
if 'value' in val and isinstance(val['value'], dict):
|
|
if 'value' in val['value']:
|
|
return val['value']['value']
|
|
elif 'field_value' in val['value'] and 'value' in val['value']['field_value']:
|
|
return val['value']['field_value']['value']
|
|
return None
|
|
|
|
def get_master_assets(self, campaign_id, is_global=False):
|
|
"""
|
|
Get master assets from Master Assets folder (A1) or Final Assets folder (B1)
|
|
Searches recursively through subfolders
|
|
|
|
Args:
|
|
campaign_id: Campaign folder asset ID
|
|
is_global: True for B1 Global campaigns (use Final Assets), False for A1 Local (use Master Assets)
|
|
|
|
Returns:
|
|
List of asset dictionaries with full metadata, including 'folder_path' for subfolder structure
|
|
"""
|
|
try:
|
|
# B1 Global campaigns use "05. Final Assets" folder
|
|
# A1 Local campaigns use "01. Master Assets" folder
|
|
if is_global:
|
|
master_folder_id = self.find_final_assets_folder(campaign_id)
|
|
logger.info("Looking for Final Assets folder (B1 Global campaign)")
|
|
else:
|
|
master_folder_id = self._find_master_assets_folder(campaign_id)
|
|
logger.info("Looking for Master Assets folder (A1 Local campaign)")
|
|
|
|
if not master_folder_id:
|
|
raise Exception("Assets folder not found (tried {})".format(
|
|
"Final Assets" if is_global else "Master Assets"
|
|
))
|
|
|
|
# Recursively get all assets from folder and subfolders
|
|
all_assets = self._get_assets_recursive(master_folder_id, folder_path="")
|
|
|
|
logger.info("Found {} master assets (recursive) in campaign {}".format(len(all_assets), campaign_id))
|
|
|
|
return all_assets
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get master assets: {}".format(str(e)))
|
|
raise
|
|
|
|
def _get_assets_recursive(self, folder_id, folder_path=""):
|
|
"""
|
|
Recursively get all assets from a folder and its subfolders
|
|
|
|
Args:
|
|
folder_id: Folder asset ID to search
|
|
folder_path: Current path (e.g., "Subfolder1/Subfolder2")
|
|
|
|
Returns:
|
|
List of assets with 'folder_path' attribute added
|
|
"""
|
|
assets = []
|
|
|
|
try:
|
|
response = self._make_api_request(
|
|
'GET',
|
|
"{}/v6/folders/{}/children?load_type=full".format(
|
|
self.base_url, folder_id
|
|
),
|
|
headers={'Accept': 'application/json'}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
logger.warning("Failed to get folder {} children: HTTP {}".format(folder_id, response.status_code))
|
|
return assets
|
|
|
|
data = response.json()
|
|
children = data.get('folder_children', {}).get('asset_list', [])
|
|
|
|
for child in children:
|
|
# Check if it's a folder or an asset
|
|
asset_type = child.get('asset_type', {})
|
|
type_name = asset_type.get('name', '') if isinstance(asset_type, dict) else str(asset_type)
|
|
|
|
if type_name.lower() == 'folder':
|
|
# It's a folder - recurse into it
|
|
folder_name = child.get('name', 'Unknown')
|
|
subfolder_path = folder_path + "/" + folder_name if folder_path else folder_name
|
|
|
|
logger.info("Recursing into subfolder: {}".format(subfolder_path))
|
|
sub_assets = self._get_assets_recursive(child['asset_id'], subfolder_path)
|
|
assets.extend(sub_assets)
|
|
else:
|
|
# It's an asset - add it with folder path
|
|
child['folder_path'] = folder_path
|
|
assets.append(child)
|
|
|
|
return assets
|
|
|
|
except Exception as e:
|
|
logger.error("Error in recursive asset search for folder {}: {}".format(folder_id, str(e)))
|
|
return assets
|
|
|
|
def is_asset_not_approved(self, asset_data):
|
|
"""
|
|
Check if asset has ECOMMERCE STATUS = "NOT APPROVED"
|
|
Used for A5→A6 rework workflow to filter rejected assets
|
|
|
|
Args:
|
|
asset_data: Complete asset JSON with metadata
|
|
|
|
Returns:
|
|
bool: True if status is "NOT APPROVED", False otherwise
|
|
"""
|
|
try:
|
|
metadata = asset_data.get('metadata', {})
|
|
metadata_elements = metadata.get('metadata_element_list', [])
|
|
|
|
for category in metadata_elements:
|
|
for element in category.get('metadata_element_list', []):
|
|
if element.get('id') == 'FERRERO.FIELD.ECOMMERCE STATUS':
|
|
status_value = self._extract_field_value(element)
|
|
|
|
if status_value:
|
|
# Case-insensitive comparison
|
|
return status_value.strip().upper() == 'NOT APPROVED'
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error("Error checking NOT APPROVED status: {}".format(str(e)))
|
|
return False
|
|
|
|
def extract_rejection_details(self, asset_data):
|
|
"""
|
|
Extract all rejection comments from a NOT APPROVED asset
|
|
Extracts details from Approver, Legal, and IA&CC reviewers
|
|
|
|
Args:
|
|
asset_data: Complete asset JSON with metadata
|
|
|
|
Returns:
|
|
dict with rejection details from all reviewers, or None if not rejected
|
|
"""
|
|
if not self.is_asset_not_approved(asset_data):
|
|
return None
|
|
|
|
metadata = asset_data.get('metadata', {})
|
|
metadata_elements = metadata.get('metadata_element_list', [])
|
|
|
|
# Initialize rejection details structure
|
|
details = {
|
|
'asset_id': asset_data.get('asset_id'),
|
|
'asset_name': asset_data.get('name'),
|
|
'status': 'NOT APPROVED',
|
|
'approver': {
|
|
'comment': None,
|
|
'certifier_name': None,
|
|
'date': None
|
|
},
|
|
'legal': {
|
|
'comment': None,
|
|
'certifier_name': None,
|
|
'date': None
|
|
},
|
|
'ia_cc': {
|
|
'comment': None,
|
|
'certifier_name': None,
|
|
'date': None
|
|
}
|
|
}
|
|
|
|
# Extract all rejection fields
|
|
for category in metadata_elements:
|
|
for element in category.get('metadata_element_list', []):
|
|
field_id = element.get('id', '')
|
|
value = self._extract_field_value(element)
|
|
|
|
# Approver rejection details
|
|
if field_id == 'FERRERO.MARKETING.FIELD.CERTIFIER COMMENT':
|
|
details['approver']['comment'] = value
|
|
|
|
elif field_id == 'FERRERO.FIELD.ECOMMERCE CERTIFIER':
|
|
details['approver']['certifier_name'] = value
|
|
|
|
elif field_id == 'FERRERO.MARKETING.FIELD.APPROVAL DATE':
|
|
details['approver']['date'] = value
|
|
|
|
# Legal rejection details
|
|
elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL COMMENT':
|
|
details['legal']['comment'] = value
|
|
|
|
elif field_id == 'FERRERO.FIELD.LEGAL CERTIFER': # Note: typo in field ID
|
|
details['legal']['certifier_name'] = value
|
|
|
|
elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL APPROVAL DATE':
|
|
details['legal']['date'] = value
|
|
|
|
# IA&CC rejection details
|
|
elif field_id == 'FERRERO.MARKETING.FIELD.IA CC COMMENT':
|
|
details['ia_cc']['comment'] = value
|
|
|
|
elif field_id == 'FERRERO.MARKETING.FIELD.IA CERTIFIER':
|
|
details['ia_cc']['certifier_name'] = value
|
|
|
|
elif field_id == 'FERRERO.MARKETING.FIELD.IA CC APPROVAL DATE':
|
|
details['ia_cc']['date'] = value
|
|
|
|
return details
|
|
|
|
def _find_master_assets_folder(self, campaign_id):
|
|
"""Find '01. Master Assets' folder in campaign"""
|
|
try:
|
|
response = self._make_api_request(
|
|
'GET',
|
|
"{}/v6/folders/{}/children".format(self.base_url, campaign_id)
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return None
|
|
|
|
data = response.json()
|
|
folders = data.get('folder_children', {}).get('asset_list', [])
|
|
|
|
for folder in folders:
|
|
name = folder.get('name', '')
|
|
if 'Master' in name and 'Assets' in name:
|
|
return folder['asset_id']
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error finding Master Assets folder: {}".format(str(e)))
|
|
return None
|
|
|
|
def find_final_assets_folder(self, campaign_id):
|
|
"""Find '01. Final Assets' folder in campaign"""
|
|
try:
|
|
response = self._make_api_request(
|
|
'GET',
|
|
"{}/v6/folders/{}/children".format(self.base_url, campaign_id)
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return None
|
|
|
|
data = response.json()
|
|
folders = data.get('folder_children', {}).get('asset_list', [])
|
|
|
|
for folder in folders:
|
|
name = folder.get('name', '')
|
|
if 'Final' in name and 'Assets' in name:
|
|
return folder['asset_id']
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error finding Final Assets folder: {}".format(str(e)))
|
|
return None
|
|
|
|
def download_asset(self, asset_id, output_dir='.'):
|
|
"""
|
|
Download asset content
|
|
|
|
Args:
|
|
asset_id: Asset ID to download
|
|
output_dir: Directory to save file
|
|
|
|
Returns:
|
|
Path to downloaded file
|
|
"""
|
|
try:
|
|
# Get asset metadata first to get filename
|
|
metadata_response = self._make_api_request(
|
|
'GET',
|
|
"{}/v6/assets/{}".format(self.base_url, asset_id)
|
|
)
|
|
|
|
if metadata_response.status_code != 200:
|
|
raise Exception("Failed to get asset metadata: HTTP {}".format(
|
|
metadata_response.status_code
|
|
))
|
|
|
|
asset_data = metadata_response.json()
|
|
asset = asset_data.get('asset_resource', {}).get('asset', asset_data)
|
|
filename = asset.get('name', 'download_{}'.format(asset_id))
|
|
|
|
# Download content
|
|
content_response = self._make_api_request(
|
|
'GET',
|
|
"{}/v6/assets/{}/contents".format(self.base_url, asset_id),
|
|
stream=True
|
|
)
|
|
|
|
if content_response.status_code != 200:
|
|
raise Exception("Download failed: HTTP {}".format(content_response.status_code))
|
|
|
|
# Ensure output directory exists
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Save file
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in content_response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
file_size = os.path.getsize(filepath)
|
|
logger.info("Downloaded: {} ({} bytes)".format(filename, file_size))
|
|
|
|
return filepath
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to download asset {}: {}".format(asset_id, str(e)))
|
|
raise
|
|
|
|
def upload_asset(self, file_path, folder_id, asset_representation, video_metadata=None):
|
|
"""
|
|
Upload asset to DAM
|
|
|
|
Args:
|
|
file_path: Path to file to upload
|
|
folder_id: Parent folder ID
|
|
asset_representation: Asset metadata JSON structure
|
|
video_metadata: Optional video metadata (width, height, etc.)
|
|
|
|
Returns:
|
|
dict with success, asset_id, http_code
|
|
"""
|
|
try:
|
|
filename = os.path.basename(file_path)
|
|
mime_type = self._get_mime_type(file_path)
|
|
|
|
# Build upload manifest
|
|
file_info = {
|
|
'file_name': filename,
|
|
'file_type': mime_type
|
|
}
|
|
|
|
# Add video dimensions if available
|
|
if video_metadata:
|
|
if video_metadata.get('width', 0) > 0:
|
|
file_info['width'] = video_metadata['width']
|
|
if video_metadata.get('height', 0) > 0:
|
|
file_info['height'] = video_metadata['height']
|
|
|
|
upload_manifest = {
|
|
'upload_manifest': {
|
|
'master_files': [
|
|
{'file': file_info}
|
|
]
|
|
}
|
|
}
|
|
|
|
# Prepare multipart upload
|
|
import json
|
|
files = {
|
|
'files': (filename, open(file_path, 'rb'), mime_type)
|
|
}
|
|
|
|
data = {
|
|
'asset_representation': json.dumps(asset_representation),
|
|
'parent_folder_id': folder_id,
|
|
'manifest': json.dumps(upload_manifest)
|
|
}
|
|
|
|
response = self._make_api_request(
|
|
'POST',
|
|
"{}/v6/assets".format(self.base_url),
|
|
data=data,
|
|
files=files
|
|
)
|
|
|
|
# Close file
|
|
files['files'][1].close()
|
|
|
|
http_code = response.status_code
|
|
|
|
if http_code in [201, 202]:
|
|
result_data = response.json()
|
|
asset_id = None
|
|
|
|
if 'asset_resource_list' in result_data:
|
|
asset_id = result_data['asset_resource_list']['asset_resource'][0]['asset']['asset_id']
|
|
elif 'job_handle' in result_data:
|
|
asset_id = result_data['job_handle'].get('job_id')
|
|
|
|
logger.info("Upload successful: {} → Asset ID: {}".format(filename, asset_id))
|
|
|
|
return {
|
|
'success': True,
|
|
'asset_id': asset_id,
|
|
'http_code': http_code
|
|
}
|
|
else:
|
|
error_msg = "Upload failed: HTTP {}".format(http_code)
|
|
if response.text:
|
|
try:
|
|
error_data = response.json()
|
|
if 'exception_body' in error_data:
|
|
error_msg = error_data['exception_body'].get('message', error_msg)
|
|
except:
|
|
pass
|
|
|
|
logger.error(error_msg)
|
|
return {
|
|
'success': False,
|
|
'error': error_msg,
|
|
'http_code': http_code
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Upload exception: {}".format(str(e)))
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'http_code': 0
|
|
}
|
|
|
|
def update_campaign_status(self, campaign_id, new_status):
|
|
"""
|
|
Update Content Scaling Status field
|
|
|
|
Args:
|
|
campaign_id: Campaign folder ID
|
|
new_status: New status value (A2, A3, etc.)
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
try:
|
|
payload = {
|
|
"edited_folder": {
|
|
"data": {
|
|
"metadata": [
|
|
{
|
|
"id": "CONTENT.SCALING.STATUS",
|
|
"type": "com.artesia.metadata.MetadataField",
|
|
"value": {
|
|
"cascading_domain_value": False,
|
|
"domain_value": True,
|
|
"value": {
|
|
"type": "string",
|
|
"value": new_status
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
response = self._make_api_request(
|
|
'PATCH',
|
|
"{}/v6/folders/{}?lock_strategy=optimistic".format(
|
|
self.base_url, campaign_id
|
|
),
|
|
json=payload,
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("Updated campaign {} status to {}".format(campaign_id, new_status))
|
|
return {'success': True}
|
|
else:
|
|
error_msg = "Status update failed: HTTP {}".format(response.status_code)
|
|
logger.error(error_msg)
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error("Status update exception: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def test_connection(self):
|
|
"""Test DAM connection with current auth method"""
|
|
try:
|
|
if self.use_mtls:
|
|
# Test mTLS by making a simple API call
|
|
response = self._make_api_request('GET', "{}/v6".format(self.base_url))
|
|
return response.status_code < 500
|
|
else:
|
|
# Test OAuth2 by getting token
|
|
token = self.get_access_token()
|
|
return token is not None
|
|
except Exception as e:
|
|
logger.error("Connection test failed: {}".format(str(e)))
|
|
return False
|
|
|
|
def _get_mime_type(self, file_path):
|
|
"""Get MIME type for file"""
|
|
import mimetypes
|
|
mime_type, _ = mimetypes.guess_type(file_path)
|
|
return mime_type or 'application/octet-stream'
|