MAJOR MILESTONE: Complete Python automation system created! Components Implemented: ✅ Box Client (box_client.py) - JWT authentication via boxsdk - Upload with tracking ID suffix - Download files - Campaign folder creation - Connection testing ✅ Database Client (database.py) - PostgreSQL connection pooling - generate_unique_tracking_id() - store_master_asset() with full_metadata JSONB - get_master_asset(tracking_id) - check_campaign_upload_complete() - ALL-DONE CHECK! - store_derivative_asset() - Connection testing ✅ Filename Parser (filename_parser.py) - V2 naming convention parser (ported from PHP) - parse_filename() - 10 components - strip_upload_components() - Remove Job# and Tracking ID - Strict validation with detailed errors ✅ Metadata Extractor MVP (metadata_extractor_mvp.py) - Extract 28 MVP fields from master - Update fields from V2 filename (Description, Language, State) - Add missing fields with defaults - Build asset representation for upload ✅ Notifier (notifier.py) - Mailgun email integration - Outgoing webhook sender - Email templates (success, error, partial, critical) - Configurable recipients Main Scripts: ✅ A1→A2 Download (a1_to_a2_download.py) - Poll DAM every 5 minutes for A1 campaigns - Download all master assets - Upload to Box with tracking IDs - Store in DB with full metadata - ALL-DONE CHECK before status update - Update A1→A2 only if all assets successful - Send webhook with campaign ID/number - Email notifications ✅ A2→A3 Upload (a2_to_a3_upload.py) - Flask webhook receiver for Box uploads - Signature validation - Async task queue processing - Parse V2 filenames - Load master metadata - Extract MVP fields - Upload to DAM - ALL-DONE CHECK for campaign - Update A2→A3 when all assets uploaded - Send webhook notifications ✅ Test Connection Script (test_connection.py) - Verify DAM, Box, Database connectivity - Quick health check ✅ README.md - Complete setup guide - Usage instructions - Configuration examples - Troubleshooting Key Features: - Python 3.6+ compatible (server requirement) - Virtual environment isolated - Configuration-driven (YAML files) - Easy field updates (no code changes) - Environment switching (staging/production) - Comprehensive error handling - Email + webhook notifications - Retry logic - All-done checks before status updates - Campaign webhook notifications Ready for testing locally with Python 3.10! 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
221 lines
8.8 KiB
Python
221 lines
8.8 KiB
Python
"""
|
|
Metadata Extractor MVP - Extract MVP fields from master metadata
|
|
Ported from PHP MetadataExtractorMVP.php
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger('MetadataExtractorMVP')
|
|
|
|
class MetadataExtractorMVP:
|
|
def __init__(self, field_mappings):
|
|
"""
|
|
Initialize with field mappings from config
|
|
|
|
Args:
|
|
field_mappings: dict from field_mappings.yaml
|
|
"""
|
|
self.mvp_field_ids = field_mappings['mvp_fields']
|
|
self.filename_updates = field_mappings.get('filename_updates', {})
|
|
self.forced_values = field_mappings.get('forced_values', {})
|
|
self.defaults = field_mappings.get('defaults', {})
|
|
|
|
def extract_mvp_fields(self, master_metadata):
|
|
"""
|
|
Extract only MVP fields from full master metadata
|
|
|
|
Args:
|
|
master_metadata: Complete DAM asset metadata
|
|
|
|
Returns:
|
|
List of MVP field objects
|
|
"""
|
|
extracted_fields = []
|
|
found_field_ids = []
|
|
|
|
# Navigate to metadata structure
|
|
# master_metadata is the full asset, need to go to: metadata.metadata_element_list
|
|
metadata_list = []
|
|
|
|
if isinstance(master_metadata, dict):
|
|
if 'metadata' in master_metadata and 'metadata_element_list' in master_metadata['metadata']:
|
|
metadata_list = master_metadata['metadata']['metadata_element_list']
|
|
logger.info("Using master_metadata['metadata']['metadata_element_list']")
|
|
|
|
logger.info("Searching through {} categories for MVP fields".format(len(metadata_list)))
|
|
|
|
# Search through categories for MVP fields
|
|
for item in metadata_list:
|
|
if 'metadata_element_list' in item:
|
|
# Category with nested fields
|
|
for field in item['metadata_element_list']:
|
|
field_id = field.get('id')
|
|
if field_id in self.mvp_field_ids:
|
|
extracted_fields.append(field)
|
|
found_field_ids.append(field_id)
|
|
logger.debug("Found MVP field: {}".format(field_id))
|
|
elif 'id' in item and item['id'] in self.mvp_field_ids:
|
|
# Direct field
|
|
extracted_fields.append(item)
|
|
found_field_ids.append(item['id'])
|
|
logger.debug("Found direct MVP field: {}".format(item['id']))
|
|
|
|
# Log results
|
|
missing = [f for f in self.mvp_field_ids if f not in found_field_ids]
|
|
logger.info("Found {}/{} MVP fields".format(len(found_field_ids), len(self.mvp_field_ids)))
|
|
|
|
if missing:
|
|
logger.info("Missing fields: {}".format(', '.join(missing[:5])))
|
|
|
|
return extracted_fields
|
|
|
|
def build_mvp_asset_representation(self, master_metadata, clean_filename, parsed_filename):
|
|
"""
|
|
Build asset representation with MVP fields + updates from filename
|
|
|
|
Args:
|
|
master_metadata: Full master asset metadata
|
|
clean_filename: Clean filename (stripped)
|
|
parsed_filename: Parsed V2 filename dict
|
|
|
|
Returns:
|
|
Asset representation dict ready for upload
|
|
"""
|
|
# Extract MVP fields from master
|
|
mvp_fields = self.extract_mvp_fields(master_metadata)
|
|
|
|
# Update fields from filename and forced values
|
|
mvp_fields = self._update_fields(mvp_fields, clean_filename, parsed_filename)
|
|
|
|
# Add missing MVP fields with defaults
|
|
mvp_fields = self._add_missing_fields(mvp_fields, parsed_filename)
|
|
|
|
# Build asset representation
|
|
asset_rep = {
|
|
'asset_resource': {
|
|
'asset': {
|
|
'metadata': {
|
|
'metadata_element_list': mvp_fields
|
|
},
|
|
'metadata_model_id': 'ECOMMERCE',
|
|
'security_policy_list': [
|
|
{'id': 1594}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.info("Built MVP asset representation with {} fields".format(len(mvp_fields)))
|
|
|
|
return asset_rep
|
|
|
|
def _update_fields(self, mvp_fields, clean_filename, parsed_filename):
|
|
"""Update specific fields from filename and forced values"""
|
|
|
|
# Update ASSET NAME
|
|
for field in mvp_fields:
|
|
if field.get('id') == 'ARTESIA.FIELD.ASSET NAME':
|
|
self._set_field_value(field, clean_filename)
|
|
logger.info("Updated ASSET NAME: {}".format(clean_filename))
|
|
|
|
# Update DESCRIPTION from subject_title
|
|
if parsed_filename and parsed_filename.get('subject_title'):
|
|
for field in mvp_fields:
|
|
if field.get('id') == 'ARTESIA.FIELD.ASSET DESCRIPTION':
|
|
self._set_field_value(field, parsed_filename['subject_title'])
|
|
logger.info("Updated DESCRIPTION: {}".format(parsed_filename['subject_title']))
|
|
|
|
# Force STATE to Local
|
|
for field in mvp_fields:
|
|
if field.get('id') == 'FERRERO.FIELD.STATE':
|
|
self._set_field_value(field, 'Local')
|
|
logger.info("Set STATE to Local")
|
|
|
|
return mvp_fields
|
|
|
|
def _add_missing_fields(self, mvp_fields, parsed_filename):
|
|
"""Add missing MVP fields from filename or defaults"""
|
|
field_ids = [f.get('id') for f in mvp_fields]
|
|
|
|
# Add MAIN_LANGUAGES if missing
|
|
if 'MAIN_LANGUAGES' not in field_ids and parsed_filename:
|
|
if parsed_filename.get('language_code'):
|
|
language = parsed_filename['language_code'].upper()
|
|
logger.info("Adding MAIN_LANGUAGES: {}".format(language))
|
|
|
|
mvp_fields.append({
|
|
'id': 'MAIN_LANGUAGES',
|
|
'parent_table_id': 'FERRERO.TABULAR.FIELD.MAIN LANGUAGES',
|
|
'type': 'com.artesia.metadata.MetadataTableField',
|
|
'values': [
|
|
{
|
|
'cascading_domain_value': False,
|
|
'domain_value': True,
|
|
'value': {
|
|
'field_value': {
|
|
'type': 'string',
|
|
'value': language
|
|
},
|
|
'type': 'com.artesia.metadata.DomainValue'
|
|
}
|
|
}
|
|
]
|
|
})
|
|
|
|
# Add other missing fields with defaults
|
|
field_ids = [f.get('id') for f in mvp_fields]
|
|
|
|
for field_id, default_value in self.defaults.items():
|
|
if field_id not in field_ids:
|
|
logger.info("Adding {} with default: {}".format(field_id, default_value))
|
|
|
|
# Check if it's a tabular field (contains .TABULAR. in parent table ID)
|
|
is_tabular = 'TABULAR' in field_id or field_id in [
|
|
'FERRERO.FIELD.ASSETCOMPLIANCE', 'MARKETING_TAG'
|
|
]
|
|
|
|
if is_tabular:
|
|
mvp_fields.append({
|
|
'id': field_id,
|
|
'parent_table_id': 'FERRERO.TABULAR.FIELD.' + field_id.split('.')[-1],
|
|
'type': 'com.artesia.metadata.MetadataTableField',
|
|
'values': [
|
|
{
|
|
'cascading_domain_value': False,
|
|
'domain_value': True,
|
|
'value': {
|
|
'field_value': {
|
|
'type': 'string',
|
|
'value': default_value
|
|
},
|
|
'type': 'com.artesia.metadata.DomainValue'
|
|
}
|
|
}
|
|
]
|
|
})
|
|
else:
|
|
mvp_fields.append({
|
|
'id': field_id,
|
|
'type': 'com.artesia.metadata.MetadataField',
|
|
'value': {
|
|
'cascading_domain_value': False,
|
|
'domain_value': True,
|
|
'value': {
|
|
'type': 'string',
|
|
'value': default_value
|
|
}
|
|
}
|
|
})
|
|
|
|
return mvp_fields
|
|
|
|
def _set_field_value(self, field, value):
|
|
"""Set field value handling different structures"""
|
|
if 'value' in field:
|
|
if isinstance(field['value'], dict):
|
|
if 'value' in field['value'] and isinstance(field['value']['value'], dict):
|
|
if 'value' in field['value']['value']:
|
|
field['value']['value']['value'] = value
|
|
elif 'field_value' in field['value']['value']:
|
|
field['value']['value']['field_value']['value'] = value
|