diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index 17b6278..5f838f4 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -149,6 +149,7 @@ async def create_job( "client_id": str(current_user.id), "organization_id": organization_id, "brief_id": brief_id or None, + "gcs_prefix": f"orgs/{organization_id}/jobs/{job_id}" if organization_id else None, "title": title, "source": { "filename": f"{job_id}/source.mp4", diff --git a/backend/app/services/gcs.py b/backend/app/services/gcs.py index 7c0fdb2..e69fca2 100644 --- a/backend/app/services/gcs.py +++ b/backend/app/services/gcs.py @@ -125,6 +125,22 @@ class GCSService: return await loop.run_in_executor(self.executor, _exists) +def gcs_path(job: "dict | object", *parts: str) -> str: + """Return a GCS object path rooted at the job's prefix. + + Jobs created before MT-14 have no gcs_prefix and use bare job_id/ as the + prefix. New jobs get prefix=orgs/{org_id}/jobs/{job_id}/. + """ + if isinstance(job, dict): + prefix = job.get("gcs_prefix") or job["_id"] + if not job.get("gcs_prefix"): + prefix = job["_id"] + else: + prefix = getattr(job, "gcs_prefix", None) or getattr(job, "id", str(job)) + prefix = prefix.rstrip("/") + return "/".join([prefix, *parts]) if parts else prefix + + # Global GCS service instance gcs_service = GCSService() diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index 11e1b05..3d10660 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -12,7 +12,7 @@ from ..core.logging import get_logger from ..models.job import JobStatus from ..services import cost_tracker from ..services.cost_tracker import BudgetExceeded -from ..services.gcs import gcs_service, upload_vtt_to_gcs +from ..services.gcs import gcs_service, gcs_path, upload_vtt_to_gcs from ..services.gemini import gemini_service from ..services.websocket import connection_manager from . import celery_app @@ -207,12 +207,12 @@ async def ingest_and_ai_task_impl(job_id: str): # Upload VTT files to GCS using detected language captions_gcs_uri = await upload_vtt_to_gcs( ai_result["captions_vtt"], - f"{job_id}/{source_language}/captions.vtt" + gcs_path(job_doc, source_language, "captions.vtt") ) ad_gcs_uri = await upload_vtt_to_gcs( ai_result["audio_description_vtt"], - f"{job_id}/{source_language}/ad.vtt" + gcs_path(job_doc, source_language, "ad.vtt") ) # Upload SDH VTT if generated @@ -220,7 +220,7 @@ async def ingest_and_ai_task_impl(job_id: str): if sdh_requested and ai_result.get("sdh_captions_vtt"): sdh_gcs_uri = await upload_vtt_to_gcs( ai_result["sdh_captions_vtt"], - f"{job_id}/{source_language}/sdh_captions.vtt" + gcs_path(job_doc, source_language, "sdh_captions.vtt") ) # Generate descriptive transcript (WCAG 2.1 1.2.1) @@ -234,7 +234,7 @@ async def ingest_and_ai_task_impl(job_id: str): if transcript_text: transcript_gcs_uri = await upload_vtt_to_gcs( transcript_text, - f"{job_id}/{source_language}/descriptive_transcript.txt" + gcs_path(job_doc, source_language, "descriptive_transcript.txt") ) logger.info(f"Generated descriptive transcript for job {job_id}, language {source_language}") except Exception as e: diff --git a/backend/app/tasks/render_accessible_video.py b/backend/app/tasks/render_accessible_video.py index 31df165..f825d4c 100644 --- a/backend/app/tasks/render_accessible_video.py +++ b/backend/app/tasks/render_accessible_video.py @@ -13,7 +13,7 @@ from ..core.logging import get_logger from ..lib.vtt import VTTParser from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata from ..schemas.whisper import CachedWhisperTranscript, CachedWordTimestamp -from ..services.gcs import gcs_service +from ..services.gcs import gcs_service, gcs_path from ..services.video_renderer import video_renderer_service from ..services.vtt_retimer import vtt_retimer_service from ..services.whisper_service import WordTimestamp, whisper_service @@ -213,7 +213,7 @@ async def _async_render_accessible_video(job_id: str, language: str): # 6. Render accessible video with segment persistence for QC editing output_video_path = os.path.join(temp_dir, "accessible_video.mp4") - gcs_segment_prefix = f"{job_id}/{language}/segments/" + gcs_segment_prefix = gcs_path(job_doc, language, "segments") + "/" logger.info(f"Rendering accessible video using {method} method with segment persistence...") rendered_path, updated_placements, segment_metadata, pause_points = await video_renderer_service.render_accessible_video( @@ -243,7 +243,7 @@ async def _async_render_accessible_video(job_id: str, language: str): logger.info(f"Built edit state with {len(segment_metadata)} segments and {len(pause_points)} pause points") # 7. Upload rendered video to GCS - video_blob_path = f"{job_id}/{language}/accessible_video.mp4" + video_blob_path = gcs_path(job_doc, language, "accessible_video.mp4") video_blob = gcs_service.bucket.blob(video_blob_path) video_blob.content_type = "video/mp4" video_blob.upload_from_filename(output_video_path) @@ -268,7 +268,7 @@ async def _async_render_accessible_video(job_id: str, language: str): ) # Upload re-timed captions - retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt" + retimed_blob_path = gcs_path(job_doc, language, "accessible_captions.vtt") retimed_blob = gcs_service.bucket.blob(retimed_blob_path) retimed_blob.content_type = "text/vtt" retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt") diff --git a/backend/app/tasks/rerender_accessible_video.py b/backend/app/tasks/rerender_accessible_video.py index 3ea4cad..fe81f19 100644 --- a/backend/app/tasks/rerender_accessible_video.py +++ b/backend/app/tasks/rerender_accessible_video.py @@ -14,7 +14,7 @@ from ..core.config import settings from ..core.logging import get_logger from ..lib.vtt import VTTParser from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata -from ..services.gcs import gcs_service +from ..services.gcs import gcs_service, gcs_path from ..services.video_renderer import video_renderer_service from ..services.vtt_retimer import vtt_retimer_service from ..services.whisper_service import WordTimestamp, whisper_service @@ -286,7 +286,7 @@ async def _async_rerender_accessible_video( # 6. Render accessible video (persist segments again for future edits) output_video_path = os.path.join(temp_dir, "accessible_video.mp4") - gcs_segment_prefix = f"{job_id}/{language}/segments/" + gcs_segment_prefix = gcs_path(job_doc, language, "segments") + "/" logger.info(f"Re-rendering accessible video using {method} method...") rendered_path, updated_placements, segment_metadata, new_pause_points = await video_renderer_service.render_accessible_video( @@ -302,7 +302,7 @@ async def _async_rerender_accessible_video( analysis["placements"] = updated_placements # 7. Upload rendered video - video_blob_path = f"{job_id}/{language}/accessible_video.mp4" + video_blob_path = gcs_path(job_doc, language, "accessible_video.mp4") video_blob = gcs_service.bucket.blob(video_blob_path) video_blob.content_type = "video/mp4" video_blob.upload_from_filename(output_video_path) @@ -323,7 +323,7 @@ async def _async_rerender_accessible_video( original_captions_vtt, analysis ) - retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt" + retimed_blob_path = gcs_path(job_doc, language, "accessible_captions.vtt") retimed_blob = gcs_service.bucket.blob(retimed_blob_path) retimed_blob.content_type = "text/vtt" retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt") diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index c61c992..498e843 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -14,7 +14,7 @@ from ..core.config import settings from ..core.logging import get_logger from ..models.job import JobStatus from ..services import cost_tracker -from ..services.gcs import gcs_service, upload_vtt_to_gcs +from ..services.gcs import gcs_service, gcs_path, upload_vtt_to_gcs from ..services.gemini import gemini_service from ..services.gemini_tts import TTSSynthesisError from ..services.websocket import connection_manager @@ -266,12 +266,12 @@ async def _async_translate_and_synthesize(job_id: str): # Upload translated VTT files captions_gcs_uri = await upload_vtt_to_gcs( translated_captions, - f"{job_id}/{lang}/captions.vtt" + gcs_path(job_doc, lang, "captions.vtt") ) ad_gcs_uri = await upload_vtt_to_gcs( translated_ad, - f"{job_id}/{lang}/ad.vtt" + gcs_path(job_doc, lang, "ad.vtt") ) # Upload SDH VTT if generated @@ -279,7 +279,7 @@ async def _async_translate_and_synthesize(job_id: str): if sdh_requested and result.get("sdh_captions_vtt"): sdh_gcs_uri = await upload_vtt_to_gcs( result["sdh_captions_vtt"], - f"{job_id}/{lang}/sdh_captions.vtt" + gcs_path(job_doc, lang, "sdh_captions.vtt") ) # Generate descriptive transcript (WCAG 2.1 1.2.1) @@ -290,7 +290,7 @@ async def _async_translate_and_synthesize(job_id: str): if transcript_text: transcript_gcs_uri = await upload_vtt_to_gcs( transcript_text, - f"{job_id}/{lang}/descriptive_transcript.txt" + gcs_path(job_doc, lang, "descriptive_transcript.txt") ) except Exception as transcript_err: logger.warning(f"Failed to generate descriptive transcript for {lang}: {transcript_err}") @@ -397,12 +397,12 @@ async def _async_translate_and_synthesize(job_id: str): # Upload translated VTT files captions_gcs_uri = await upload_vtt_to_gcs( translated_captions, - f"{job_id}/{language}/captions.vtt" + gcs_path(job_doc, language, "captions.vtt") ) ad_gcs_uri = await upload_vtt_to_gcs( translated_ad, - f"{job_id}/{language}/ad.vtt" + gcs_path(job_doc, language, "ad.vtt") ) # Translate and upload SDH VTT if requested @@ -421,7 +421,7 @@ async def _async_translate_and_synthesize(job_id: str): translated_sdh = await retry_with_backoff(translate_sdh, max_retries=3) sdh_gcs_uri = await upload_vtt_to_gcs( translated_sdh, - f"{job_id}/{language}/sdh_captions.vtt" + gcs_path(job_doc, language, "sdh_captions.vtt") ) lang_out["sdh_captions_vtt_gcs"] = sdh_gcs_uri @@ -432,7 +432,7 @@ async def _async_translate_and_synthesize(job_id: str): if transcript_text: transcript_gcs_uri = await upload_vtt_to_gcs( transcript_text, - f"{job_id}/{language}/descriptive_transcript.txt" + gcs_path(job_doc, language, "descriptive_transcript.txt") ) lang_out["descriptive_transcript_gcs"] = transcript_gcs_uri except Exception as transcript_err: @@ -934,7 +934,7 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict, mp3_data = output_buffer.getvalue() # Upload combined MP3 to GCS - mp3_blob_path = f"{job_id}/{language}/ad.mp3" + mp3_blob_path = gcs_path(job_doc, language, "ad.mp3") mp3_blob = gcs_service.bucket.blob(mp3_blob_path) mp3_blob.content_type = "audio/mpeg" mp3_blob.upload_from_string(mp3_data, content_type="audio/mpeg") diff --git a/tools/migrate_gcs_org_prefix.py b/tools/migrate_gcs_org_prefix.py new file mode 100644 index 0000000..9b05658 --- /dev/null +++ b/tools/migrate_gcs_org_prefix.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 +""" +Operator script: migrate GCS objects from flat job-id prefix to org-scoped prefix. + +Usage: + python tools/migrate_gcs_org_prefix.py [--dry-run] [--rollback] [--limit N] [--resume] + +Prerequisites (must hold BEFORE running): + 1. mongodump taken and verified + 2. gcloud storage ls > pre_migration_inventory.txt saved + 3. Celery workers stopped / paused on ingest/translate/render/tts queues + 4. celery inspect active → empty + +The script copies: + gs://{BUCKET}/{job_id}/** + to: + gs://{BUCKET}/orgs/{org_id}/jobs/{job_id}/** + +Then verifies object counts, spot-checks 3 random MD5 hashes, updates Mongo, +then (only if verify passes) deletes the old prefix. + +Writes gcs_migration_resume.json after each job so it can be resumed. +""" + +import argparse +import hashlib +import json +import os +import random +import subprocess +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from pathlib import Path + +ACTIVE_PIPELINE_STATUSES = { + "ingesting", + "ai_processing", + "translating", + "tts_generating", + "rendering_video", + "rendering_qc", +} + +RESUME_FILE = Path("gcs_migration_resume.json") +GCS_BUCKET = os.environ.get("GCS_BUCKET") or os.environ.get("BUCKET") +MONGO_URI = os.environ.get("MONGODB_URI") or os.environ.get("MONGO_URI") + + +def require_env(): + missing = [] + if not GCS_BUCKET: + missing.append("GCS_BUCKET or BUCKET") + if not MONGO_URI: + missing.append("MONGODB_URI or MONGO_URI") + if missing: + print(f"ERROR: Missing environment variables: {', '.join(missing)}") + sys.exit(1) + + +def run(cmd: str, capture: bool = True) -> subprocess.CompletedProcess: + return subprocess.run(cmd, shell=True, capture_output=capture, text=True) + + +def gsutil_list(prefix: str) -> list[str]: + r = run(f"gsutil ls -r 'gs://{GCS_BUCKET}/{prefix}/**'") + if r.returncode != 0: + return [] + return [line.strip() for line in r.stdout.splitlines() if line.strip() and not line.strip().endswith(":")] + + +def gsutil_hash(uri: str) -> str: + r = run(f"gsutil hash -h '{uri}'") + if r.returncode != 0: + return "" + for line in r.stdout.splitlines(): + if "Hash (md5)" in line or "Hash (crc32c)" in line: + return line.split(":")[-1].strip() + return "" + + +def copy_prefix(src_prefix: str, dst_prefix: str, dry_run: bool) -> bool: + cmd = f"gsutil -m cp -r 'gs://{GCS_BUCKET}/{src_prefix}/*' 'gs://{GCS_BUCKET}/{dst_prefix}/'" + print(f" COPY: {cmd}") + if dry_run: + return True + r = run(cmd, capture=False) + return r.returncode == 0 + + +def delete_prefix(prefix: str, dry_run: bool) -> bool: + cmd = f"gsutil -m rm -r 'gs://{GCS_BUCKET}/{prefix}/'" + print(f" DELETE: {cmd}") + if dry_run: + return True + r = run(cmd, capture=False) + return r.returncode == 0 + + +def verify_copy(src_prefix: str, dst_prefix: str) -> bool: + src_objects = gsutil_list(src_prefix) + dst_objects = gsutil_list(dst_prefix) + if len(src_objects) != len(dst_objects): + print(f" ERROR: Object count mismatch: src={len(src_objects)}, dst={len(dst_objects)}") + return False + + # Spot-check 3 random objects + sample_src = random.sample(src_objects, min(3, len(src_objects))) + for src_uri in sample_src: + rel = src_uri.replace(f"gs://{GCS_BUCKET}/{src_prefix}/", "") + dst_uri = f"gs://{GCS_BUCKET}/{dst_prefix}/{rel}" + src_hash = gsutil_hash(src_uri) + dst_hash = gsutil_hash(dst_uri) + if src_hash != dst_hash or not src_hash: + print(f" ERROR: Hash mismatch for {rel}: src={src_hash} dst={dst_hash}") + return False + return True + + +def migrate_job_mongo(job: dict, new_prefix: str, dry_run: bool): + """Update MongoDB job document to reflect new GCS prefix.""" + from motor.motor_asyncio import AsyncIOMotorClient + import asyncio + + async def _update(): + client = AsyncIOMotorClient(MONGO_URI) + db = client["accessible_video"] # infer from URI or hardcode + job_id = str(job["_id"]) + old_prefix = job_id # bare job_id + + updates: dict = {"gcs_prefix": new_prefix} + + # Update source.gcs_uri + source_uri = job.get("source", {}).get("gcs_uri", "") + if source_uri and f"/{old_prefix}/" in source_uri: + updates["source.gcs_uri"] = source_uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1) + if job.get("source", {}).get("filename", "").startswith(old_prefix + "/"): + updates["source.filename"] = job["source"]["filename"].replace(old_prefix + "/", new_prefix + "/", 1) + + # Update outputs + outputs = job.get("outputs") or {} + output_updates = {} + for lang, lang_out in outputs.items(): + for field, uri in (lang_out or {}).items(): + if isinstance(uri, str) and f"/{old_prefix}/" in uri: + output_updates[f"outputs.{lang}.{field}"] = uri.replace(f"/{old_prefix}/", f"/{new_prefix}/", 1) + updates.update(output_updates) + updates["updated_at"] = datetime.utcnow() + + if not dry_run: + await db.jobs.update_one({"_id": job["_id"]}, {"$set": updates}) + else: + print(f" DRY-RUN mongo update: {list(updates.keys())}") + client.close() + + asyncio.run(_update()) + + +def load_resume() -> set[str]: + if RESUME_FILE.exists(): + data = json.loads(RESUME_FILE.read_text()) + return set(data.get("completed", [])) + return set() + + +def save_resume(completed: set[str]): + existing = {} + if RESUME_FILE.exists(): + existing = json.loads(RESUME_FILE.read_text()) + existing["completed"] = list(completed) + existing["updated_at"] = datetime.utcnow().isoformat() + RESUME_FILE.write_text(json.dumps(existing, indent=2)) + + +def main(): + parser = argparse.ArgumentParser(description="Migrate GCS org prefix for all jobs") + parser.add_argument("--dry-run", action="store_true", help="Print actions without executing") + parser.add_argument("--rollback", action="store_true", help="Roll back migrated jobs (moves back)") + parser.add_argument("--limit", type=int, default=0, help="Limit number of jobs to migrate (0=all)") + parser.add_argument("--resume", action="store_true", help="Skip already-completed jobs from resume file") + args = parser.parse_args() + + require_env() + + # Pre-flight: check gcloud auth + r = run("gcloud config get-value project") + if r.returncode != 0 or not r.stdout.strip(): + print("ERROR: gcloud not authenticated or project not set. Run 'gcloud auth login' and 'gcloud config set project PROJECT_ID'.") + sys.exit(1) + print(f"GCP project: {r.stdout.strip()}") + + if not args.dry_run: + confirm = input("⚠️ This will move production GCS objects and update MongoDB. Type 'yes' to continue: ") + if confirm.lower() != "yes": + print("Aborted.") + sys.exit(0) + + # Load jobs from MongoDB + from motor.motor_asyncio import AsyncIOMotorClient + import asyncio + + async def fetch_jobs() -> list: + client = AsyncIOMotorClient(MONGO_URI) + db_name = MONGO_URI.split("/")[-1].split("?")[0] or "accessible_video" + db = client[db_name] + query = { + "gcs_prefix": None, + "organization_id": {"$ne": None}, + "status": {"$nin": list(ACTIVE_PIPELINE_STATUSES)}, + } + cursor = db.jobs.find(query, {"_id": 1, "organization_id": 1, "source": 1, "outputs": 1, "status": 1}) + jobs = await cursor.to_list(length=10000) + client.close() + return jobs + + all_jobs = asyncio.run(fetch_jobs()) + print(f"Found {len(all_jobs)} jobs to migrate (gcs_prefix=null, has org_id, not in-flight).") + + already_done = load_resume() if args.resume else set() + jobs_to_process = [j for j in all_jobs if str(j["_id"]) not in already_done] + if args.limit: + jobs_to_process = jobs_to_process[:args.limit] + + print(f"Processing {len(jobs_to_process)} jobs (skipping {len(already_done)} already done).") + + completed: set[str] = set(already_done) + errors: list[dict] = [] + + def process_job(job: dict) -> tuple[str, bool, str]: + job_id = str(job["_id"]) + org_id = str(job["organization_id"]) + new_prefix = f"orgs/{org_id}/jobs/{job_id}" + src_prefix = job_id + + if args.rollback: + # Rollback: copy from new_prefix back to src_prefix + src_prefix, new_prefix = new_prefix, src_prefix + + print(f"\n[{job_id}] {src_prefix} → {new_prefix}") + src_objects = gsutil_list(src_prefix) + if not src_objects: + print(f" SKIP: No objects found at {src_prefix}") + return job_id, True, "" + + print(f" Found {len(src_objects)} objects to copy") + if not copy_prefix(src_prefix, new_prefix, args.dry_run): + return job_id, False, "gsutil cp failed" + + if not args.dry_run: + if not verify_copy(src_prefix, new_prefix): + return job_id, False, "verification failed" + + if not args.rollback: + migrate_job_mongo(job, new_prefix, args.dry_run) + if not delete_prefix(src_prefix, args.dry_run): + return job_id, False, "gsutil rm failed" + + return job_id, True, "" + + with ThreadPoolExecutor(max_workers=4) as executor: + futures = {executor.submit(process_job, job): job for job in jobs_to_process} + for future in as_completed(futures): + job_id, ok, err = future.result() + if ok: + completed.add(job_id) + save_resume(completed) + print(f" ✓ {job_id}") + else: + errors.append({"job_id": job_id, "error": err}) + print(f" ✗ {job_id}: {err}") + + print(f"\n--- Summary ---") + print(f"Completed: {len(completed) - len(already_done)}") + print(f"Errors: {len(errors)}") + if errors: + print("Failed jobs:") + for e in errors: + print(f" {e['job_id']}: {e['error']}") + sys.exit(1) + + +if __name__ == "__main__": + main()