From 79440929f44c7322f4e7785814fe6397395de65a Mon Sep 17 00:00:00 2001 From: michael Date: Fri, 2 Jan 2026 10:12:50 -0600 Subject: [PATCH] feat: add Cloud Run HTTP services for Whisper and FFmpeg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate CPU-intensive workloads to Cloud Run for autoscaling: - Add Whisper HTTP service (FastAPI) with /transcribe endpoint - Add FFmpeg HTTP service (FastAPI) with /encode, /probe, /extract-frame, etc. - Add Dockerfiles for both services (8 vCPU, 32GB RAM, Gen2) - Add Cloud Build config for CI/CD deployment - Add Cloud Run service YAML configs with scale-to-zero - Update whisper_transcribe.py to call Cloud Run when WHISPER_SERVICE_URL set - Update video_renderer.py to call Cloud Run when FFMPEG_SERVICE_URL set - Update whisper_service.py for Cloud Run compatibility (no settings dependency) - Add ffmpeg_service_url and whisper_service_url to config.py Services scale 0→N based on request load, falling back to local execution when service URLs are not configured (hybrid mode). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/Dockerfile.ffmpeg-service | 97 +++ backend/Dockerfile.whisper-service | 101 +++ backend/app/core/config.py | 5 + backend/app/services/ffmpeg_http_service.py | 539 ++++++++++++++++ backend/app/services/video_renderer.py | 583 +++++++++++++++++- backend/app/services/whisper_http_service.py | 311 ++++++++++ backend/app/services/whisper_service.py | 40 +- backend/app/tasks/whisper_transcribe.py | 165 ++++- infra/cloud-run/cloudbuild-http-services.yaml | 166 +++++ infra/cloud-run/ffmpeg-http-service.yaml | 125 ++++ infra/cloud-run/whisper-http-service.yaml | 128 ++++ 11 files changed, 2204 insertions(+), 56 deletions(-) create mode 100644 backend/Dockerfile.ffmpeg-service create mode 100644 backend/Dockerfile.whisper-service create mode 100644 backend/app/services/ffmpeg_http_service.py create mode 100644 backend/app/services/whisper_http_service.py create mode 100644 infra/cloud-run/cloudbuild-http-services.yaml create mode 100644 infra/cloud-run/ffmpeg-http-service.yaml create mode 100644 infra/cloud-run/whisper-http-service.yaml diff --git a/backend/Dockerfile.ffmpeg-service b/backend/Dockerfile.ffmpeg-service new file mode 100644 index 0000000..897a537 --- /dev/null +++ b/backend/Dockerfile.ffmpeg-service @@ -0,0 +1,97 @@ +# ============================================================================= +# Dockerfile for FFmpeg HTTP Service - Cloud Run Deployment +# ============================================================================= +# This Dockerfile creates a FastAPI-based HTTP service for FFmpeg operations, +# designed for deployment on Google Cloud Run with autoscaling. +# +# Key features: +# - Lightweight image with FFmpeg + FastAPI (no Whisper model) +# - Fast startup for quick autoscaling +# - Optimized for 8 vCPU / 32GB RAM Cloud Run instances +# ============================================================================= + +# ----------------------------------------------------------------------------- +# Stage 1: Builder - Install Python dependencies using Poetry +# ----------------------------------------------------------------------------- +FROM python:3.11-slim AS builder + +# Install build dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install Poetry +RUN pip install --no-cache-dir poetry==1.8.2 + +# Configure Poetry +ENV POETRY_NO_INTERACTION=1 \ + POETRY_VIRTUALENVS_CREATE=false \ + POETRY_CACHE_DIR=/tmp/poetry_cache + +WORKDIR /app + +# Copy dependency files +COPY pyproject.toml poetry.lock ./ + +# Install dependencies (exclude faster-whisper to keep image small) +# Note: We still install all deps for simplicity, but could optimize with groups +RUN poetry config virtualenvs.create false \ + && poetry install --only main --no-interaction --no-ansi \ + && rm -rf $POETRY_CACHE_DIR + +# ----------------------------------------------------------------------------- +# Stage 2: Runtime - FFmpeg HTTP Service +# ----------------------------------------------------------------------------- +FROM python:3.11-slim AS runtime + +# Install runtime dependencies including FFmpeg +RUN apt-get update && apt-get install -y --no-install-recommends \ + libmagic1 \ + curl \ + tini \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +# Create non-root user for security +RUN groupadd --gid 1000 app \ + && useradd --uid 1000 --gid app --shell /bin/bash --create-home app + +# Copy Python packages from builder +COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +# Set environment variables +ENV PYTHONPATH=/app \ + PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + APP_ENV=prod + +WORKDIR /app + +# Copy application code +COPY --chown=app:app . . + +# Switch to non-root user +USER app + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +# Expose HTTP port (Cloud Run uses 8080 by default) +EXPOSE 8080 + +# Use tini as init system +ENTRYPOINT ["tini", "--"] + +# Start Uvicorn server +# - 1 worker since Cloud Run uses containerConcurrency=1 +# - Bind to 0.0.0.0:8080 for Cloud Run +# - Timeout of 300s for long FFmpeg operations +CMD ["uvicorn", "app.services.ffmpeg_http_service:app", \ + "--host", "0.0.0.0", \ + "--port", "8080", \ + "--workers", "1", \ + "--timeout-keep-alive", "300"] diff --git a/backend/Dockerfile.whisper-service b/backend/Dockerfile.whisper-service new file mode 100644 index 0000000..156df3a --- /dev/null +++ b/backend/Dockerfile.whisper-service @@ -0,0 +1,101 @@ +# ============================================================================= +# Dockerfile for Whisper HTTP Service - Cloud Run Deployment +# ============================================================================= +# This Dockerfile creates a FastAPI-based HTTP service for Whisper transcription, +# designed for deployment on Google Cloud Run with autoscaling. +# +# Key features: +# - Pre-downloads Whisper model during build (no cold start model loading) +# - Runs FastAPI with Uvicorn for HTTP handling +# - Optimized for 8 vCPU / 32GB RAM Cloud Run instances +# ============================================================================= + +# ----------------------------------------------------------------------------- +# Stage 1: Builder - Install Python dependencies using Poetry +# ----------------------------------------------------------------------------- +FROM python:3.11-slim AS builder + +# Install build dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install Poetry +RUN pip install --no-cache-dir poetry==1.8.2 + +# Configure Poetry to not create virtual environment +ENV POETRY_NO_INTERACTION=1 \ + POETRY_VIRTUALENVS_CREATE=false \ + POETRY_CACHE_DIR=/tmp/poetry_cache + +WORKDIR /app + +# Copy dependency files +COPY pyproject.toml poetry.lock ./ + +# Install dependencies +RUN poetry config virtualenvs.create false \ + && poetry install --only main --no-interaction --no-ansi \ + && rm -rf $POETRY_CACHE_DIR + +# ----------------------------------------------------------------------------- +# Stage 2: Runtime - Whisper HTTP Service +# ----------------------------------------------------------------------------- +FROM python:3.11-slim AS runtime + +# Install runtime dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + libmagic1 \ + curl \ + tini \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +# Create non-root user for security +RUN groupadd --gid 1000 app \ + && useradd --uid 1000 --gid app --shell /bin/bash --create-home app + +# Copy Python packages from builder +COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +# Set environment variables +ENV PYTHONPATH=/app \ + PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + APP_ENV=prod + +WORKDIR /app + +# Copy application code +COPY --chown=app:app . . + +# Switch to non-root user +USER app + +# Pre-download Whisper medium model during build +# This prevents cold start delays when the service scales up +# Model is cached in ~/.cache/huggingface/hub (~1.5GB) +RUN python -c "from faster_whisper import WhisperModel; WhisperModel('medium', device='cpu', compute_type='int8')" + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +# Expose HTTP port (Cloud Run uses 8080 by default) +EXPOSE 8080 + +# Use tini as init system +ENTRYPOINT ["tini", "--"] + +# Start Uvicorn server +# - 1 worker since Cloud Run uses containerConcurrency=1 +# - Bind to 0.0.0.0:8080 for Cloud Run +# - Timeout of 300s for long transcriptions +CMD ["uvicorn", "app.services.whisper_http_service:app", \ + "--host", "0.0.0.0", \ + "--port", "8080", \ + "--workers", "1", \ + "--timeout-keep-alive", "300"] diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 827a1b5..1a6ca11 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -170,6 +170,11 @@ class Settings(BaseSettings): whisper_phrase_gap_threshold: float = 0.3 # Gap duration to classify as phrase boundary whisper_min_gap_threshold: float = 0.15 # Minimum gap duration to consider + # Cloud Run Service URLs (empty = use local processing) + # When set, CPU-intensive work is offloaded to Cloud Run with autoscaling + whisper_service_url: str = "" # e.g., "https://whisper-service-xxx.run.app" + ffmpeg_service_url: str = "" # e.g., "https://ffmpeg-service-xxx.run.app" + # Email sendgrid_api_key: str email_from: str diff --git a/backend/app/services/ffmpeg_http_service.py b/backend/app/services/ffmpeg_http_service.py new file mode 100644 index 0000000..afed87f --- /dev/null +++ b/backend/app/services/ffmpeg_http_service.py @@ -0,0 +1,539 @@ +""" +FFmpeg HTTP Service - FastAPI application for Cloud Run deployment. + +This service exposes FFmpeg operations as HTTP endpoints, allowing +the main application to offload CPU-intensive video encoding to Cloud Run +with autoscaling. + +This module uses minimal configuration to avoid importing the full app Settings. +""" + +import json +import logging +import os +import subprocess +import tempfile +import uuid +from typing import Any, Optional + +from fastapi import FastAPI, HTTPException +from google.cloud import storage +from pydantic import BaseModel, Field + +# Minimal configuration for Cloud Run deployment +# These are read directly from environment variables +GCS_BUCKET = os.environ.get("GCS_BUCKET", "") + +# Simple logging setup (no dependency on app.core.logging) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# FastAPI app for Cloud Run +app = FastAPI( + title="FFmpeg Processing Service", + description="Cloud Run service for FFmpeg video processing operations", + version="1.0.0", +) + + +# Request/Response Models +class RunFFmpegRequest(BaseModel): + """Request model for running an FFmpeg command.""" + input_gcs_uris: list[str] = Field( + ..., + description="GCS URIs of input files (will be downloaded and mapped to local paths)" + ) + output_gcs_uri: str = Field( + ..., + description="GCS URI where the output file should be uploaded" + ) + command_template: list[str] = Field( + ..., + description="FFmpeg command template with {input_0}, {input_1}, etc. and {output} placeholders" + ) + timeout: int = Field( + default=3600, + description="Command timeout in seconds" + ) + + +class RunFFmpegResponse(BaseModel): + """Response model for FFmpeg command execution.""" + success: bool + output_gcs_uri: str + stderr: str = "" + duration_seconds: float = 0.0 + + +class ProbeRequest(BaseModel): + """Request model for probing video properties.""" + gcs_uri: str = Field( + ..., + description="GCS URI of the video file to probe" + ) + + +class ProbeResponse(BaseModel): + """Response model for video probe.""" + width: int + height: int + fps: float + duration: float + sample_rate: int + channels: int + codec_name: str + pix_fmt: str + + +class EncodeSegmentRequest(BaseModel): + """Request for encoding a video segment.""" + source_gcs_uri: str + output_gcs_uri: str + start_time: float + duration: float + fps: float = 30.0 + pix_fmt: str = "yuv420p" + sample_rate: int = 44100 + channels: int = 2 + + +class ExtractFrameRequest(BaseModel): + """Request for extracting a single frame.""" + source_gcs_uri: str + output_gcs_uri: str + time_point: float + + +class CreateFreezeSegmentRequest(BaseModel): + """Request for creating a freeze-frame video with audio.""" + frame_gcs_uri: str + audio_gcs_uri: str + output_gcs_uri: str + width: int + height: int + fps: float = 30.0 + pix_fmt: str = "yuv420p" + sample_rate: int = 44100 + channels: int = 2 + + +class ConcatenateRequest(BaseModel): + """Request for concatenating video segments.""" + segment_gcs_uris: list[str] + output_gcs_uri: str + + +class HealthResponse(BaseModel): + """Health check response.""" + status: str + ffmpeg_version: str + + +def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]: + """Parse GCS URI into bucket and blob path.""" + if not gcs_uri.startswith("gs://"): + raise ValueError(f"Invalid GCS URI: {gcs_uri}") + parts = gcs_uri[5:].split("/", 1) + if len(parts) != 2: + raise ValueError(f"Invalid GCS URI format: {gcs_uri}") + return parts[0], parts[1] + + +def _download_from_gcs(gcs_uri: str, local_path: str) -> None: + """Download file from GCS to local path.""" + bucket_name, blob_path = _parse_gcs_uri(gcs_uri) + client = storage.Client() # Auto-detects project from environment + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + if not blob.exists(): + raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}") + blob.download_to_filename(local_path) + logger.debug(f"Downloaded {gcs_uri} to {local_path}") + + +def _upload_to_gcs(local_path: str, gcs_uri: str) -> None: + """Upload local file to GCS.""" + bucket_name, blob_path = _parse_gcs_uri(gcs_uri) + client = storage.Client() # Auto-detects project from environment + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + blob.upload_from_filename(local_path) + logger.debug(f"Uploaded {local_path} to {gcs_uri}") + + +def _run_command(cmd: list[str], timeout: int = 3600) -> tuple[bool, str, str]: + """Run a command and return success, stdout, stderr.""" + cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '') + logger.info(f"Executing: {cmd_preview}") + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + return result.returncode == 0, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return False, "", f"Command timed out after {timeout}s" + except Exception as e: + return False, "", str(e) + + +def _get_ffmpeg_version() -> str: + """Get FFmpeg version string.""" + try: + result = subprocess.run( + ["ffmpeg", "-version"], + capture_output=True, + text=True, + timeout=5 + ) + first_line = result.stdout.split('\n')[0] + return first_line + except Exception: + return "unknown" + + +@app.get("/health", response_model=HealthResponse) +async def health_check(): + """Health check endpoint.""" + return HealthResponse( + status="healthy", + ffmpeg_version=_get_ffmpeg_version() + ) + + +@app.post("/run-ffmpeg", response_model=RunFFmpegResponse) +async def run_ffmpeg(request: RunFFmpegRequest): + """ + Run a generic FFmpeg command with GCS input/output. + + The command_template should use placeholders: + - {input_0}, {input_1}, etc. for input files + - {output} for the output file + + Example: + { + "input_gcs_uris": ["gs://bucket/video.mp4"], + "output_gcs_uri": "gs://bucket/output.mp4", + "command_template": ["ffmpeg", "-y", "-i", "{input_0}", "-c:v", "libx264", "{output}"] + } + """ + import time + start_time = time.time() + + with tempfile.TemporaryDirectory() as tmpdir: + try: + # Download input files + input_paths = [] + for i, gcs_uri in enumerate(request.input_gcs_uris): + ext = os.path.splitext(gcs_uri)[1] or ".bin" + local_path = os.path.join(tmpdir, f"input_{i}{ext}") + _download_from_gcs(gcs_uri, local_path) + input_paths.append(local_path) + + # Prepare output path + output_ext = os.path.splitext(request.output_gcs_uri)[1] or ".mp4" + output_path = os.path.join(tmpdir, f"output{output_ext}") + + # Build command from template + cmd = [] + for part in request.command_template: + if part == "{output}": + cmd.append(output_path) + elif part.startswith("{input_") and part.endswith("}"): + idx = int(part[7:-1]) + if idx < len(input_paths): + cmd.append(input_paths[idx]) + else: + raise HTTPException(status_code=400, detail=f"Invalid input index: {idx}") + else: + cmd.append(part) + + # Run FFmpeg + success, stdout, stderr = _run_command(cmd, request.timeout) + + if not success: + logger.error(f"FFmpeg failed: {stderr[-500:]}") + raise HTTPException(status_code=500, detail=f"FFmpeg failed: {stderr[-500:]}") + + # Upload output + _upload_to_gcs(output_path, request.output_gcs_uri) + + duration = time.time() - start_time + logger.info(f"FFmpeg completed in {duration:.2f}s") + + return RunFFmpegResponse( + success=True, + output_gcs_uri=request.output_gcs_uri, + stderr=stderr[-200:] if stderr else "", + duration_seconds=duration + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"FFmpeg operation failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/probe", response_model=ProbeResponse) +async def probe_video(request: ProbeRequest): + """ + Get video properties using FFprobe. + """ + with tempfile.TemporaryDirectory() as tmpdir: + try: + # Download video + ext = os.path.splitext(request.gcs_uri)[1] or ".mp4" + local_path = os.path.join(tmpdir, f"video{ext}") + _download_from_gcs(request.gcs_uri, local_path) + + # Run ffprobe + cmd = [ + "ffprobe", "-v", "quiet", + "-show_streams", "-of", "json", + local_path + ] + success, stdout, stderr = _run_command(cmd, timeout=60) + + if not success: + raise HTTPException(status_code=500, detail=f"FFprobe failed: {stderr}") + + # Parse output + data = json.loads(stdout) + streams = data.get("streams", []) + + video_stream = next((s for s in streams if s.get("codec_type") == "video"), {}) + audio_stream = next((s for s in streams if s.get("codec_type") == "audio"), {}) + + # Parse FPS + fps = 30.0 + if "r_frame_rate" in video_stream: + fps_parts = video_stream["r_frame_rate"].split("/") + if len(fps_parts) == 2 and int(fps_parts[1]) != 0: + fps = int(fps_parts[0]) / int(fps_parts[1]) + + return ProbeResponse( + width=int(video_stream.get("width", 1920)), + height=int(video_stream.get("height", 1080)), + fps=fps, + duration=float(video_stream.get("duration", 0) or audio_stream.get("duration", 0)), + sample_rate=int(audio_stream.get("sample_rate", 44100)), + channels=int(audio_stream.get("channels", 2)), + codec_name=video_stream.get("codec_name", "h264"), + pix_fmt=video_stream.get("pix_fmt", "yuv420p") + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Probe failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/encode-segment", response_model=RunFFmpegResponse) +async def encode_segment(request: EncodeSegmentRequest): + """ + Extract and encode a video segment with re-encoding for frame accuracy. + """ + import time + start_time = time.time() + + with tempfile.TemporaryDirectory() as tmpdir: + try: + # Download source + ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4" + source_path = os.path.join(tmpdir, f"source{ext}") + _download_from_gcs(request.source_gcs_uri, source_path) + + # Output path + output_path = os.path.join(tmpdir, "segment.mp4") + + # Build FFmpeg command + cmd = [ + "ffmpeg", "-y", + "-ss", str(request.start_time), + "-i", source_path, + "-t", str(request.duration), + "-c:v", "libx264", "-preset", "fast", + "-pix_fmt", request.pix_fmt, + "-r", str(request.fps), + "-c:a", "aac", "-ar", str(request.sample_rate), + "-ac", str(request.channels), "-b:a", "192k", + "-video_track_timescale", "90000", + output_path + ] + + success, stdout, stderr = _run_command(cmd) + if not success: + raise HTTPException(status_code=500, detail=f"Encode failed: {stderr[-500:]}") + + _upload_to_gcs(output_path, request.output_gcs_uri) + + return RunFFmpegResponse( + success=True, + output_gcs_uri=request.output_gcs_uri, + duration_seconds=time.time() - start_time + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Encode segment failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/extract-frame", response_model=RunFFmpegResponse) +async def extract_frame(request: ExtractFrameRequest): + """ + Extract a single frame from a video as PNG. + """ + import time + start_time = time.time() + + with tempfile.TemporaryDirectory() as tmpdir: + try: + # Download source + ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4" + source_path = os.path.join(tmpdir, f"source{ext}") + _download_from_gcs(request.source_gcs_uri, source_path) + + output_path = os.path.join(tmpdir, "frame.png") + + cmd = [ + "ffmpeg", "-y", + "-ss", str(request.time_point), + "-i", source_path, + "-frames:v", "1", + "-q:v", "2", + output_path + ] + + success, stdout, stderr = _run_command(cmd, timeout=60) + if not success: + raise HTTPException(status_code=500, detail=f"Frame extraction failed: {stderr[-500:]}") + + _upload_to_gcs(output_path, request.output_gcs_uri) + + return RunFFmpegResponse( + success=True, + output_gcs_uri=request.output_gcs_uri, + duration_seconds=time.time() - start_time + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Extract frame failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/create-freeze-segment", response_model=RunFFmpegResponse) +async def create_freeze_segment(request: CreateFreezeSegmentRequest): + """ + Create a freeze-frame video segment with audio overlay. + """ + import time + start_time = time.time() + + with tempfile.TemporaryDirectory() as tmpdir: + try: + # Download inputs + frame_path = os.path.join(tmpdir, "frame.png") + audio_path = os.path.join(tmpdir, "audio.mp3") + _download_from_gcs(request.frame_gcs_uri, frame_path) + _download_from_gcs(request.audio_gcs_uri, audio_path) + + output_path = os.path.join(tmpdir, "freeze.mp4") + + # Build FFmpeg command for freeze frame with audio + cmd = [ + "ffmpeg", "-y", + "-loop", "1", "-i", frame_path, + "-i", audio_path, + "-c:v", "libx264", "-preset", "fast", + "-tune", "stillimage", + "-pix_fmt", request.pix_fmt, + "-r", str(request.fps), + "-vf", f"scale={request.width}:{request.height}:force_original_aspect_ratio=decrease," + f"pad={request.width}:{request.height}:(ow-iw)/2:(oh-ih)/2", + "-c:a", "aac", "-ar", str(request.sample_rate), + "-ac", str(request.channels), "-b:a", "192k", + "-video_track_timescale", "90000", + "-shortest", + output_path + ] + + success, stdout, stderr = _run_command(cmd) + if not success: + raise HTTPException(status_code=500, detail=f"Freeze segment failed: {stderr[-500:]}") + + _upload_to_gcs(output_path, request.output_gcs_uri) + + return RunFFmpegResponse( + success=True, + output_gcs_uri=request.output_gcs_uri, + duration_seconds=time.time() - start_time + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Create freeze segment failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/concatenate", response_model=RunFFmpegResponse) +async def concatenate_segments(request: ConcatenateRequest): + """ + Concatenate multiple video segments using concat demuxer. + """ + import time + start_time = time.time() + + with tempfile.TemporaryDirectory() as tmpdir: + try: + # Download all segments + segment_paths = [] + for i, gcs_uri in enumerate(request.segment_gcs_uris): + ext = os.path.splitext(gcs_uri)[1] or ".mp4" + local_path = os.path.join(tmpdir, f"segment_{i}{ext}") + _download_from_gcs(gcs_uri, local_path) + segment_paths.append(local_path) + + # Create concat file + concat_file = os.path.join(tmpdir, "concat.txt") + with open(concat_file, "w") as f: + for path in segment_paths: + f.write(f"file '{path}'\n") + + output_path = os.path.join(tmpdir, "output.mp4") + + cmd = [ + "ffmpeg", "-y", + "-f", "concat", + "-safe", "0", + "-i", concat_file, + "-c", "copy", + output_path + ] + + success, stdout, stderr = _run_command(cmd) + if not success: + raise HTTPException(status_code=500, detail=f"Concatenation failed: {stderr[-500:]}") + + _upload_to_gcs(output_path, request.output_gcs_uri) + + return RunFFmpegResponse( + success=True, + output_gcs_uri=request.output_gcs_uri, + duration_seconds=time.time() - start_time + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Concatenate failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/backend/app/services/video_renderer.py b/backend/app/services/video_renderer.py index 94439c1..1fbcb3c 100644 --- a/backend/app/services/video_renderer.py +++ b/backend/app/services/video_renderer.py @@ -2,6 +2,8 @@ FFmpeg operations are dispatched to a dedicated Celery queue (ffmpeg) with concurrency=1 to prevent server overload when multiple render tasks run in parallel. + +When FFMPEG_SERVICE_URL is configured, operations are offloaded to Cloud Run for autoscaling. """ import asyncio @@ -10,6 +12,10 @@ import os import tempfile from pathlib import Path from typing import Any +from uuid import uuid4 + +import httpx +from google.cloud import storage from ..core.config import settings from ..core.logging import get_logger @@ -32,6 +38,160 @@ class VideoRendererService: # Audio ducking settings self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3) self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200) + # Cloud Run support + self._http_client: httpx.Client | None = None + self._gcs_client: storage.Client | None = None + # Source video caching for Cloud Run (uploaded once, reused across operations) + self._cached_source_gcs_uri: str | None = None + + @property + def _use_cloud_run(self) -> bool: + """Check if Cloud Run FFmpeg service is configured.""" + return bool(settings.ffmpeg_service_url) + + def _get_http_client(self) -> httpx.Client: + """Get or create HTTP client for Cloud Run calls.""" + if self._http_client is None: + # 10-minute timeout for long FFmpeg operations + self._http_client = httpx.Client(timeout=600.0) + return self._http_client + + def _get_gcs_client(self) -> storage.Client: + """Get or create GCS client for file transfers.""" + if self._gcs_client is None: + self._gcs_client = storage.Client() + return self._gcs_client + + def _upload_to_gcs_temp(self, local_path: str, prefix: str) -> str: + """ + Upload local file to GCS temp location and return gs:// URI. + + Args: + local_path: Path to local file + prefix: Prefix for temp path (e.g., "source", "frame", "audio") + + Returns: + GCS URI (gs://bucket/temp/ffmpeg/{prefix}/{uuid}/{filename}) + """ + client = self._get_gcs_client() + bucket = client.bucket(settings.gcs_bucket) + + filename = os.path.basename(local_path) + gcs_path = f"temp/ffmpeg/{prefix}/{uuid4().hex}/{filename}" + + blob = bucket.blob(gcs_path) + blob.upload_from_filename(local_path) + + gcs_uri = f"gs://{settings.gcs_bucket}/{gcs_path}" + logger.debug(f"Uploaded {local_path} to {gcs_uri}") + return gcs_uri + + def _download_from_gcs_temp(self, gcs_uri: str, local_path: str): + """ + Download file from GCS to local path. + + Args: + gcs_uri: GCS URI (gs://bucket/path) + local_path: Local destination path + """ + client = self._get_gcs_client() + + # Parse gs:// URI + if not gcs_uri.startswith("gs://"): + raise ValueError(f"Invalid GCS URI: {gcs_uri}") + + path_parts = gcs_uri[5:].split("/", 1) + bucket_name = path_parts[0] + blob_path = path_parts[1] if len(path_parts) > 1 else "" + + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + + # Ensure parent directory exists + os.makedirs(os.path.dirname(local_path), exist_ok=True) + blob.download_to_filename(local_path) + logger.debug(f"Downloaded {gcs_uri} to {local_path}") + + def _delete_gcs_temp(self, gcs_uri: str): + """ + Delete temporary GCS file. + + Args: + gcs_uri: GCS URI to delete (gs://bucket/path) + """ + if not gcs_uri: + return + + try: + client = self._get_gcs_client() + + # Parse gs:// URI + if not gcs_uri.startswith("gs://"): + logger.warning(f"Invalid GCS URI for deletion: {gcs_uri}") + return + + path_parts = gcs_uri[5:].split("/", 1) + bucket_name = path_parts[0] + blob_path = path_parts[1] if len(path_parts) > 1 else "" + + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + blob.delete() + logger.debug(f"Deleted {gcs_uri}") + except Exception as e: + # Log but don't fail on cleanup errors + logger.warning(f"Failed to delete GCS temp file {gcs_uri}: {e}") + + def _call_cloud_run_probe(self, gcs_uri: str) -> dict[str, Any]: + """ + Call Cloud Run FFmpeg service /probe endpoint. + + Args: + gcs_uri: GCS URI of video to probe (gs://bucket/path) + + Returns: + Probe result with duration and stream info + """ + client = self._get_http_client() + response = client.post( + f"{settings.ffmpeg_service_url}/probe", + json={"source_gcs_uri": gcs_uri} + ) + response.raise_for_status() + return response.json() + + def _call_cloud_run_endpoint( + self, + endpoint: str, + payload: dict[str, Any], + output_gcs_uri: str | None = None + ) -> dict[str, Any]: + """ + Call Cloud Run FFmpeg service endpoint. + + Args: + endpoint: Endpoint path (e.g., "/encode-segment", "/extract-frame") + payload: Request payload + output_gcs_uri: Expected output GCS URI (for cleanup on error) + + Returns: + Response JSON + """ + client = self._get_http_client() + try: + response = client.post( + f"{settings.ffmpeg_service_url}{endpoint}", + json=payload + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + # Try to get error details from response + try: + error_detail = e.response.json().get("detail", str(e)) + except Exception: + error_detail = str(e) + raise FFmpegExecutionError(f"Cloud Run {endpoint} failed: {error_detail}") async def _dispatch_ffmpeg(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]: """ @@ -139,14 +299,27 @@ class VideoRendererService: """ method = analysis.get("method", "pause_insert") - if method == "overlay": - return await self._render_overlay_method( - source_video_path, ad_segments, analysis, output_path - ) - else: - return await self._render_pause_insert_method( - source_video_path, ad_segments, analysis, output_path - ) + # Cloud Run optimization: Upload source video once and cache for all operations + if self._use_cloud_run: + logger.info("Cloud Run mode: uploading source video to GCS for caching") + self._cached_source_gcs_uri = self._upload_to_gcs_temp(source_video_path, "source") + logger.info(f"Source video cached at: {self._cached_source_gcs_uri}") + + try: + if method == "overlay": + return await self._render_overlay_method( + source_video_path, ad_segments, analysis, output_path + ) + else: + return await self._render_pause_insert_method( + source_video_path, ad_segments, analysis, output_path + ) + finally: + # Clean up cached source video from GCS + if self._use_cloud_run and self._cached_source_gcs_uri: + logger.info(f"Cleaning up cached source video: {self._cached_source_gcs_uri}") + self._delete_gcs_temp(self._cached_source_gcs_uri) + self._cached_source_gcs_uri = None async def _render_overlay_method( self, @@ -429,7 +602,13 @@ class VideoRendererService: return output_path async def _get_video_duration(self, video_path: str) -> float: - """Get video duration in seconds using ffprobe via the ffmpeg queue.""" + """Get video duration in seconds using ffprobe.""" + if self._use_cloud_run: + return await self._get_video_duration_cloud_run(video_path) + return await self._get_video_duration_local(video_path) + + async def _get_video_duration_local(self, video_path: str) -> float: + """Get video duration using local ffprobe via Celery queue.""" cmd = [ self.ffprobe_path, "-v", "quiet", @@ -440,8 +619,30 @@ class VideoRendererService: result = await self._dispatch_ffprobe(cmd) return float(result['stdout'].strip()) + async def _get_video_duration_cloud_run(self, video_path: str) -> float: + """Get video duration via Cloud Run FFmpeg service.""" + # Use cached source if available, otherwise upload + if self._cached_source_gcs_uri: + gcs_uri = self._cached_source_gcs_uri + else: + gcs_uri = self._upload_to_gcs_temp(video_path, "probe") + + try: + result = self._call_cloud_run_probe(gcs_uri) + return result["duration"] + finally: + # Clean up if we uploaded specifically for this call + if not self._cached_source_gcs_uri: + self._delete_gcs_temp(gcs_uri) + async def _get_video_properties(self, video_path: str) -> dict[str, Any]: - """Get detailed video and audio properties via the ffmpeg queue.""" + """Get detailed video and audio properties.""" + if self._use_cloud_run: + return await self._get_video_properties_cloud_run(video_path) + return await self._get_video_properties_local(video_path) + + async def _get_video_properties_local(self, video_path: str) -> dict[str, Any]: + """Get video properties using local ffprobe via Celery queue.""" cmd = [ self.ffprobe_path, "-v", "quiet", @@ -484,6 +685,53 @@ class VideoRendererService: return props + async def _get_video_properties_cloud_run(self, video_path: str) -> dict[str, Any]: + """Get video properties via Cloud Run FFmpeg service.""" + # Use cached source if available, otherwise upload + if self._cached_source_gcs_uri: + gcs_uri = self._cached_source_gcs_uri + else: + gcs_uri = self._upload_to_gcs_temp(video_path, "probe") + + try: + result = self._call_cloud_run_probe(gcs_uri) + + # Defaults + props = { + "width": 1920, + "height": 1080, + "fps": 30.0, + "sample_rate": "44100", + "channels": "2", + "pix_fmt": "yuv420p", + "codec": "h264" + } + + # Extract from probe result + for stream in result.get("streams", []): + if stream.get("codec_type") == "video": + props["width"] = stream.get("width", props["width"]) + props["height"] = stream.get("height", props["height"]) + props["pix_fmt"] = stream.get("pix_fmt", props["pix_fmt"]) + props["codec"] = stream.get("codec_name", props["codec"]) + + fps_str = stream.get("r_frame_rate", "30/1") + if "/" in fps_str: + num, den = fps_str.split("/") + props["fps"] = float(num) / float(den) + else: + props["fps"] = float(fps_str) + + elif stream.get("codec_type") == "audio": + props["sample_rate"] = stream.get("sample_rate", props["sample_rate"]) + props["channels"] = str(stream.get("channels", props["channels"])) + + return props + finally: + # Clean up if we uploaded specifically for this call + if not self._cached_source_gcs_uri: + self._delete_gcs_temp(gcs_uri) + async def _extract_segment( self, source_path: str, @@ -519,6 +767,24 @@ class VideoRendererService: - Keyframe-only cuts causing audio dropouts - Timestamp desynchronization """ + if self._use_cloud_run: + await self._extract_segment_reencoded_cloud_run( + source_path, start_time, duration, output_path, props + ) + else: + await self._extract_segment_reencoded_local( + source_path, start_time, duration, output_path, props + ) + + async def _extract_segment_reencoded_local( + self, + source_path: str, + start_time: float, + duration: float, + output_path: str, + props: dict[str, Any] + ): + """Extract segment with re-encoding using local ffmpeg via Celery queue.""" cmd = [ self.ffmpeg_path, "-y", @@ -541,8 +807,51 @@ class VideoRendererService: ] await self._run_ffmpeg(cmd) + async def _extract_segment_reencoded_cloud_run( + self, + source_path: str, + start_time: float, + duration: float, + output_path: str, + props: dict[str, Any] + ): + """Extract segment with re-encoding via Cloud Run FFmpeg service.""" + # Use cached source GCS URI + if not self._cached_source_gcs_uri: + raise FFmpegExecutionError("Source video not cached for Cloud Run operation") + + output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/segment/{uuid4().hex}/segment.mp4" + + try: + self._call_cloud_run_endpoint( + "/encode-segment", + { + "source_gcs_uri": self._cached_source_gcs_uri, + "output_gcs_uri": output_gcs_uri, + "start_time": start_time, + "duration": duration, + "width": props["width"], + "height": props["height"], + "fps": props["fps"], + "pix_fmt": props["pix_fmt"], + "sample_rate": props["sample_rate"], + "channels": props["channels"], + } + ) + # Download result to local path + self._download_from_gcs_temp(output_gcs_uri, output_path) + finally: + self._delete_gcs_temp(output_gcs_uri) + async def _extract_frame(self, video_path: str, time_point: float, output_path: str): """Extract a single frame as PNG using ffmpeg.""" + if self._use_cloud_run: + await self._extract_frame_cloud_run(video_path, time_point, output_path) + else: + await self._extract_frame_local(video_path, time_point, output_path) + + async def _extract_frame_local(self, video_path: str, time_point: float, output_path: str): + """Extract frame using local ffmpeg via Celery queue.""" logger.debug(f"Extracting frame at {time_point:.2f}s from {video_path}") cmd = [ @@ -564,6 +873,34 @@ class VideoRendererService: f"(time may be beyond video duration)" ) + async def _extract_frame_cloud_run(self, video_path: str, time_point: float, output_path: str): + """Extract frame via Cloud Run FFmpeg service.""" + if not self._cached_source_gcs_uri: + raise FFmpegExecutionError("Source video not cached for Cloud Run operation") + + output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/frame/{uuid4().hex}/frame.png" + + try: + self._call_cloud_run_endpoint( + "/extract-frame", + { + "source_gcs_uri": self._cached_source_gcs_uri, + "output_gcs_uri": output_gcs_uri, + "time_point": time_point, + } + ) + # Download result to local path + self._download_from_gcs_temp(output_gcs_uri, output_path) + + # Verify frame was actually created + if not os.path.exists(output_path): + raise FileNotFoundError( + f"Frame extraction failed: no frame created at {time_point:.2f}s " + f"(time may be beyond video duration)" + ) + finally: + self._delete_gcs_temp(output_gcs_uri) + async def _create_freeze_segment( self, frame_path: str, @@ -611,6 +948,24 @@ class VideoRendererService: This fixes the "silent pause" issue caused by sample rate mismatch when concatenating with extracted video segments. """ + if self._use_cloud_run: + await self._create_freeze_segment_matched_cloud_run( + frame_path, audio_path, duration, output_path, props + ) + else: + await self._create_freeze_segment_matched_local( + frame_path, audio_path, duration, output_path, props + ) + + async def _create_freeze_segment_matched_local( + self, + frame_path: str, + audio_path: str, + duration: float, + output_path: str, + props: dict[str, Any] + ): + """Create freeze segment using local ffmpeg via Celery queue.""" # Validate inputs if duration <= 0: raise ValueError(f"Invalid freeze segment duration: {duration}") @@ -650,6 +1005,52 @@ class VideoRendererService: ] await self._run_ffmpeg(cmd) + async def _create_freeze_segment_matched_cloud_run( + self, + frame_path: str, + audio_path: str, + duration: float, + output_path: str, + props: dict[str, Any] + ): + """Create freeze segment via Cloud Run FFmpeg service.""" + # Validate inputs + if duration <= 0: + raise ValueError(f"Invalid freeze segment duration: {duration}") + + if not os.path.exists(frame_path): + raise FileNotFoundError(f"Frame file not found: {frame_path}") + if not os.path.exists(audio_path): + raise FileNotFoundError(f"Audio file not found: {audio_path}") + + # Upload frame and audio to GCS + frame_gcs_uri = self._upload_to_gcs_temp(frame_path, "frame") + audio_gcs_uri = self._upload_to_gcs_temp(audio_path, "audio") + output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/freeze/{uuid4().hex}/freeze.mp4" + + try: + self._call_cloud_run_endpoint( + "/create-freeze-segment", + { + "frame_gcs_uri": frame_gcs_uri, + "audio_gcs_uri": audio_gcs_uri, + "output_gcs_uri": output_gcs_uri, + "duration": duration, + "width": props["width"], + "height": props["height"], + "fps": props["fps"], + "pix_fmt": props["pix_fmt"], + "sample_rate": props["sample_rate"], + "channels": props["channels"], + } + ) + # Download result to local path + self._download_from_gcs_temp(output_gcs_uri, output_path) + finally: + self._delete_gcs_temp(frame_gcs_uri) + self._delete_gcs_temp(audio_gcs_uri) + self._delete_gcs_temp(output_gcs_uri) + async def _extract_audio_segment( self, source_path: str, @@ -704,12 +1105,19 @@ class VideoRendererService: Concatenate multiple audio files for combined AD + catch-up audio. Uses FFmpeg concat filter for seamless joining with consistent encoding. - - Args: - audio_paths: List of audio file paths to concatenate (in order) - output_path: Path to output combined audio file - props: Video properties (for sample_rate, channels) """ + if self._use_cloud_run: + await self._concatenate_audio_cloud_run(audio_paths, output_path, props) + else: + await self._concatenate_audio_local(audio_paths, output_path, props) + + async def _concatenate_audio_local( + self, + audio_paths: list[str], + output_path: str, + props: dict[str, Any] + ): + """Concatenate audio files using local ffmpeg via Celery queue.""" if not audio_paths: raise ValueError("No audio files to concatenate") @@ -755,6 +1163,57 @@ class VideoRendererService: ] await self._run_ffmpeg(cmd) + async def _concatenate_audio_cloud_run( + self, + audio_paths: list[str], + output_path: str, + props: dict[str, Any] + ): + """Concatenate audio files via Cloud Run FFmpeg service.""" + if not audio_paths: + raise ValueError("No audio files to concatenate") + + # Upload all audio files to GCS + audio_gcs_uris = [] + for path in audio_paths: + gcs_uri = self._upload_to_gcs_temp(path, "audio") + audio_gcs_uris.append(gcs_uri) + + output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/audio/{uuid4().hex}/combined.m4a" + + try: + # Build ffmpeg command for Cloud Run + # Use filter_complex for concatenation + inputs = [] + filter_parts = [] + for i in range(len(audio_gcs_uris)): + filter_parts.append(f"[{i}:a]") + + filter_complex = "".join(filter_parts) + f"concat=n={len(audio_gcs_uris)}:v=0:a=1[out]" + + self._call_cloud_run_endpoint( + "/run-ffmpeg", + { + "input_gcs_uris": audio_gcs_uris, + "output_gcs_uri": output_gcs_uri, + "ffmpeg_args": [ + "-filter_complex", filter_complex, + "-map", "[out]", + "-c:a", "aac", + "-ar", props["sample_rate"], + "-ac", props["channels"], + "-b:a", "192k", + ] + } + ) + # Download result to local path + self._download_from_gcs_temp(output_gcs_uri, output_path) + finally: + # Cleanup + for gcs_uri in audio_gcs_uris: + self._delete_gcs_temp(gcs_uri) + self._delete_gcs_temp(output_gcs_uri) + async def _generate_silence( self, duration: float, @@ -765,12 +1224,19 @@ class VideoRendererService: Generate a silent audio file of specified duration. Used to create 500ms silence buffers before/after AD audio. - - Args: - duration: Duration of silence in seconds - output_path: Path to output audio file - props: Video properties (for sample_rate, channels) """ + if self._use_cloud_run: + await self._generate_silence_cloud_run(duration, output_path, props) + else: + await self._generate_silence_local(duration, output_path, props) + + async def _generate_silence_local( + self, + duration: float, + output_path: str, + props: dict[str, Any] + ): + """Generate silence using local ffmpeg via Celery queue.""" if duration <= 0: raise ValueError(f"Invalid silence duration: {duration}") @@ -792,6 +1258,41 @@ class VideoRendererService: ] await self._run_ffmpeg(cmd) + async def _generate_silence_cloud_run( + self, + duration: float, + output_path: str, + props: dict[str, Any] + ): + """Generate silence via Cloud Run FFmpeg service.""" + if duration <= 0: + raise ValueError(f"Invalid silence duration: {duration}") + + output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/silence/{uuid4().hex}/silence.m4a" + + try: + channel_layout = "stereo" if props["channels"] == "2" else "mono" + self._call_cloud_run_endpoint( + "/run-ffmpeg", + { + "input_gcs_uris": [], # No input files, using lavfi + "output_gcs_uri": output_gcs_uri, + "ffmpeg_args": [ + "-f", "lavfi", + "-i", f"anullsrc=r={props['sample_rate']}:cl={channel_layout}", + "-t", str(duration), + "-c:a", "aac", + "-ar", props["sample_rate"], + "-ac", props["channels"], + "-b:a", "192k", + ] + } + ) + # Download result to local path + self._download_from_gcs_temp(output_gcs_uri, output_path) + finally: + self._delete_gcs_temp(output_gcs_uri) + async def _concatenate_segments( self, segment_paths: list[str], @@ -799,6 +1300,18 @@ class VideoRendererService: temp_dir: Path ): """Concatenate video segments using ffmpeg concat demuxer.""" + if self._use_cloud_run: + await self._concatenate_segments_cloud_run(segment_paths, output_path) + else: + await self._concatenate_segments_local(segment_paths, output_path, temp_dir) + + async def _concatenate_segments_local( + self, + segment_paths: list[str], + output_path: str, + temp_dir: Path + ): + """Concatenate video segments using local ffmpeg via Celery queue.""" # Create concat file concat_file = temp_dir / "concat.txt" with open(concat_file, "w") as f: @@ -818,6 +1331,36 @@ class VideoRendererService: ] await self._run_ffmpeg(cmd) + async def _concatenate_segments_cloud_run( + self, + segment_paths: list[str], + output_path: str + ): + """Concatenate video segments via Cloud Run FFmpeg service.""" + # Upload all segment files to GCS + segment_gcs_uris = [] + for path in segment_paths: + gcs_uri = self._upload_to_gcs_temp(path, "segment") + segment_gcs_uris.append(gcs_uri) + + output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/concat/{uuid4().hex}/final.mp4" + + try: + self._call_cloud_run_endpoint( + "/concatenate", + { + "segment_gcs_uris": segment_gcs_uris, + "output_gcs_uri": output_gcs_uri, + } + ) + # Download result to local path + self._download_from_gcs_temp(output_gcs_uri, output_path) + finally: + # Cleanup uploaded segments + for gcs_uri in segment_gcs_uris: + self._delete_gcs_temp(gcs_uri) + self._delete_gcs_temp(output_gcs_uri) + async def _copy_video(self, source_path: str, output_path: str): """Copy video without modification.""" cmd = [ diff --git a/backend/app/services/whisper_http_service.py b/backend/app/services/whisper_http_service.py new file mode 100644 index 0000000..815dcd4 --- /dev/null +++ b/backend/app/services/whisper_http_service.py @@ -0,0 +1,311 @@ +""" +Whisper HTTP Service - FastAPI application for Cloud Run deployment. + +This service exposes Whisper transcription as HTTP endpoints, allowing +the main application to offload CPU-intensive transcription to Cloud Run +with autoscaling. + +This module uses minimal configuration to avoid importing the full app Settings. +""" + +import logging +import os +import tempfile +from typing import Optional + +from fastapi import FastAPI, HTTPException +from google.cloud import storage +from pydantic import BaseModel, Field + +from .whisper_service import WordTimestamp, whisper_service + +# Simple logging setup (no dependency on app.core.logging) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# FastAPI app for Cloud Run +app = FastAPI( + title="Whisper Transcription Service", + description="Cloud Run service for Whisper-based audio transcription", + version="1.0.0", +) + + +# Request/Response Models +class TranscribeRequest(BaseModel): + """Request model for transcription endpoint.""" + gcs_uri: str = Field( + ..., + description="GCS URI of the audio file (gs://bucket/path/audio.mp3)", + examples=["gs://accessible-video-storage/jobs/123/audio.mp3"] + ) + + +class WordTimestampResponse(BaseModel): + """Word timestamp in response.""" + word: str + start: float + end: float + + +class TranscribeResponse(BaseModel): + """Response model for transcription endpoint.""" + words: list[WordTimestampResponse] + word_count: int + transcript: str = Field( + ..., + description="Full transcript text (words joined with spaces)" + ) + + +class SpeechGapResponse(BaseModel): + """Speech gap in response.""" + start: float + end: float + duration: float + gap_type: str + + +class TranscribeWithGapsRequest(BaseModel): + """Request model for transcription with gap analysis.""" + gcs_uri: str = Field( + ..., + description="GCS URI of the audio file" + ) + + +class TranscribeWithGapsResponse(BaseModel): + """Response model for transcription with gap analysis.""" + words: list[WordTimestampResponse] + gaps: list[SpeechGapResponse] + word_count: int + transcript: str + + +class RefinePausePointsRequest(BaseModel): + """Request model for pause point refinement.""" + placements: list[dict] = Field( + ..., + description="List of placement dicts from Gemini analysis" + ) + words: list[WordTimestampResponse] = Field( + ..., + description="Word timestamps from transcription" + ) + consolidation_threshold: float = Field( + default=5.0, + description="Threshold in seconds for consolidating nearby cues" + ) + + +class RefinePausePointsResponse(BaseModel): + """Response model for pause point refinement.""" + refined_placements: list[dict] + warnings: list[str] + + +class HealthResponse(BaseModel): + """Health check response.""" + status: str + model_loaded: bool + model_name: str + + +def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]: + """Parse GCS URI into bucket and blob path.""" + if not gcs_uri.startswith("gs://"): + raise ValueError(f"Invalid GCS URI: {gcs_uri}") + + parts = gcs_uri[5:].split("/", 1) + if len(parts) != 2: + raise ValueError(f"Invalid GCS URI format: {gcs_uri}") + + return parts[0], parts[1] + + +def _download_from_gcs(gcs_uri: str, local_path: str) -> None: + """Download file from GCS to local path.""" + bucket_name, blob_path = _parse_gcs_uri(gcs_uri) + + client = storage.Client() # Auto-detects project from environment + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + + if not blob.exists(): + raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}") + + blob.download_to_filename(local_path) + logger.info(f"Downloaded {gcs_uri} to {local_path}") + + +@app.get("/health", response_model=HealthResponse) +async def health_check(): + """Health check endpoint.""" + return HealthResponse( + status="healthy", + model_loaded=whisper_service._model is not None, + model_name=whisper_service._model_name + ) + + +@app.post("/transcribe", response_model=TranscribeResponse) +async def transcribe(request: TranscribeRequest): + """ + Transcribe audio file from GCS and return word-level timestamps. + + This endpoint: + 1. Downloads the audio file from GCS + 2. Runs Whisper transcription with word-level timestamps + 3. Returns the words with timing information + """ + logger.info(f"Transcribe request: {request.gcs_uri}") + + # Create temp file for audio + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: + tmp_path = tmp.name + + try: + # Download audio from GCS + _download_from_gcs(request.gcs_uri, tmp_path) + + # Run transcription + words = whisper_service.transcribe_audio(tmp_path) + + # Build response + word_responses = [ + WordTimestampResponse(word=w.word, start=w.start, end=w.end) + for w in words + ] + + transcript = " ".join(w.word for w in words) + + logger.info(f"Transcription complete: {len(words)} words") + + return TranscribeResponse( + words=word_responses, + word_count=len(words), + transcript=transcript + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Transcription failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + # Clean up temp file + if os.path.exists(tmp_path): + os.unlink(tmp_path) + + +@app.post("/transcribe-with-gaps", response_model=TranscribeWithGapsResponse) +async def transcribe_with_gaps(request: TranscribeWithGapsRequest): + """ + Transcribe audio and identify speech gaps for pause point placement. + + This endpoint combines transcription and gap analysis in a single call, + which is more efficient for the pause-insert rendering method. + """ + logger.info(f"Transcribe with gaps request: {request.gcs_uri}") + + # Create temp file for audio + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: + tmp_path = tmp.name + + try: + # Download audio from GCS + _download_from_gcs(request.gcs_uri, tmp_path) + + # Run transcription + words = whisper_service.transcribe_audio(tmp_path) + + # Identify speech gaps + gaps = whisper_service.identify_speech_gaps(words) + + # Build response + word_responses = [ + WordTimestampResponse(word=w.word, start=w.start, end=w.end) + for w in words + ] + + gap_responses = [ + SpeechGapResponse( + start=g.start, + end=g.end, + duration=g.duration, + gap_type=g.gap_type + ) + for g in gaps + ] + + transcript = " ".join(w.word for w in words) + + logger.info(f"Transcription with gaps complete: {len(words)} words, {len(gaps)} gaps") + + return TranscribeWithGapsResponse( + words=word_responses, + gaps=gap_responses, + word_count=len(words), + transcript=transcript + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Transcription with gaps failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + # Clean up temp file + if os.path.exists(tmp_path): + os.unlink(tmp_path) + + +@app.post("/refine-pause-points", response_model=RefinePausePointsResponse) +async def refine_pause_points(request: RefinePausePointsRequest): + """ + Refine Gemini pause points using Whisper transcript analysis. + + This endpoint takes the placements from Gemini analysis and the + word timestamps from transcription, then refines the pause points + to align with natural sentence boundaries. + """ + logger.info(f"Refine pause points request: {len(request.placements)} placements") + + try: + # Convert word responses back to WordTimestamp objects + words = [ + WordTimestamp(word=w.word, start=w.start, end=w.end) + for w in request.words + ] + + # Identify speech gaps + gaps = whisper_service.identify_speech_gaps(words) + + # Refine pause points + refined_placements, warnings = whisper_service.refine_all_pause_points( + request.placements, + words, + gaps, + request.consolidation_threshold + ) + + logger.info(f"Pause point refinement complete: {len(warnings)} warnings") + + return RefinePausePointsResponse( + refined_placements=refined_placements, + warnings=warnings + ) + + except Exception as e: + logger.error(f"Pause point refinement failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +# Startup event to pre-load Whisper model +@app.on_event("startup") +async def startup_event(): + """Pre-load Whisper model on startup to reduce first-request latency.""" + logger.info("Whisper HTTP Service starting up...") + # Access the model property to trigger lazy loading + _ = whisper_service.model + logger.info("Whisper model pre-loaded successfully") diff --git a/backend/app/services/whisper_service.py b/backend/app/services/whisper_service.py index fc8215c..1dc10b4 100644 --- a/backend/app/services/whisper_service.py +++ b/backend/app/services/whisper_service.py @@ -2,16 +2,40 @@ from __future__ import annotations +import logging import os import time from dataclasses import dataclass from faster_whisper import WhisperModel -from ..core.config import settings -from ..core.logging import get_logger +# Use simple logging for Cloud Run compatibility (no dependency on app.core.logging) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) -logger = get_logger(__name__) +# Try to import settings, fall back to env vars for Cloud Run mode +try: + from ..core.config import settings + _HAS_SETTINGS = True +except Exception: + _HAS_SETTINGS = False + settings = None # type: ignore + + +def _get_setting(name: str, default): + """Get setting value from Settings object or environment variable.""" + if _HAS_SETTINGS and settings: + return getattr(settings, name, default) + # Fall back to environment variable + env_val = os.environ.get(name.upper()) + if env_val is not None: + # Try to convert to same type as default + if isinstance(default, float): + return float(env_val) + if isinstance(default, int): + return int(env_val) + return env_val + return default @dataclass @@ -66,15 +90,15 @@ class WhisperService: def __init__(self): self._model: WhisperModel | None = None - self._model_name = getattr(settings, 'whisper_model', 'base') + self._model_name = _get_setting('whisper_model', 'base') # Gap classification thresholds (in seconds) - self.sentence_gap_threshold = getattr(settings, 'whisper_sentence_gap_threshold', 0.5) - self.phrase_gap_threshold = getattr(settings, 'whisper_phrase_gap_threshold', 0.3) - self.min_gap_threshold = getattr(settings, 'whisper_min_gap_threshold', 0.15) + self.sentence_gap_threshold = _get_setting('whisper_sentence_gap_threshold', 0.5) + self.phrase_gap_threshold = _get_setting('whisper_phrase_gap_threshold', 0.3) + self.min_gap_threshold = _get_setting('whisper_min_gap_threshold', 0.15) # Snapping configuration - self.max_search_window = getattr(settings, 'whisper_max_search_window', 30.0) + self.max_search_window = _get_setting('whisper_max_search_window', 30.0) @property def model(self) -> WhisperModel: diff --git a/backend/app/tasks/whisper_transcribe.py b/backend/app/tasks/whisper_transcribe.py index 437b3d7..3e2deda 100644 --- a/backend/app/tasks/whisper_transcribe.py +++ b/backend/app/tasks/whisper_transcribe.py @@ -1,5 +1,12 @@ -"""Celery task for Whisper transcription on dedicated queue with concurrency=1.""" +"""Celery task for Whisper transcription with Cloud Run fallback.""" +import os +import uuid + +import httpx +from google.cloud import storage + +from ..core.config import settings from ..core.logging import get_logger from ..services.whisper_service import whisper_service from . import celery_app @@ -7,14 +14,128 @@ from . import celery_app logger = get_logger(__name__) -@celery_app.task(bind=True, queue='whisper', time_limit=1800, soft_time_limit=1700) # 30 min limit +def _upload_audio_to_gcs_temp(audio_path: str, job_id: str) -> str: + """Upload local audio file to GCS temporary location and return GCS URI.""" + # Generate unique temp path + filename = os.path.basename(audio_path) + temp_path = f"temp/whisper/{job_id}/{uuid.uuid4().hex}/{filename}" + + client = storage.Client(project=settings.gcp_project_id) + bucket = client.bucket(settings.gcs_bucket) + blob = bucket.blob(temp_path) + + blob.upload_from_filename(audio_path) + gcs_uri = f"gs://{settings.gcs_bucket}/{temp_path}" + + logger.info(f"Uploaded audio to temp GCS: {gcs_uri}") + return gcs_uri + + +def _delete_gcs_temp(gcs_uri: str) -> None: + """Delete temporary GCS file.""" + try: + if not gcs_uri.startswith("gs://"): + return + + parts = gcs_uri[5:].split("/", 1) + if len(parts) != 2: + return + + bucket_name, blob_path = parts + + client = storage.Client(project=settings.gcp_project_id) + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + blob.delete() + + logger.info(f"Deleted temp GCS file: {gcs_uri}") + except Exception as e: + logger.warning(f"Failed to delete temp GCS file {gcs_uri}: {e}") + + +def _transcribe_via_cloud_run(job_id: str, audio_path: str) -> dict: + """ + Transcribe audio via Cloud Run Whisper service. + + Uploads local audio to GCS temp, calls Cloud Run, then cleans up. + """ + gcs_uri = None + try: + # Upload audio to GCS temp location + gcs_uri = _upload_audio_to_gcs_temp(audio_path, job_id) + + # Call Cloud Run service + service_url = settings.whisper_service_url.rstrip("/") + endpoint = f"{service_url}/transcribe" + + logger.info(f"Calling Whisper Cloud Run service: {endpoint}") + + with httpx.Client(timeout=300.0) as client: + response = client.post( + endpoint, + json={"gcs_uri": gcs_uri}, + headers={"Content-Type": "application/json"} + ) + + response.raise_for_status() + data = response.json() + + # Calculate audio duration from last word + words = data.get("words", []) + audio_duration = words[-1]["end"] if words else 0.0 + + result = { + "job_id": job_id, + "word_count": data.get("word_count", len(words)), + "audio_duration": audio_duration, + "words": words + } + + logger.info( + f"Cloud Run transcription complete for job {job_id}: " + f"{result['word_count']} words, {audio_duration:.2f}s duration" + ) + + return result + + finally: + # Clean up temp GCS file + if gcs_uri: + _delete_gcs_temp(gcs_uri) + + +def _transcribe_locally(job_id: str, audio_path: str) -> dict: + """Transcribe audio using local Whisper service.""" + words = whisper_service.transcribe_audio(audio_path) + + # Convert to serializable format + words_data = [w.to_dict() for w in words] + + # Calculate audio duration from last word + audio_duration = words[-1].end if words else 0.0 + + result = { + "job_id": job_id, + "word_count": len(words), + "audio_duration": audio_duration, + "words": words_data + } + + logger.info( + f"Local transcription complete for job {job_id}: " + f"{len(words)} words, {audio_duration:.2f}s duration" + ) + + return result + + +@celery_app.task(bind=True, queue='whisper', time_limit=1800, soft_time_limit=1700) def transcribe_video_audio_task(self, job_id: str, audio_path: str) -> dict: """ - Run Whisper transcription on dedicated queue with concurrency=1. + Run Whisper transcription - via Cloud Run if configured, otherwise locally. - This task is routed to the 'whisper' queue which should be processed - by a worker with --concurrency=1 to prevent memory overload when - processing multiple jobs in parallel. + When WHISPER_SERVICE_URL is set, transcription is offloaded to Cloud Run + for autoscaling. Otherwise, runs on dedicated local worker with concurrency=1. Args: job_id: Job ID for logging @@ -32,29 +153,17 @@ def transcribe_video_audio_task(self, job_id: str, audio_path: str) -> dict: logger.info(f"Starting Whisper transcription task for job {job_id}") try: - # Run transcription using the WhisperService - words = whisper_service.transcribe_audio(audio_path) - - # Convert to serializable format - words_data = [w.to_dict() for w in words] - - # Calculate audio duration from last word - audio_duration = words[-1].end if words else 0.0 - - result = { - "job_id": job_id, - "word_count": len(words), - "audio_duration": audio_duration, - "words": words_data - } - - logger.info( - f"Whisper transcription complete for job {job_id}: " - f"{len(words)} words, {audio_duration:.2f}s duration" - ) - - return result + # Use Cloud Run if configured, otherwise local + if settings.whisper_service_url: + logger.info(f"Using Cloud Run Whisper service: {settings.whisper_service_url}") + return _transcribe_via_cloud_run(job_id, audio_path) + else: + logger.info("Using local Whisper service") + return _transcribe_locally(job_id, audio_path) + except httpx.HTTPStatusError as e: + logger.error(f"Cloud Run transcription failed for job {job_id}: {e.response.status_code} - {e.response.text}") + raise except Exception as e: logger.error(f"Whisper transcription failed for job {job_id}: {e}") raise diff --git a/infra/cloud-run/cloudbuild-http-services.yaml b/infra/cloud-run/cloudbuild-http-services.yaml new file mode 100644 index 0000000..29823fe --- /dev/null +++ b/infra/cloud-run/cloudbuild-http-services.yaml @@ -0,0 +1,166 @@ +# ============================================================================= +# Cloud Build: HTTP Services (Whisper & FFmpeg) +# ============================================================================= +# Builds and deploys the autoscaling HTTP services to Cloud Run. +# +# Usage: +# gcloud builds submit --config=infra/cloud-run/cloudbuild-http-services.yaml . +# +# Or trigger automatically on push to main branch via Cloud Build triggers. +# ============================================================================= + +steps: + # ========================================================================= + # Build Docker Images + # ========================================================================= + + # Build Whisper HTTP Service image + - name: 'gcr.io/cloud-builders/docker' + args: + - 'build' + - '-f' + - 'backend/Dockerfile.whisper-service' + - '-t' + - 'gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA' + - '-t' + - 'gcr.io/$PROJECT_ID/whisper-http-service:latest' + - 'backend/' + id: 'build-whisper-service' + + # Build FFmpeg HTTP Service image + - name: 'gcr.io/cloud-builders/docker' + args: + - 'build' + - '-f' + - 'backend/Dockerfile.ffmpeg-service' + - '-t' + - 'gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA' + - '-t' + - 'gcr.io/$PROJECT_ID/ffmpeg-http-service:latest' + - 'backend/' + id: 'build-ffmpeg-service' + + # ========================================================================= + # Push Images to Container Registry + # ========================================================================= + + - name: 'gcr.io/cloud-builders/docker' + args: ['push', 'gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA'] + id: 'push-whisper-service-sha' + waitFor: ['build-whisper-service'] + + - name: 'gcr.io/cloud-builders/docker' + args: ['push', 'gcr.io/$PROJECT_ID/whisper-http-service:latest'] + id: 'push-whisper-service-latest' + waitFor: ['build-whisper-service'] + + - name: 'gcr.io/cloud-builders/docker' + args: ['push', 'gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA'] + id: 'push-ffmpeg-service-sha' + waitFor: ['build-ffmpeg-service'] + + - name: 'gcr.io/cloud-builders/docker' + args: ['push', 'gcr.io/$PROJECT_ID/ffmpeg-http-service:latest'] + id: 'push-ffmpeg-service-latest' + waitFor: ['build-ffmpeg-service'] + + # ========================================================================= + # Deploy to Cloud Run + # ========================================================================= + + # Deploy Whisper HTTP Service + - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' + entrypoint: 'gcloud' + args: + - 'run' + - 'deploy' + - 'whisper-http-service' + - '--image=gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA' + - '--region=$_REGION' + - '--platform=managed' + - '--no-allow-unauthenticated' # Internal only + - '--port=8080' + - '--memory=32Gi' + - '--cpu=8' + - '--min-instances=0' + - '--max-instances=10' + - '--concurrency=1' + - '--timeout=300' + - '--execution-environment=gen2' + - '--no-cpu-throttling' + - '--cpu-boost' + - '--set-env-vars=APP_ENV=prod,PYTHONPATH=/app,WHISPER_MODEL=medium,GCS_BUCKET=$_GCS_BUCKET' + - '--service-account=accessible-video-worker@$PROJECT_ID.iam.gserviceaccount.com' + id: 'deploy-whisper-service' + waitFor: ['push-whisper-service-sha'] + + # Deploy FFmpeg HTTP Service + - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' + entrypoint: 'gcloud' + args: + - 'run' + - 'deploy' + - 'ffmpeg-http-service' + - '--image=gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA' + - '--region=$_REGION' + - '--platform=managed' + - '--no-allow-unauthenticated' # Internal only + - '--port=8080' + - '--memory=32Gi' + - '--cpu=8' + - '--min-instances=0' + - '--max-instances=20' + - '--concurrency=1' + - '--timeout=600' + - '--execution-environment=gen2' + - '--no-cpu-throttling' + - '--cpu-boost' + - '--set-env-vars=APP_ENV=prod,PYTHONPATH=/app,GCS_BUCKET=$_GCS_BUCKET' + - '--service-account=accessible-video-worker@$PROJECT_ID.iam.gserviceaccount.com' + id: 'deploy-ffmpeg-service' + waitFor: ['push-ffmpeg-service-sha'] + + # ========================================================================= + # Output Service URLs + # ========================================================================= + + - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' + entrypoint: 'bash' + args: + - '-c' + - | + echo "==============================================" + echo "HTTP Services Deployed Successfully!" + echo "==============================================" + echo "" + echo "Whisper Service:" + gcloud run services describe whisper-http-service --region=$_REGION --format='value(status.url)' + echo "" + echo "FFmpeg Service:" + gcloud run services describe ffmpeg-http-service --region=$_REGION --format='value(status.url)' + echo "" + echo "==============================================" + echo "To enable Cloud Run usage, set these environment variables:" + echo " WHISPER_SERVICE_URL=" + echo " FFMPEG_SERVICE_URL=" + echo "==============================================" + id: 'output-urls' + waitFor: ['deploy-whisper-service', 'deploy-ffmpeg-service'] + +substitutions: + _REGION: 'us-central1' + _GCS_BUCKET: 'accessible-video' + +options: + machineType: 'E2_HIGHCPU_8' + diskSizeGb: '100' + dynamic_substitutions: true + +timeout: '2400s' # 40 minutes (Whisper image build takes time due to model download) + +# Images to be pushed (for Cloud Build to track) +images: + - 'gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA' + - 'gcr.io/$PROJECT_ID/whisper-http-service:latest' + - 'gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA' + - 'gcr.io/$PROJECT_ID/ffmpeg-http-service:latest' diff --git a/infra/cloud-run/ffmpeg-http-service.yaml b/infra/cloud-run/ffmpeg-http-service.yaml new file mode 100644 index 0000000..3d99c58 --- /dev/null +++ b/infra/cloud-run/ffmpeg-http-service.yaml @@ -0,0 +1,125 @@ +# ============================================================================= +# Cloud Run Service: FFmpeg HTTP Service +# ============================================================================= +# Autoscaling FFmpeg processing service for Cloud Run deployment. +# This service handles CPU-intensive video encoding via HTTP endpoints. +# +# Key features: +# - Scale to zero when idle (pay only for compute time used) +# - Up to 20 instances for parallel video processing +# - 8 vCPU / 32GB RAM for fast encoding +# - Startup CPU boost for faster cold starts +# - Faster startup than Whisper (no model loading) +# ============================================================================= + +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: ffmpeg-http-service + annotations: + run.googleapis.com/ingress: internal # Only accessible from within GCP + run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU +spec: + template: + metadata: + annotations: + # Autoscaling configuration + autoscaling.knative.dev/minScale: "0" # Scale to zero when idle + autoscaling.knative.dev/maxScale: "20" # Max 20 concurrent instances + + # Cloud Run Gen2 features + run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU + run.googleapis.com/cpu-throttling: "false" # Always-on CPU during requests + run.googleapis.com/startup-cpu-boost: "true" # Faster cold start + + spec: + # Only 1 FFmpeg operation at a time per instance (CPU-intensive) + containerConcurrency: 1 + + # 10-minute timeout for long encoding operations + timeoutSeconds: 600 + + serviceAccountName: accessible-video-worker@PROJECT_ID.iam.gserviceaccount.com + + containers: + - image: gcr.io/PROJECT_ID/ffmpeg-http-service:latest + + ports: + - containerPort: 8080 + + env: + - name: APP_ENV + value: "prod" + - name: PYTHONPATH + value: "/app" + - name: PYTHONUNBUFFERED + value: "1" + - name: PYTHONDONTWRITEBYTECODE + value: "1" + + # GCP Configuration + - name: GCP_PROJECT_ID + value: "PROJECT_ID" + - name: GCS_BUCKET + valueFrom: + secretKeyRef: + name: gcs-bucket-name + key: latest + + # MongoDB for job tracking (optional, for logging) + - name: MONGODB_URL + valueFrom: + secretKeyRef: + name: mongodb-url + key: latest + + # OpenTelemetry configuration + - name: OTEL_SERVICE_NAME + value: "ffmpeg-http-service" + - name: OTEL_SERVICE_VERSION + value: "1.0.0" + - name: OTEL_TRACES_EXPORTER + value: "gcp_trace" + + # Sentry configuration (optional) + - name: SENTRY_DSN + valueFrom: + secretKeyRef: + name: sentry-dsn + key: latest + - name: SENTRY_ENVIRONMENT + value: "production" + + resources: + limits: + memory: "32Gi" + cpu: "8000m" # 8 vCPU + requests: + memory: "4Gi" + cpu: "2000m" # 2 vCPU minimum + + # Health checks + startupProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 5 # FFmpeg starts fast (no model to load) + periodSeconds: 5 + timeoutSeconds: 5 + failureThreshold: 6 + + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 15 + periodSeconds: 30 + timeoutSeconds: 10 + + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 diff --git a/infra/cloud-run/whisper-http-service.yaml b/infra/cloud-run/whisper-http-service.yaml new file mode 100644 index 0000000..fcbf2ad --- /dev/null +++ b/infra/cloud-run/whisper-http-service.yaml @@ -0,0 +1,128 @@ +# ============================================================================= +# Cloud Run Service: Whisper HTTP Service +# ============================================================================= +# Autoscaling Whisper transcription service for Cloud Run deployment. +# This service handles CPU-intensive Whisper transcription via HTTP endpoints. +# +# Key features: +# - Scale to zero when idle (pay only for compute time used) +# - Up to 10 instances for parallel transcription +# - 8 vCPU / 32GB RAM for fast transcription +# - Startup CPU boost for faster cold starts +# ============================================================================= + +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: whisper-http-service + annotations: + run.googleapis.com/ingress: internal # Only accessible from within GCP + run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU +spec: + template: + metadata: + annotations: + # Autoscaling configuration + autoscaling.knative.dev/minScale: "0" # Scale to zero when idle + autoscaling.knative.dev/maxScale: "10" # Max 10 concurrent instances + + # Cloud Run Gen2 features + run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU + run.googleapis.com/cpu-throttling: "false" # Always-on CPU during requests + run.googleapis.com/startup-cpu-boost: "true" # Faster cold start + + spec: + # Only 1 transcription at a time per instance (Whisper is CPU-intensive) + containerConcurrency: 1 + + # 5-minute timeout for long transcriptions + timeoutSeconds: 300 + + serviceAccountName: accessible-video-worker@PROJECT_ID.iam.gserviceaccount.com + + containers: + - image: gcr.io/PROJECT_ID/whisper-http-service:latest + + ports: + - containerPort: 8080 + + env: + - name: APP_ENV + value: "prod" + - name: PYTHONPATH + value: "/app" + - name: PYTHONUNBUFFERED + value: "1" + - name: PYTHONDONTWRITEBYTECODE + value: "1" + + # GCP Configuration + - name: GCP_PROJECT_ID + value: "PROJECT_ID" + - name: GCS_BUCKET + valueFrom: + secretKeyRef: + name: gcs-bucket-name + key: latest + + # MongoDB for job tracking (optional, for logging) + - name: MONGODB_URL + valueFrom: + secretKeyRef: + name: mongodb-url + key: latest + + # Whisper Configuration + - name: WHISPER_MODEL + value: "medium" + + # OpenTelemetry configuration + - name: OTEL_SERVICE_NAME + value: "whisper-http-service" + - name: OTEL_SERVICE_VERSION + value: "1.0.0" + - name: OTEL_TRACES_EXPORTER + value: "gcp_trace" + + # Sentry configuration (optional) + - name: SENTRY_DSN + valueFrom: + secretKeyRef: + name: sentry-dsn + key: latest + - name: SENTRY_ENVIRONMENT + value: "production" + + resources: + limits: + memory: "32Gi" + cpu: "8000m" # 8 vCPU + requests: + memory: "8Gi" + cpu: "4000m" # 4 vCPU minimum + + # Health checks + startupProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 # Wait for Whisper model to load + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 6 + + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 60 + periodSeconds: 30 + timeoutSeconds: 10 + + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5