video-accessibility/tools/migrate_gcs_org_prefix.py
Vadym Samoilenko 54fcf47887 feat(mt-14): gcs_prefix on Job, gcs_path helper, rewrite path sites
- gcs_path(job, *parts) helper in gcs.py: uses job.gcs_prefix if set,
  falls back to job._id (legacy) — backward-compatible for all old jobs
- create_job: sets gcs_prefix=orgs/{org_id}/jobs/{job_id} when
  organization_id is known; legacy jobs without org get null prefix
- Rewrote hardcoded f"{job_id}/{lang}/..." paths in:
  - ingest_and_ai.py (4 upload sites)
  - translate_and_synthesize.py (9 sites via bulk regex)
  - render_accessible_video.py (3 sites: segments, video, captions)
  - rerender_accessible_video.py (3 sites)
- tools/migrate_gcs_org_prefix.py: idempotent operator script —
  preflight checks, copy→verify(count+md5)→mongo update→delete,
  ThreadPoolExecutor(4), resume file, dry-run + rollback modes

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-29 20:45:12 +01:00

283 lines
9.9 KiB
Python

#!/usr/bin/env python3
"""
Operator script: migrate GCS objects from flat job-id prefix to org-scoped prefix.
Usage:
python tools/migrate_gcs_org_prefix.py [--dry-run] [--rollback] [--limit N] [--resume]
Prerequisites (must hold BEFORE running):
1. mongodump taken and verified
2. gcloud storage ls > pre_migration_inventory.txt saved
3. Celery workers stopped / paused on ingest/translate/render/tts queues
4. celery inspect active → empty
The script copies:
gs://{BUCKET}/{job_id}/**
to:
gs://{BUCKET}/orgs/{org_id}/jobs/{job_id}/**
Then verifies object counts, spot-checks 3 random MD5 hashes, updates Mongo,
then (only if verify passes) deletes the old prefix.
Writes gcs_migration_resume.json after each job so it can be resumed.
"""
import argparse
import hashlib
import json
import os
import random
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
ACTIVE_PIPELINE_STATUSES = {
"ingesting",
"ai_processing",
"translating",
"tts_generating",
"rendering_video",
"rendering_qc",
}
RESUME_FILE = Path("gcs_migration_resume.json")
GCS_BUCKET = os.environ.get("GCS_BUCKET") or os.environ.get("BUCKET")
MONGO_URI = os.environ.get("MONGODB_URI") or os.environ.get("MONGO_URI")
def require_env():
missing = []
if not GCS_BUCKET:
missing.append("GCS_BUCKET or BUCKET")
if not MONGO_URI:
missing.append("MONGODB_URI or MONGO_URI")
if missing:
print(f"ERROR: Missing environment variables: {', '.join(missing)}")
sys.exit(1)
def run(cmd: str, capture: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(cmd, shell=True, capture_output=capture, text=True)
def gsutil_list(prefix: str) -> list[str]:
r = run(f"gsutil ls -r 'gs://{GCS_BUCKET}/{prefix}/**'")
if r.returncode != 0:
return []
return [line.strip() for line in r.stdout.splitlines() if line.strip() and not line.strip().endswith(":")]
def gsutil_hash(uri: str) -> str:
r = run(f"gsutil hash -h '{uri}'")
if r.returncode != 0:
return ""
for line in r.stdout.splitlines():
if "Hash (md5)" in line or "Hash (crc32c)" in line:
return line.split(":")[-1].strip()
return ""
def copy_prefix(src_prefix: str, dst_prefix: str, dry_run: bool) -> bool:
cmd = f"gsutil -m cp -r 'gs://{GCS_BUCKET}/{src_prefix}/*' 'gs://{GCS_BUCKET}/{dst_prefix}/'"
print(f" COPY: {cmd}")
if dry_run:
return True
r = run(cmd, capture=False)
return r.returncode == 0
def delete_prefix(prefix: str, dry_run: bool) -> bool:
cmd = f"gsutil -m rm -r 'gs://{GCS_BUCKET}/{prefix}/'"
print(f" DELETE: {cmd}")
if dry_run:
return True
r = run(cmd, capture=False)
return r.returncode == 0
def verify_copy(src_prefix: str, dst_prefix: str) -> bool:
src_objects = gsutil_list(src_prefix)
dst_objects = gsutil_list(dst_prefix)
if len(src_objects) != len(dst_objects):
print(f" ERROR: Object count mismatch: src={len(src_objects)}, dst={len(dst_objects)}")
return False
# Spot-check 3 random objects
sample_src = random.sample(src_objects, min(3, len(src_objects)))
for src_uri in sample_src:
rel = src_uri.replace(f"gs://{GCS_BUCKET}/{src_prefix}/", "")
dst_uri = f"gs://{GCS_BUCKET}/{dst_prefix}/{rel}"
src_hash = gsutil_hash(src_uri)
dst_hash = gsutil_hash(dst_uri)
if src_hash != dst_hash or not src_hash:
print(f" ERROR: Hash mismatch for {rel}: src={src_hash} dst={dst_hash}")
return False
return True
def migrate_job_mongo(job: dict, new_prefix: str, dry_run: bool):
"""Update MongoDB job document to reflect new GCS prefix."""
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
async def _update():
client = AsyncIOMotorClient(MONGO_URI)
db = client["accessible_video"] # infer from URI or hardcode
job_id = str(job["_id"])
old_prefix = job_id # bare job_id
updates: dict = {"gcs_prefix": new_prefix}
# Update source.gcs_uri
source_uri = job.get("source", {}).get("gcs_uri", "")
if source_uri and f"/{old_prefix}/" in source_uri:
updates["source.gcs_uri"] = source_uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1)
if job.get("source", {}).get("filename", "").startswith(old_prefix + "/"):
updates["source.filename"] = job["source"]["filename"].replace(old_prefix + "/", new_prefix + "/", 1)
# Update outputs
outputs = job.get("outputs") or {}
output_updates = {}
for lang, lang_out in outputs.items():
for field, uri in (lang_out or {}).items():
if isinstance(uri, str) and f"/{old_prefix}/" in uri:
output_updates[f"outputs.{lang}.{field}"] = uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1)
updates.update(output_updates)
updates["updated_at"] = datetime.utcnow()
if not dry_run:
await db.jobs.update_one({"_id": job["_id"]}, {"$set": updates})
else:
print(f" DRY-RUN mongo update: {list(updates.keys())}")
client.close()
asyncio.run(_update())
def load_resume() -> set[str]:
if RESUME_FILE.exists():
data = json.loads(RESUME_FILE.read_text())
return set(data.get("completed", []))
return set()
def save_resume(completed: set[str]):
existing = {}
if RESUME_FILE.exists():
existing = json.loads(RESUME_FILE.read_text())
existing["completed"] = list(completed)
existing["updated_at"] = datetime.utcnow().isoformat()
RESUME_FILE.write_text(json.dumps(existing, indent=2))
def main():
parser = argparse.ArgumentParser(description="Migrate GCS org prefix for all jobs")
parser.add_argument("--dry-run", action="store_true", help="Print actions without executing")
parser.add_argument("--rollback", action="store_true", help="Roll back migrated jobs (moves back)")
parser.add_argument("--limit", type=int, default=0, help="Limit number of jobs to migrate (0=all)")
parser.add_argument("--resume", action="store_true", help="Skip already-completed jobs from resume file")
args = parser.parse_args()
require_env()
# Pre-flight: check gcloud auth
r = run("gcloud config get-value project")
if r.returncode != 0 or not r.stdout.strip():
print("ERROR: gcloud not authenticated or project not set. Run 'gcloud auth login' and 'gcloud config set project PROJECT_ID'.")
sys.exit(1)
print(f"GCP project: {r.stdout.strip()}")
if not args.dry_run:
confirm = input("⚠️ This will move production GCS objects and update MongoDB. Type 'yes' to continue: ")
if confirm.lower() != "yes":
print("Aborted.")
sys.exit(0)
# Load jobs from MongoDB
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
async def fetch_jobs() -> list:
client = AsyncIOMotorClient(MONGO_URI)
db_name = MONGO_URI.split("/")[-1].split("?")[0] or "accessible_video"
db = client[db_name]
query = {
"gcs_prefix": None,
"organization_id": {"$ne": None},
"status": {"$nin": list(ACTIVE_PIPELINE_STATUSES)},
}
cursor = db.jobs.find(query, {"_id": 1, "organization_id": 1, "source": 1, "outputs": 1, "status": 1})
jobs = await cursor.to_list(length=10000)
client.close()
return jobs
all_jobs = asyncio.run(fetch_jobs())
print(f"Found {len(all_jobs)} jobs to migrate (gcs_prefix=null, has org_id, not in-flight).")
already_done = load_resume() if args.resume else set()
jobs_to_process = [j for j in all_jobs if str(j["_id"]) not in already_done]
if args.limit:
jobs_to_process = jobs_to_process[:args.limit]
print(f"Processing {len(jobs_to_process)} jobs (skipping {len(already_done)} already done).")
completed: set[str] = set(already_done)
errors: list[dict] = []
def process_job(job: dict) -> tuple[str, bool, str]:
job_id = str(job["_id"])
org_id = str(job["organization_id"])
new_prefix = f"orgs/{org_id}/jobs/{job_id}"
src_prefix = job_id
if args.rollback:
# Rollback: copy from new_prefix back to src_prefix
src_prefix, new_prefix = new_prefix, src_prefix
print(f"\n[{job_id}] {src_prefix}{new_prefix}")
src_objects = gsutil_list(src_prefix)
if not src_objects:
print(f" SKIP: No objects found at {src_prefix}")
return job_id, True, ""
print(f" Found {len(src_objects)} objects to copy")
if not copy_prefix(src_prefix, new_prefix, args.dry_run):
return job_id, False, "gsutil cp failed"
if not args.dry_run:
if not verify_copy(src_prefix, new_prefix):
return job_id, False, "verification failed"
if not args.rollback:
migrate_job_mongo(job, new_prefix, args.dry_run)
if not delete_prefix(src_prefix, args.dry_run):
return job_id, False, "gsutil rm failed"
return job_id, True, ""
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(process_job, job): job for job in jobs_to_process}
for future in as_completed(futures):
job_id, ok, err = future.result()
if ok:
completed.add(job_id)
save_resume(completed)
print(f"{job_id}")
else:
errors.append({"job_id": job_id, "error": err})
print(f"{job_id}: {err}")
print(f"\n--- Summary ---")
print(f"Completed: {len(completed) - len(already_done)}")
print(f"Errors: {len(errors)}")
if errors:
print("Failed jobs:")
for e in errors:
print(f" {e['job_id']}: {e['error']}")
sys.exit(1)
if __name__ == "__main__":
main()