- gcs_path(job, *parts) helper in gcs.py: uses job.gcs_prefix if set,
falls back to job._id (legacy) — backward-compatible for all old jobs
- create_job: sets gcs_prefix=orgs/{org_id}/jobs/{job_id} when
organization_id is known; legacy jobs without org get null prefix
- Rewrote hardcoded f"{job_id}/{lang}/..." paths in:
- ingest_and_ai.py (4 upload sites)
- translate_and_synthesize.py (9 sites via bulk regex)
- render_accessible_video.py (3 sites: segments, video, captions)
- rerender_accessible_video.py (3 sites)
- tools/migrate_gcs_org_prefix.py: idempotent operator script —
preflight checks, copy→verify(count+md5)→mongo update→delete,
ThreadPoolExecutor(4), resume file, dry-run + rollback modes
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
283 lines
9.9 KiB
Python
283 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Operator script: migrate GCS objects from flat job-id prefix to org-scoped prefix.
|
|
|
|
Usage:
|
|
python tools/migrate_gcs_org_prefix.py [--dry-run] [--rollback] [--limit N] [--resume]
|
|
|
|
Prerequisites (must hold BEFORE running):
|
|
1. mongodump taken and verified
|
|
2. gcloud storage ls > pre_migration_inventory.txt saved
|
|
3. Celery workers stopped / paused on ingest/translate/render/tts queues
|
|
4. celery inspect active → empty
|
|
|
|
The script copies:
|
|
gs://{BUCKET}/{job_id}/**
|
|
to:
|
|
gs://{BUCKET}/orgs/{org_id}/jobs/{job_id}/**
|
|
|
|
Then verifies object counts, spot-checks 3 random MD5 hashes, updates Mongo,
|
|
then (only if verify passes) deletes the old prefix.
|
|
|
|
Writes gcs_migration_resume.json after each job so it can be resumed.
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import random
|
|
import subprocess
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
ACTIVE_PIPELINE_STATUSES = {
|
|
"ingesting",
|
|
"ai_processing",
|
|
"translating",
|
|
"tts_generating",
|
|
"rendering_video",
|
|
"rendering_qc",
|
|
}
|
|
|
|
RESUME_FILE = Path("gcs_migration_resume.json")
|
|
GCS_BUCKET = os.environ.get("GCS_BUCKET") or os.environ.get("BUCKET")
|
|
MONGO_URI = os.environ.get("MONGODB_URI") or os.environ.get("MONGO_URI")
|
|
|
|
|
|
def require_env():
|
|
missing = []
|
|
if not GCS_BUCKET:
|
|
missing.append("GCS_BUCKET or BUCKET")
|
|
if not MONGO_URI:
|
|
missing.append("MONGODB_URI or MONGO_URI")
|
|
if missing:
|
|
print(f"ERROR: Missing environment variables: {', '.join(missing)}")
|
|
sys.exit(1)
|
|
|
|
|
|
def run(cmd: str, capture: bool = True) -> subprocess.CompletedProcess:
|
|
return subprocess.run(cmd, shell=True, capture_output=capture, text=True)
|
|
|
|
|
|
def gsutil_list(prefix: str) -> list[str]:
|
|
r = run(f"gsutil ls -r 'gs://{GCS_BUCKET}/{prefix}/**'")
|
|
if r.returncode != 0:
|
|
return []
|
|
return [line.strip() for line in r.stdout.splitlines() if line.strip() and not line.strip().endswith(":")]
|
|
|
|
|
|
def gsutil_hash(uri: str) -> str:
|
|
r = run(f"gsutil hash -h '{uri}'")
|
|
if r.returncode != 0:
|
|
return ""
|
|
for line in r.stdout.splitlines():
|
|
if "Hash (md5)" in line or "Hash (crc32c)" in line:
|
|
return line.split(":")[-1].strip()
|
|
return ""
|
|
|
|
|
|
def copy_prefix(src_prefix: str, dst_prefix: str, dry_run: bool) -> bool:
|
|
cmd = f"gsutil -m cp -r 'gs://{GCS_BUCKET}/{src_prefix}/*' 'gs://{GCS_BUCKET}/{dst_prefix}/'"
|
|
print(f" COPY: {cmd}")
|
|
if dry_run:
|
|
return True
|
|
r = run(cmd, capture=False)
|
|
return r.returncode == 0
|
|
|
|
|
|
def delete_prefix(prefix: str, dry_run: bool) -> bool:
|
|
cmd = f"gsutil -m rm -r 'gs://{GCS_BUCKET}/{prefix}/'"
|
|
print(f" DELETE: {cmd}")
|
|
if dry_run:
|
|
return True
|
|
r = run(cmd, capture=False)
|
|
return r.returncode == 0
|
|
|
|
|
|
def verify_copy(src_prefix: str, dst_prefix: str) -> bool:
|
|
src_objects = gsutil_list(src_prefix)
|
|
dst_objects = gsutil_list(dst_prefix)
|
|
if len(src_objects) != len(dst_objects):
|
|
print(f" ERROR: Object count mismatch: src={len(src_objects)}, dst={len(dst_objects)}")
|
|
return False
|
|
|
|
# Spot-check 3 random objects
|
|
sample_src = random.sample(src_objects, min(3, len(src_objects)))
|
|
for src_uri in sample_src:
|
|
rel = src_uri.replace(f"gs://{GCS_BUCKET}/{src_prefix}/", "")
|
|
dst_uri = f"gs://{GCS_BUCKET}/{dst_prefix}/{rel}"
|
|
src_hash = gsutil_hash(src_uri)
|
|
dst_hash = gsutil_hash(dst_uri)
|
|
if src_hash != dst_hash or not src_hash:
|
|
print(f" ERROR: Hash mismatch for {rel}: src={src_hash} dst={dst_hash}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def migrate_job_mongo(job: dict, new_prefix: str, dry_run: bool):
|
|
"""Update MongoDB job document to reflect new GCS prefix."""
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
import asyncio
|
|
|
|
async def _update():
|
|
client = AsyncIOMotorClient(MONGO_URI)
|
|
db = client["accessible_video"] # infer from URI or hardcode
|
|
job_id = str(job["_id"])
|
|
old_prefix = job_id # bare job_id
|
|
|
|
updates: dict = {"gcs_prefix": new_prefix}
|
|
|
|
# Update source.gcs_uri
|
|
source_uri = job.get("source", {}).get("gcs_uri", "")
|
|
if source_uri and f"/{old_prefix}/" in source_uri:
|
|
updates["source.gcs_uri"] = source_uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1)
|
|
if job.get("source", {}).get("filename", "").startswith(old_prefix + "/"):
|
|
updates["source.filename"] = job["source"]["filename"].replace(old_prefix + "/", new_prefix + "/", 1)
|
|
|
|
# Update outputs
|
|
outputs = job.get("outputs") or {}
|
|
output_updates = {}
|
|
for lang, lang_out in outputs.items():
|
|
for field, uri in (lang_out or {}).items():
|
|
if isinstance(uri, str) and f"/{old_prefix}/" in uri:
|
|
output_updates[f"outputs.{lang}.{field}"] = uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1)
|
|
updates.update(output_updates)
|
|
updates["updated_at"] = datetime.utcnow()
|
|
|
|
if not dry_run:
|
|
await db.jobs.update_one({"_id": job["_id"]}, {"$set": updates})
|
|
else:
|
|
print(f" DRY-RUN mongo update: {list(updates.keys())}")
|
|
client.close()
|
|
|
|
asyncio.run(_update())
|
|
|
|
|
|
def load_resume() -> set[str]:
|
|
if RESUME_FILE.exists():
|
|
data = json.loads(RESUME_FILE.read_text())
|
|
return set(data.get("completed", []))
|
|
return set()
|
|
|
|
|
|
def save_resume(completed: set[str]):
|
|
existing = {}
|
|
if RESUME_FILE.exists():
|
|
existing = json.loads(RESUME_FILE.read_text())
|
|
existing["completed"] = list(completed)
|
|
existing["updated_at"] = datetime.utcnow().isoformat()
|
|
RESUME_FILE.write_text(json.dumps(existing, indent=2))
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Migrate GCS org prefix for all jobs")
|
|
parser.add_argument("--dry-run", action="store_true", help="Print actions without executing")
|
|
parser.add_argument("--rollback", action="store_true", help="Roll back migrated jobs (moves back)")
|
|
parser.add_argument("--limit", type=int, default=0, help="Limit number of jobs to migrate (0=all)")
|
|
parser.add_argument("--resume", action="store_true", help="Skip already-completed jobs from resume file")
|
|
args = parser.parse_args()
|
|
|
|
require_env()
|
|
|
|
# Pre-flight: check gcloud auth
|
|
r = run("gcloud config get-value project")
|
|
if r.returncode != 0 or not r.stdout.strip():
|
|
print("ERROR: gcloud not authenticated or project not set. Run 'gcloud auth login' and 'gcloud config set project PROJECT_ID'.")
|
|
sys.exit(1)
|
|
print(f"GCP project: {r.stdout.strip()}")
|
|
|
|
if not args.dry_run:
|
|
confirm = input("⚠️ This will move production GCS objects and update MongoDB. Type 'yes' to continue: ")
|
|
if confirm.lower() != "yes":
|
|
print("Aborted.")
|
|
sys.exit(0)
|
|
|
|
# Load jobs from MongoDB
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
import asyncio
|
|
|
|
async def fetch_jobs() -> list:
|
|
client = AsyncIOMotorClient(MONGO_URI)
|
|
db_name = MONGO_URI.split("/")[-1].split("?")[0] or "accessible_video"
|
|
db = client[db_name]
|
|
query = {
|
|
"gcs_prefix": None,
|
|
"organization_id": {"$ne": None},
|
|
"status": {"$nin": list(ACTIVE_PIPELINE_STATUSES)},
|
|
}
|
|
cursor = db.jobs.find(query, {"_id": 1, "organization_id": 1, "source": 1, "outputs": 1, "status": 1})
|
|
jobs = await cursor.to_list(length=10000)
|
|
client.close()
|
|
return jobs
|
|
|
|
all_jobs = asyncio.run(fetch_jobs())
|
|
print(f"Found {len(all_jobs)} jobs to migrate (gcs_prefix=null, has org_id, not in-flight).")
|
|
|
|
already_done = load_resume() if args.resume else set()
|
|
jobs_to_process = [j for j in all_jobs if str(j["_id"]) not in already_done]
|
|
if args.limit:
|
|
jobs_to_process = jobs_to_process[:args.limit]
|
|
|
|
print(f"Processing {len(jobs_to_process)} jobs (skipping {len(already_done)} already done).")
|
|
|
|
completed: set[str] = set(already_done)
|
|
errors: list[dict] = []
|
|
|
|
def process_job(job: dict) -> tuple[str, bool, str]:
|
|
job_id = str(job["_id"])
|
|
org_id = str(job["organization_id"])
|
|
new_prefix = f"orgs/{org_id}/jobs/{job_id}"
|
|
src_prefix = job_id
|
|
|
|
if args.rollback:
|
|
# Rollback: copy from new_prefix back to src_prefix
|
|
src_prefix, new_prefix = new_prefix, src_prefix
|
|
|
|
print(f"\n[{job_id}] {src_prefix} → {new_prefix}")
|
|
src_objects = gsutil_list(src_prefix)
|
|
if not src_objects:
|
|
print(f" SKIP: No objects found at {src_prefix}")
|
|
return job_id, True, ""
|
|
|
|
print(f" Found {len(src_objects)} objects to copy")
|
|
if not copy_prefix(src_prefix, new_prefix, args.dry_run):
|
|
return job_id, False, "gsutil cp failed"
|
|
|
|
if not args.dry_run:
|
|
if not verify_copy(src_prefix, new_prefix):
|
|
return job_id, False, "verification failed"
|
|
|
|
if not args.rollback:
|
|
migrate_job_mongo(job, new_prefix, args.dry_run)
|
|
if not delete_prefix(src_prefix, args.dry_run):
|
|
return job_id, False, "gsutil rm failed"
|
|
|
|
return job_id, True, ""
|
|
|
|
with ThreadPoolExecutor(max_workers=4) as executor:
|
|
futures = {executor.submit(process_job, job): job for job in jobs_to_process}
|
|
for future in as_completed(futures):
|
|
job_id, ok, err = future.result()
|
|
if ok:
|
|
completed.add(job_id)
|
|
save_resume(completed)
|
|
print(f" ✓ {job_id}")
|
|
else:
|
|
errors.append({"job_id": job_id, "error": err})
|
|
print(f" ✗ {job_id}: {err}")
|
|
|
|
print(f"\n--- Summary ---")
|
|
print(f"Completed: {len(completed) - len(already_done)}")
|
|
print(f"Errors: {len(errors)}")
|
|
if errors:
|
|
print("Failed jobs:")
|
|
for e in errors:
|
|
print(f" {e['job_id']}: {e['error']}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|