Created Python-Version/ directory structure: ✅ Complete folder hierarchy (scripts, config, logs, temp, tests) ✅ Virtual environment setup script ✅ Python 3.6+ compatible dependencies ✅ Configuration system with env var substitution ✅ DAM API client (complete) Components Implemented: 1. setup.sh - venv creation and dependency installation 2. requirements.txt - Python 3.6/3.10 compatible packages 3. config/config.yaml - Main configuration (URLs, credentials, settings) 4. config/field_mappings.yaml - MVP fields list (easy to edit!) 5. config_loader.py - YAML config with ${VAR} substitution 6. dam_client.py - Complete DAM API wrapper: - OAuth2 with auto-refresh - search_campaigns(status) - get_master_assets(campaign_id) - download_asset(asset_id) - upload_asset() with video metadata - update_campaign_status() - Helper methods Features: - Python 3.6 compatible (shared hosting requirement) - Python 3.10 compatible (local development) - Configuration-driven (no hardcoded values) - Environment-specific configs (staging/production) - Comprehensive error handling - Logging built-in Next: Box client, Database client, FilenameParser, MetadataExtractorMVP, Notifier, then main scripts (A1→A2, A2→A3) 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
504 lines
17 KiB
Python
504 lines
17 KiB
Python
"""
|
|
DAM Client - OpenText DAM API Integration
|
|
Handles OAuth2 authentication and all DAM operations
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import logging
|
|
import os
|
|
|
|
logger = logging.getLogger('DAMClient')
|
|
|
|
class DAMClient:
|
|
def __init__(self, config):
|
|
self.base_url = config['dam']['base_url'].rstrip('/')
|
|
self.auth_url = config['dam']['auth_url']
|
|
self.client_id = config['dam']['client_id']
|
|
self.client_secret = config['dam']['client_secret']
|
|
self.timeout = config['dam']['timeout_seconds']
|
|
|
|
self.access_token = None
|
|
self.token_expiry = 0
|
|
|
|
def get_access_token(self):
|
|
"""Get OAuth2 access token with auto-refresh"""
|
|
if self.access_token and time.time() < self.token_expiry:
|
|
return self.access_token
|
|
|
|
return self._request_new_token()
|
|
|
|
def _request_new_token(self):
|
|
"""Request new OAuth2 token"""
|
|
try:
|
|
response = requests.post(
|
|
self.auth_url,
|
|
data={
|
|
'grant_type': 'client_credentials',
|
|
'client_id': self.client_id,
|
|
'client_secret': self.client_secret
|
|
},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
self.access_token = data['access_token']
|
|
# Set expiry with 60 second buffer
|
|
expires_in = data.get('expires_in', 3600)
|
|
self.token_expiry = time.time() + expires_in - 60
|
|
|
|
logger.info("OAuth token obtained, expires in {}s".format(expires_in))
|
|
return self.access_token
|
|
else:
|
|
raise Exception("OAuth failed: HTTP {} - {}".format(
|
|
response.status_code, response.text
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get OAuth token: {}".format(str(e)))
|
|
raise
|
|
|
|
def search_campaigns(self, status=None):
|
|
"""
|
|
Search for campaigns with optional status filter
|
|
|
|
Args:
|
|
status: Content Scaling Status (A1, A2, A3, etc.) or None for all
|
|
|
|
Returns:
|
|
List of campaign dictionaries
|
|
"""
|
|
try:
|
|
token = self.get_access_token()
|
|
|
|
# Search for Local Adaptation campaigns
|
|
search_payload = {
|
|
"text_search_resource": {
|
|
"boolean_clause_list": [
|
|
{
|
|
"field_name": "NAME",
|
|
"field_value": "Local",
|
|
"operator": "CONTAINS"
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
"{}/v6/search/text".format(self.base_url),
|
|
json=search_payload,
|
|
headers={
|
|
'Authorization': 'Bearer {}'.format(token),
|
|
'Content-Type': 'application/json'
|
|
},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception("Search failed: HTTP {}".format(response.status_code))
|
|
|
|
data = response.json()
|
|
all_campaigns = []
|
|
|
|
if 'search_result_resource' in data:
|
|
results = data['search_result_resource']['search_result'].get('search_result_element_list', [])
|
|
|
|
for result in results:
|
|
asset = result.get('resource', {})
|
|
|
|
# Extract campaign info
|
|
campaign = self._extract_campaign_info(asset)
|
|
|
|
# Filter by status if provided
|
|
if status is None or campaign.get('status') == status:
|
|
all_campaigns.append(campaign)
|
|
|
|
logger.info("Found {} campaigns{}".format(
|
|
len(all_campaigns),
|
|
" with status {}".format(status) if status else ""
|
|
))
|
|
|
|
return all_campaigns
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to search campaigns: {}".format(str(e)))
|
|
raise
|
|
|
|
def _extract_campaign_info(self, asset):
|
|
"""Extract campaign information from asset data"""
|
|
campaign = {
|
|
'asset_id': asset.get('asset_id'),
|
|
'campaign_name': None,
|
|
'campaign_id': None,
|
|
'status': None,
|
|
'brand': None,
|
|
'market': None
|
|
}
|
|
|
|
# Extract metadata fields
|
|
if 'metadata' in asset and 'metadata_element_list' in asset['metadata']:
|
|
for category in asset['metadata']['metadata_element_list']:
|
|
if 'metadata_element_list' in category:
|
|
for field in category['metadata_element_list']:
|
|
field_id = field.get('id')
|
|
value = self._extract_field_value(field)
|
|
|
|
if field_id == 'ARTESIA.FIELD.NAME' or field_id == 'INER_NAME_GENERIC':
|
|
campaign['campaign_name'] = value
|
|
elif field_id == 'FERRERO.FIELD.CAMPAIGN_ID':
|
|
campaign['campaign_id'] = value
|
|
elif field_id == 'CONTENT.SCALING.STATUS':
|
|
campaign['status'] = value
|
|
elif field_id == 'FERRERO.FIELD.CAMPAIGN_BRAND':
|
|
campaign['brand'] = value
|
|
elif field_id == 'FERRERO.FIELD.CAMPAIGN_MARKET':
|
|
campaign['market'] = value
|
|
|
|
return campaign
|
|
|
|
def _extract_field_value(self, field):
|
|
"""Extract value from field structure"""
|
|
if 'value' in field:
|
|
val = field['value']
|
|
if isinstance(val, dict):
|
|
if 'value' in val and isinstance(val['value'], dict):
|
|
if 'value' in val['value']:
|
|
return val['value']['value']
|
|
elif 'field_value' in val['value'] and 'value' in val['value']['field_value']:
|
|
return val['value']['field_value']['value']
|
|
return None
|
|
|
|
def get_master_assets(self, campaign_id):
|
|
"""
|
|
Get master assets from Master Assets folder
|
|
|
|
Args:
|
|
campaign_id: Campaign folder asset ID
|
|
|
|
Returns:
|
|
List of asset dictionaries with full metadata
|
|
"""
|
|
try:
|
|
# Find Master Assets folder
|
|
master_folder_id = self._find_master_assets_folder(campaign_id)
|
|
|
|
if not master_folder_id:
|
|
raise Exception("Master Assets folder not found")
|
|
|
|
# Get assets from folder
|
|
token = self.get_access_token()
|
|
|
|
response = requests.get(
|
|
"{}/v6/folders/{}/children?load_type=full".format(
|
|
self.base_url, master_folder_id
|
|
),
|
|
headers={'Authorization': 'Bearer {}'.format(token)},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception("Failed to get assets: HTTP {}".format(response.status_code))
|
|
|
|
data = response.json()
|
|
assets = data.get('folder_children', {}).get('asset_list', [])
|
|
|
|
logger.info("Found {} master assets in campaign {}".format(len(assets), campaign_id))
|
|
|
|
return assets
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get master assets: {}".format(str(e)))
|
|
raise
|
|
|
|
def _find_master_assets_folder(self, campaign_id):
|
|
"""Find '01. Master Assets' folder in campaign"""
|
|
try:
|
|
token = self.get_access_token()
|
|
|
|
response = requests.get(
|
|
"{}/v6/folders/{}/children".format(self.base_url, campaign_id),
|
|
headers={'Authorization': 'Bearer {}'.format(token)},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return None
|
|
|
|
data = response.json()
|
|
folders = data.get('folder_children', {}).get('asset_list', [])
|
|
|
|
for folder in folders:
|
|
name = folder.get('name', '')
|
|
if 'Master' in name and 'Assets' in name:
|
|
return folder['asset_id']
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error finding Master Assets folder: {}".format(str(e)))
|
|
return None
|
|
|
|
def find_final_assets_folder(self, campaign_id):
|
|
"""Find '01. Final Assets' folder in campaign"""
|
|
try:
|
|
token = self.get_access_token()
|
|
|
|
response = requests.get(
|
|
"{}/v6/folders/{}/children".format(self.base_url, campaign_id),
|
|
headers={'Authorization': 'Bearer {}'.format(token)},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return None
|
|
|
|
data = response.json()
|
|
folders = data.get('folder_children', {}).get('asset_list', [])
|
|
|
|
for folder in folders:
|
|
name = folder.get('name', '')
|
|
if 'Final' in name and 'Assets' in name:
|
|
return folder['asset_id']
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error finding Final Assets folder: {}".format(str(e)))
|
|
return None
|
|
|
|
def download_asset(self, asset_id, output_dir='.'):
|
|
"""
|
|
Download asset content
|
|
|
|
Args:
|
|
asset_id: Asset ID to download
|
|
output_dir: Directory to save file
|
|
|
|
Returns:
|
|
Path to downloaded file
|
|
"""
|
|
try:
|
|
# Get asset metadata first to get filename
|
|
token = self.get_access_token()
|
|
|
|
metadata_response = requests.get(
|
|
"{}/v6/assets/{}".format(self.base_url, asset_id),
|
|
headers={'Authorization': 'Bearer {}'.format(token)},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
if metadata_response.status_code != 200:
|
|
raise Exception("Failed to get asset metadata: HTTP {}".format(
|
|
metadata_response.status_code
|
|
))
|
|
|
|
asset_data = metadata_response.json()
|
|
asset = asset_data.get('asset_resource', {}).get('asset', asset_data)
|
|
filename = asset.get('name', 'download_{}'.format(asset_id))
|
|
|
|
# Download content
|
|
content_response = requests.get(
|
|
"{}/v6/assets/{}/contents".format(self.base_url, asset_id),
|
|
headers={'Authorization': 'Bearer {}'.format(token)},
|
|
timeout=self.timeout,
|
|
stream=True
|
|
)
|
|
|
|
if content_response.status_code != 200:
|
|
raise Exception("Download failed: HTTP {}".format(content_response.status_code))
|
|
|
|
# Ensure output directory exists
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Save file
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in content_response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
file_size = os.path.getsize(filepath)
|
|
logger.info("Downloaded: {} ({} bytes)".format(filename, file_size))
|
|
|
|
return filepath
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to download asset {}: {}".format(asset_id, str(e)))
|
|
raise
|
|
|
|
def upload_asset(self, file_path, folder_id, asset_representation, video_metadata=None):
|
|
"""
|
|
Upload asset to DAM
|
|
|
|
Args:
|
|
file_path: Path to file to upload
|
|
folder_id: Parent folder ID
|
|
asset_representation: Asset metadata JSON structure
|
|
video_metadata: Optional video metadata (width, height, etc.)
|
|
|
|
Returns:
|
|
dict with success, asset_id, http_code
|
|
"""
|
|
try:
|
|
token = self.get_access_token()
|
|
filename = os.path.basename(file_path)
|
|
mime_type = self._get_mime_type(file_path)
|
|
|
|
# Build upload manifest
|
|
file_info = {
|
|
'file_name': filename,
|
|
'file_type': mime_type
|
|
}
|
|
|
|
# Add video dimensions if available
|
|
if video_metadata:
|
|
if video_metadata.get('width', 0) > 0:
|
|
file_info['width'] = video_metadata['width']
|
|
if video_metadata.get('height', 0) > 0:
|
|
file_info['height'] = video_metadata['height']
|
|
|
|
upload_manifest = {
|
|
'upload_manifest': {
|
|
'master_files': [
|
|
{'file': file_info}
|
|
]
|
|
}
|
|
}
|
|
|
|
# Prepare multipart upload
|
|
import json
|
|
files = {
|
|
'files': (filename, open(file_path, 'rb'), mime_type)
|
|
}
|
|
|
|
data = {
|
|
'asset_representation': json.dumps(asset_representation),
|
|
'parent_folder_id': folder_id,
|
|
'manifest': json.dumps(upload_manifest)
|
|
}
|
|
|
|
response = requests.post(
|
|
"{}/v6/assets".format(self.base_url),
|
|
data=data,
|
|
files=files,
|
|
headers={'Authorization': 'Bearer {}'.format(token)},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
# Close file
|
|
files['files'][1].close()
|
|
|
|
http_code = response.status_code
|
|
|
|
if http_code in [201, 202]:
|
|
result_data = response.json()
|
|
asset_id = None
|
|
|
|
if 'asset_resource_list' in result_data:
|
|
asset_id = result_data['asset_resource_list']['asset_resource'][0]['asset']['asset_id']
|
|
elif 'job_handle' in result_data:
|
|
asset_id = result_data['job_handle'].get('job_id')
|
|
|
|
logger.info("Upload successful: {} → Asset ID: {}".format(filename, asset_id))
|
|
|
|
return {
|
|
'success': True,
|
|
'asset_id': asset_id,
|
|
'http_code': http_code
|
|
}
|
|
else:
|
|
error_msg = "Upload failed: HTTP {}".format(http_code)
|
|
if response.text:
|
|
try:
|
|
error_data = response.json()
|
|
if 'exception_body' in error_data:
|
|
error_msg = error_data['exception_body'].get('message', error_msg)
|
|
except:
|
|
pass
|
|
|
|
logger.error(error_msg)
|
|
return {
|
|
'success': False,
|
|
'error': error_msg,
|
|
'http_code': http_code
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Upload exception: {}".format(str(e)))
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'http_code': 0
|
|
}
|
|
|
|
def update_campaign_status(self, campaign_id, new_status):
|
|
"""
|
|
Update Content Scaling Status field
|
|
|
|
Args:
|
|
campaign_id: Campaign folder ID
|
|
new_status: New status value (A2, A3, etc.)
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
try:
|
|
token = self.get_access_token()
|
|
|
|
payload = {
|
|
"edited_folder": {
|
|
"data": {
|
|
"metadata": [
|
|
{
|
|
"id": "CONTENT.SCALING.STATUS",
|
|
"value": {
|
|
"domain_value": True,
|
|
"value": {
|
|
"value": new_status
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
response = requests.patch(
|
|
"{}/v6/folders/{}?lock_strategy=optimistic".format(
|
|
self.base_url, campaign_id
|
|
),
|
|
json=payload,
|
|
headers={
|
|
'Authorization': 'Bearer {}'.format(token),
|
|
'Content-Type': 'application/json'
|
|
},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("Updated campaign {} status to {}".format(campaign_id, new_status))
|
|
return {'success': True}
|
|
else:
|
|
error_msg = "Status update failed: HTTP {}".format(response.status_code)
|
|
logger.error(error_msg)
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error("Status update exception: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def test_connection(self):
|
|
"""Test DAM connection by getting a token"""
|
|
try:
|
|
token = self.get_access_token()
|
|
return token is not None
|
|
except:
|
|
return False
|
|
|
|
def _get_mime_type(self, file_path):
|
|
"""Get MIME type for file"""
|
|
import mimetypes
|
|
mime_type, _ = mimetypes.guess_type(file_path)
|
|
return mime_type or 'application/octet-stream'
|