ferrero-opentext/Python-Version/scripts/diagnose_b1_master_metadata.py
nickviljoen a463eb42f8 Diagnostic: Recursively walk nested metadata_element_list for CX search
Previous version only looked at top-level metadata_element_list, which
contains categories — actual fields nest under each category. Now
recursively descends through all nested metadata_element_list arrays
and counts every element_id at any depth, then searches the full set
for CX/score/quality hints. Reports max nesting depth and the depth at
which each CX-flavored ID was found.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 11:49:54 +02:00

162 lines
5.8 KiB
Python

#!/usr/bin/env python3
"""
Diagnostic: Inspect what metadata B1 global masters actually carry in
master_assets.full_metadata, so we can tell why the CX backfill found 0.
Two checks:
1. Top-level keys of full_metadata (does the structure even contain
metadata.metadata_element_list?).
2. Across a larger sample, count occurrences of any element_id that
looks CX/score/quality-related (case-insensitive) — surfaces the
actual element IDs used by client B1 masters, in case they differ
from the A1 IDs the extractor expects.
Read-only. Safe to run any time.
Usage:
python scripts/diagnose_b1_master_metadata.py
python scripts/diagnose_b1_master_metadata.py --sample 200
"""
import sys
import os
import json
import argparse
import logging
from collections import Counter
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config
from shared.database import Database
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('B1MetaDiag')
CX_HINTS = ('creativex', 'cx', 'score', 'quality')
def walk_elements(elements, depth=0):
"""Recursively yield (depth, element) for every element in a nested
metadata_element_list. Categories and tables both contain nested
metadata_element_list arrays — flat iteration misses everything below
the top level."""
for e in elements or []:
if not isinstance(e, dict):
continue
yield depth, e
nested = e.get('metadata_element_list')
if isinstance(nested, list):
for sub in walk_elements(nested, depth + 1):
yield sub
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--sample', type=int, default=100,
help='How many B1 masters to scan for element-ID counts (default 100)')
parser.add_argument('--show-full', type=int, default=2,
help='How many sample full_metadata blobs to dump in full (default 2)')
args = parser.parse_args()
config = load_config('config/config.yaml')
db = Database(config)
if not db.test_connection():
sys.exit(1)
conn = db.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT tracking_id, original_filename, full_metadata
FROM master_assets
WHERE tracking_id LIKE 'M%%'
AND local_campaign_id IS NULL
AND status = 'active'
ORDER BY created_at DESC
LIMIT %s
""", (args.sample,))
rows = cursor.fetchall()
finally:
cursor.close()
db.put_connection(conn)
logger.info('Sampled %d B1 global masters', len(rows))
# 1. Top-level structure check
top_key_counter = Counter()
has_meta_list = 0
empty_full_meta = 0
for r in rows:
full = r[2] if isinstance(r[2], dict) else (r[2] or {})
if not full:
empty_full_meta += 1
continue
for k in full.keys():
top_key_counter[k] += 1
meta = full.get('metadata')
if isinstance(meta, dict) and isinstance(meta.get('metadata_element_list'), list):
has_meta_list += 1
logger.info('=' * 60)
logger.info('Top-level keys present in full_metadata (count of rows containing the key):')
for k, c in top_key_counter.most_common():
logger.info(' %-30s %d', k, c)
logger.info('Rows with empty full_metadata: %d', empty_full_meta)
logger.info('Rows with metadata.metadata_element_list: %d', has_meta_list)
logger.info('=' * 60)
# 2. Recursive hunt for CX-flavored element IDs (nested metadata_element_list)
id_counter = Counter()
cx_id_depth = {} # eid -> depth at which it was first seen
cx_id_counter = Counter()
rows_with_cx_hint = 0
max_depth_seen = 0
for r in rows:
full = r[2] if isinstance(r[2], dict) else (r[2] or {})
top_list = (full.get('metadata') or {}).get('metadata_element_list') or []
row_had_hint = False
for depth, e in walk_elements(top_list):
if depth > max_depth_seen:
max_depth_seen = depth
eid = (e.get('id') or '').strip()
if not eid:
continue
id_counter[eid] += 1
lower = eid.lower()
if any(h in lower for h in CX_HINTS):
cx_id_counter[eid] += 1
cx_id_depth.setdefault(eid, depth)
row_had_hint = True
if row_had_hint:
rows_with_cx_hint += 1
logger.info('Distinct element_ids seen across sample (any depth): %d', len(id_counter))
logger.info('Max nesting depth observed: %d', max_depth_seen)
logger.info('Rows containing at least one CX-flavored element_id: %d / %d',
rows_with_cx_hint, len(rows))
logger.info('-' * 60)
if cx_id_counter:
logger.info('CX/score/quality-flavored element_ids found (id @ depth, count):')
for eid, c in cx_id_counter.most_common():
logger.info(' %-50s @depth %d %d', eid, cx_id_depth[eid], c)
else:
logger.info('NO CX/score/quality-flavored element_ids found at any depth.')
logger.info('Likely: client B1 masters were uploaded before CX scoring ran on them.')
logger.info('=' * 60)
# 3. Dump first few full blobs verbatim for manual inspection
if args.show_full > 0:
logger.info('First %d full_metadata blobs (truncated to 4KB each):', args.show_full)
for r in rows[:args.show_full]:
full = r[2] if isinstance(r[2], dict) else (r[2] or {})
blob = json.dumps(full, indent=2, default=str)
if len(blob) > 4096:
blob = blob[:4096] + '\n... [truncated]'
logger.info('--- %s (%s) ---\n%s', r[0], r[1], blob)
db.close()
if __name__ == '__main__':
main()