ferrero-opentext/Python-Version/scripts/backfill_b1_creativex_scores.py
nickviljoen 8bf8dc1325 Fix: Recursively walk metadata_element_list when extracting CreativeX
Diagnostic confirmed FERRERO.TAB.FIELD.CREATIVEX (score) lives at depth 2
in B1 master metadata — nested under FERRERO.TABULAR.FIELD.CREATIVEX
inside a category — and FERRERO.FIELD.CREATIVEX LINK lives at depth 1.
The flat top-level walk used previously never reached them, so live B1
runs and the backfill both reported zero CX scores. Updated extractor
in b1_to_b2_download.py and the inline copy in
backfill_b1_creativex_scores.py to descend recursively.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 11:53:15 +02:00

203 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""
One-shot backfill: Populate creativex_scores with status='b1-master-cx-score'
for B1→B2 global masters already in master_assets that don't yet have a row.
Identification rule:
tracking_id LIKE 'M%' AND local_campaign_id IS NULL AND status = 'active'
B1→B2 stores masters without local_campaign_id; A1→A2 always sets it, so this
cleanly separates global from local masters that share the M-prefix.
The CX score is read out of master_assets.full_metadata JSONB. Rows where the
DAM metadata has no CreativeX score AND no URL are reported but skipped.
db.store_creativex_score(..., status='b1-master-cx-score') already dedupes by
tracking_id, so re-running is safe.
Usage:
python scripts/backfill_b1_creativex_scores.py # apply
python scripts/backfill_b1_creativex_scores.py --dry-run # preview only
"""
import sys
import os
import argparse
import logging
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config
from shared.database import Database
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('B1CXBackfill')
def _walk_metadata_elements(elements):
"""Recursively yield every element in nested metadata_element_list arrays."""
for e in elements or []:
if not isinstance(e, dict):
continue
yield e
nested = e.get('metadata_element_list')
if isinstance(nested, list):
for sub in _walk_metadata_elements(nested):
yield sub
def extract_creativex_from_dam_metadata(asset_metadata):
"""Mirror of the extractor in b1_to_b2_download.py — duplicated here
to keep the backfill script self-contained (avoids triggering
b1_to_b2_download's module-level logging setup on import).
Walks recursively: the score field is at depth 2 (nested inside
FERRERO.TABULAR.FIELD.CREATIVEX, which lives inside a category)."""
try:
top = (asset_metadata or {}).get('metadata', {}).get('metadata_element_list', [])
cx = {'score': None, 'url': None}
for element in _walk_metadata_elements(top):
element_id = element.get('id')
if element_id == 'FERRERO.TAB.FIELD.CREATIVEX':
values = element.get('values', [])
if values:
value_obj = values[0].get('value', {})
if isinstance(value_obj, dict):
field_value = value_obj.get('field_value', {})
if isinstance(field_value, dict):
score = field_value.get('value')
if score:
cx['score'] = str(score)
elif element_id == 'FERRERO.FIELD.CREATIVEX LINK':
value_obj = element.get('value', {})
if isinstance(value_obj, dict):
nested = value_obj.get('value', {})
if isinstance(nested, dict):
url = nested.get('value')
if url:
cx['url'] = url
return cx
except Exception as e:
logger.warning('Failed to extract CreativeX from metadata: %s', e)
return {'score': None, 'url': None}
def fetch_b1_masters(db):
conn = db.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT tracking_id, original_filename, file_extension,
full_metadata, description
FROM master_assets
WHERE tracking_id LIKE 'M%'
AND local_campaign_id IS NULL
AND status = 'active'
ORDER BY created_at
""")
rows = cursor.fetchall()
return [
{
'tracking_id': r[0],
'filename': (r[1] or '') + (r[2] or ''),
'full_metadata': r[3] if isinstance(r[3], dict) else (r[3] or {}),
'box_file_id': Database.parse_box_info_from_description(r[4]).get('box_file_id') or '',
}
for r in rows
]
finally:
cursor.close()
db.put_connection(conn)
def existing_cx_tracking_ids(db):
"""Return set of tracking_ids that already have a b1-master-cx-score row."""
conn = db.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT tracking_id
FROM creativex_scores
WHERE status = 'b1-master-cx-score'
AND tracking_id IS NOT NULL
""")
return {row[0] for row in cursor.fetchall()}
finally:
cursor.close()
db.put_connection(conn)
def main():
parser = argparse.ArgumentParser(description='Backfill B1 master CreativeX scores')
parser.add_argument('--dry-run', action='store_true',
help='Report what would be inserted without touching the DB')
args = parser.parse_args()
config = load_config('config/config.yaml')
db = Database(config)
if not db.test_connection():
logger.error('Database connection failed')
sys.exit(1)
masters = fetch_b1_masters(db)
already_have = existing_cx_tracking_ids(db)
logger.info('Scanned %d B1 global masters in master_assets', len(masters))
logger.info('Existing b1-master-cx-score rows: %d', len(already_have))
inserted = 0
skipped_no_cx = 0
skipped_already = 0
for m in masters:
if m['tracking_id'] in already_have:
skipped_already += 1
continue
cx = extract_creativex_from_dam_metadata(m['full_metadata'])
if not (cx['score'] or cx['url']):
skipped_no_cx += 1
logger.debug('No CX in metadata for %s (%s)', m['tracking_id'], m['filename'])
continue
if args.dry_run:
logger.info('[DRY-RUN] Would insert: %s | %s | score=%s url=%s',
m['tracking_id'], m['filename'], cx['score'], cx['url'])
inserted += 1
continue
result = db.store_creativex_score(
filename=m['filename'],
creativex_id='',
creativex_url=cx['url'] or '',
quality_score=cx['score'] or '',
box_file_id=m['box_file_id'],
full_extraction_data={'master_metadata': True, 'source': 'b1_backfill', 'data': cx},
tracking_id=m['tracking_id'],
status='b1-master-cx-score'
)
if result.get('success'):
if result.get('already_exists'):
# Race or stale already_have set — count as already
skipped_already += 1
else:
inserted += 1
logger.info('Inserted: %s | %s | score=%s', m['tracking_id'], m['filename'], cx['score'])
else:
logger.error('Failed for %s: %s', m['tracking_id'], result.get('error'))
logger.info('=' * 60)
logger.info('Backfill summary%s:', ' (DRY-RUN)' if args.dry_run else '')
logger.info(' Scanned B1 masters: %d', len(masters))
logger.info(' Already had CX row: %d', skipped_already)
logger.info(' No CX in metadata: %d', skipped_no_cx)
logger.info(' %s: %d', 'Would insert' if args.dry_run else 'Inserted', inserted)
logger.info('=' * 60)
db.close()
if __name__ == '__main__':
main()