#!/usr/bin/env python3 """ One-shot backfill: Populate creativex_scores with status='b1-master-cx-score' for B1→B2 global masters already in master_assets that don't yet have a row. Identification rule: tracking_id LIKE 'M%' AND local_campaign_id IS NULL AND status = 'active' B1→B2 stores masters without local_campaign_id; A1→A2 always sets it, so this cleanly separates global from local masters that share the M-prefix. The CX score is read out of master_assets.full_metadata JSONB. Rows where the DAM metadata has no CreativeX score AND no URL are reported but skipped. db.store_creativex_score(..., status='b1-master-cx-score') already dedupes by tracking_id, so re-running is safe. Usage: python scripts/backfill_b1_creativex_scores.py # apply python scripts/backfill_b1_creativex_scores.py --dry-run # preview only """ import sys import os import argparse import logging sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config from shared.database import Database logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('B1CXBackfill') def _walk_metadata_elements(elements): """Recursively yield every element in nested metadata_element_list arrays.""" for e in elements or []: if not isinstance(e, dict): continue yield e nested = e.get('metadata_element_list') if isinstance(nested, list): for sub in _walk_metadata_elements(nested): yield sub def extract_creativex_from_dam_metadata(asset_metadata): """Mirror of the extractor in b1_to_b2_download.py — duplicated here to keep the backfill script self-contained (avoids triggering b1_to_b2_download's module-level logging setup on import). Walks recursively: the score field is at depth 2 (nested inside FERRERO.TABULAR.FIELD.CREATIVEX, which lives inside a category).""" try: top = (asset_metadata or {}).get('metadata', {}).get('metadata_element_list', []) cx = {'score': None, 'url': None} for element in _walk_metadata_elements(top): element_id = element.get('id') if element_id == 'FERRERO.TAB.FIELD.CREATIVEX': values = element.get('values', []) if values: value_obj = values[0].get('value', {}) if isinstance(value_obj, dict): field_value = value_obj.get('field_value', {}) if isinstance(field_value, dict): score = field_value.get('value') if score: cx['score'] = str(score) elif element_id == 'FERRERO.FIELD.CREATIVEX LINK': value_obj = element.get('value', {}) if isinstance(value_obj, dict): nested = value_obj.get('value', {}) if isinstance(nested, dict): url = nested.get('value') if url: cx['url'] = url return cx except Exception as e: logger.warning('Failed to extract CreativeX from metadata: %s', e) return {'score': None, 'url': None} def fetch_b1_masters(db): conn = db.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT tracking_id, original_filename, file_extension, full_metadata, description FROM master_assets WHERE tracking_id LIKE 'M%' AND local_campaign_id IS NULL AND status = 'active' ORDER BY created_at """) rows = cursor.fetchall() return [ { 'tracking_id': r[0], 'filename': (r[1] or '') + (r[2] or ''), 'full_metadata': r[3] if isinstance(r[3], dict) else (r[3] or {}), 'box_file_id': Database.parse_box_info_from_description(r[4]).get('box_file_id') or '', } for r in rows ] finally: cursor.close() db.put_connection(conn) def existing_cx_tracking_ids(db): """Return set of tracking_ids that already have a b1-master-cx-score row.""" conn = db.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT tracking_id FROM creativex_scores WHERE status = 'b1-master-cx-score' AND tracking_id IS NOT NULL """) return {row[0] for row in cursor.fetchall()} finally: cursor.close() db.put_connection(conn) def main(): parser = argparse.ArgumentParser(description='Backfill B1 master CreativeX scores') parser.add_argument('--dry-run', action='store_true', help='Report what would be inserted without touching the DB') args = parser.parse_args() config = load_config('config/config.yaml') db = Database(config) if not db.test_connection(): logger.error('Database connection failed') sys.exit(1) masters = fetch_b1_masters(db) already_have = existing_cx_tracking_ids(db) logger.info('Scanned %d B1 global masters in master_assets', len(masters)) logger.info('Existing b1-master-cx-score rows: %d', len(already_have)) inserted = 0 skipped_no_cx = 0 skipped_already = 0 for m in masters: if m['tracking_id'] in already_have: skipped_already += 1 continue cx = extract_creativex_from_dam_metadata(m['full_metadata']) if not (cx['score'] or cx['url']): skipped_no_cx += 1 logger.debug('No CX in metadata for %s (%s)', m['tracking_id'], m['filename']) continue if args.dry_run: logger.info('[DRY-RUN] Would insert: %s | %s | score=%s url=%s', m['tracking_id'], m['filename'], cx['score'], cx['url']) inserted += 1 continue result = db.store_creativex_score( filename=m['filename'], creativex_id='', creativex_url=cx['url'] or '', quality_score=cx['score'] or '', box_file_id=m['box_file_id'], full_extraction_data={'master_metadata': True, 'source': 'b1_backfill', 'data': cx}, tracking_id=m['tracking_id'], status='b1-master-cx-score' ) if result.get('success'): if result.get('already_exists'): # Race or stale already_have set — count as already skipped_already += 1 else: inserted += 1 logger.info('Inserted: %s | %s | score=%s', m['tracking_id'], m['filename'], cx['score']) else: logger.error('Failed for %s: %s', m['tracking_id'], result.get('error')) logger.info('=' * 60) logger.info('Backfill summary%s:', ' (DRY-RUN)' if args.dry_run else '') logger.info(' Scanned B1 masters: %d', len(masters)) logger.info(' Already had CX row: %d', skipped_already) logger.info(' No CX in metadata: %d', skipped_no_cx) logger.info(' %s: %d', 'Would insert' if args.dry_run else 'Inserted', inserted) logger.info('=' * 60) db.close() if __name__ == '__main__': main()