feat(mt-14): gcs_prefix on Job, gcs_path helper, rewrite path sites

- gcs_path(job, *parts) helper in gcs.py: uses job.gcs_prefix if set,
  falls back to job._id (legacy) — backward-compatible for all old jobs
- create_job: sets gcs_prefix=orgs/{org_id}/jobs/{job_id} when
  organization_id is known; legacy jobs without org get null prefix
- Rewrote hardcoded f"{job_id}/{lang}/..." paths in:
  - ingest_and_ai.py (4 upload sites)
  - translate_and_synthesize.py (9 sites via bulk regex)
  - render_accessible_video.py (3 sites: segments, video, captions)
  - rerender_accessible_video.py (3 sites)
- tools/migrate_gcs_org_prefix.py: idempotent operator script —
  preflight checks, copy→verify(count+md5)→mongo update→delete,
  ThreadPoolExecutor(4), resume file, dry-run + rollback modes

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-29 20:45:12 +01:00
parent fe608401be
commit 54fcf47887
7 changed files with 323 additions and 23 deletions

View file

@ -149,6 +149,7 @@ async def create_job(
"client_id": str(current_user.id),
"organization_id": organization_id,
"brief_id": brief_id or None,
"gcs_prefix": f"orgs/{organization_id}/jobs/{job_id}" if organization_id else None,
"title": title,
"source": {
"filename": f"{job_id}/source.mp4",

View file

@ -125,6 +125,22 @@ class GCSService:
return await loop.run_in_executor(self.executor, _exists)
def gcs_path(job: "dict | object", *parts: str) -> str:
"""Return a GCS object path rooted at the job's prefix.
Jobs created before MT-14 have no gcs_prefix and use bare job_id/ as the
prefix. New jobs get prefix=orgs/{org_id}/jobs/{job_id}/.
"""
if isinstance(job, dict):
prefix = job.get("gcs_prefix") or job["_id"]
if not job.get("gcs_prefix"):
prefix = job["_id"]
else:
prefix = getattr(job, "gcs_prefix", None) or getattr(job, "id", str(job))
prefix = prefix.rstrip("/")
return "/".join([prefix, *parts]) if parts else prefix
# Global GCS service instance
gcs_service = GCSService()

View file

@ -12,7 +12,7 @@ from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services import cost_tracker
from ..services.cost_tracker import BudgetExceeded
from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gcs import gcs_service, gcs_path, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.websocket import connection_manager
from . import celery_app
@ -207,12 +207,12 @@ async def ingest_and_ai_task_impl(job_id: str):
# Upload VTT files to GCS using detected language
captions_gcs_uri = await upload_vtt_to_gcs(
ai_result["captions_vtt"],
f"{job_id}/{source_language}/captions.vtt"
gcs_path(job_doc, source_language, "captions.vtt")
)
ad_gcs_uri = await upload_vtt_to_gcs(
ai_result["audio_description_vtt"],
f"{job_id}/{source_language}/ad.vtt"
gcs_path(job_doc, source_language, "ad.vtt")
)
# Upload SDH VTT if generated
@ -220,7 +220,7 @@ async def ingest_and_ai_task_impl(job_id: str):
if sdh_requested and ai_result.get("sdh_captions_vtt"):
sdh_gcs_uri = await upload_vtt_to_gcs(
ai_result["sdh_captions_vtt"],
f"{job_id}/{source_language}/sdh_captions.vtt"
gcs_path(job_doc, source_language, "sdh_captions.vtt")
)
# Generate descriptive transcript (WCAG 2.1 1.2.1)
@ -234,7 +234,7 @@ async def ingest_and_ai_task_impl(job_id: str):
if transcript_text:
transcript_gcs_uri = await upload_vtt_to_gcs(
transcript_text,
f"{job_id}/{source_language}/descriptive_transcript.txt"
gcs_path(job_doc, source_language, "descriptive_transcript.txt")
)
logger.info(f"Generated descriptive transcript for job {job_id}, language {source_language}")
except Exception as e:

View file

@ -13,7 +13,7 @@ from ..core.logging import get_logger
from ..lib.vtt import VTTParser
from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata
from ..schemas.whisper import CachedWhisperTranscript, CachedWordTimestamp
from ..services.gcs import gcs_service
from ..services.gcs import gcs_service, gcs_path
from ..services.video_renderer import video_renderer_service
from ..services.vtt_retimer import vtt_retimer_service
from ..services.whisper_service import WordTimestamp, whisper_service
@ -213,7 +213,7 @@ async def _async_render_accessible_video(job_id: str, language: str):
# 6. Render accessible video with segment persistence for QC editing
output_video_path = os.path.join(temp_dir, "accessible_video.mp4")
gcs_segment_prefix = f"{job_id}/{language}/segments/"
gcs_segment_prefix = gcs_path(job_doc, language, "segments") + "/"
logger.info(f"Rendering accessible video using {method} method with segment persistence...")
rendered_path, updated_placements, segment_metadata, pause_points = await video_renderer_service.render_accessible_video(
@ -243,7 +243,7 @@ async def _async_render_accessible_video(job_id: str, language: str):
logger.info(f"Built edit state with {len(segment_metadata)} segments and {len(pause_points)} pause points")
# 7. Upload rendered video to GCS
video_blob_path = f"{job_id}/{language}/accessible_video.mp4"
video_blob_path = gcs_path(job_doc, language, "accessible_video.mp4")
video_blob = gcs_service.bucket.blob(video_blob_path)
video_blob.content_type = "video/mp4"
video_blob.upload_from_filename(output_video_path)
@ -268,7 +268,7 @@ async def _async_render_accessible_video(job_id: str, language: str):
)
# Upload re-timed captions
retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt"
retimed_blob_path = gcs_path(job_doc, language, "accessible_captions.vtt")
retimed_blob = gcs_service.bucket.blob(retimed_blob_path)
retimed_blob.content_type = "text/vtt"
retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt")

View file

@ -14,7 +14,7 @@ from ..core.config import settings
from ..core.logging import get_logger
from ..lib.vtt import VTTParser
from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata
from ..services.gcs import gcs_service
from ..services.gcs import gcs_service, gcs_path
from ..services.video_renderer import video_renderer_service
from ..services.vtt_retimer import vtt_retimer_service
from ..services.whisper_service import WordTimestamp, whisper_service
@ -286,7 +286,7 @@ async def _async_rerender_accessible_video(
# 6. Render accessible video (persist segments again for future edits)
output_video_path = os.path.join(temp_dir, "accessible_video.mp4")
gcs_segment_prefix = f"{job_id}/{language}/segments/"
gcs_segment_prefix = gcs_path(job_doc, language, "segments") + "/"
logger.info(f"Re-rendering accessible video using {method} method...")
rendered_path, updated_placements, segment_metadata, new_pause_points = await video_renderer_service.render_accessible_video(
@ -302,7 +302,7 @@ async def _async_rerender_accessible_video(
analysis["placements"] = updated_placements
# 7. Upload rendered video
video_blob_path = f"{job_id}/{language}/accessible_video.mp4"
video_blob_path = gcs_path(job_doc, language, "accessible_video.mp4")
video_blob = gcs_service.bucket.blob(video_blob_path)
video_blob.content_type = "video/mp4"
video_blob.upload_from_filename(output_video_path)
@ -323,7 +323,7 @@ async def _async_rerender_accessible_video(
original_captions_vtt, analysis
)
retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt"
retimed_blob_path = gcs_path(job_doc, language, "accessible_captions.vtt")
retimed_blob = gcs_service.bucket.blob(retimed_blob_path)
retimed_blob.content_type = "text/vtt"
retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt")

View file

@ -14,7 +14,7 @@ from ..core.config import settings
from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services import cost_tracker
from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gcs import gcs_service, gcs_path, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.gemini_tts import TTSSynthesisError
from ..services.websocket import connection_manager
@ -266,12 +266,12 @@ async def _async_translate_and_synthesize(job_id: str):
# Upload translated VTT files
captions_gcs_uri = await upload_vtt_to_gcs(
translated_captions,
f"{job_id}/{lang}/captions.vtt"
gcs_path(job_doc, lang, "captions.vtt")
)
ad_gcs_uri = await upload_vtt_to_gcs(
translated_ad,
f"{job_id}/{lang}/ad.vtt"
gcs_path(job_doc, lang, "ad.vtt")
)
# Upload SDH VTT if generated
@ -279,7 +279,7 @@ async def _async_translate_and_synthesize(job_id: str):
if sdh_requested and result.get("sdh_captions_vtt"):
sdh_gcs_uri = await upload_vtt_to_gcs(
result["sdh_captions_vtt"],
f"{job_id}/{lang}/sdh_captions.vtt"
gcs_path(job_doc, lang, "sdh_captions.vtt")
)
# Generate descriptive transcript (WCAG 2.1 1.2.1)
@ -290,7 +290,7 @@ async def _async_translate_and_synthesize(job_id: str):
if transcript_text:
transcript_gcs_uri = await upload_vtt_to_gcs(
transcript_text,
f"{job_id}/{lang}/descriptive_transcript.txt"
gcs_path(job_doc, lang, "descriptive_transcript.txt")
)
except Exception as transcript_err:
logger.warning(f"Failed to generate descriptive transcript for {lang}: {transcript_err}")
@ -397,12 +397,12 @@ async def _async_translate_and_synthesize(job_id: str):
# Upload translated VTT files
captions_gcs_uri = await upload_vtt_to_gcs(
translated_captions,
f"{job_id}/{language}/captions.vtt"
gcs_path(job_doc, language, "captions.vtt")
)
ad_gcs_uri = await upload_vtt_to_gcs(
translated_ad,
f"{job_id}/{language}/ad.vtt"
gcs_path(job_doc, language, "ad.vtt")
)
# Translate and upload SDH VTT if requested
@ -421,7 +421,7 @@ async def _async_translate_and_synthesize(job_id: str):
translated_sdh = await retry_with_backoff(translate_sdh, max_retries=3)
sdh_gcs_uri = await upload_vtt_to_gcs(
translated_sdh,
f"{job_id}/{language}/sdh_captions.vtt"
gcs_path(job_doc, language, "sdh_captions.vtt")
)
lang_out["sdh_captions_vtt_gcs"] = sdh_gcs_uri
@ -432,7 +432,7 @@ async def _async_translate_and_synthesize(job_id: str):
if transcript_text:
transcript_gcs_uri = await upload_vtt_to_gcs(
transcript_text,
f"{job_id}/{language}/descriptive_transcript.txt"
gcs_path(job_doc, language, "descriptive_transcript.txt")
)
lang_out["descriptive_transcript_gcs"] = transcript_gcs_uri
except Exception as transcript_err:
@ -934,7 +934,7 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict,
mp3_data = output_buffer.getvalue()
# Upload combined MP3 to GCS
mp3_blob_path = f"{job_id}/{language}/ad.mp3"
mp3_blob_path = gcs_path(job_doc, language, "ad.mp3")
mp3_blob = gcs_service.bucket.blob(mp3_blob_path)
mp3_blob.content_type = "audio/mpeg"
mp3_blob.upload_from_string(mp3_data, content_type="audio/mpeg")

View file

@ -0,0 +1,283 @@
#!/usr/bin/env python3
"""
Operator script: migrate GCS objects from flat job-id prefix to org-scoped prefix.
Usage:
python tools/migrate_gcs_org_prefix.py [--dry-run] [--rollback] [--limit N] [--resume]
Prerequisites (must hold BEFORE running):
1. mongodump taken and verified
2. gcloud storage ls > pre_migration_inventory.txt saved
3. Celery workers stopped / paused on ingest/translate/render/tts queues
4. celery inspect active empty
The script copies:
gs://{BUCKET}/{job_id}/**
to:
gs://{BUCKET}/orgs/{org_id}/jobs/{job_id}/**
Then verifies object counts, spot-checks 3 random MD5 hashes, updates Mongo,
then (only if verify passes) deletes the old prefix.
Writes gcs_migration_resume.json after each job so it can be resumed.
"""
import argparse
import hashlib
import json
import os
import random
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
ACTIVE_PIPELINE_STATUSES = {
"ingesting",
"ai_processing",
"translating",
"tts_generating",
"rendering_video",
"rendering_qc",
}
RESUME_FILE = Path("gcs_migration_resume.json")
GCS_BUCKET = os.environ.get("GCS_BUCKET") or os.environ.get("BUCKET")
MONGO_URI = os.environ.get("MONGODB_URI") or os.environ.get("MONGO_URI")
def require_env():
missing = []
if not GCS_BUCKET:
missing.append("GCS_BUCKET or BUCKET")
if not MONGO_URI:
missing.append("MONGODB_URI or MONGO_URI")
if missing:
print(f"ERROR: Missing environment variables: {', '.join(missing)}")
sys.exit(1)
def run(cmd: str, capture: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(cmd, shell=True, capture_output=capture, text=True)
def gsutil_list(prefix: str) -> list[str]:
r = run(f"gsutil ls -r 'gs://{GCS_BUCKET}/{prefix}/**'")
if r.returncode != 0:
return []
return [line.strip() for line in r.stdout.splitlines() if line.strip() and not line.strip().endswith(":")]
def gsutil_hash(uri: str) -> str:
r = run(f"gsutil hash -h '{uri}'")
if r.returncode != 0:
return ""
for line in r.stdout.splitlines():
if "Hash (md5)" in line or "Hash (crc32c)" in line:
return line.split(":")[-1].strip()
return ""
def copy_prefix(src_prefix: str, dst_prefix: str, dry_run: bool) -> bool:
cmd = f"gsutil -m cp -r 'gs://{GCS_BUCKET}/{src_prefix}/*' 'gs://{GCS_BUCKET}/{dst_prefix}/'"
print(f" COPY: {cmd}")
if dry_run:
return True
r = run(cmd, capture=False)
return r.returncode == 0
def delete_prefix(prefix: str, dry_run: bool) -> bool:
cmd = f"gsutil -m rm -r 'gs://{GCS_BUCKET}/{prefix}/'"
print(f" DELETE: {cmd}")
if dry_run:
return True
r = run(cmd, capture=False)
return r.returncode == 0
def verify_copy(src_prefix: str, dst_prefix: str) -> bool:
src_objects = gsutil_list(src_prefix)
dst_objects = gsutil_list(dst_prefix)
if len(src_objects) != len(dst_objects):
print(f" ERROR: Object count mismatch: src={len(src_objects)}, dst={len(dst_objects)}")
return False
# Spot-check 3 random objects
sample_src = random.sample(src_objects, min(3, len(src_objects)))
for src_uri in sample_src:
rel = src_uri.replace(f"gs://{GCS_BUCKET}/{src_prefix}/", "")
dst_uri = f"gs://{GCS_BUCKET}/{dst_prefix}/{rel}"
src_hash = gsutil_hash(src_uri)
dst_hash = gsutil_hash(dst_uri)
if src_hash != dst_hash or not src_hash:
print(f" ERROR: Hash mismatch for {rel}: src={src_hash} dst={dst_hash}")
return False
return True
def migrate_job_mongo(job: dict, new_prefix: str, dry_run: bool):
"""Update MongoDB job document to reflect new GCS prefix."""
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
async def _update():
client = AsyncIOMotorClient(MONGO_URI)
db = client["accessible_video"] # infer from URI or hardcode
job_id = str(job["_id"])
old_prefix = job_id # bare job_id
updates: dict = {"gcs_prefix": new_prefix}
# Update source.gcs_uri
source_uri = job.get("source", {}).get("gcs_uri", "")
if source_uri and f"/{old_prefix}/" in source_uri:
updates["source.gcs_uri"] = source_uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1)
if job.get("source", {}).get("filename", "").startswith(old_prefix + "/"):
updates["source.filename"] = job["source"]["filename"].replace(old_prefix + "/", new_prefix + "/", 1)
# Update outputs
outputs = job.get("outputs") or {}
output_updates = {}
for lang, lang_out in outputs.items():
for field, uri in (lang_out or {}).items():
if isinstance(uri, str) and f"/{old_prefix}/" in uri:
output_updates[f"outputs.{lang}.{field}"] = uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1)
updates.update(output_updates)
updates["updated_at"] = datetime.utcnow()
if not dry_run:
await db.jobs.update_one({"_id": job["_id"]}, {"$set": updates})
else:
print(f" DRY-RUN mongo update: {list(updates.keys())}")
client.close()
asyncio.run(_update())
def load_resume() -> set[str]:
if RESUME_FILE.exists():
data = json.loads(RESUME_FILE.read_text())
return set(data.get("completed", []))
return set()
def save_resume(completed: set[str]):
existing = {}
if RESUME_FILE.exists():
existing = json.loads(RESUME_FILE.read_text())
existing["completed"] = list(completed)
existing["updated_at"] = datetime.utcnow().isoformat()
RESUME_FILE.write_text(json.dumps(existing, indent=2))
def main():
parser = argparse.ArgumentParser(description="Migrate GCS org prefix for all jobs")
parser.add_argument("--dry-run", action="store_true", help="Print actions without executing")
parser.add_argument("--rollback", action="store_true", help="Roll back migrated jobs (moves back)")
parser.add_argument("--limit", type=int, default=0, help="Limit number of jobs to migrate (0=all)")
parser.add_argument("--resume", action="store_true", help="Skip already-completed jobs from resume file")
args = parser.parse_args()
require_env()
# Pre-flight: check gcloud auth
r = run("gcloud config get-value project")
if r.returncode != 0 or not r.stdout.strip():
print("ERROR: gcloud not authenticated or project not set. Run 'gcloud auth login' and 'gcloud config set project PROJECT_ID'.")
sys.exit(1)
print(f"GCP project: {r.stdout.strip()}")
if not args.dry_run:
confirm = input("⚠️ This will move production GCS objects and update MongoDB. Type 'yes' to continue: ")
if confirm.lower() != "yes":
print("Aborted.")
sys.exit(0)
# Load jobs from MongoDB
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
async def fetch_jobs() -> list:
client = AsyncIOMotorClient(MONGO_URI)
db_name = MONGO_URI.split("/")[-1].split("?")[0] or "accessible_video"
db = client[db_name]
query = {
"gcs_prefix": None,
"organization_id": {"$ne": None},
"status": {"$nin": list(ACTIVE_PIPELINE_STATUSES)},
}
cursor = db.jobs.find(query, {"_id": 1, "organization_id": 1, "source": 1, "outputs": 1, "status": 1})
jobs = await cursor.to_list(length=10000)
client.close()
return jobs
all_jobs = asyncio.run(fetch_jobs())
print(f"Found {len(all_jobs)} jobs to migrate (gcs_prefix=null, has org_id, not in-flight).")
already_done = load_resume() if args.resume else set()
jobs_to_process = [j for j in all_jobs if str(j["_id"]) not in already_done]
if args.limit:
jobs_to_process = jobs_to_process[:args.limit]
print(f"Processing {len(jobs_to_process)} jobs (skipping {len(already_done)} already done).")
completed: set[str] = set(already_done)
errors: list[dict] = []
def process_job(job: dict) -> tuple[str, bool, str]:
job_id = str(job["_id"])
org_id = str(job["organization_id"])
new_prefix = f"orgs/{org_id}/jobs/{job_id}"
src_prefix = job_id
if args.rollback:
# Rollback: copy from new_prefix back to src_prefix
src_prefix, new_prefix = new_prefix, src_prefix
print(f"\n[{job_id}] {src_prefix}{new_prefix}")
src_objects = gsutil_list(src_prefix)
if not src_objects:
print(f" SKIP: No objects found at {src_prefix}")
return job_id, True, ""
print(f" Found {len(src_objects)} objects to copy")
if not copy_prefix(src_prefix, new_prefix, args.dry_run):
return job_id, False, "gsutil cp failed"
if not args.dry_run:
if not verify_copy(src_prefix, new_prefix):
return job_id, False, "verification failed"
if not args.rollback:
migrate_job_mongo(job, new_prefix, args.dry_run)
if not delete_prefix(src_prefix, args.dry_run):
return job_id, False, "gsutil rm failed"
return job_id, True, ""
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(process_job, job): job for job in jobs_to_process}
for future in as_completed(futures):
job_id, ok, err = future.result()
if ok:
completed.add(job_id)
save_resume(completed)
print(f"{job_id}")
else:
errors.append({"job_id": job_id, "error": err})
print(f"{job_id}: {err}")
print(f"\n--- Summary ---")
print(f"Completed: {len(completed) - len(already_done)}")
print(f"Errors: {len(errors)}")
if errors:
print("Failed jobs:")
for e in errors:
print(f" {e['job_id']}: {e['error']}")
sys.exit(1)
if __name__ == "__main__":
main()