#!/usr/bin/env python3 """ Diagnostic: Inspect what metadata B1 global masters actually carry in master_assets.full_metadata, so we can tell why the CX backfill found 0. Two checks: 1. Top-level keys of full_metadata (does the structure even contain metadata.metadata_element_list?). 2. Across a larger sample, count occurrences of any element_id that looks CX/score/quality-related (case-insensitive) — surfaces the actual element IDs used by client B1 masters, in case they differ from the A1 IDs the extractor expects. Read-only. Safe to run any time. Usage: python scripts/diagnose_b1_master_metadata.py python scripts/diagnose_b1_master_metadata.py --sample 200 """ import sys import os import json import argparse import logging from collections import Counter sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config from shared.database import Database logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger('B1MetaDiag') CX_HINTS = ('creativex', 'cx', 'score', 'quality') def walk_elements(elements, depth=0): """Recursively yield (depth, element) for every element in a nested metadata_element_list. Categories and tables both contain nested metadata_element_list arrays — flat iteration misses everything below the top level.""" for e in elements or []: if not isinstance(e, dict): continue yield depth, e nested = e.get('metadata_element_list') if isinstance(nested, list): for sub in walk_elements(nested, depth + 1): yield sub def main(): parser = argparse.ArgumentParser() parser.add_argument('--sample', type=int, default=100, help='How many B1 masters to scan for element-ID counts (default 100)') parser.add_argument('--show-full', type=int, default=2, help='How many sample full_metadata blobs to dump in full (default 2)') args = parser.parse_args() config = load_config('config/config.yaml') db = Database(config) if not db.test_connection(): sys.exit(1) conn = db.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT tracking_id, original_filename, full_metadata FROM master_assets WHERE tracking_id LIKE 'M%%' AND local_campaign_id IS NULL AND status = 'active' ORDER BY created_at DESC LIMIT %s """, (args.sample,)) rows = cursor.fetchall() finally: cursor.close() db.put_connection(conn) logger.info('Sampled %d B1 global masters', len(rows)) # 1. Top-level structure check top_key_counter = Counter() has_meta_list = 0 empty_full_meta = 0 for r in rows: full = r[2] if isinstance(r[2], dict) else (r[2] or {}) if not full: empty_full_meta += 1 continue for k in full.keys(): top_key_counter[k] += 1 meta = full.get('metadata') if isinstance(meta, dict) and isinstance(meta.get('metadata_element_list'), list): has_meta_list += 1 logger.info('=' * 60) logger.info('Top-level keys present in full_metadata (count of rows containing the key):') for k, c in top_key_counter.most_common(): logger.info(' %-30s %d', k, c) logger.info('Rows with empty full_metadata: %d', empty_full_meta) logger.info('Rows with metadata.metadata_element_list: %d', has_meta_list) logger.info('=' * 60) # 2. Recursive hunt for CX-flavored element IDs (nested metadata_element_list) id_counter = Counter() cx_id_depth = {} # eid -> depth at which it was first seen cx_id_counter = Counter() rows_with_cx_hint = 0 max_depth_seen = 0 for r in rows: full = r[2] if isinstance(r[2], dict) else (r[2] or {}) top_list = (full.get('metadata') or {}).get('metadata_element_list') or [] row_had_hint = False for depth, e in walk_elements(top_list): if depth > max_depth_seen: max_depth_seen = depth eid = (e.get('id') or '').strip() if not eid: continue id_counter[eid] += 1 lower = eid.lower() if any(h in lower for h in CX_HINTS): cx_id_counter[eid] += 1 cx_id_depth.setdefault(eid, depth) row_had_hint = True if row_had_hint: rows_with_cx_hint += 1 logger.info('Distinct element_ids seen across sample (any depth): %d', len(id_counter)) logger.info('Max nesting depth observed: %d', max_depth_seen) logger.info('Rows containing at least one CX-flavored element_id: %d / %d', rows_with_cx_hint, len(rows)) logger.info('-' * 60) if cx_id_counter: logger.info('CX/score/quality-flavored element_ids found (id @ depth, count):') for eid, c in cx_id_counter.most_common(): logger.info(' %-50s @depth %d %d', eid, cx_id_depth[eid], c) else: logger.info('NO CX/score/quality-flavored element_ids found at any depth.') logger.info('Likely: client B1 masters were uploaded before CX scoring ran on them.') logger.info('=' * 60) # 3. Dump first few full blobs verbatim for manual inspection if args.show_full > 0: logger.info('First %d full_metadata blobs (truncated to 4KB each):', args.show_full) for r in rows[:args.show_full]: full = r[2] if isinstance(r[2], dict) else (r[2] or {}) blob = json.dumps(full, indent=2, default=str) if len(blob) > 4096: blob = blob[:4096] + '\n... [truncated]' logger.info('--- %s (%s) ---\n%s', r[0], r[1], blob) db.close() if __name__ == '__main__': main()