Diagnostic confirmed FERRERO.TAB.FIELD.CREATIVEX (score) lives at depth 2 in B1 master metadata — nested under FERRERO.TABULAR.FIELD.CREATIVEX inside a category — and FERRERO.FIELD.CREATIVEX LINK lives at depth 1. The flat top-level walk used previously never reached them, so live B1 runs and the backfill both reported zero CX scores. Updated extractor in b1_to_b2_download.py and the inline copy in backfill_b1_creativex_scores.py to descend recursively. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
203 lines
7.3 KiB
Python
203 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
One-shot backfill: Populate creativex_scores with status='b1-master-cx-score'
|
|
for B1→B2 global masters already in master_assets that don't yet have a row.
|
|
|
|
Identification rule:
|
|
tracking_id LIKE 'M%' AND local_campaign_id IS NULL AND status = 'active'
|
|
B1→B2 stores masters without local_campaign_id; A1→A2 always sets it, so this
|
|
cleanly separates global from local masters that share the M-prefix.
|
|
|
|
The CX score is read out of master_assets.full_metadata JSONB. Rows where the
|
|
DAM metadata has no CreativeX score AND no URL are reported but skipped.
|
|
db.store_creativex_score(..., status='b1-master-cx-score') already dedupes by
|
|
tracking_id, so re-running is safe.
|
|
|
|
Usage:
|
|
python scripts/backfill_b1_creativex_scores.py # apply
|
|
python scripts/backfill_b1_creativex_scores.py --dry-run # preview only
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import logging
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from shared.config_loader import load_config
|
|
from shared.database import Database
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger('B1CXBackfill')
|
|
|
|
|
|
def _walk_metadata_elements(elements):
|
|
"""Recursively yield every element in nested metadata_element_list arrays."""
|
|
for e in elements or []:
|
|
if not isinstance(e, dict):
|
|
continue
|
|
yield e
|
|
nested = e.get('metadata_element_list')
|
|
if isinstance(nested, list):
|
|
for sub in _walk_metadata_elements(nested):
|
|
yield sub
|
|
|
|
|
|
def extract_creativex_from_dam_metadata(asset_metadata):
|
|
"""Mirror of the extractor in b1_to_b2_download.py — duplicated here
|
|
to keep the backfill script self-contained (avoids triggering
|
|
b1_to_b2_download's module-level logging setup on import).
|
|
|
|
Walks recursively: the score field is at depth 2 (nested inside
|
|
FERRERO.TABULAR.FIELD.CREATIVEX, which lives inside a category)."""
|
|
try:
|
|
top = (asset_metadata or {}).get('metadata', {}).get('metadata_element_list', [])
|
|
cx = {'score': None, 'url': None}
|
|
for element in _walk_metadata_elements(top):
|
|
element_id = element.get('id')
|
|
if element_id == 'FERRERO.TAB.FIELD.CREATIVEX':
|
|
values = element.get('values', [])
|
|
if values:
|
|
value_obj = values[0].get('value', {})
|
|
if isinstance(value_obj, dict):
|
|
field_value = value_obj.get('field_value', {})
|
|
if isinstance(field_value, dict):
|
|
score = field_value.get('value')
|
|
if score:
|
|
cx['score'] = str(score)
|
|
elif element_id == 'FERRERO.FIELD.CREATIVEX LINK':
|
|
value_obj = element.get('value', {})
|
|
if isinstance(value_obj, dict):
|
|
nested = value_obj.get('value', {})
|
|
if isinstance(nested, dict):
|
|
url = nested.get('value')
|
|
if url:
|
|
cx['url'] = url
|
|
return cx
|
|
except Exception as e:
|
|
logger.warning('Failed to extract CreativeX from metadata: %s', e)
|
|
return {'score': None, 'url': None}
|
|
|
|
|
|
def fetch_b1_masters(db):
|
|
conn = db.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT tracking_id, original_filename, file_extension,
|
|
full_metadata, description
|
|
FROM master_assets
|
|
WHERE tracking_id LIKE 'M%'
|
|
AND local_campaign_id IS NULL
|
|
AND status = 'active'
|
|
ORDER BY created_at
|
|
""")
|
|
rows = cursor.fetchall()
|
|
return [
|
|
{
|
|
'tracking_id': r[0],
|
|
'filename': (r[1] or '') + (r[2] or ''),
|
|
'full_metadata': r[3] if isinstance(r[3], dict) else (r[3] or {}),
|
|
'box_file_id': Database.parse_box_info_from_description(r[4]).get('box_file_id') or '',
|
|
}
|
|
for r in rows
|
|
]
|
|
finally:
|
|
cursor.close()
|
|
db.put_connection(conn)
|
|
|
|
|
|
def existing_cx_tracking_ids(db):
|
|
"""Return set of tracking_ids that already have a b1-master-cx-score row."""
|
|
conn = db.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT DISTINCT tracking_id
|
|
FROM creativex_scores
|
|
WHERE status = 'b1-master-cx-score'
|
|
AND tracking_id IS NOT NULL
|
|
""")
|
|
return {row[0] for row in cursor.fetchall()}
|
|
finally:
|
|
cursor.close()
|
|
db.put_connection(conn)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Backfill B1 master CreativeX scores')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Report what would be inserted without touching the DB')
|
|
args = parser.parse_args()
|
|
|
|
config = load_config('config/config.yaml')
|
|
db = Database(config)
|
|
|
|
if not db.test_connection():
|
|
logger.error('Database connection failed')
|
|
sys.exit(1)
|
|
|
|
masters = fetch_b1_masters(db)
|
|
already_have = existing_cx_tracking_ids(db)
|
|
|
|
logger.info('Scanned %d B1 global masters in master_assets', len(masters))
|
|
logger.info('Existing b1-master-cx-score rows: %d', len(already_have))
|
|
|
|
inserted = 0
|
|
skipped_no_cx = 0
|
|
skipped_already = 0
|
|
|
|
for m in masters:
|
|
if m['tracking_id'] in already_have:
|
|
skipped_already += 1
|
|
continue
|
|
|
|
cx = extract_creativex_from_dam_metadata(m['full_metadata'])
|
|
if not (cx['score'] or cx['url']):
|
|
skipped_no_cx += 1
|
|
logger.debug('No CX in metadata for %s (%s)', m['tracking_id'], m['filename'])
|
|
continue
|
|
|
|
if args.dry_run:
|
|
logger.info('[DRY-RUN] Would insert: %s | %s | score=%s url=%s',
|
|
m['tracking_id'], m['filename'], cx['score'], cx['url'])
|
|
inserted += 1
|
|
continue
|
|
|
|
result = db.store_creativex_score(
|
|
filename=m['filename'],
|
|
creativex_id='',
|
|
creativex_url=cx['url'] or '',
|
|
quality_score=cx['score'] or '',
|
|
box_file_id=m['box_file_id'],
|
|
full_extraction_data={'master_metadata': True, 'source': 'b1_backfill', 'data': cx},
|
|
tracking_id=m['tracking_id'],
|
|
status='b1-master-cx-score'
|
|
)
|
|
if result.get('success'):
|
|
if result.get('already_exists'):
|
|
# Race or stale already_have set — count as already
|
|
skipped_already += 1
|
|
else:
|
|
inserted += 1
|
|
logger.info('Inserted: %s | %s | score=%s', m['tracking_id'], m['filename'], cx['score'])
|
|
else:
|
|
logger.error('Failed for %s: %s', m['tracking_id'], result.get('error'))
|
|
|
|
logger.info('=' * 60)
|
|
logger.info('Backfill summary%s:', ' (DRY-RUN)' if args.dry_run else '')
|
|
logger.info(' Scanned B1 masters: %d', len(masters))
|
|
logger.info(' Already had CX row: %d', skipped_already)
|
|
logger.info(' No CX in metadata: %d', skipped_no_cx)
|
|
logger.info(' %s: %d', 'Would insert' if args.dry_run else 'Inserted', inserted)
|
|
logger.info('=' * 60)
|
|
|
|
db.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|