Full-stack Amazon AI Transcreation Platform with: - FastAPI backend (async, PostgreSQL, Redis, Celery) with 11 DB tables - JWT auth (SSO-ready abstract provider pattern) - 6-agent pipeline orchestrator with deterministic modules - Next.js 14 frontend with Amazon branding (Ember fonts, orange/dark theme) - Job wizard, monitoring HUD, output review, admin screens - 154 TM/reference files imported, 12 locales configured - Docker Compose for all services Agents 2-5 (TM retrieval, ranker, transcreator, compliance) are stubs pending Phase 3 LLM integration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
204 lines
6 KiB
Python
204 lines
6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
TM File Validator
|
|
=================
|
|
Reads all TM files from a directory and reports on format, line counts,
|
|
locale codes, channels, and parse errors.
|
|
|
|
Usage:
|
|
python scripts/validate_tm_files.py <tm_directory>
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
LOCALE_PATTERN = re.compile(r"\b([a-z]{2}-[a-z]{2})\b")
|
|
|
|
|
|
def detect_format(entry: dict) -> str:
|
|
"""Detect whether an entry is compact or multi-field format."""
|
|
if "t" in entry and isinstance(entry["t"], str):
|
|
return "compact"
|
|
if "seg_key" in entry:
|
|
return "multi-field"
|
|
return "unknown"
|
|
|
|
|
|
def extract_channel_from_filename(filename: str) -> str | None:
|
|
"""
|
|
Extract channel name from a TM filename.
|
|
|
|
Expected pattern: flat_{Channel}_{locale}.json(l)
|
|
Examples: flat_MASS_de-de.json -> MASS, flat_PrimeSpeed_ca-es.json -> PrimeSpeed
|
|
"""
|
|
match = re.match(r"flat_(.+?)_[a-z]{2}-[a-z]{2}\.jsonl?$", filename, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
|
|
def validate_file(file_path: Path) -> dict:
|
|
"""Validate a single TM file and return statistics."""
|
|
stats = {
|
|
"filename": file_path.name,
|
|
"path": str(file_path),
|
|
"line_count": 0,
|
|
"format": None,
|
|
"formats_seen": defaultdict(int),
|
|
"locales": set(),
|
|
"parse_errors": [],
|
|
"empty_lines": 0,
|
|
}
|
|
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
for line_num, line in enumerate(f, start=1):
|
|
line = line.strip()
|
|
if not line:
|
|
stats["empty_lines"] += 1
|
|
continue
|
|
|
|
stats["line_count"] += 1
|
|
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError as exc:
|
|
stats["parse_errors"].append(
|
|
f"Line {line_num}: Invalid JSON - {exc}"
|
|
)
|
|
continue
|
|
|
|
fmt = detect_format(entry)
|
|
stats["formats_seen"][fmt] += 1
|
|
|
|
# Extract locale
|
|
if fmt == "compact":
|
|
text = entry.get("t", "")
|
|
match = LOCALE_PATTERN.search(text)
|
|
if match:
|
|
stats["locales"].add(match.group(1))
|
|
else:
|
|
stats["parse_errors"].append(
|
|
f"Line {line_num}: No locale found in compact entry"
|
|
)
|
|
elif fmt == "multi-field":
|
|
lc = entry.get("lc", "")
|
|
if lc:
|
|
stats["locales"].add(lc)
|
|
else:
|
|
stats["parse_errors"].append(
|
|
f"Line {line_num}: Unknown format (no 't' or 'seg_key' field)"
|
|
)
|
|
|
|
# Determine dominant format
|
|
if stats["formats_seen"]:
|
|
stats["format"] = max(stats["formats_seen"], key=stats["formats_seen"].get)
|
|
else:
|
|
stats["format"] = "empty"
|
|
|
|
return stats
|
|
|
|
|
|
def validate_directory(tm_dir: Path) -> None:
|
|
"""Validate all TM files in a directory and print a report."""
|
|
# Find all .json and .jsonl files
|
|
tm_files = sorted(
|
|
p for p in tm_dir.rglob("*")
|
|
if p.suffix in (".json", ".jsonl") and p.is_file()
|
|
)
|
|
|
|
if not tm_files:
|
|
print(f"No .json or .jsonl files found in {tm_dir}")
|
|
return
|
|
|
|
print(f"Scanning {len(tm_files)} file(s) in {tm_dir}\n")
|
|
print("=" * 80)
|
|
|
|
all_locales = set()
|
|
all_channels = set()
|
|
total_entries = 0
|
|
total_errors = 0
|
|
format_counts = defaultdict(int)
|
|
|
|
for file_path in tm_files:
|
|
stats = validate_file(file_path)
|
|
|
|
# Extract channel from filename
|
|
channel = extract_channel_from_filename(file_path.name)
|
|
if channel:
|
|
all_channels.add(channel)
|
|
|
|
all_locales.update(stats["locales"])
|
|
total_entries += stats["line_count"]
|
|
total_errors += len(stats["parse_errors"])
|
|
|
|
for fmt, count in stats["formats_seen"].items():
|
|
format_counts[fmt] += count
|
|
|
|
# Per-file report
|
|
relative = file_path.relative_to(tm_dir) if file_path.is_relative_to(tm_dir) else file_path
|
|
locales_str = ", ".join(sorted(stats["locales"])) or "none"
|
|
error_count = len(stats["parse_errors"])
|
|
|
|
status = "OK" if error_count == 0 else f"{error_count} ERROR(S)"
|
|
|
|
print(f"\n File: {relative}")
|
|
print(f" Format: {stats['format']}")
|
|
print(f" Lines: {stats['line_count']} (+ {stats['empty_lines']} empty)")
|
|
print(f" Locales: {locales_str}")
|
|
if channel:
|
|
print(f" Channel: {channel}")
|
|
print(f" Status: {status}")
|
|
|
|
if stats["parse_errors"]:
|
|
for err in stats["parse_errors"][:5]:
|
|
print(f" - {err}")
|
|
if len(stats["parse_errors"]) > 5:
|
|
print(f" ... and {len(stats['parse_errors']) - 5} more errors")
|
|
|
|
# Summary
|
|
print("\n" + "=" * 80)
|
|
print("VALIDATION SUMMARY")
|
|
print("=" * 80)
|
|
print(f" Total files: {len(tm_files)}")
|
|
print(f" Total entries: {total_entries}")
|
|
print(f" Total errors: {total_errors}")
|
|
print()
|
|
print(" Format breakdown:")
|
|
for fmt, count in sorted(format_counts.items()):
|
|
print(f" {fmt:12s} {count:>6d} entries")
|
|
print()
|
|
print(f" Locales covered ({len(all_locales)}):")
|
|
for lc in sorted(all_locales):
|
|
print(f" - {lc}")
|
|
print()
|
|
print(f" Channels covered ({len(all_channels)}):")
|
|
for ch in sorted(all_channels):
|
|
print(f" - {ch}")
|
|
print("=" * 80)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate TM files and report on format, locales, and errors.",
|
|
)
|
|
parser.add_argument(
|
|
"tm_directory",
|
|
type=Path,
|
|
help="Directory containing TM files (.json or .jsonl)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.tm_directory.is_dir():
|
|
print(f"Error: Directory does not exist: {args.tm_directory}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
validate_directory(args.tm_directory)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|