ferrero-opentext/Python-Version/scripts/shared/metadata_extractor_mvp.py
DJP 222a53f466 Fix date field type error for ASSET VALIDITY START/END PERIOD
- Convert dates to milliseconds since epoch (Unix timestamp × 1000)
- Change field type from 'string' to 'long' for DATE fields
- Add _set_date_field_value() helper method for proper date handling
- Fixes 'java.lang.String was specified. Expecting java.util.Date' error
- Applies to A2->A3 uploads
2025-12-19 23:02:55 -05:00

694 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Metadata Extractor MVP - Extract MVP fields from master metadata
Ported from PHP MetadataExtractorMVP.php
Compatible with Python 3.6+
"""
import logging
from datetime import datetime, timedelta
from shared.config_loader import load_country_code_mappings
logger = logging.getLogger('MetadataExtractorMVP')
class MetadataExtractorMVP:
def __init__(self, field_mappings):
"""
Initialize with field mappings from config
Args:
field_mappings: dict from field_mappings.yaml
"""
self.mvp_field_ids = field_mappings['mvp_fields']
self.filename_updates = field_mappings.get('filename_updates', {})
self.forced_values = field_mappings.get('forced_values', {})
self.defaults = field_mappings.get('defaults', {})
# Load country code mappings (ISO -> DAM codes)
self.country_mappings = load_country_code_mappings()
if self.country_mappings:
logger.info("Loaded {} country code mappings (ISO->DAM)".format(len(self.country_mappings)))
# Load asset type mappings (3-letter codes -> DAM codes)
self.asset_type_mappings = self._load_asset_type_mappings()
if self.asset_type_mappings:
logger.info("Loaded {} asset type mappings (3-letter->DAM)".format(len(self.asset_type_mappings)))
def extract_mvp_fields(self, master_metadata):
"""
Extract only MVP fields from full master metadata
Args:
master_metadata: Complete DAM asset metadata
Returns:
List of MVP field objects
"""
extracted_fields = []
found_field_ids = []
# Navigate to metadata structure
# master_metadata is the full asset, need to go to: metadata.metadata_element_list
metadata_list = []
if isinstance(master_metadata, dict):
if 'metadata' in master_metadata and 'metadata_element_list' in master_metadata['metadata']:
metadata_list = master_metadata['metadata']['metadata_element_list']
logger.info("Using master_metadata['metadata']['metadata_element_list']")
logger.info("Searching through {} categories for MVP fields".format(len(metadata_list)))
# Search through categories for MVP fields
for item in metadata_list:
if 'metadata_element_list' in item:
# Category with nested fields
for field in item['metadata_element_list']:
field_id = field.get('id')
if field_id in self.mvp_field_ids:
extracted_fields.append(field)
found_field_ids.append(field_id)
logger.debug("Found MVP field: {}".format(field_id))
elif 'id' in item and item['id'] in self.mvp_field_ids:
# Direct field
extracted_fields.append(item)
found_field_ids.append(item['id'])
logger.debug("Found direct MVP field: {}".format(item['id']))
# Log results
missing = [f for f in self.mvp_field_ids if f not in found_field_ids]
logger.info("Found {}/{} MVP fields".format(len(found_field_ids), len(self.mvp_field_ids)))
if missing:
logger.info("Missing fields: {}".format(', '.join(missing[:5])))
return extracted_fields
def build_mvp_asset_representation(self, master_metadata, clean_filename, parsed_filename, box_metadata=None, tracking_mode='full', master_opentext_id=None):
"""
Build asset representation with MVP fields + updates from filename
Args:
master_metadata: Full master asset metadata
clean_filename: Clean filename (stripped)
parsed_filename: Parsed V2 filename dict
box_metadata: Optional Box metadata
tracking_mode: 'full' (inherit all metadata) or 'folder_only' (only use folder)
master_opentext_id: Optional DAM Asset ID of master asset (for derivative tracking)
Returns:
Asset representation dict ready for upload
"""
if tracking_mode == 'full':
# FULL INHERITANCE MODE - Standard behavior
logger.info("Full inheritance mode - using master metadata")
# Extract MVP fields from master
mvp_fields = self.extract_mvp_fields(master_metadata)
# Update fields from filename and forced values
mvp_fields = self._update_fields(mvp_fields, clean_filename, parsed_filename)
elif tracking_mode == 'folder_only':
# FOLDER ONLY MODE - New asset, only use upload folder
logger.info("Folder-only mode (-N suffix) - building metadata from filename only")
logger.warning("Note: Upload folder comes from master, all other metadata from filename")
# Start with empty fields, build from filename
mvp_fields = []
mvp_fields = self._build_fields_from_filename(parsed_filename, clean_filename)
# Add missing MVP fields with defaults (both modes)
mvp_fields = self._add_missing_fields(mvp_fields, parsed_filename)
# Update CreativeX fields from Box metadata if provided
if box_metadata:
mvp_fields = self._update_creativex_fields(mvp_fields, box_metadata)
# Add Master Asset ID field if provided (derivative tracking)
if master_opentext_id:
mvp_fields = self._add_master_asset_id_field(mvp_fields, master_opentext_id)
logger.info("Added Master Asset ID field: {}".format(master_opentext_id))
# Build asset representation
asset_rep = {
'asset_resource': {
'asset': {
'metadata': {
'metadata_element_list': mvp_fields
},
'metadata_model_id': 'ECOMMERCE',
'security_policy_list': [
{'id': 1594}
]
}
}
}
logger.info("Built MVP asset representation with {} fields".format(len(mvp_fields)))
return asset_rep
def _update_fields(self, mvp_fields, clean_filename, parsed_filename):
"""Update specific fields from filename and forced values"""
# Process filename_updates from configuration
for field_id, config in self.filename_updates.items():
source = config.get('source')
transform = config.get('transform', '')
# Get value from appropriate source
if source == 'clean_filename':
value = clean_filename
elif source and parsed_filename:
value = parsed_filename.get(source)
else:
continue
if not value:
continue
# Apply transform if specified
if transform == 'uppercase':
value = value.upper()
elif transform == 'lowercase':
value = value.lower()
# Apply asset type mapping if this is the asset type field
if field_id == 'FERRERO.FIELD.MKTG.ASSET TYPE' and source == 'asset_type':
value = self._map_asset_type(value)
# Update the field
for field in mvp_fields:
if field.get('id') == field_id:
self._set_field_value(field, value)
logger.info("Updated {} from filename: {}".format(field_id, value))
break
# Apply country code mapping (ISO -> DAM codes)
for field in mvp_fields:
if field.get('id') == 'FERRERO.FIELD.COUNTRY':
current_value = self._get_field_value(field)
if current_value:
mapped_value = self._map_country_code(current_value)
if mapped_value != current_value:
self._set_field_value(field, mapped_value)
logger.info("Mapped country code: {} -> {}".format(current_value, mapped_value))
# Apply forced values from configuration
for field_id, forced_value in self.forced_values.items():
for field in mvp_fields:
if field.get('id') == field_id:
self._set_field_value(field, forced_value)
logger.info("Set {} to {}".format(field_id, forced_value))
break
# Set Asset Validity Dates (Start = Today, End = Today + 1 Year)
# Field 4: Date the asset was uploaded
# Field 5: Add 1 year from date provided above
try:
today = datetime.now()
one_year_later = today + timedelta(days=365)
# Convert to milliseconds since epoch (Unix timestamp × 1000)
# This is the format expected by java.util.Date in JSON
start_date_ms = int(today.timestamp() * 1000)
end_date_ms = int(one_year_later.timestamp() * 1000)
date_fields = {
'FERRERO.FIELD.ASSET VALIDITY START PERIOD': start_date_ms,
'FERRERO.FIELD.ASSET VALIDITY END PERIOD': end_date_ms
}
for field_id, value in date_fields.items():
field_found = False
for field in mvp_fields:
if field.get('id') == field_id:
# Use specialized method for date fields
self._set_date_field_value(field, value)
logger.info("Set {} to {} ms (Upload Date Logic)".format(field_id, value))
field_found = True
break
if not field_found:
# Add new date field with proper structure
mvp_fields.append({
'id': field_id,
'type': 'com.artesia.metadata.MetadataField',
'value': {
'value': {
'type': 'long',
'value': value
}
}
})
logger.info("Added {} with value {} ms (Upload Date Logic)".format(field_id, value))
except Exception as e:
logger.error("Failed to set validity dates: {}".format(str(e)))
return mvp_fields
def _add_missing_fields(self, mvp_fields, parsed_filename):
"""Add missing MVP fields from filename or defaults"""
field_ids = [f.get('id') for f in mvp_fields]
# Add MAIN_LANGUAGES if missing
if 'MAIN_LANGUAGES' not in field_ids and parsed_filename:
if parsed_filename.get('language_code'):
language = parsed_filename['language_code'].upper()
logger.info("Adding MAIN_LANGUAGES: {}".format(language))
mvp_fields.append({
'id': 'MAIN_LANGUAGES',
'parent_table_id': 'FERRERO.TABULAR.FIELD.MAIN LANGUAGES',
'type': 'com.artesia.metadata.MetadataTableField',
'values': [
{
'cascading_domain_value': False,
'domain_value': True,
'value': {
'field_value': {
'type': 'string',
'value': language
},
'type': 'com.artesia.metadata.DomainValue'
}
}
]
})
# Add other missing fields with defaults
field_ids = [f.get('id') for f in mvp_fields]
for field_id, default_value in self.defaults.items():
if field_id not in field_ids:
logger.info("Adding {} with default: {}".format(field_id, default_value))
# Check if it's a tabular field (contains .TABULAR. in parent table ID)
is_tabular = 'TABULAR' in field_id or field_id in [
'FERRERO.FIELD.ASSETCOMPLIANCE', 'MARKETING_TAG'
]
if is_tabular:
mvp_fields.append({
'id': field_id,
'parent_table_id': 'FERRERO.TABULAR.FIELD.' + field_id.split('.')[-1],
'type': 'com.artesia.metadata.MetadataTableField',
'values': [
{
'cascading_domain_value': False,
'domain_value': True,
'value': {
'field_value': {
'type': 'string',
'value': default_value
},
'type': 'com.artesia.metadata.DomainValue'
}
}
]
})
else:
mvp_fields.append({
'id': field_id,
'type': 'com.artesia.metadata.MetadataField',
'value': {
'cascading_domain_value': False,
'domain_value': True,
'value': {
'type': 'string',
'value': default_value
}
}
})
return mvp_fields
def _map_country_code(self, iso_code):
"""
Map ISO country code to DAM country code
Args:
iso_code: ISO 3166-1 Alpha-2 code (e.g., 'BD', 'DE')
Returns:
str: DAM country code (e.g., 'BG' for BD, 'DE' for DE)
"""
if not iso_code:
return iso_code
iso_upper = iso_code.upper()
# Check if we have a mapping
if iso_upper in self.country_mappings:
dam_code = self.country_mappings[iso_upper]
if dam_code != iso_upper:
logger.info("Country code mapping: {} (ISO) -> {} (DAM)".format(iso_upper, dam_code))
return dam_code
else:
# No mapping found, use ISO code as-is
logger.debug("No mapping for country code: {} (using as-is)".format(iso_upper))
return iso_upper
def _load_asset_type_mappings(self):
"""
Load asset type mappings: 3-letter codes -> DAM codes
Returns:
dict: 3-letter code -> DAM code mapping
"""
import yaml
mapping_path = 'config/asset_type_mappings.yaml'
try:
with open(mapping_path, 'r') as f:
mappings = yaml.safe_load(f)
return mappings if mappings else {}
except Exception as e:
logger.warning("Could not load asset type mappings: {}".format(str(e)))
return {}
def _map_asset_type(self, three_letter_code):
"""
Map 3-letter asset type code to DAM code
Args:
three_letter_code: 3-letter code (e.g., 'EHI', 'IMG', 'TVC')
Returns:
DAM code (e.g., 'heroimage', 'keyvisual', 'tvc')
"""
if not three_letter_code:
return three_letter_code
code_upper = three_letter_code.upper()
# Check if we have a mapping
if code_upper in self.asset_type_mappings:
dam_code = self.asset_type_mappings[code_upper]
logger.info("Asset type mapping: {} -> {}".format(code_upper, dam_code))
return dam_code
# No mapping - return as-is
logger.warning("No mapping for asset type: {} - using as-is (may fail DAM validation)".format(code_upper))
return three_letter_code
def _build_fields_from_filename(self, parsed_filename, clean_filename):
"""
Build ALL metadata fields from parsed filename
Used in folder-only mode (tracking ID with -N suffix)
Note: Uses codes directly for now. Can add lookup tables later
for brand_code->brand_name, country_code->country_name, etc.
"""
fields = []
# ASSET NAME
fields.append({
'id': 'ARTESIA.FIELD.ASSET NAME',
'value': {'value': {'value': clean_filename}}
})
# DESCRIPTION (from subject_title)
if parsed_filename.get('subject_title'):
fields.append({
'id': 'ARTESIA.FIELD.ASSET DESCRIPTION',
'value': {'value': {'value': parsed_filename['subject_title']}}
})
# BRAND (use code for now, could add lookup later)
if parsed_filename.get('brand_code'):
fields.append({
'id': 'FERRERO.FIELD.BRAND',
'value': {'value': {'value': parsed_filename['brand_code']}}
})
# COUNTRY (map ISO code to DAM code)
if parsed_filename.get('country_code'):
dam_country_code = self._map_country_code(parsed_filename['country_code'])
fields.append({
'id': 'FERRERO.FIELD.COUNTRY',
'value': {'value': {'value': dam_country_code}}
})
# LANGUAGE (use code for now)
if parsed_filename.get('language_code'):
fields.append({
'id': 'FERRERO.FIELD.LANGUAGES',
'value': {'value': {'value': parsed_filename['language_code']}}
})
# ASSET TYPE (use code for now)
if parsed_filename.get('asset_type'):
fields.append({
'id': 'FERRERO.FIELD.ASSET TYPE',
'value': {'value': {'value': parsed_filename['asset_type']}}
})
# STATE (force to Local)
fields.append({
'id': 'FERRERO.FIELD.STATE',
'value': {'value': {'value': 'Local'}}
})
logger.info("Built {} fields from filename (folder-only mode)".format(len(fields)))
return fields
def _get_field_value(self, field):
"""Get field value handling different structures"""
if 'value' in field:
if isinstance(field['value'], dict):
if 'value' in field['value'] and isinstance(field['value']['value'], dict):
if 'value' in field['value']['value']:
return field['value']['value']['value']
elif 'field_value' in field['value']['value']:
return field['value']['value']['field_value'].get('value')
return None
def _set_field_value(self, field, value):
"""Set field value handling different structures"""
import json
field_id = field.get('id', 'UNKNOWN')
logger.info("_set_field_value called for: {} with value: {}".format(field_id, value))
logger.info("Current field['value']: {}".format(json.dumps(field.get('value'), indent=2) if field.get('value') else 'None'))
if 'value' in field:
if isinstance(field['value'], dict):
# Try nested structure first (most common)
if 'value' in field['value'] and isinstance(field['value']['value'], dict):
if 'value' in field['value']['value']:
field['value']['value']['value'] = value
# Ensure type is set for CreativeX URL field
if field_id == 'FERRERO.FIELD.CREATIVEX LINK' and 'type' not in field['value']['value']:
field['value']['value']['type'] = 'string'
logger.info("Set via field['value']['value']['value']")
elif 'field_value' in field['value']['value']:
field['value']['value']['field_value']['value'] = value
logger.info("Set via field['value']['value']['field_value']['value']")
else:
# If nested dict is empty, create the value structure with type
field['value']['value'] = {'type': 'string', 'value': value}
logger.info("Created field['value']['value'] = {{'type': 'string', 'value': {}}}".format(value))
else:
# If value dict is empty or doesn't have nested value, create it with type
field['value'] = {'value': {'type': 'string', 'value': value}}
logger.info("Created field['value'] = {{'value': {{'type': 'string', 'value': {}}}}}".format(value))
logger.info("After setting, field['value']: {}".format(json.dumps(field.get('value'), indent=2) if field.get('value') else 'None'))
def _set_date_field_value(self, field, milliseconds):
"""
Set date field value with proper type for DAM API
Args:
field: Field dict to update
milliseconds: Date as milliseconds since epoch (int)
"""
field_id = field.get('id', 'UNKNOWN')
logger.info("_set_date_field_value called for: {} with value: {} ms".format(
field_id, milliseconds
))
if 'value' in field:
if isinstance(field['value'], dict):
if 'value' in field['value'] and isinstance(field['value']['value'], dict):
# Update existing nested structure
field['value']['value']['type'] = 'long'
field['value']['value']['value'] = milliseconds
logger.info("Set via field['value']['value'] with type 'long'")
else:
# Create nested structure
field['value'] = {
'value': {
'type': 'long',
'value': milliseconds
}
}
logger.info("Created field['value'] with type 'long'")
else:
# Create value structure from scratch
field['value'] = {
'value': {
'type': 'long',
'value': milliseconds
}
}
logger.info("Created field['value'] from scratch with type 'long'")
def _update_creativex_fields(self, mvp_fields, box_metadata):
"""
Update CreativeX fields from Box metadata template
Args:
mvp_fields: List of MVP fields
box_metadata: dict with 'score' and 'url' from Box template
Returns:
Updated mvp_fields list
"""
# Map Box metadata to DAM field IDs
creativex_mapping = {
'score': 'FERRERO.TAB.FIELD.CREATIVEX', # Platform > Rating (%)
'url': 'FERRERO.FIELD.CREATIVEX LINK' # CreativeX Hyperlink
}
if box_metadata.get('score'):
# Update CreativeX Score field (tabular field structure)
# New structure: Platform^Score (e.g., "Google Ads^100")
score_val = box_metadata['score']
platforms = box_metadata.get('platforms', [])
# If no platforms, default to Unknown
if not platforms:
logger.warning("No Platforms mapped for CreativeX score - using 'Unknown'")
platforms = ["Unknown"]
# Construct value objects for each platform
value_objects = []
for platform in platforms:
combined_value = "{}^{}".format(platform, score_val)
value_obj = {
"cascading_domain_value": True,
"domain_value": False,
"is_locked": False,
"value": {
"type": "com.artesia.metadata.CascadingDomainValue",
"field_value": {
"type": "string",
"value": combined_value
}
}
}
value_objects.append(value_obj)
logger.info("Constructed CreativeX value: {}".format(combined_value))
score_field_found = False
for field in mvp_fields:
if field.get('id') == 'FERRERO.TAB.FIELD.CREATIVEX':
score_field_found = True
try:
# Replace values list with new list of objects
field['values'] = value_objects
logger.info("Set CREATIVEX field with {} values".format(len(value_objects)))
except Exception as e:
logger.error("Failed to set CreativeX Score: {}".format(str(e)))
import traceback
logger.error(traceback.format_exc())
break
if not score_field_found:
logger.warning("CREATIVEX Score field not found in master metadata - adding it now")
# Create the field structure (tabular field)
creativex_score_field = {
"type": "com.artesia.metadata.MetadataTableField",
"id": "FERRERO.TAB.FIELD.CREATIVEX",
"parent_table_id": "FERRERO.TABULAR.FIELD.CREATIVEX",
"values": value_objects
}
mvp_fields.append(creativex_score_field)
logger.info("Added CREATIVEX Score field with {} values".format(len(value_objects)))
if box_metadata.get('url'):
# Update CreativeX URL field
logger.info("Updating CreativeX URL from database: {}".format(box_metadata['url']))
url_field_found = False
for field in mvp_fields:
if field.get('id') == 'FERRERO.FIELD.CREATIVEX LINK':
url_field_found = True
try:
# Log field structure before setting
logger.info("CREATIVEX URL field structure: {}".format(field.get('value', {}).keys() if isinstance(field.get('value'), dict) else 'not a dict'))
self._set_field_value(field, box_metadata['url'])
logger.info("Set CREATIVEX LINK to: {}".format(box_metadata['url']))
except Exception as e:
logger.error("Failed to set CreativeX URL: {}".format(str(e)))
import traceback
logger.error(traceback.format_exc())
break
if not url_field_found:
logger.warning("CREATIVEX URL field not found in master metadata - adding it now")
# Create the field structure (text field)
creativex_url_field = {
'id': 'FERRERO.FIELD.CREATIVEX LINK',
'name': 'CreativeX Hyperlink',
'type': 'com.artesia.metadata.MetadataField',
'value': {
'value': {
'type': 'string',
'value': box_metadata['url']
}
},
'data_type': 'CHAR',
'required': False
}
mvp_fields.append(creativex_url_field)
logger.info("Added CREATIVEX URL field with value: {}".format(box_metadata['url']))
return mvp_fields
def _add_master_asset_id_field(self, mvp_fields, master_opentext_id):
"""
Add ARTESIA.FIELD.ASSET_ID with master asset's DAM ID
Args:
mvp_fields: List of MVP fields
master_opentext_id: DAM Asset ID of the master asset
Returns:
Updated mvp_fields list
"""
# Check if field already exists in MVP fields
for field in mvp_fields:
field_id = self._get_field_id(field)
if field_id == 'ARTESIA.FIELD.ASSET_ID':
# Update existing field value
self._set_field_value(field, master_opentext_id)
logger.info("Updated existing ARTESIA.FIELD.ASSET_ID: {}".format(master_opentext_id))
return mvp_fields
# Field doesn't exist - add new field
mvp_fields.append({
'id': 'ARTESIA.FIELD.ASSET_ID',
'type': 'com.artesia.metadata.MetadataField',
'value': {
'value': {
'type': 'string',
'value': master_opentext_id
}
}
})
logger.info("Added new ARTESIA.FIELD.ASSET_ID: {}".format(master_opentext_id))
return mvp_fields
def _get_field_id(self, field):
"""Extract field ID from field dict"""
if isinstance(field, dict):
return field.get('id', '')
return ''