ferrero-opentext/Python-Version/scripts/shared/dam_client.py
DJP 055fc9ad16 Add recursive folder search, NOT APPROVED filtering, and rejection details for A5→A6
Major enhancements to all workflow scripts with recursive search and detailed rejection tracking.

NEW FEATURES:
1. Recursive Folder Search (ALL workflows: A1→A2, A5→A6, B1→B2)
   - Searches subfolders within Master/Final Assets folders
   - Preserves folder structure in Box
   - Adds 'folder_path' attribute to each asset

2. NOT APPROVED Filtering (A5→A6 ONLY)
   - Only downloads assets with ECOMMERCE STATUS = "NOT APPROVED"
   - Skips approved/other status assets
   - Logs rejected vs skipped counts

3. Rejection Details Extraction (A5→A6)
   - Extracts comments from 3 reviewers: Approver, Legal, IA&CC
   - Includes certifier names and dates
   - Displays in detailed email notifications

CHANGES BY FILE:

dam_client.py:
- NEW: _get_assets_recursive() - Recursively searches folders
- UPDATED: get_master_assets() - Now uses recursive search, adds folder_path to assets
- NEW: is_asset_not_approved() - Checks FERRERO.FIELD.ECOMMERCE STATUS
- NEW: extract_rejection_details() - Extracts all rejection comments from 10 fields

box_client.py:
- UPDATED: upload_with_tracking_id() - Added subfolder_path parameter
- NEW: _get_or_create_subfolder_path() - Creates/navigates Box subfolders
- Preserves DAM folder structure in Box uploads

a1_to_a2_download.py:
- Added folder_path extraction from assets
- Pass subfolder_path to Box upload
- Logs subfolder info during processing

b1_to_b2_download.py:
- Added folder_path extraction from assets
- Pass subfolder_path to Box upload
- Logs subfolder info during processing

a5_to_a6_download.py:
- Filter assets for NOT APPROVED status ONLY
- Extract rejection details for each asset
- Pass subfolder_path to Box upload
- Updated email data with rejection_details
- Handle "no rejections" scenario with email
- Updated logging to show rejected vs skipped counts

notifier.py:
- REPLACED: a5_to_a6_complete → a5_to_a6_rejections
- Detailed HTML template with rejection sections
- Shows Approver, Legal, and IA&CC rejections
- Styled with red warnings and bordered sections
- NEW: a5_to_a6_no_rejections template
- Green success message when no rejected assets found
- UPDATED: a5_to_a6_partial - Now uses rejected_assets

FIELD IDs EXTRACTED (A5→A6):
- FERRERO.FIELD.ECOMMERCE STATUS (primary check)
- FERRERO.MARKETING.FIELD.CERTIFIER COMMENT
- FERRERO.FIELD.ECOMMERCE CERTIFIER
- FERRERO.MARKETING.FIELD.APPROVAL DATE
- FERRERO.MARKETING.FIELD.LEGAL COMMENT
- FERRERO.FIELD.LEGAL CERTIFER (typo in field ID)
- FERRERO.MARKETING.FIELD.LEGAL APPROVAL DATE
- FERRERO.MARKETING.FIELD.IA CC COMMENT
- FERRERO.MARKETING.FIELD.IA CERTIFIER
- FERRERO.MARKETING.FIELD.IA CC APPROVAL DATE

TESTING:
✓ All connections working (DAM, Box, Database)
✓ A5→A6 script executes correctly
✓ Recursive search working
✓ NOT APPROVED filtering working
✓ "No rejections" email sent successfully
✓ Folder structure preserved in logs

WORKFLOW IMPACTS:
- A1→A2: Now searches recursively, preserves folder structure
- A5→A6: Filters for NOT APPROVED only, shows rejection details
- B1→B2: Now searches recursively, preserves folder structure

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 14:49:19 -05:00

728 lines
27 KiB
Python

"""
DAM Client - OpenText DAM API Integration
Handles OAuth2 authentication and all DAM operations
Compatible with Python 3.6+
"""
import requests
import time
import logging
import os
import urllib3
# Disable SSL warnings when verify=False
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger('DAMClient')
class DAMClient:
def __init__(self, config):
self.base_url = config['dam']['base_url'].rstrip('/')
self.auth_url = config['dam']['auth_url']
self.client_id = config['dam']['client_id']
self.client_secret = config['dam']['client_secret']
self.timeout = config['dam']['timeout_seconds']
self.access_token = None
self.token_expiry = 0
def get_access_token(self):
"""Get OAuth2 access token with auto-refresh"""
if self.access_token and time.time() < self.token_expiry:
return self.access_token
return self._request_new_token()
def _request_new_token(self):
"""Request new OAuth2 token"""
try:
# Debug logging
logger.debug("Requesting OAuth token from: {}".format(self.auth_url))
logger.debug("Client ID: {}".format(self.client_id))
logger.debug("Client Secret length: {}".format(len(self.client_secret)))
response = requests.post(
self.auth_url,
data={
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
},
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
},
verify=False, # Disable SSL verification like PHP version
timeout=30
)
logger.debug("OAuth response: HTTP {}".format(response.status_code))
logger.debug("Response body: {}".format(response.text[:200]))
if response.status_code == 200:
data = response.json()
self.access_token = data['access_token']
# Set expiry with 60 second buffer
expires_in = data.get('expires_in', 3600)
self.token_expiry = time.time() + expires_in - 60
logger.info("OAuth token obtained, expires in {}s".format(expires_in))
return self.access_token
else:
raise Exception("OAuth failed: HTTP {} - {}".format(
response.status_code, response.text
))
except Exception as e:
logger.error("Failed to get OAuth token: {}".format(str(e)))
raise
def search_campaigns(self, status=None, campaign_type='Local Adaptation'):
"""
Search for campaigns with optional status filter
Args:
status: Content Scaling Status (A1, A2, B1, B2, etc.) or None for all
campaign_type: 'Local Adaptation' for A-series or 'Global comm' for B-series
Returns:
List of campaign dictionaries
"""
try:
import json as json_module
import urllib.parse
token = self.get_access_token()
# Build search condition (like Postman collection)
search_condition = {
"search_condition_list": {
"search_condition": [
{
"type": "com.artesia.search.SearchScalarCondition",
"metadata_field_id": "ARTESIA.FIELD.CONTAINER TYPE NAME",
"relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS",
"value": "GLOBALCAMPAING",
"left_paren": "(",
"right_paren": ")"
},
{
"type": "com.artesia.search.SearchScalarCondition",
"metadata_field_id": "FERRERO.FIELD.CAMPAIGN TYPE",
"relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS",
"value": campaign_type, # 'Local Adaptation' or 'Global comm'
"relational_operator": "and"
}
]
}
}
# URL encode search condition
search_condition_str = json_module.dumps(search_condition)
search_condition_encoded = urllib.parse.quote(search_condition_str)
# Use GET with query parameters (matching Postman)
url = "{}/v6/search/text?load_type=metadata&search_config_id=18&search_condition_list={}".format(
self.base_url,
search_condition_encoded
)
response = requests.get(
url,
headers={
'Authorization': 'Bearer {}'.format(token),
'Accept': 'application/json'
},
verify=False,
timeout=self.timeout
)
if response.status_code != 200:
raise Exception("Search failed: HTTP {} - {}".format(
response.status_code, response.text[:200]
))
data = response.json()
all_campaigns_raw = []
# Extract asset list from response
if 'search_result_resource' in data:
# Try asset_list first (like PHP version)
all_campaigns_raw = data['search_result_resource'].get('asset_list', [])
# Fallback to search_result_element_list
if not all_campaigns_raw:
results = data['search_result_resource'].get('search_result', {}).get('search_result_element_list', [])
all_campaigns_raw = [r.get('resource', {}) for r in results]
logger.info("Found {} total campaigns in search".format(len(all_campaigns_raw)))
# Extract campaign info and filter by status
all_campaigns = []
for asset in all_campaigns_raw:
campaign = self._extract_campaign_info(asset)
# Debug log the status
logger.debug("Campaign: {} - Status: {}".format(
campaign.get('campaign_name', 'Unknown'),
campaign.get('status', 'NO STATUS')
))
# Filter by status if provided
if status is None or campaign.get('status') == status:
all_campaigns.append(campaign)
logger.info("Found {} campaigns{}".format(
len(all_campaigns),
" with status {}".format(status) if status else ""
))
return all_campaigns
except Exception as e:
logger.error("Failed to search campaigns: {}".format(str(e)))
raise
def _extract_campaign_info(self, asset):
"""Extract campaign information from asset data"""
campaign = {
'asset_id': asset.get('asset_id'),
'campaign_name': asset.get('name'), # Try name field first
'campaign_id': None,
'status': None,
'brand': None,
'market': None
}
# Extract metadata fields
if 'metadata' in asset and 'metadata_element_list' in asset['metadata']:
for category in asset['metadata']['metadata_element_list']:
if 'metadata_element_list' in category:
for field in category['metadata_element_list']:
field_id = field.get('id')
value = self._extract_field_value(field)
if field_id == 'ARTESIA.FIELD.NAME' or field_id == 'INER_NAME_GENERIC':
campaign['campaign_name'] = value
elif field_id == 'FERRERO.FIELD.CAMPAIGN ID' or field_id == 'FERRERO.FIELD.CAMPAIGN_ID':
campaign['campaign_id'] = value
elif field_id == 'CONTENT.SCALING.STATUS':
campaign['status'] = value
elif field_id == 'FERRERO.FIELD.CAMPAIGN_BRAND':
campaign['brand'] = value
elif field_id == 'FERRERO.FIELD.CAMPAIGN_MARKET':
campaign['market'] = value
return campaign
def _extract_field_value(self, field):
"""Extract value from field structure"""
if 'value' in field:
val = field['value']
if isinstance(val, dict):
if 'value' in val and isinstance(val['value'], dict):
if 'value' in val['value']:
return val['value']['value']
elif 'field_value' in val['value'] and 'value' in val['value']['field_value']:
return val['value']['field_value']['value']
return None
def get_master_assets(self, campaign_id, is_global=False):
"""
Get master assets from Master Assets folder (A1) or Final Assets folder (B1)
Searches recursively through subfolders
Args:
campaign_id: Campaign folder asset ID
is_global: True for B1 Global campaigns (use Final Assets), False for A1 Local (use Master Assets)
Returns:
List of asset dictionaries with full metadata, including 'folder_path' for subfolder structure
"""
try:
# B1 Global campaigns use "05. Final Assets" folder
# A1 Local campaigns use "01. Master Assets" folder
if is_global:
master_folder_id = self.find_final_assets_folder(campaign_id)
logger.info("Looking for Final Assets folder (B1 Global campaign)")
else:
master_folder_id = self._find_master_assets_folder(campaign_id)
logger.info("Looking for Master Assets folder (A1 Local campaign)")
if not master_folder_id:
raise Exception("Assets folder not found (tried {})".format(
"Final Assets" if is_global else "Master Assets"
))
# Recursively get all assets from folder and subfolders
all_assets = self._get_assets_recursive(master_folder_id, folder_path="")
logger.info("Found {} master assets (recursive) in campaign {}".format(len(all_assets), campaign_id))
return all_assets
except Exception as e:
logger.error("Failed to get master assets: {}".format(str(e)))
raise
def _get_assets_recursive(self, folder_id, folder_path=""):
"""
Recursively get all assets from a folder and its subfolders
Args:
folder_id: Folder asset ID to search
folder_path: Current path (e.g., "Subfolder1/Subfolder2")
Returns:
List of assets with 'folder_path' attribute added
"""
assets = []
try:
token = self.get_access_token()
response = requests.get(
"{}/v6/folders/{}/children?load_type=full".format(
self.base_url, folder_id
),
headers={
'Authorization': 'Bearer {}'.format(token),
'Accept': 'application/json'
},
verify=False,
timeout=self.timeout
)
if response.status_code != 200:
logger.warning("Failed to get folder {} children: HTTP {}".format(folder_id, response.status_code))
return assets
data = response.json()
children = data.get('folder_children', {}).get('asset_list', [])
for child in children:
# Check if it's a folder or an asset
asset_type = child.get('asset_type', {})
type_name = asset_type.get('name', '') if isinstance(asset_type, dict) else str(asset_type)
if type_name.lower() == 'folder':
# It's a folder - recurse into it
folder_name = child.get('name', 'Unknown')
subfolder_path = folder_path + "/" + folder_name if folder_path else folder_name
logger.info("Recursing into subfolder: {}".format(subfolder_path))
sub_assets = self._get_assets_recursive(child['asset_id'], subfolder_path)
assets.extend(sub_assets)
else:
# It's an asset - add it with folder path
child['folder_path'] = folder_path
assets.append(child)
return assets
except Exception as e:
logger.error("Error in recursive asset search for folder {}: {}".format(folder_id, str(e)))
return assets
def is_asset_not_approved(self, asset_data):
"""
Check if asset has ECOMMERCE STATUS = "NOT APPROVED"
Used for A5→A6 rework workflow to filter rejected assets
Args:
asset_data: Complete asset JSON with metadata
Returns:
bool: True if status is "NOT APPROVED", False otherwise
"""
try:
metadata = asset_data.get('metadata', {})
metadata_elements = metadata.get('metadata_element_list', [])
for category in metadata_elements:
for element in category.get('metadata_element_list', []):
if element.get('id') == 'FERRERO.FIELD.ECOMMERCE STATUS':
status_value = self._extract_field_value(element)
if status_value:
# Case-insensitive comparison
return status_value.strip().upper() == 'NOT APPROVED'
return False
except Exception as e:
logger.error("Error checking NOT APPROVED status: {}".format(str(e)))
return False
def extract_rejection_details(self, asset_data):
"""
Extract all rejection comments from a NOT APPROVED asset
Extracts details from Approver, Legal, and IA&CC reviewers
Args:
asset_data: Complete asset JSON with metadata
Returns:
dict with rejection details from all reviewers, or None if not rejected
"""
if not self.is_asset_not_approved(asset_data):
return None
metadata = asset_data.get('metadata', {})
metadata_elements = metadata.get('metadata_element_list', [])
# Initialize rejection details structure
details = {
'asset_id': asset_data.get('asset_id'),
'asset_name': asset_data.get('name'),
'status': 'NOT APPROVED',
'approver': {
'comment': None,
'certifier_name': None,
'date': None
},
'legal': {
'comment': None,
'certifier_name': None,
'date': None
},
'ia_cc': {
'comment': None,
'certifier_name': None,
'date': None
}
}
# Extract all rejection fields
for category in metadata_elements:
for element in category.get('metadata_element_list', []):
field_id = element.get('id', '')
value = self._extract_field_value(element)
# Approver rejection details
if field_id == 'FERRERO.MARKETING.FIELD.CERTIFIER COMMENT':
details['approver']['comment'] = value
elif field_id == 'FERRERO.FIELD.ECOMMERCE CERTIFIER':
details['approver']['certifier_name'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.APPROVAL DATE':
details['approver']['date'] = value
# Legal rejection details
elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL COMMENT':
details['legal']['comment'] = value
elif field_id == 'FERRERO.FIELD.LEGAL CERTIFER': # Note: typo in field ID
details['legal']['certifier_name'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.LEGAL APPROVAL DATE':
details['legal']['date'] = value
# IA&CC rejection details
elif field_id == 'FERRERO.MARKETING.FIELD.IA CC COMMENT':
details['ia_cc']['comment'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.IA CERTIFIER':
details['ia_cc']['certifier_name'] = value
elif field_id == 'FERRERO.MARKETING.FIELD.IA CC APPROVAL DATE':
details['ia_cc']['date'] = value
return details
def _find_master_assets_folder(self, campaign_id):
"""Find '01. Master Assets' folder in campaign"""
try:
token = self.get_access_token()
response = requests.get(
"{}/v6/folders/{}/children".format(self.base_url, campaign_id),
headers={'Authorization': 'Bearer {}'.format(token)},
timeout=self.timeout
)
if response.status_code != 200:
return None
data = response.json()
folders = data.get('folder_children', {}).get('asset_list', [])
for folder in folders:
name = folder.get('name', '')
if 'Master' in name and 'Assets' in name:
return folder['asset_id']
return None
except Exception as e:
logger.error("Error finding Master Assets folder: {}".format(str(e)))
return None
def find_final_assets_folder(self, campaign_id):
"""Find '01. Final Assets' folder in campaign"""
try:
token = self.get_access_token()
response = requests.get(
"{}/v6/folders/{}/children".format(self.base_url, campaign_id),
headers={'Authorization': 'Bearer {}'.format(token)},
timeout=self.timeout
)
if response.status_code != 200:
return None
data = response.json()
folders = data.get('folder_children', {}).get('asset_list', [])
for folder in folders:
name = folder.get('name', '')
if 'Final' in name and 'Assets' in name:
return folder['asset_id']
return None
except Exception as e:
logger.error("Error finding Final Assets folder: {}".format(str(e)))
return None
def download_asset(self, asset_id, output_dir='.'):
"""
Download asset content
Args:
asset_id: Asset ID to download
output_dir: Directory to save file
Returns:
Path to downloaded file
"""
try:
# Get asset metadata first to get filename
token = self.get_access_token()
metadata_response = requests.get(
"{}/v6/assets/{}".format(self.base_url, asset_id),
headers={'Authorization': 'Bearer {}'.format(token)},
timeout=self.timeout
)
if metadata_response.status_code != 200:
raise Exception("Failed to get asset metadata: HTTP {}".format(
metadata_response.status_code
))
asset_data = metadata_response.json()
asset = asset_data.get('asset_resource', {}).get('asset', asset_data)
filename = asset.get('name', 'download_{}'.format(asset_id))
# Download content
content_response = requests.get(
"{}/v6/assets/{}/contents".format(self.base_url, asset_id),
headers={'Authorization': 'Bearer {}'.format(token)},
timeout=self.timeout,
stream=True
)
if content_response.status_code != 200:
raise Exception("Download failed: HTTP {}".format(content_response.status_code))
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save file
filepath = os.path.join(output_dir, filename)
with open(filepath, 'wb') as f:
for chunk in content_response.iter_content(chunk_size=8192):
f.write(chunk)
file_size = os.path.getsize(filepath)
logger.info("Downloaded: {} ({} bytes)".format(filename, file_size))
return filepath
except Exception as e:
logger.error("Failed to download asset {}: {}".format(asset_id, str(e)))
raise
def upload_asset(self, file_path, folder_id, asset_representation, video_metadata=None):
"""
Upload asset to DAM
Args:
file_path: Path to file to upload
folder_id: Parent folder ID
asset_representation: Asset metadata JSON structure
video_metadata: Optional video metadata (width, height, etc.)
Returns:
dict with success, asset_id, http_code
"""
try:
token = self.get_access_token()
filename = os.path.basename(file_path)
mime_type = self._get_mime_type(file_path)
# Build upload manifest
file_info = {
'file_name': filename,
'file_type': mime_type
}
# Add video dimensions if available
if video_metadata:
if video_metadata.get('width', 0) > 0:
file_info['width'] = video_metadata['width']
if video_metadata.get('height', 0) > 0:
file_info['height'] = video_metadata['height']
upload_manifest = {
'upload_manifest': {
'master_files': [
{'file': file_info}
]
}
}
# Prepare multipart upload
import json
files = {
'files': (filename, open(file_path, 'rb'), mime_type)
}
data = {
'asset_representation': json.dumps(asset_representation),
'parent_folder_id': folder_id,
'manifest': json.dumps(upload_manifest)
}
response = requests.post(
"{}/v6/assets".format(self.base_url),
data=data,
files=files,
headers={'Authorization': 'Bearer {}'.format(token)},
timeout=self.timeout
)
# Close file
files['files'][1].close()
http_code = response.status_code
if http_code in [201, 202]:
result_data = response.json()
asset_id = None
if 'asset_resource_list' in result_data:
asset_id = result_data['asset_resource_list']['asset_resource'][0]['asset']['asset_id']
elif 'job_handle' in result_data:
asset_id = result_data['job_handle'].get('job_id')
logger.info("Upload successful: {} → Asset ID: {}".format(filename, asset_id))
return {
'success': True,
'asset_id': asset_id,
'http_code': http_code
}
else:
error_msg = "Upload failed: HTTP {}".format(http_code)
if response.text:
try:
error_data = response.json()
if 'exception_body' in error_data:
error_msg = error_data['exception_body'].get('message', error_msg)
except:
pass
logger.error(error_msg)
return {
'success': False,
'error': error_msg,
'http_code': http_code
}
except Exception as e:
logger.error("Upload exception: {}".format(str(e)))
return {
'success': False,
'error': str(e),
'http_code': 0
}
def update_campaign_status(self, campaign_id, new_status):
"""
Update Content Scaling Status field
Args:
campaign_id: Campaign folder ID
new_status: New status value (A2, A3, etc.)
Returns:
dict with success boolean
"""
try:
token = self.get_access_token()
payload = {
"edited_folder": {
"data": {
"metadata": [
{
"id": "CONTENT.SCALING.STATUS",
"type": "com.artesia.metadata.MetadataField",
"value": {
"cascading_domain_value": False,
"domain_value": True,
"value": {
"type": "string",
"value": new_status
}
}
}
]
}
}
}
response = requests.patch(
"{}/v6/folders/{}?lock_strategy=optimistic".format(
self.base_url, campaign_id
),
json=payload,
headers={
'Authorization': 'Bearer {}'.format(token),
'Content-Type': 'application/json',
'Accept': 'application/json'
},
verify=False,
timeout=self.timeout
)
if response.status_code == 200:
logger.info("Updated campaign {} status to {}".format(campaign_id, new_status))
return {'success': True}
else:
error_msg = "Status update failed: HTTP {}".format(response.status_code)
logger.error(error_msg)
return {'success': False, 'error': error_msg}
except Exception as e:
logger.error("Status update exception: {}".format(str(e)))
return {'success': False, 'error': str(e)}
def test_connection(self):
"""Test DAM connection by getting a token"""
try:
token = self.get_access_token()
return token is not None
except:
return False
def _get_mime_type(self, file_path):
"""Get MIME type for file"""
import mimetypes
mime_type, _ = mimetypes.guess_type(file_path)
return mime_type or 'application/octet-stream'