MAJOR MILESTONE: Complete Python automation system created! Components Implemented: ✅ Box Client (box_client.py) - JWT authentication via boxsdk - Upload with tracking ID suffix - Download files - Campaign folder creation - Connection testing ✅ Database Client (database.py) - PostgreSQL connection pooling - generate_unique_tracking_id() - store_master_asset() with full_metadata JSONB - get_master_asset(tracking_id) - check_campaign_upload_complete() - ALL-DONE CHECK! - store_derivative_asset() - Connection testing ✅ Filename Parser (filename_parser.py) - V2 naming convention parser (ported from PHP) - parse_filename() - 10 components - strip_upload_components() - Remove Job# and Tracking ID - Strict validation with detailed errors ✅ Metadata Extractor MVP (metadata_extractor_mvp.py) - Extract 28 MVP fields from master - Update fields from V2 filename (Description, Language, State) - Add missing fields with defaults - Build asset representation for upload ✅ Notifier (notifier.py) - Mailgun email integration - Outgoing webhook sender - Email templates (success, error, partial, critical) - Configurable recipients Main Scripts: ✅ A1→A2 Download (a1_to_a2_download.py) - Poll DAM every 5 minutes for A1 campaigns - Download all master assets - Upload to Box with tracking IDs - Store in DB with full metadata - ALL-DONE CHECK before status update - Update A1→A2 only if all assets successful - Send webhook with campaign ID/number - Email notifications ✅ A2→A3 Upload (a2_to_a3_upload.py) - Flask webhook receiver for Box uploads - Signature validation - Async task queue processing - Parse V2 filenames - Load master metadata - Extract MVP fields - Upload to DAM - ALL-DONE CHECK for campaign - Update A2→A3 when all assets uploaded - Send webhook notifications ✅ Test Connection Script (test_connection.py) - Verify DAM, Box, Database connectivity - Quick health check ✅ README.md - Complete setup guide - Usage instructions - Configuration examples - Troubleshooting Key Features: - Python 3.6+ compatible (server requirement) - Virtual environment isolated - Configuration-driven (YAML files) - Easy field updates (no code changes) - Environment switching (staging/production) - Comprehensive error handling - Email + webhook notifications - Retry logic - All-done checks before status updates - Campaign webhook notifications Ready for testing locally with Python 3.10! 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
227 lines
7.6 KiB
Python
227 lines
7.6 KiB
Python
"""
|
|
Filename Parser - V2 Naming Convention Parser
|
|
Ported from PHP FilenameParser.php
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import re
|
|
import logging
|
|
|
|
logger = logging.getLogger('FilenameParser')
|
|
|
|
class FilenameParser:
|
|
"""
|
|
Parse V2 naming convention filenames:
|
|
[OMG_JOB]_[BRAND]_[COUNTRY]_[LANG]_[TITLE]_[TYPE]_[VERSION]_[SEC]S_[RATIO]_[TRACKING]
|
|
|
|
Example: 1234567_RAF_DE_de_TEST-JOB_OLV_001_6S_16x9_TaNu6a.mp4
|
|
"""
|
|
|
|
def parse_filename(self, filename):
|
|
"""
|
|
Parse V2 filename into components
|
|
|
|
Args:
|
|
filename: Filename to parse (with or without extension)
|
|
|
|
Returns:
|
|
dict with parsed components and validation results
|
|
"""
|
|
validation_errors = []
|
|
warnings = []
|
|
|
|
# Remove extension
|
|
if '.' in filename:
|
|
filename_without_ext, extension = filename.rsplit('.', 1)
|
|
extension = '.' + extension
|
|
else:
|
|
filename_without_ext = filename
|
|
extension = ''
|
|
|
|
# Split by underscore
|
|
parts = filename_without_ext.split('_')
|
|
|
|
if len(parts) < 9:
|
|
validation_errors.append("Invalid structure: expected min 9 parts, got {}".format(len(parts)))
|
|
|
|
parsed = {
|
|
'original_filename': filename,
|
|
'filename_without_ext': filename_without_ext,
|
|
'extension': extension,
|
|
'omg_job_number': None,
|
|
'brand_code': None,
|
|
'country_code': None,
|
|
'language_code': None,
|
|
'subject_title': None,
|
|
'asset_type': None,
|
|
'spot_version': None,
|
|
'has_master': False,
|
|
'seconds': None,
|
|
'aspect_ratio': None,
|
|
'tracking_id': None,
|
|
'validation_errors': [],
|
|
'warnings': [],
|
|
'is_valid': False
|
|
}
|
|
|
|
if len(parts) < 9:
|
|
parsed['validation_errors'] = validation_errors
|
|
return parsed
|
|
|
|
index = 0
|
|
|
|
# 1. OMG Job Number (digits only, max 10)
|
|
if index < len(parts) and parts[index].isdigit():
|
|
omg = parts[index]
|
|
if len(omg) > 10:
|
|
validation_errors.append("OMG Job Number too long: {} (max 10)".format(omg))
|
|
else:
|
|
parsed['omg_job_number'] = omg
|
|
index += 1
|
|
else:
|
|
if index < len(parts):
|
|
validation_errors.append("OMG Job Number missing or invalid: {}".format(parts[index]))
|
|
|
|
# 2. Brand Code (2-5 chars)
|
|
if index < len(parts):
|
|
brand = parts[index].upper()
|
|
if 2 <= len(brand) <= 5:
|
|
parsed['brand_code'] = brand
|
|
else:
|
|
validation_errors.append("Brand Code invalid: {} (must be 2-5 chars)".format(brand))
|
|
index += 1
|
|
|
|
# 3. Country Code (2 chars)
|
|
if index < len(parts):
|
|
country = parts[index].upper()
|
|
if len(country) == 2:
|
|
parsed['country_code'] = country
|
|
else:
|
|
validation_errors.append("Country Code invalid: {} (must be 2 chars)".format(country))
|
|
index += 1
|
|
|
|
# 4. Language Code (2-3 chars)
|
|
if index < len(parts):
|
|
lang = parts[index].lower()
|
|
if 2 <= len(lang) <= 3:
|
|
parsed['language_code'] = lang
|
|
else:
|
|
validation_errors.append("Language Code invalid: {} (must be 2-3 chars)".format(lang))
|
|
index += 1
|
|
|
|
# 5. Subject Title (find asset type to know where title ends)
|
|
# Asset type is 3 uppercase letters followed by 3-char version
|
|
subject_parts = []
|
|
asset_type_found = False
|
|
|
|
for i in range(index, len(parts)):
|
|
part = parts[i]
|
|
# Check if this looks like asset type (3 uppercase letters)
|
|
if len(part) == 3 and part.isalpha() and part.isupper():
|
|
# Check if next part looks like spot version
|
|
if i + 1 < len(parts):
|
|
next_part = parts[i + 1]
|
|
if len(next_part) == 3 or next_part.upper() == 'MST':
|
|
# Found asset type
|
|
index = i
|
|
asset_type_found = True
|
|
break
|
|
|
|
subject_parts.append(part)
|
|
|
|
if subject_parts:
|
|
parsed['subject_title'] = '_'.join(subject_parts)
|
|
if len(parsed['subject_title']) > 15:
|
|
warnings.append("Subject title exceeds 15 chars: {}".format(parsed['subject_title']))
|
|
|
|
# 6. Asset Type (3 uppercase letters)
|
|
if index < len(parts) and len(parts[index]) == 3:
|
|
parsed['asset_type'] = parts[index].upper()
|
|
index += 1
|
|
else:
|
|
validation_errors.append("Asset Type missing or invalid")
|
|
|
|
# 7. Spot Version (3 chars or MST)
|
|
if index < len(parts):
|
|
spot = parts[index].upper()
|
|
if spot == 'MST' or 'MST' in spot:
|
|
parsed['has_master'] = True
|
|
parsed['spot_version'] = spot
|
|
index += 1
|
|
|
|
# 8. Duration (format: 6S, 15S, etc.)
|
|
if index < len(parts):
|
|
duration = parts[index]
|
|
match = re.match(r'^(\d+)S$', duration, re.IGNORECASE)
|
|
if match:
|
|
parsed['seconds'] = match.group(1)
|
|
else:
|
|
validation_errors.append("Duration invalid: {} (must be format: 6S)".format(duration))
|
|
index += 1
|
|
|
|
# 9. Aspect Ratio (format: 16x9, 4x3, etc.)
|
|
if index < len(parts):
|
|
ratio = parts[index]
|
|
if re.match(r'^\d+x\d+$', ratio, re.IGNORECASE):
|
|
parsed['aspect_ratio'] = ratio
|
|
else:
|
|
validation_errors.append("Aspect Ratio invalid: {} (must be format: 16x9)".format(ratio))
|
|
index += 1
|
|
|
|
# 10. Tracking ID (6 alphanumeric chars)
|
|
if index < len(parts):
|
|
tracking = parts[index]
|
|
if len(tracking) == 6 and tracking.isalnum():
|
|
parsed['tracking_id'] = tracking
|
|
else:
|
|
warnings.append("Tracking ID invalid: {} (should be 6 alphanumeric)".format(tracking))
|
|
parsed['tracking_id'] = tracking
|
|
|
|
# Set validation status
|
|
parsed['validation_errors'] = validation_errors
|
|
parsed['warnings'] = warnings
|
|
parsed['is_valid'] = len(validation_errors) == 0
|
|
|
|
return parsed
|
|
|
|
def strip_upload_components(self, filename):
|
|
"""
|
|
Strip OMG Job Number and Tracking ID from filename
|
|
|
|
Args:
|
|
filename: Original filename
|
|
|
|
Returns:
|
|
Clean filename for upload
|
|
"""
|
|
parsed = self.parse_filename(filename)
|
|
|
|
if not parsed:
|
|
return filename
|
|
|
|
# Build clean filename
|
|
clean_parts = []
|
|
|
|
if parsed['brand_code']:
|
|
clean_parts.append(parsed['brand_code'])
|
|
if parsed['country_code']:
|
|
clean_parts.append(parsed['country_code'])
|
|
if parsed['language_code']:
|
|
clean_parts.append(parsed['language_code'])
|
|
if parsed['subject_title']:
|
|
clean_parts.append(parsed['subject_title'])
|
|
if parsed['asset_type']:
|
|
clean_parts.append(parsed['asset_type'])
|
|
if parsed['spot_version']:
|
|
clean_parts.append(parsed['spot_version'])
|
|
if parsed['seconds']:
|
|
clean_parts.append(parsed['seconds'] + 'S')
|
|
if parsed['aspect_ratio']:
|
|
clean_parts.append(parsed['aspect_ratio'])
|
|
|
|
clean_filename = '_'.join(clean_parts)
|
|
|
|
if parsed['extension']:
|
|
clean_filename += parsed['extension']
|
|
|
|
return clean_filename
|