feat: add Cloud Run HTTP services for Whisper and FFmpeg
Migrate CPU-intensive workloads to Cloud Run for autoscaling: - Add Whisper HTTP service (FastAPI) with /transcribe endpoint - Add FFmpeg HTTP service (FastAPI) with /encode, /probe, /extract-frame, etc. - Add Dockerfiles for both services (8 vCPU, 32GB RAM, Gen2) - Add Cloud Build config for CI/CD deployment - Add Cloud Run service YAML configs with scale-to-zero - Update whisper_transcribe.py to call Cloud Run when WHISPER_SERVICE_URL set - Update video_renderer.py to call Cloud Run when FFMPEG_SERVICE_URL set - Update whisper_service.py for Cloud Run compatibility (no settings dependency) - Add ffmpeg_service_url and whisper_service_url to config.py Services scale 0→N based on request load, falling back to local execution when service URLs are not configured (hybrid mode). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
a8cc0b65a4
commit
79440929f4
11 changed files with 2204 additions and 56 deletions
97
backend/Dockerfile.ffmpeg-service
Normal file
97
backend/Dockerfile.ffmpeg-service
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
# =============================================================================
|
||||
# Dockerfile for FFmpeg HTTP Service - Cloud Run Deployment
|
||||
# =============================================================================
|
||||
# This Dockerfile creates a FastAPI-based HTTP service for FFmpeg operations,
|
||||
# designed for deployment on Google Cloud Run with autoscaling.
|
||||
#
|
||||
# Key features:
|
||||
# - Lightweight image with FFmpeg + FastAPI (no Whisper model)
|
||||
# - Fast startup for quick autoscaling
|
||||
# - Optimized for 8 vCPU / 32GB RAM Cloud Run instances
|
||||
# =============================================================================
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Stage 1: Builder - Install Python dependencies using Poetry
|
||||
# -----------------------------------------------------------------------------
|
||||
FROM python:3.11-slim AS builder
|
||||
|
||||
# Install build dependencies
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
build-essential \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install Poetry
|
||||
RUN pip install --no-cache-dir poetry==1.8.2
|
||||
|
||||
# Configure Poetry
|
||||
ENV POETRY_NO_INTERACTION=1 \
|
||||
POETRY_VIRTUALENVS_CREATE=false \
|
||||
POETRY_CACHE_DIR=/tmp/poetry_cache
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy dependency files
|
||||
COPY pyproject.toml poetry.lock ./
|
||||
|
||||
# Install dependencies (exclude faster-whisper to keep image small)
|
||||
# Note: We still install all deps for simplicity, but could optimize with groups
|
||||
RUN poetry config virtualenvs.create false \
|
||||
&& poetry install --only main --no-interaction --no-ansi \
|
||||
&& rm -rf $POETRY_CACHE_DIR
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Stage 2: Runtime - FFmpeg HTTP Service
|
||||
# -----------------------------------------------------------------------------
|
||||
FROM python:3.11-slim AS runtime
|
||||
|
||||
# Install runtime dependencies including FFmpeg
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libmagic1 \
|
||||
curl \
|
||||
tini \
|
||||
ffmpeg \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& apt-get clean
|
||||
|
||||
# Create non-root user for security
|
||||
RUN groupadd --gid 1000 app \
|
||||
&& useradd --uid 1000 --gid app --shell /bin/bash --create-home app
|
||||
|
||||
# Copy Python packages from builder
|
||||
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
|
||||
COPY --from=builder /usr/local/bin /usr/local/bin
|
||||
|
||||
# Set environment variables
|
||||
ENV PYTHONPATH=/app \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
PYTHONDONTWRITEBYTECODE=1 \
|
||||
APP_ENV=prod
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy application code
|
||||
COPY --chown=app:app . .
|
||||
|
||||
# Switch to non-root user
|
||||
USER app
|
||||
|
||||
# Health check
|
||||
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
|
||||
CMD curl -f http://localhost:8080/health || exit 1
|
||||
|
||||
# Expose HTTP port (Cloud Run uses 8080 by default)
|
||||
EXPOSE 8080
|
||||
|
||||
# Use tini as init system
|
||||
ENTRYPOINT ["tini", "--"]
|
||||
|
||||
# Start Uvicorn server
|
||||
# - 1 worker since Cloud Run uses containerConcurrency=1
|
||||
# - Bind to 0.0.0.0:8080 for Cloud Run
|
||||
# - Timeout of 300s for long FFmpeg operations
|
||||
CMD ["uvicorn", "app.services.ffmpeg_http_service:app", \
|
||||
"--host", "0.0.0.0", \
|
||||
"--port", "8080", \
|
||||
"--workers", "1", \
|
||||
"--timeout-keep-alive", "300"]
|
||||
101
backend/Dockerfile.whisper-service
Normal file
101
backend/Dockerfile.whisper-service
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
# =============================================================================
|
||||
# Dockerfile for Whisper HTTP Service - Cloud Run Deployment
|
||||
# =============================================================================
|
||||
# This Dockerfile creates a FastAPI-based HTTP service for Whisper transcription,
|
||||
# designed for deployment on Google Cloud Run with autoscaling.
|
||||
#
|
||||
# Key features:
|
||||
# - Pre-downloads Whisper model during build (no cold start model loading)
|
||||
# - Runs FastAPI with Uvicorn for HTTP handling
|
||||
# - Optimized for 8 vCPU / 32GB RAM Cloud Run instances
|
||||
# =============================================================================
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Stage 1: Builder - Install Python dependencies using Poetry
|
||||
# -----------------------------------------------------------------------------
|
||||
FROM python:3.11-slim AS builder
|
||||
|
||||
# Install build dependencies
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
build-essential \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install Poetry
|
||||
RUN pip install --no-cache-dir poetry==1.8.2
|
||||
|
||||
# Configure Poetry to not create virtual environment
|
||||
ENV POETRY_NO_INTERACTION=1 \
|
||||
POETRY_VIRTUALENVS_CREATE=false \
|
||||
POETRY_CACHE_DIR=/tmp/poetry_cache
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy dependency files
|
||||
COPY pyproject.toml poetry.lock ./
|
||||
|
||||
# Install dependencies
|
||||
RUN poetry config virtualenvs.create false \
|
||||
&& poetry install --only main --no-interaction --no-ansi \
|
||||
&& rm -rf $POETRY_CACHE_DIR
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Stage 2: Runtime - Whisper HTTP Service
|
||||
# -----------------------------------------------------------------------------
|
||||
FROM python:3.11-slim AS runtime
|
||||
|
||||
# Install runtime dependencies
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libmagic1 \
|
||||
curl \
|
||||
tini \
|
||||
ffmpeg \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& apt-get clean
|
||||
|
||||
# Create non-root user for security
|
||||
RUN groupadd --gid 1000 app \
|
||||
&& useradd --uid 1000 --gid app --shell /bin/bash --create-home app
|
||||
|
||||
# Copy Python packages from builder
|
||||
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
|
||||
COPY --from=builder /usr/local/bin /usr/local/bin
|
||||
|
||||
# Set environment variables
|
||||
ENV PYTHONPATH=/app \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
PYTHONDONTWRITEBYTECODE=1 \
|
||||
APP_ENV=prod
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy application code
|
||||
COPY --chown=app:app . .
|
||||
|
||||
# Switch to non-root user
|
||||
USER app
|
||||
|
||||
# Pre-download Whisper medium model during build
|
||||
# This prevents cold start delays when the service scales up
|
||||
# Model is cached in ~/.cache/huggingface/hub (~1.5GB)
|
||||
RUN python -c "from faster_whisper import WhisperModel; WhisperModel('medium', device='cpu', compute_type='int8')"
|
||||
|
||||
# Health check
|
||||
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
|
||||
CMD curl -f http://localhost:8080/health || exit 1
|
||||
|
||||
# Expose HTTP port (Cloud Run uses 8080 by default)
|
||||
EXPOSE 8080
|
||||
|
||||
# Use tini as init system
|
||||
ENTRYPOINT ["tini", "--"]
|
||||
|
||||
# Start Uvicorn server
|
||||
# - 1 worker since Cloud Run uses containerConcurrency=1
|
||||
# - Bind to 0.0.0.0:8080 for Cloud Run
|
||||
# - Timeout of 300s for long transcriptions
|
||||
CMD ["uvicorn", "app.services.whisper_http_service:app", \
|
||||
"--host", "0.0.0.0", \
|
||||
"--port", "8080", \
|
||||
"--workers", "1", \
|
||||
"--timeout-keep-alive", "300"]
|
||||
|
|
@ -170,6 +170,11 @@ class Settings(BaseSettings):
|
|||
whisper_phrase_gap_threshold: float = 0.3 # Gap duration to classify as phrase boundary
|
||||
whisper_min_gap_threshold: float = 0.15 # Minimum gap duration to consider
|
||||
|
||||
# Cloud Run Service URLs (empty = use local processing)
|
||||
# When set, CPU-intensive work is offloaded to Cloud Run with autoscaling
|
||||
whisper_service_url: str = "" # e.g., "https://whisper-service-xxx.run.app"
|
||||
ffmpeg_service_url: str = "" # e.g., "https://ffmpeg-service-xxx.run.app"
|
||||
|
||||
# Email
|
||||
sendgrid_api_key: str
|
||||
email_from: str
|
||||
|
|
|
|||
539
backend/app/services/ffmpeg_http_service.py
Normal file
539
backend/app/services/ffmpeg_http_service.py
Normal file
|
|
@ -0,0 +1,539 @@
|
|||
"""
|
||||
FFmpeg HTTP Service - FastAPI application for Cloud Run deployment.
|
||||
|
||||
This service exposes FFmpeg operations as HTTP endpoints, allowing
|
||||
the main application to offload CPU-intensive video encoding to Cloud Run
|
||||
with autoscaling.
|
||||
|
||||
This module uses minimal configuration to avoid importing the full app Settings.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import uuid
|
||||
from typing import Any, Optional
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from google.cloud import storage
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# Minimal configuration for Cloud Run deployment
|
||||
# These are read directly from environment variables
|
||||
GCS_BUCKET = os.environ.get("GCS_BUCKET", "")
|
||||
|
||||
# Simple logging setup (no dependency on app.core.logging)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# FastAPI app for Cloud Run
|
||||
app = FastAPI(
|
||||
title="FFmpeg Processing Service",
|
||||
description="Cloud Run service for FFmpeg video processing operations",
|
||||
version="1.0.0",
|
||||
)
|
||||
|
||||
|
||||
# Request/Response Models
|
||||
class RunFFmpegRequest(BaseModel):
|
||||
"""Request model for running an FFmpeg command."""
|
||||
input_gcs_uris: list[str] = Field(
|
||||
...,
|
||||
description="GCS URIs of input files (will be downloaded and mapped to local paths)"
|
||||
)
|
||||
output_gcs_uri: str = Field(
|
||||
...,
|
||||
description="GCS URI where the output file should be uploaded"
|
||||
)
|
||||
command_template: list[str] = Field(
|
||||
...,
|
||||
description="FFmpeg command template with {input_0}, {input_1}, etc. and {output} placeholders"
|
||||
)
|
||||
timeout: int = Field(
|
||||
default=3600,
|
||||
description="Command timeout in seconds"
|
||||
)
|
||||
|
||||
|
||||
class RunFFmpegResponse(BaseModel):
|
||||
"""Response model for FFmpeg command execution."""
|
||||
success: bool
|
||||
output_gcs_uri: str
|
||||
stderr: str = ""
|
||||
duration_seconds: float = 0.0
|
||||
|
||||
|
||||
class ProbeRequest(BaseModel):
|
||||
"""Request model for probing video properties."""
|
||||
gcs_uri: str = Field(
|
||||
...,
|
||||
description="GCS URI of the video file to probe"
|
||||
)
|
||||
|
||||
|
||||
class ProbeResponse(BaseModel):
|
||||
"""Response model for video probe."""
|
||||
width: int
|
||||
height: int
|
||||
fps: float
|
||||
duration: float
|
||||
sample_rate: int
|
||||
channels: int
|
||||
codec_name: str
|
||||
pix_fmt: str
|
||||
|
||||
|
||||
class EncodeSegmentRequest(BaseModel):
|
||||
"""Request for encoding a video segment."""
|
||||
source_gcs_uri: str
|
||||
output_gcs_uri: str
|
||||
start_time: float
|
||||
duration: float
|
||||
fps: float = 30.0
|
||||
pix_fmt: str = "yuv420p"
|
||||
sample_rate: int = 44100
|
||||
channels: int = 2
|
||||
|
||||
|
||||
class ExtractFrameRequest(BaseModel):
|
||||
"""Request for extracting a single frame."""
|
||||
source_gcs_uri: str
|
||||
output_gcs_uri: str
|
||||
time_point: float
|
||||
|
||||
|
||||
class CreateFreezeSegmentRequest(BaseModel):
|
||||
"""Request for creating a freeze-frame video with audio."""
|
||||
frame_gcs_uri: str
|
||||
audio_gcs_uri: str
|
||||
output_gcs_uri: str
|
||||
width: int
|
||||
height: int
|
||||
fps: float = 30.0
|
||||
pix_fmt: str = "yuv420p"
|
||||
sample_rate: int = 44100
|
||||
channels: int = 2
|
||||
|
||||
|
||||
class ConcatenateRequest(BaseModel):
|
||||
"""Request for concatenating video segments."""
|
||||
segment_gcs_uris: list[str]
|
||||
output_gcs_uri: str
|
||||
|
||||
|
||||
class HealthResponse(BaseModel):
|
||||
"""Health check response."""
|
||||
status: str
|
||||
ffmpeg_version: str
|
||||
|
||||
|
||||
def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]:
|
||||
"""Parse GCS URI into bucket and blob path."""
|
||||
if not gcs_uri.startswith("gs://"):
|
||||
raise ValueError(f"Invalid GCS URI: {gcs_uri}")
|
||||
parts = gcs_uri[5:].split("/", 1)
|
||||
if len(parts) != 2:
|
||||
raise ValueError(f"Invalid GCS URI format: {gcs_uri}")
|
||||
return parts[0], parts[1]
|
||||
|
||||
|
||||
def _download_from_gcs(gcs_uri: str, local_path: str) -> None:
|
||||
"""Download file from GCS to local path."""
|
||||
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
|
||||
client = storage.Client() # Auto-detects project from environment
|
||||
bucket = client.bucket(bucket_name)
|
||||
blob = bucket.blob(blob_path)
|
||||
if not blob.exists():
|
||||
raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}")
|
||||
blob.download_to_filename(local_path)
|
||||
logger.debug(f"Downloaded {gcs_uri} to {local_path}")
|
||||
|
||||
|
||||
def _upload_to_gcs(local_path: str, gcs_uri: str) -> None:
|
||||
"""Upload local file to GCS."""
|
||||
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
|
||||
client = storage.Client() # Auto-detects project from environment
|
||||
bucket = client.bucket(bucket_name)
|
||||
blob = bucket.blob(blob_path)
|
||||
blob.upload_from_filename(local_path)
|
||||
logger.debug(f"Uploaded {local_path} to {gcs_uri}")
|
||||
|
||||
|
||||
def _run_command(cmd: list[str], timeout: int = 3600) -> tuple[bool, str, str]:
|
||||
"""Run a command and return success, stdout, stderr."""
|
||||
cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '')
|
||||
logger.info(f"Executing: {cmd_preview}")
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout
|
||||
)
|
||||
return result.returncode == 0, result.stdout, result.stderr
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "", f"Command timed out after {timeout}s"
|
||||
except Exception as e:
|
||||
return False, "", str(e)
|
||||
|
||||
|
||||
def _get_ffmpeg_version() -> str:
|
||||
"""Get FFmpeg version string."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ffmpeg", "-version"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
first_line = result.stdout.split('\n')[0]
|
||||
return first_line
|
||||
except Exception:
|
||||
return "unknown"
|
||||
|
||||
|
||||
@app.get("/health", response_model=HealthResponse)
|
||||
async def health_check():
|
||||
"""Health check endpoint."""
|
||||
return HealthResponse(
|
||||
status="healthy",
|
||||
ffmpeg_version=_get_ffmpeg_version()
|
||||
)
|
||||
|
||||
|
||||
@app.post("/run-ffmpeg", response_model=RunFFmpegResponse)
|
||||
async def run_ffmpeg(request: RunFFmpegRequest):
|
||||
"""
|
||||
Run a generic FFmpeg command with GCS input/output.
|
||||
|
||||
The command_template should use placeholders:
|
||||
- {input_0}, {input_1}, etc. for input files
|
||||
- {output} for the output file
|
||||
|
||||
Example:
|
||||
{
|
||||
"input_gcs_uris": ["gs://bucket/video.mp4"],
|
||||
"output_gcs_uri": "gs://bucket/output.mp4",
|
||||
"command_template": ["ffmpeg", "-y", "-i", "{input_0}", "-c:v", "libx264", "{output}"]
|
||||
}
|
||||
"""
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
try:
|
||||
# Download input files
|
||||
input_paths = []
|
||||
for i, gcs_uri in enumerate(request.input_gcs_uris):
|
||||
ext = os.path.splitext(gcs_uri)[1] or ".bin"
|
||||
local_path = os.path.join(tmpdir, f"input_{i}{ext}")
|
||||
_download_from_gcs(gcs_uri, local_path)
|
||||
input_paths.append(local_path)
|
||||
|
||||
# Prepare output path
|
||||
output_ext = os.path.splitext(request.output_gcs_uri)[1] or ".mp4"
|
||||
output_path = os.path.join(tmpdir, f"output{output_ext}")
|
||||
|
||||
# Build command from template
|
||||
cmd = []
|
||||
for part in request.command_template:
|
||||
if part == "{output}":
|
||||
cmd.append(output_path)
|
||||
elif part.startswith("{input_") and part.endswith("}"):
|
||||
idx = int(part[7:-1])
|
||||
if idx < len(input_paths):
|
||||
cmd.append(input_paths[idx])
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid input index: {idx}")
|
||||
else:
|
||||
cmd.append(part)
|
||||
|
||||
# Run FFmpeg
|
||||
success, stdout, stderr = _run_command(cmd, request.timeout)
|
||||
|
||||
if not success:
|
||||
logger.error(f"FFmpeg failed: {stderr[-500:]}")
|
||||
raise HTTPException(status_code=500, detail=f"FFmpeg failed: {stderr[-500:]}")
|
||||
|
||||
# Upload output
|
||||
_upload_to_gcs(output_path, request.output_gcs_uri)
|
||||
|
||||
duration = time.time() - start_time
|
||||
logger.info(f"FFmpeg completed in {duration:.2f}s")
|
||||
|
||||
return RunFFmpegResponse(
|
||||
success=True,
|
||||
output_gcs_uri=request.output_gcs_uri,
|
||||
stderr=stderr[-200:] if stderr else "",
|
||||
duration_seconds=duration
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"FFmpeg operation failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/probe", response_model=ProbeResponse)
|
||||
async def probe_video(request: ProbeRequest):
|
||||
"""
|
||||
Get video properties using FFprobe.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
try:
|
||||
# Download video
|
||||
ext = os.path.splitext(request.gcs_uri)[1] or ".mp4"
|
||||
local_path = os.path.join(tmpdir, f"video{ext}")
|
||||
_download_from_gcs(request.gcs_uri, local_path)
|
||||
|
||||
# Run ffprobe
|
||||
cmd = [
|
||||
"ffprobe", "-v", "quiet",
|
||||
"-show_streams", "-of", "json",
|
||||
local_path
|
||||
]
|
||||
success, stdout, stderr = _run_command(cmd, timeout=60)
|
||||
|
||||
if not success:
|
||||
raise HTTPException(status_code=500, detail=f"FFprobe failed: {stderr}")
|
||||
|
||||
# Parse output
|
||||
data = json.loads(stdout)
|
||||
streams = data.get("streams", [])
|
||||
|
||||
video_stream = next((s for s in streams if s.get("codec_type") == "video"), {})
|
||||
audio_stream = next((s for s in streams if s.get("codec_type") == "audio"), {})
|
||||
|
||||
# Parse FPS
|
||||
fps = 30.0
|
||||
if "r_frame_rate" in video_stream:
|
||||
fps_parts = video_stream["r_frame_rate"].split("/")
|
||||
if len(fps_parts) == 2 and int(fps_parts[1]) != 0:
|
||||
fps = int(fps_parts[0]) / int(fps_parts[1])
|
||||
|
||||
return ProbeResponse(
|
||||
width=int(video_stream.get("width", 1920)),
|
||||
height=int(video_stream.get("height", 1080)),
|
||||
fps=fps,
|
||||
duration=float(video_stream.get("duration", 0) or audio_stream.get("duration", 0)),
|
||||
sample_rate=int(audio_stream.get("sample_rate", 44100)),
|
||||
channels=int(audio_stream.get("channels", 2)),
|
||||
codec_name=video_stream.get("codec_name", "h264"),
|
||||
pix_fmt=video_stream.get("pix_fmt", "yuv420p")
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Probe failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/encode-segment", response_model=RunFFmpegResponse)
|
||||
async def encode_segment(request: EncodeSegmentRequest):
|
||||
"""
|
||||
Extract and encode a video segment with re-encoding for frame accuracy.
|
||||
"""
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
try:
|
||||
# Download source
|
||||
ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4"
|
||||
source_path = os.path.join(tmpdir, f"source{ext}")
|
||||
_download_from_gcs(request.source_gcs_uri, source_path)
|
||||
|
||||
# Output path
|
||||
output_path = os.path.join(tmpdir, "segment.mp4")
|
||||
|
||||
# Build FFmpeg command
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-ss", str(request.start_time),
|
||||
"-i", source_path,
|
||||
"-t", str(request.duration),
|
||||
"-c:v", "libx264", "-preset", "fast",
|
||||
"-pix_fmt", request.pix_fmt,
|
||||
"-r", str(request.fps),
|
||||
"-c:a", "aac", "-ar", str(request.sample_rate),
|
||||
"-ac", str(request.channels), "-b:a", "192k",
|
||||
"-video_track_timescale", "90000",
|
||||
output_path
|
||||
]
|
||||
|
||||
success, stdout, stderr = _run_command(cmd)
|
||||
if not success:
|
||||
raise HTTPException(status_code=500, detail=f"Encode failed: {stderr[-500:]}")
|
||||
|
||||
_upload_to_gcs(output_path, request.output_gcs_uri)
|
||||
|
||||
return RunFFmpegResponse(
|
||||
success=True,
|
||||
output_gcs_uri=request.output_gcs_uri,
|
||||
duration_seconds=time.time() - start_time
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Encode segment failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/extract-frame", response_model=RunFFmpegResponse)
|
||||
async def extract_frame(request: ExtractFrameRequest):
|
||||
"""
|
||||
Extract a single frame from a video as PNG.
|
||||
"""
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
try:
|
||||
# Download source
|
||||
ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4"
|
||||
source_path = os.path.join(tmpdir, f"source{ext}")
|
||||
_download_from_gcs(request.source_gcs_uri, source_path)
|
||||
|
||||
output_path = os.path.join(tmpdir, "frame.png")
|
||||
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-ss", str(request.time_point),
|
||||
"-i", source_path,
|
||||
"-frames:v", "1",
|
||||
"-q:v", "2",
|
||||
output_path
|
||||
]
|
||||
|
||||
success, stdout, stderr = _run_command(cmd, timeout=60)
|
||||
if not success:
|
||||
raise HTTPException(status_code=500, detail=f"Frame extraction failed: {stderr[-500:]}")
|
||||
|
||||
_upload_to_gcs(output_path, request.output_gcs_uri)
|
||||
|
||||
return RunFFmpegResponse(
|
||||
success=True,
|
||||
output_gcs_uri=request.output_gcs_uri,
|
||||
duration_seconds=time.time() - start_time
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Extract frame failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/create-freeze-segment", response_model=RunFFmpegResponse)
|
||||
async def create_freeze_segment(request: CreateFreezeSegmentRequest):
|
||||
"""
|
||||
Create a freeze-frame video segment with audio overlay.
|
||||
"""
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
try:
|
||||
# Download inputs
|
||||
frame_path = os.path.join(tmpdir, "frame.png")
|
||||
audio_path = os.path.join(tmpdir, "audio.mp3")
|
||||
_download_from_gcs(request.frame_gcs_uri, frame_path)
|
||||
_download_from_gcs(request.audio_gcs_uri, audio_path)
|
||||
|
||||
output_path = os.path.join(tmpdir, "freeze.mp4")
|
||||
|
||||
# Build FFmpeg command for freeze frame with audio
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-loop", "1", "-i", frame_path,
|
||||
"-i", audio_path,
|
||||
"-c:v", "libx264", "-preset", "fast",
|
||||
"-tune", "stillimage",
|
||||
"-pix_fmt", request.pix_fmt,
|
||||
"-r", str(request.fps),
|
||||
"-vf", f"scale={request.width}:{request.height}:force_original_aspect_ratio=decrease,"
|
||||
f"pad={request.width}:{request.height}:(ow-iw)/2:(oh-ih)/2",
|
||||
"-c:a", "aac", "-ar", str(request.sample_rate),
|
||||
"-ac", str(request.channels), "-b:a", "192k",
|
||||
"-video_track_timescale", "90000",
|
||||
"-shortest",
|
||||
output_path
|
||||
]
|
||||
|
||||
success, stdout, stderr = _run_command(cmd)
|
||||
if not success:
|
||||
raise HTTPException(status_code=500, detail=f"Freeze segment failed: {stderr[-500:]}")
|
||||
|
||||
_upload_to_gcs(output_path, request.output_gcs_uri)
|
||||
|
||||
return RunFFmpegResponse(
|
||||
success=True,
|
||||
output_gcs_uri=request.output_gcs_uri,
|
||||
duration_seconds=time.time() - start_time
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Create freeze segment failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/concatenate", response_model=RunFFmpegResponse)
|
||||
async def concatenate_segments(request: ConcatenateRequest):
|
||||
"""
|
||||
Concatenate multiple video segments using concat demuxer.
|
||||
"""
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
try:
|
||||
# Download all segments
|
||||
segment_paths = []
|
||||
for i, gcs_uri in enumerate(request.segment_gcs_uris):
|
||||
ext = os.path.splitext(gcs_uri)[1] or ".mp4"
|
||||
local_path = os.path.join(tmpdir, f"segment_{i}{ext}")
|
||||
_download_from_gcs(gcs_uri, local_path)
|
||||
segment_paths.append(local_path)
|
||||
|
||||
# Create concat file
|
||||
concat_file = os.path.join(tmpdir, "concat.txt")
|
||||
with open(concat_file, "w") as f:
|
||||
for path in segment_paths:
|
||||
f.write(f"file '{path}'\n")
|
||||
|
||||
output_path = os.path.join(tmpdir, "output.mp4")
|
||||
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-f", "concat",
|
||||
"-safe", "0",
|
||||
"-i", concat_file,
|
||||
"-c", "copy",
|
||||
output_path
|
||||
]
|
||||
|
||||
success, stdout, stderr = _run_command(cmd)
|
||||
if not success:
|
||||
raise HTTPException(status_code=500, detail=f"Concatenation failed: {stderr[-500:]}")
|
||||
|
||||
_upload_to_gcs(output_path, request.output_gcs_uri)
|
||||
|
||||
return RunFFmpegResponse(
|
||||
success=True,
|
||||
output_gcs_uri=request.output_gcs_uri,
|
||||
duration_seconds=time.time() - start_time
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Concatenate failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
FFmpeg operations are dispatched to a dedicated Celery queue (ffmpeg) with concurrency=1
|
||||
to prevent server overload when multiple render tasks run in parallel.
|
||||
|
||||
When FFMPEG_SERVICE_URL is configured, operations are offloaded to Cloud Run for autoscaling.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
|
@ -10,6 +12,10 @@ import os
|
|||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
import httpx
|
||||
from google.cloud import storage
|
||||
|
||||
from ..core.config import settings
|
||||
from ..core.logging import get_logger
|
||||
|
|
@ -32,6 +38,160 @@ class VideoRendererService:
|
|||
# Audio ducking settings
|
||||
self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3)
|
||||
self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200)
|
||||
# Cloud Run support
|
||||
self._http_client: httpx.Client | None = None
|
||||
self._gcs_client: storage.Client | None = None
|
||||
# Source video caching for Cloud Run (uploaded once, reused across operations)
|
||||
self._cached_source_gcs_uri: str | None = None
|
||||
|
||||
@property
|
||||
def _use_cloud_run(self) -> bool:
|
||||
"""Check if Cloud Run FFmpeg service is configured."""
|
||||
return bool(settings.ffmpeg_service_url)
|
||||
|
||||
def _get_http_client(self) -> httpx.Client:
|
||||
"""Get or create HTTP client for Cloud Run calls."""
|
||||
if self._http_client is None:
|
||||
# 10-minute timeout for long FFmpeg operations
|
||||
self._http_client = httpx.Client(timeout=600.0)
|
||||
return self._http_client
|
||||
|
||||
def _get_gcs_client(self) -> storage.Client:
|
||||
"""Get or create GCS client for file transfers."""
|
||||
if self._gcs_client is None:
|
||||
self._gcs_client = storage.Client()
|
||||
return self._gcs_client
|
||||
|
||||
def _upload_to_gcs_temp(self, local_path: str, prefix: str) -> str:
|
||||
"""
|
||||
Upload local file to GCS temp location and return gs:// URI.
|
||||
|
||||
Args:
|
||||
local_path: Path to local file
|
||||
prefix: Prefix for temp path (e.g., "source", "frame", "audio")
|
||||
|
||||
Returns:
|
||||
GCS URI (gs://bucket/temp/ffmpeg/{prefix}/{uuid}/{filename})
|
||||
"""
|
||||
client = self._get_gcs_client()
|
||||
bucket = client.bucket(settings.gcs_bucket)
|
||||
|
||||
filename = os.path.basename(local_path)
|
||||
gcs_path = f"temp/ffmpeg/{prefix}/{uuid4().hex}/{filename}"
|
||||
|
||||
blob = bucket.blob(gcs_path)
|
||||
blob.upload_from_filename(local_path)
|
||||
|
||||
gcs_uri = f"gs://{settings.gcs_bucket}/{gcs_path}"
|
||||
logger.debug(f"Uploaded {local_path} to {gcs_uri}")
|
||||
return gcs_uri
|
||||
|
||||
def _download_from_gcs_temp(self, gcs_uri: str, local_path: str):
|
||||
"""
|
||||
Download file from GCS to local path.
|
||||
|
||||
Args:
|
||||
gcs_uri: GCS URI (gs://bucket/path)
|
||||
local_path: Local destination path
|
||||
"""
|
||||
client = self._get_gcs_client()
|
||||
|
||||
# Parse gs:// URI
|
||||
if not gcs_uri.startswith("gs://"):
|
||||
raise ValueError(f"Invalid GCS URI: {gcs_uri}")
|
||||
|
||||
path_parts = gcs_uri[5:].split("/", 1)
|
||||
bucket_name = path_parts[0]
|
||||
blob_path = path_parts[1] if len(path_parts) > 1 else ""
|
||||
|
||||
bucket = client.bucket(bucket_name)
|
||||
blob = bucket.blob(blob_path)
|
||||
|
||||
# Ensure parent directory exists
|
||||
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
||||
blob.download_to_filename(local_path)
|
||||
logger.debug(f"Downloaded {gcs_uri} to {local_path}")
|
||||
|
||||
def _delete_gcs_temp(self, gcs_uri: str):
|
||||
"""
|
||||
Delete temporary GCS file.
|
||||
|
||||
Args:
|
||||
gcs_uri: GCS URI to delete (gs://bucket/path)
|
||||
"""
|
||||
if not gcs_uri:
|
||||
return
|
||||
|
||||
try:
|
||||
client = self._get_gcs_client()
|
||||
|
||||
# Parse gs:// URI
|
||||
if not gcs_uri.startswith("gs://"):
|
||||
logger.warning(f"Invalid GCS URI for deletion: {gcs_uri}")
|
||||
return
|
||||
|
||||
path_parts = gcs_uri[5:].split("/", 1)
|
||||
bucket_name = path_parts[0]
|
||||
blob_path = path_parts[1] if len(path_parts) > 1 else ""
|
||||
|
||||
bucket = client.bucket(bucket_name)
|
||||
blob = bucket.blob(blob_path)
|
||||
blob.delete()
|
||||
logger.debug(f"Deleted {gcs_uri}")
|
||||
except Exception as e:
|
||||
# Log but don't fail on cleanup errors
|
||||
logger.warning(f"Failed to delete GCS temp file {gcs_uri}: {e}")
|
||||
|
||||
def _call_cloud_run_probe(self, gcs_uri: str) -> dict[str, Any]:
|
||||
"""
|
||||
Call Cloud Run FFmpeg service /probe endpoint.
|
||||
|
||||
Args:
|
||||
gcs_uri: GCS URI of video to probe (gs://bucket/path)
|
||||
|
||||
Returns:
|
||||
Probe result with duration and stream info
|
||||
"""
|
||||
client = self._get_http_client()
|
||||
response = client.post(
|
||||
f"{settings.ffmpeg_service_url}/probe",
|
||||
json={"source_gcs_uri": gcs_uri}
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def _call_cloud_run_endpoint(
|
||||
self,
|
||||
endpoint: str,
|
||||
payload: dict[str, Any],
|
||||
output_gcs_uri: str | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Call Cloud Run FFmpeg service endpoint.
|
||||
|
||||
Args:
|
||||
endpoint: Endpoint path (e.g., "/encode-segment", "/extract-frame")
|
||||
payload: Request payload
|
||||
output_gcs_uri: Expected output GCS URI (for cleanup on error)
|
||||
|
||||
Returns:
|
||||
Response JSON
|
||||
"""
|
||||
client = self._get_http_client()
|
||||
try:
|
||||
response = client.post(
|
||||
f"{settings.ffmpeg_service_url}{endpoint}",
|
||||
json=payload
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except httpx.HTTPStatusError as e:
|
||||
# Try to get error details from response
|
||||
try:
|
||||
error_detail = e.response.json().get("detail", str(e))
|
||||
except Exception:
|
||||
error_detail = str(e)
|
||||
raise FFmpegExecutionError(f"Cloud Run {endpoint} failed: {error_detail}")
|
||||
|
||||
async def _dispatch_ffmpeg(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]:
|
||||
"""
|
||||
|
|
@ -139,14 +299,27 @@ class VideoRendererService:
|
|||
"""
|
||||
method = analysis.get("method", "pause_insert")
|
||||
|
||||
if method == "overlay":
|
||||
return await self._render_overlay_method(
|
||||
source_video_path, ad_segments, analysis, output_path
|
||||
)
|
||||
else:
|
||||
return await self._render_pause_insert_method(
|
||||
source_video_path, ad_segments, analysis, output_path
|
||||
)
|
||||
# Cloud Run optimization: Upload source video once and cache for all operations
|
||||
if self._use_cloud_run:
|
||||
logger.info("Cloud Run mode: uploading source video to GCS for caching")
|
||||
self._cached_source_gcs_uri = self._upload_to_gcs_temp(source_video_path, "source")
|
||||
logger.info(f"Source video cached at: {self._cached_source_gcs_uri}")
|
||||
|
||||
try:
|
||||
if method == "overlay":
|
||||
return await self._render_overlay_method(
|
||||
source_video_path, ad_segments, analysis, output_path
|
||||
)
|
||||
else:
|
||||
return await self._render_pause_insert_method(
|
||||
source_video_path, ad_segments, analysis, output_path
|
||||
)
|
||||
finally:
|
||||
# Clean up cached source video from GCS
|
||||
if self._use_cloud_run and self._cached_source_gcs_uri:
|
||||
logger.info(f"Cleaning up cached source video: {self._cached_source_gcs_uri}")
|
||||
self._delete_gcs_temp(self._cached_source_gcs_uri)
|
||||
self._cached_source_gcs_uri = None
|
||||
|
||||
async def _render_overlay_method(
|
||||
self,
|
||||
|
|
@ -429,7 +602,13 @@ class VideoRendererService:
|
|||
return output_path
|
||||
|
||||
async def _get_video_duration(self, video_path: str) -> float:
|
||||
"""Get video duration in seconds using ffprobe via the ffmpeg queue."""
|
||||
"""Get video duration in seconds using ffprobe."""
|
||||
if self._use_cloud_run:
|
||||
return await self._get_video_duration_cloud_run(video_path)
|
||||
return await self._get_video_duration_local(video_path)
|
||||
|
||||
async def _get_video_duration_local(self, video_path: str) -> float:
|
||||
"""Get video duration using local ffprobe via Celery queue."""
|
||||
cmd = [
|
||||
self.ffprobe_path,
|
||||
"-v", "quiet",
|
||||
|
|
@ -440,8 +619,30 @@ class VideoRendererService:
|
|||
result = await self._dispatch_ffprobe(cmd)
|
||||
return float(result['stdout'].strip())
|
||||
|
||||
async def _get_video_duration_cloud_run(self, video_path: str) -> float:
|
||||
"""Get video duration via Cloud Run FFmpeg service."""
|
||||
# Use cached source if available, otherwise upload
|
||||
if self._cached_source_gcs_uri:
|
||||
gcs_uri = self._cached_source_gcs_uri
|
||||
else:
|
||||
gcs_uri = self._upload_to_gcs_temp(video_path, "probe")
|
||||
|
||||
try:
|
||||
result = self._call_cloud_run_probe(gcs_uri)
|
||||
return result["duration"]
|
||||
finally:
|
||||
# Clean up if we uploaded specifically for this call
|
||||
if not self._cached_source_gcs_uri:
|
||||
self._delete_gcs_temp(gcs_uri)
|
||||
|
||||
async def _get_video_properties(self, video_path: str) -> dict[str, Any]:
|
||||
"""Get detailed video and audio properties via the ffmpeg queue."""
|
||||
"""Get detailed video and audio properties."""
|
||||
if self._use_cloud_run:
|
||||
return await self._get_video_properties_cloud_run(video_path)
|
||||
return await self._get_video_properties_local(video_path)
|
||||
|
||||
async def _get_video_properties_local(self, video_path: str) -> dict[str, Any]:
|
||||
"""Get video properties using local ffprobe via Celery queue."""
|
||||
cmd = [
|
||||
self.ffprobe_path,
|
||||
"-v", "quiet",
|
||||
|
|
@ -484,6 +685,53 @@ class VideoRendererService:
|
|||
|
||||
return props
|
||||
|
||||
async def _get_video_properties_cloud_run(self, video_path: str) -> dict[str, Any]:
|
||||
"""Get video properties via Cloud Run FFmpeg service."""
|
||||
# Use cached source if available, otherwise upload
|
||||
if self._cached_source_gcs_uri:
|
||||
gcs_uri = self._cached_source_gcs_uri
|
||||
else:
|
||||
gcs_uri = self._upload_to_gcs_temp(video_path, "probe")
|
||||
|
||||
try:
|
||||
result = self._call_cloud_run_probe(gcs_uri)
|
||||
|
||||
# Defaults
|
||||
props = {
|
||||
"width": 1920,
|
||||
"height": 1080,
|
||||
"fps": 30.0,
|
||||
"sample_rate": "44100",
|
||||
"channels": "2",
|
||||
"pix_fmt": "yuv420p",
|
||||
"codec": "h264"
|
||||
}
|
||||
|
||||
# Extract from probe result
|
||||
for stream in result.get("streams", []):
|
||||
if stream.get("codec_type") == "video":
|
||||
props["width"] = stream.get("width", props["width"])
|
||||
props["height"] = stream.get("height", props["height"])
|
||||
props["pix_fmt"] = stream.get("pix_fmt", props["pix_fmt"])
|
||||
props["codec"] = stream.get("codec_name", props["codec"])
|
||||
|
||||
fps_str = stream.get("r_frame_rate", "30/1")
|
||||
if "/" in fps_str:
|
||||
num, den = fps_str.split("/")
|
||||
props["fps"] = float(num) / float(den)
|
||||
else:
|
||||
props["fps"] = float(fps_str)
|
||||
|
||||
elif stream.get("codec_type") == "audio":
|
||||
props["sample_rate"] = stream.get("sample_rate", props["sample_rate"])
|
||||
props["channels"] = str(stream.get("channels", props["channels"]))
|
||||
|
||||
return props
|
||||
finally:
|
||||
# Clean up if we uploaded specifically for this call
|
||||
if not self._cached_source_gcs_uri:
|
||||
self._delete_gcs_temp(gcs_uri)
|
||||
|
||||
async def _extract_segment(
|
||||
self,
|
||||
source_path: str,
|
||||
|
|
@ -519,6 +767,24 @@ class VideoRendererService:
|
|||
- Keyframe-only cuts causing audio dropouts
|
||||
- Timestamp desynchronization
|
||||
"""
|
||||
if self._use_cloud_run:
|
||||
await self._extract_segment_reencoded_cloud_run(
|
||||
source_path, start_time, duration, output_path, props
|
||||
)
|
||||
else:
|
||||
await self._extract_segment_reencoded_local(
|
||||
source_path, start_time, duration, output_path, props
|
||||
)
|
||||
|
||||
async def _extract_segment_reencoded_local(
|
||||
self,
|
||||
source_path: str,
|
||||
start_time: float,
|
||||
duration: float,
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Extract segment with re-encoding using local ffmpeg via Celery queue."""
|
||||
cmd = [
|
||||
self.ffmpeg_path,
|
||||
"-y",
|
||||
|
|
@ -541,8 +807,51 @@ class VideoRendererService:
|
|||
]
|
||||
await self._run_ffmpeg(cmd)
|
||||
|
||||
async def _extract_segment_reencoded_cloud_run(
|
||||
self,
|
||||
source_path: str,
|
||||
start_time: float,
|
||||
duration: float,
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Extract segment with re-encoding via Cloud Run FFmpeg service."""
|
||||
# Use cached source GCS URI
|
||||
if not self._cached_source_gcs_uri:
|
||||
raise FFmpegExecutionError("Source video not cached for Cloud Run operation")
|
||||
|
||||
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/segment/{uuid4().hex}/segment.mp4"
|
||||
|
||||
try:
|
||||
self._call_cloud_run_endpoint(
|
||||
"/encode-segment",
|
||||
{
|
||||
"source_gcs_uri": self._cached_source_gcs_uri,
|
||||
"output_gcs_uri": output_gcs_uri,
|
||||
"start_time": start_time,
|
||||
"duration": duration,
|
||||
"width": props["width"],
|
||||
"height": props["height"],
|
||||
"fps": props["fps"],
|
||||
"pix_fmt": props["pix_fmt"],
|
||||
"sample_rate": props["sample_rate"],
|
||||
"channels": props["channels"],
|
||||
}
|
||||
)
|
||||
# Download result to local path
|
||||
self._download_from_gcs_temp(output_gcs_uri, output_path)
|
||||
finally:
|
||||
self._delete_gcs_temp(output_gcs_uri)
|
||||
|
||||
async def _extract_frame(self, video_path: str, time_point: float, output_path: str):
|
||||
"""Extract a single frame as PNG using ffmpeg."""
|
||||
if self._use_cloud_run:
|
||||
await self._extract_frame_cloud_run(video_path, time_point, output_path)
|
||||
else:
|
||||
await self._extract_frame_local(video_path, time_point, output_path)
|
||||
|
||||
async def _extract_frame_local(self, video_path: str, time_point: float, output_path: str):
|
||||
"""Extract frame using local ffmpeg via Celery queue."""
|
||||
logger.debug(f"Extracting frame at {time_point:.2f}s from {video_path}")
|
||||
|
||||
cmd = [
|
||||
|
|
@ -564,6 +873,34 @@ class VideoRendererService:
|
|||
f"(time may be beyond video duration)"
|
||||
)
|
||||
|
||||
async def _extract_frame_cloud_run(self, video_path: str, time_point: float, output_path: str):
|
||||
"""Extract frame via Cloud Run FFmpeg service."""
|
||||
if not self._cached_source_gcs_uri:
|
||||
raise FFmpegExecutionError("Source video not cached for Cloud Run operation")
|
||||
|
||||
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/frame/{uuid4().hex}/frame.png"
|
||||
|
||||
try:
|
||||
self._call_cloud_run_endpoint(
|
||||
"/extract-frame",
|
||||
{
|
||||
"source_gcs_uri": self._cached_source_gcs_uri,
|
||||
"output_gcs_uri": output_gcs_uri,
|
||||
"time_point": time_point,
|
||||
}
|
||||
)
|
||||
# Download result to local path
|
||||
self._download_from_gcs_temp(output_gcs_uri, output_path)
|
||||
|
||||
# Verify frame was actually created
|
||||
if not os.path.exists(output_path):
|
||||
raise FileNotFoundError(
|
||||
f"Frame extraction failed: no frame created at {time_point:.2f}s "
|
||||
f"(time may be beyond video duration)"
|
||||
)
|
||||
finally:
|
||||
self._delete_gcs_temp(output_gcs_uri)
|
||||
|
||||
async def _create_freeze_segment(
|
||||
self,
|
||||
frame_path: str,
|
||||
|
|
@ -611,6 +948,24 @@ class VideoRendererService:
|
|||
This fixes the "silent pause" issue caused by sample rate mismatch
|
||||
when concatenating with extracted video segments.
|
||||
"""
|
||||
if self._use_cloud_run:
|
||||
await self._create_freeze_segment_matched_cloud_run(
|
||||
frame_path, audio_path, duration, output_path, props
|
||||
)
|
||||
else:
|
||||
await self._create_freeze_segment_matched_local(
|
||||
frame_path, audio_path, duration, output_path, props
|
||||
)
|
||||
|
||||
async def _create_freeze_segment_matched_local(
|
||||
self,
|
||||
frame_path: str,
|
||||
audio_path: str,
|
||||
duration: float,
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Create freeze segment using local ffmpeg via Celery queue."""
|
||||
# Validate inputs
|
||||
if duration <= 0:
|
||||
raise ValueError(f"Invalid freeze segment duration: {duration}")
|
||||
|
|
@ -650,6 +1005,52 @@ class VideoRendererService:
|
|||
]
|
||||
await self._run_ffmpeg(cmd)
|
||||
|
||||
async def _create_freeze_segment_matched_cloud_run(
|
||||
self,
|
||||
frame_path: str,
|
||||
audio_path: str,
|
||||
duration: float,
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Create freeze segment via Cloud Run FFmpeg service."""
|
||||
# Validate inputs
|
||||
if duration <= 0:
|
||||
raise ValueError(f"Invalid freeze segment duration: {duration}")
|
||||
|
||||
if not os.path.exists(frame_path):
|
||||
raise FileNotFoundError(f"Frame file not found: {frame_path}")
|
||||
if not os.path.exists(audio_path):
|
||||
raise FileNotFoundError(f"Audio file not found: {audio_path}")
|
||||
|
||||
# Upload frame and audio to GCS
|
||||
frame_gcs_uri = self._upload_to_gcs_temp(frame_path, "frame")
|
||||
audio_gcs_uri = self._upload_to_gcs_temp(audio_path, "audio")
|
||||
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/freeze/{uuid4().hex}/freeze.mp4"
|
||||
|
||||
try:
|
||||
self._call_cloud_run_endpoint(
|
||||
"/create-freeze-segment",
|
||||
{
|
||||
"frame_gcs_uri": frame_gcs_uri,
|
||||
"audio_gcs_uri": audio_gcs_uri,
|
||||
"output_gcs_uri": output_gcs_uri,
|
||||
"duration": duration,
|
||||
"width": props["width"],
|
||||
"height": props["height"],
|
||||
"fps": props["fps"],
|
||||
"pix_fmt": props["pix_fmt"],
|
||||
"sample_rate": props["sample_rate"],
|
||||
"channels": props["channels"],
|
||||
}
|
||||
)
|
||||
# Download result to local path
|
||||
self._download_from_gcs_temp(output_gcs_uri, output_path)
|
||||
finally:
|
||||
self._delete_gcs_temp(frame_gcs_uri)
|
||||
self._delete_gcs_temp(audio_gcs_uri)
|
||||
self._delete_gcs_temp(output_gcs_uri)
|
||||
|
||||
async def _extract_audio_segment(
|
||||
self,
|
||||
source_path: str,
|
||||
|
|
@ -704,12 +1105,19 @@ class VideoRendererService:
|
|||
Concatenate multiple audio files for combined AD + catch-up audio.
|
||||
|
||||
Uses FFmpeg concat filter for seamless joining with consistent encoding.
|
||||
|
||||
Args:
|
||||
audio_paths: List of audio file paths to concatenate (in order)
|
||||
output_path: Path to output combined audio file
|
||||
props: Video properties (for sample_rate, channels)
|
||||
"""
|
||||
if self._use_cloud_run:
|
||||
await self._concatenate_audio_cloud_run(audio_paths, output_path, props)
|
||||
else:
|
||||
await self._concatenate_audio_local(audio_paths, output_path, props)
|
||||
|
||||
async def _concatenate_audio_local(
|
||||
self,
|
||||
audio_paths: list[str],
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Concatenate audio files using local ffmpeg via Celery queue."""
|
||||
if not audio_paths:
|
||||
raise ValueError("No audio files to concatenate")
|
||||
|
||||
|
|
@ -755,6 +1163,57 @@ class VideoRendererService:
|
|||
]
|
||||
await self._run_ffmpeg(cmd)
|
||||
|
||||
async def _concatenate_audio_cloud_run(
|
||||
self,
|
||||
audio_paths: list[str],
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Concatenate audio files via Cloud Run FFmpeg service."""
|
||||
if not audio_paths:
|
||||
raise ValueError("No audio files to concatenate")
|
||||
|
||||
# Upload all audio files to GCS
|
||||
audio_gcs_uris = []
|
||||
for path in audio_paths:
|
||||
gcs_uri = self._upload_to_gcs_temp(path, "audio")
|
||||
audio_gcs_uris.append(gcs_uri)
|
||||
|
||||
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/audio/{uuid4().hex}/combined.m4a"
|
||||
|
||||
try:
|
||||
# Build ffmpeg command for Cloud Run
|
||||
# Use filter_complex for concatenation
|
||||
inputs = []
|
||||
filter_parts = []
|
||||
for i in range(len(audio_gcs_uris)):
|
||||
filter_parts.append(f"[{i}:a]")
|
||||
|
||||
filter_complex = "".join(filter_parts) + f"concat=n={len(audio_gcs_uris)}:v=0:a=1[out]"
|
||||
|
||||
self._call_cloud_run_endpoint(
|
||||
"/run-ffmpeg",
|
||||
{
|
||||
"input_gcs_uris": audio_gcs_uris,
|
||||
"output_gcs_uri": output_gcs_uri,
|
||||
"ffmpeg_args": [
|
||||
"-filter_complex", filter_complex,
|
||||
"-map", "[out]",
|
||||
"-c:a", "aac",
|
||||
"-ar", props["sample_rate"],
|
||||
"-ac", props["channels"],
|
||||
"-b:a", "192k",
|
||||
]
|
||||
}
|
||||
)
|
||||
# Download result to local path
|
||||
self._download_from_gcs_temp(output_gcs_uri, output_path)
|
||||
finally:
|
||||
# Cleanup
|
||||
for gcs_uri in audio_gcs_uris:
|
||||
self._delete_gcs_temp(gcs_uri)
|
||||
self._delete_gcs_temp(output_gcs_uri)
|
||||
|
||||
async def _generate_silence(
|
||||
self,
|
||||
duration: float,
|
||||
|
|
@ -765,12 +1224,19 @@ class VideoRendererService:
|
|||
Generate a silent audio file of specified duration.
|
||||
|
||||
Used to create 500ms silence buffers before/after AD audio.
|
||||
|
||||
Args:
|
||||
duration: Duration of silence in seconds
|
||||
output_path: Path to output audio file
|
||||
props: Video properties (for sample_rate, channels)
|
||||
"""
|
||||
if self._use_cloud_run:
|
||||
await self._generate_silence_cloud_run(duration, output_path, props)
|
||||
else:
|
||||
await self._generate_silence_local(duration, output_path, props)
|
||||
|
||||
async def _generate_silence_local(
|
||||
self,
|
||||
duration: float,
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Generate silence using local ffmpeg via Celery queue."""
|
||||
if duration <= 0:
|
||||
raise ValueError(f"Invalid silence duration: {duration}")
|
||||
|
||||
|
|
@ -792,6 +1258,41 @@ class VideoRendererService:
|
|||
]
|
||||
await self._run_ffmpeg(cmd)
|
||||
|
||||
async def _generate_silence_cloud_run(
|
||||
self,
|
||||
duration: float,
|
||||
output_path: str,
|
||||
props: dict[str, Any]
|
||||
):
|
||||
"""Generate silence via Cloud Run FFmpeg service."""
|
||||
if duration <= 0:
|
||||
raise ValueError(f"Invalid silence duration: {duration}")
|
||||
|
||||
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/silence/{uuid4().hex}/silence.m4a"
|
||||
|
||||
try:
|
||||
channel_layout = "stereo" if props["channels"] == "2" else "mono"
|
||||
self._call_cloud_run_endpoint(
|
||||
"/run-ffmpeg",
|
||||
{
|
||||
"input_gcs_uris": [], # No input files, using lavfi
|
||||
"output_gcs_uri": output_gcs_uri,
|
||||
"ffmpeg_args": [
|
||||
"-f", "lavfi",
|
||||
"-i", f"anullsrc=r={props['sample_rate']}:cl={channel_layout}",
|
||||
"-t", str(duration),
|
||||
"-c:a", "aac",
|
||||
"-ar", props["sample_rate"],
|
||||
"-ac", props["channels"],
|
||||
"-b:a", "192k",
|
||||
]
|
||||
}
|
||||
)
|
||||
# Download result to local path
|
||||
self._download_from_gcs_temp(output_gcs_uri, output_path)
|
||||
finally:
|
||||
self._delete_gcs_temp(output_gcs_uri)
|
||||
|
||||
async def _concatenate_segments(
|
||||
self,
|
||||
segment_paths: list[str],
|
||||
|
|
@ -799,6 +1300,18 @@ class VideoRendererService:
|
|||
temp_dir: Path
|
||||
):
|
||||
"""Concatenate video segments using ffmpeg concat demuxer."""
|
||||
if self._use_cloud_run:
|
||||
await self._concatenate_segments_cloud_run(segment_paths, output_path)
|
||||
else:
|
||||
await self._concatenate_segments_local(segment_paths, output_path, temp_dir)
|
||||
|
||||
async def _concatenate_segments_local(
|
||||
self,
|
||||
segment_paths: list[str],
|
||||
output_path: str,
|
||||
temp_dir: Path
|
||||
):
|
||||
"""Concatenate video segments using local ffmpeg via Celery queue."""
|
||||
# Create concat file
|
||||
concat_file = temp_dir / "concat.txt"
|
||||
with open(concat_file, "w") as f:
|
||||
|
|
@ -818,6 +1331,36 @@ class VideoRendererService:
|
|||
]
|
||||
await self._run_ffmpeg(cmd)
|
||||
|
||||
async def _concatenate_segments_cloud_run(
|
||||
self,
|
||||
segment_paths: list[str],
|
||||
output_path: str
|
||||
):
|
||||
"""Concatenate video segments via Cloud Run FFmpeg service."""
|
||||
# Upload all segment files to GCS
|
||||
segment_gcs_uris = []
|
||||
for path in segment_paths:
|
||||
gcs_uri = self._upload_to_gcs_temp(path, "segment")
|
||||
segment_gcs_uris.append(gcs_uri)
|
||||
|
||||
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/concat/{uuid4().hex}/final.mp4"
|
||||
|
||||
try:
|
||||
self._call_cloud_run_endpoint(
|
||||
"/concatenate",
|
||||
{
|
||||
"segment_gcs_uris": segment_gcs_uris,
|
||||
"output_gcs_uri": output_gcs_uri,
|
||||
}
|
||||
)
|
||||
# Download result to local path
|
||||
self._download_from_gcs_temp(output_gcs_uri, output_path)
|
||||
finally:
|
||||
# Cleanup uploaded segments
|
||||
for gcs_uri in segment_gcs_uris:
|
||||
self._delete_gcs_temp(gcs_uri)
|
||||
self._delete_gcs_temp(output_gcs_uri)
|
||||
|
||||
async def _copy_video(self, source_path: str, output_path: str):
|
||||
"""Copy video without modification."""
|
||||
cmd = [
|
||||
|
|
|
|||
311
backend/app/services/whisper_http_service.py
Normal file
311
backend/app/services/whisper_http_service.py
Normal file
|
|
@ -0,0 +1,311 @@
|
|||
"""
|
||||
Whisper HTTP Service - FastAPI application for Cloud Run deployment.
|
||||
|
||||
This service exposes Whisper transcription as HTTP endpoints, allowing
|
||||
the main application to offload CPU-intensive transcription to Cloud Run
|
||||
with autoscaling.
|
||||
|
||||
This module uses minimal configuration to avoid importing the full app Settings.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from google.cloud import storage
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from .whisper_service import WordTimestamp, whisper_service
|
||||
|
||||
# Simple logging setup (no dependency on app.core.logging)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# FastAPI app for Cloud Run
|
||||
app = FastAPI(
|
||||
title="Whisper Transcription Service",
|
||||
description="Cloud Run service for Whisper-based audio transcription",
|
||||
version="1.0.0",
|
||||
)
|
||||
|
||||
|
||||
# Request/Response Models
|
||||
class TranscribeRequest(BaseModel):
|
||||
"""Request model for transcription endpoint."""
|
||||
gcs_uri: str = Field(
|
||||
...,
|
||||
description="GCS URI of the audio file (gs://bucket/path/audio.mp3)",
|
||||
examples=["gs://accessible-video-storage/jobs/123/audio.mp3"]
|
||||
)
|
||||
|
||||
|
||||
class WordTimestampResponse(BaseModel):
|
||||
"""Word timestamp in response."""
|
||||
word: str
|
||||
start: float
|
||||
end: float
|
||||
|
||||
|
||||
class TranscribeResponse(BaseModel):
|
||||
"""Response model for transcription endpoint."""
|
||||
words: list[WordTimestampResponse]
|
||||
word_count: int
|
||||
transcript: str = Field(
|
||||
...,
|
||||
description="Full transcript text (words joined with spaces)"
|
||||
)
|
||||
|
||||
|
||||
class SpeechGapResponse(BaseModel):
|
||||
"""Speech gap in response."""
|
||||
start: float
|
||||
end: float
|
||||
duration: float
|
||||
gap_type: str
|
||||
|
||||
|
||||
class TranscribeWithGapsRequest(BaseModel):
|
||||
"""Request model for transcription with gap analysis."""
|
||||
gcs_uri: str = Field(
|
||||
...,
|
||||
description="GCS URI of the audio file"
|
||||
)
|
||||
|
||||
|
||||
class TranscribeWithGapsResponse(BaseModel):
|
||||
"""Response model for transcription with gap analysis."""
|
||||
words: list[WordTimestampResponse]
|
||||
gaps: list[SpeechGapResponse]
|
||||
word_count: int
|
||||
transcript: str
|
||||
|
||||
|
||||
class RefinePausePointsRequest(BaseModel):
|
||||
"""Request model for pause point refinement."""
|
||||
placements: list[dict] = Field(
|
||||
...,
|
||||
description="List of placement dicts from Gemini analysis"
|
||||
)
|
||||
words: list[WordTimestampResponse] = Field(
|
||||
...,
|
||||
description="Word timestamps from transcription"
|
||||
)
|
||||
consolidation_threshold: float = Field(
|
||||
default=5.0,
|
||||
description="Threshold in seconds for consolidating nearby cues"
|
||||
)
|
||||
|
||||
|
||||
class RefinePausePointsResponse(BaseModel):
|
||||
"""Response model for pause point refinement."""
|
||||
refined_placements: list[dict]
|
||||
warnings: list[str]
|
||||
|
||||
|
||||
class HealthResponse(BaseModel):
|
||||
"""Health check response."""
|
||||
status: str
|
||||
model_loaded: bool
|
||||
model_name: str
|
||||
|
||||
|
||||
def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]:
|
||||
"""Parse GCS URI into bucket and blob path."""
|
||||
if not gcs_uri.startswith("gs://"):
|
||||
raise ValueError(f"Invalid GCS URI: {gcs_uri}")
|
||||
|
||||
parts = gcs_uri[5:].split("/", 1)
|
||||
if len(parts) != 2:
|
||||
raise ValueError(f"Invalid GCS URI format: {gcs_uri}")
|
||||
|
||||
return parts[0], parts[1]
|
||||
|
||||
|
||||
def _download_from_gcs(gcs_uri: str, local_path: str) -> None:
|
||||
"""Download file from GCS to local path."""
|
||||
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
|
||||
|
||||
client = storage.Client() # Auto-detects project from environment
|
||||
bucket = client.bucket(bucket_name)
|
||||
blob = bucket.blob(blob_path)
|
||||
|
||||
if not blob.exists():
|
||||
raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}")
|
||||
|
||||
blob.download_to_filename(local_path)
|
||||
logger.info(f"Downloaded {gcs_uri} to {local_path}")
|
||||
|
||||
|
||||
@app.get("/health", response_model=HealthResponse)
|
||||
async def health_check():
|
||||
"""Health check endpoint."""
|
||||
return HealthResponse(
|
||||
status="healthy",
|
||||
model_loaded=whisper_service._model is not None,
|
||||
model_name=whisper_service._model_name
|
||||
)
|
||||
|
||||
|
||||
@app.post("/transcribe", response_model=TranscribeResponse)
|
||||
async def transcribe(request: TranscribeRequest):
|
||||
"""
|
||||
Transcribe audio file from GCS and return word-level timestamps.
|
||||
|
||||
This endpoint:
|
||||
1. Downloads the audio file from GCS
|
||||
2. Runs Whisper transcription with word-level timestamps
|
||||
3. Returns the words with timing information
|
||||
"""
|
||||
logger.info(f"Transcribe request: {request.gcs_uri}")
|
||||
|
||||
# Create temp file for audio
|
||||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
|
||||
tmp_path = tmp.name
|
||||
|
||||
try:
|
||||
# Download audio from GCS
|
||||
_download_from_gcs(request.gcs_uri, tmp_path)
|
||||
|
||||
# Run transcription
|
||||
words = whisper_service.transcribe_audio(tmp_path)
|
||||
|
||||
# Build response
|
||||
word_responses = [
|
||||
WordTimestampResponse(word=w.word, start=w.start, end=w.end)
|
||||
for w in words
|
||||
]
|
||||
|
||||
transcript = " ".join(w.word for w in words)
|
||||
|
||||
logger.info(f"Transcription complete: {len(words)} words")
|
||||
|
||||
return TranscribeResponse(
|
||||
words=word_responses,
|
||||
word_count=len(words),
|
||||
transcript=transcript
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Transcription failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
finally:
|
||||
# Clean up temp file
|
||||
if os.path.exists(tmp_path):
|
||||
os.unlink(tmp_path)
|
||||
|
||||
|
||||
@app.post("/transcribe-with-gaps", response_model=TranscribeWithGapsResponse)
|
||||
async def transcribe_with_gaps(request: TranscribeWithGapsRequest):
|
||||
"""
|
||||
Transcribe audio and identify speech gaps for pause point placement.
|
||||
|
||||
This endpoint combines transcription and gap analysis in a single call,
|
||||
which is more efficient for the pause-insert rendering method.
|
||||
"""
|
||||
logger.info(f"Transcribe with gaps request: {request.gcs_uri}")
|
||||
|
||||
# Create temp file for audio
|
||||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
|
||||
tmp_path = tmp.name
|
||||
|
||||
try:
|
||||
# Download audio from GCS
|
||||
_download_from_gcs(request.gcs_uri, tmp_path)
|
||||
|
||||
# Run transcription
|
||||
words = whisper_service.transcribe_audio(tmp_path)
|
||||
|
||||
# Identify speech gaps
|
||||
gaps = whisper_service.identify_speech_gaps(words)
|
||||
|
||||
# Build response
|
||||
word_responses = [
|
||||
WordTimestampResponse(word=w.word, start=w.start, end=w.end)
|
||||
for w in words
|
||||
]
|
||||
|
||||
gap_responses = [
|
||||
SpeechGapResponse(
|
||||
start=g.start,
|
||||
end=g.end,
|
||||
duration=g.duration,
|
||||
gap_type=g.gap_type
|
||||
)
|
||||
for g in gaps
|
||||
]
|
||||
|
||||
transcript = " ".join(w.word for w in words)
|
||||
|
||||
logger.info(f"Transcription with gaps complete: {len(words)} words, {len(gaps)} gaps")
|
||||
|
||||
return TranscribeWithGapsResponse(
|
||||
words=word_responses,
|
||||
gaps=gap_responses,
|
||||
word_count=len(words),
|
||||
transcript=transcript
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Transcription with gaps failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
finally:
|
||||
# Clean up temp file
|
||||
if os.path.exists(tmp_path):
|
||||
os.unlink(tmp_path)
|
||||
|
||||
|
||||
@app.post("/refine-pause-points", response_model=RefinePausePointsResponse)
|
||||
async def refine_pause_points(request: RefinePausePointsRequest):
|
||||
"""
|
||||
Refine Gemini pause points using Whisper transcript analysis.
|
||||
|
||||
This endpoint takes the placements from Gemini analysis and the
|
||||
word timestamps from transcription, then refines the pause points
|
||||
to align with natural sentence boundaries.
|
||||
"""
|
||||
logger.info(f"Refine pause points request: {len(request.placements)} placements")
|
||||
|
||||
try:
|
||||
# Convert word responses back to WordTimestamp objects
|
||||
words = [
|
||||
WordTimestamp(word=w.word, start=w.start, end=w.end)
|
||||
for w in request.words
|
||||
]
|
||||
|
||||
# Identify speech gaps
|
||||
gaps = whisper_service.identify_speech_gaps(words)
|
||||
|
||||
# Refine pause points
|
||||
refined_placements, warnings = whisper_service.refine_all_pause_points(
|
||||
request.placements,
|
||||
words,
|
||||
gaps,
|
||||
request.consolidation_threshold
|
||||
)
|
||||
|
||||
logger.info(f"Pause point refinement complete: {len(warnings)} warnings")
|
||||
|
||||
return RefinePausePointsResponse(
|
||||
refined_placements=refined_placements,
|
||||
warnings=warnings
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Pause point refinement failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
# Startup event to pre-load Whisper model
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""Pre-load Whisper model on startup to reduce first-request latency."""
|
||||
logger.info("Whisper HTTP Service starting up...")
|
||||
# Access the model property to trigger lazy loading
|
||||
_ = whisper_service.model
|
||||
logger.info("Whisper model pre-loaded successfully")
|
||||
|
|
@ -2,16 +2,40 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
from faster_whisper import WhisperModel
|
||||
|
||||
from ..core.config import settings
|
||||
from ..core.logging import get_logger
|
||||
# Use simple logging for Cloud Run compatibility (no dependency on app.core.logging)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger = get_logger(__name__)
|
||||
# Try to import settings, fall back to env vars for Cloud Run mode
|
||||
try:
|
||||
from ..core.config import settings
|
||||
_HAS_SETTINGS = True
|
||||
except Exception:
|
||||
_HAS_SETTINGS = False
|
||||
settings = None # type: ignore
|
||||
|
||||
|
||||
def _get_setting(name: str, default):
|
||||
"""Get setting value from Settings object or environment variable."""
|
||||
if _HAS_SETTINGS and settings:
|
||||
return getattr(settings, name, default)
|
||||
# Fall back to environment variable
|
||||
env_val = os.environ.get(name.upper())
|
||||
if env_val is not None:
|
||||
# Try to convert to same type as default
|
||||
if isinstance(default, float):
|
||||
return float(env_val)
|
||||
if isinstance(default, int):
|
||||
return int(env_val)
|
||||
return env_val
|
||||
return default
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -66,15 +90,15 @@ class WhisperService:
|
|||
|
||||
def __init__(self):
|
||||
self._model: WhisperModel | None = None
|
||||
self._model_name = getattr(settings, 'whisper_model', 'base')
|
||||
self._model_name = _get_setting('whisper_model', 'base')
|
||||
|
||||
# Gap classification thresholds (in seconds)
|
||||
self.sentence_gap_threshold = getattr(settings, 'whisper_sentence_gap_threshold', 0.5)
|
||||
self.phrase_gap_threshold = getattr(settings, 'whisper_phrase_gap_threshold', 0.3)
|
||||
self.min_gap_threshold = getattr(settings, 'whisper_min_gap_threshold', 0.15)
|
||||
self.sentence_gap_threshold = _get_setting('whisper_sentence_gap_threshold', 0.5)
|
||||
self.phrase_gap_threshold = _get_setting('whisper_phrase_gap_threshold', 0.3)
|
||||
self.min_gap_threshold = _get_setting('whisper_min_gap_threshold', 0.15)
|
||||
|
||||
# Snapping configuration
|
||||
self.max_search_window = getattr(settings, 'whisper_max_search_window', 30.0)
|
||||
self.max_search_window = _get_setting('whisper_max_search_window', 30.0)
|
||||
|
||||
@property
|
||||
def model(self) -> WhisperModel:
|
||||
|
|
|
|||
|
|
@ -1,5 +1,12 @@
|
|||
"""Celery task for Whisper transcription on dedicated queue with concurrency=1."""
|
||||
"""Celery task for Whisper transcription with Cloud Run fallback."""
|
||||
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import httpx
|
||||
from google.cloud import storage
|
||||
|
||||
from ..core.config import settings
|
||||
from ..core.logging import get_logger
|
||||
from ..services.whisper_service import whisper_service
|
||||
from . import celery_app
|
||||
|
|
@ -7,14 +14,128 @@ from . import celery_app
|
|||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@celery_app.task(bind=True, queue='whisper', time_limit=1800, soft_time_limit=1700) # 30 min limit
|
||||
def _upload_audio_to_gcs_temp(audio_path: str, job_id: str) -> str:
|
||||
"""Upload local audio file to GCS temporary location and return GCS URI."""
|
||||
# Generate unique temp path
|
||||
filename = os.path.basename(audio_path)
|
||||
temp_path = f"temp/whisper/{job_id}/{uuid.uuid4().hex}/{filename}"
|
||||
|
||||
client = storage.Client(project=settings.gcp_project_id)
|
||||
bucket = client.bucket(settings.gcs_bucket)
|
||||
blob = bucket.blob(temp_path)
|
||||
|
||||
blob.upload_from_filename(audio_path)
|
||||
gcs_uri = f"gs://{settings.gcs_bucket}/{temp_path}"
|
||||
|
||||
logger.info(f"Uploaded audio to temp GCS: {gcs_uri}")
|
||||
return gcs_uri
|
||||
|
||||
|
||||
def _delete_gcs_temp(gcs_uri: str) -> None:
|
||||
"""Delete temporary GCS file."""
|
||||
try:
|
||||
if not gcs_uri.startswith("gs://"):
|
||||
return
|
||||
|
||||
parts = gcs_uri[5:].split("/", 1)
|
||||
if len(parts) != 2:
|
||||
return
|
||||
|
||||
bucket_name, blob_path = parts
|
||||
|
||||
client = storage.Client(project=settings.gcp_project_id)
|
||||
bucket = client.bucket(bucket_name)
|
||||
blob = bucket.blob(blob_path)
|
||||
blob.delete()
|
||||
|
||||
logger.info(f"Deleted temp GCS file: {gcs_uri}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete temp GCS file {gcs_uri}: {e}")
|
||||
|
||||
|
||||
def _transcribe_via_cloud_run(job_id: str, audio_path: str) -> dict:
|
||||
"""
|
||||
Transcribe audio via Cloud Run Whisper service.
|
||||
|
||||
Uploads local audio to GCS temp, calls Cloud Run, then cleans up.
|
||||
"""
|
||||
gcs_uri = None
|
||||
try:
|
||||
# Upload audio to GCS temp location
|
||||
gcs_uri = _upload_audio_to_gcs_temp(audio_path, job_id)
|
||||
|
||||
# Call Cloud Run service
|
||||
service_url = settings.whisper_service_url.rstrip("/")
|
||||
endpoint = f"{service_url}/transcribe"
|
||||
|
||||
logger.info(f"Calling Whisper Cloud Run service: {endpoint}")
|
||||
|
||||
with httpx.Client(timeout=300.0) as client:
|
||||
response = client.post(
|
||||
endpoint,
|
||||
json={"gcs_uri": gcs_uri},
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
# Calculate audio duration from last word
|
||||
words = data.get("words", [])
|
||||
audio_duration = words[-1]["end"] if words else 0.0
|
||||
|
||||
result = {
|
||||
"job_id": job_id,
|
||||
"word_count": data.get("word_count", len(words)),
|
||||
"audio_duration": audio_duration,
|
||||
"words": words
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Cloud Run transcription complete for job {job_id}: "
|
||||
f"{result['word_count']} words, {audio_duration:.2f}s duration"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
finally:
|
||||
# Clean up temp GCS file
|
||||
if gcs_uri:
|
||||
_delete_gcs_temp(gcs_uri)
|
||||
|
||||
|
||||
def _transcribe_locally(job_id: str, audio_path: str) -> dict:
|
||||
"""Transcribe audio using local Whisper service."""
|
||||
words = whisper_service.transcribe_audio(audio_path)
|
||||
|
||||
# Convert to serializable format
|
||||
words_data = [w.to_dict() for w in words]
|
||||
|
||||
# Calculate audio duration from last word
|
||||
audio_duration = words[-1].end if words else 0.0
|
||||
|
||||
result = {
|
||||
"job_id": job_id,
|
||||
"word_count": len(words),
|
||||
"audio_duration": audio_duration,
|
||||
"words": words_data
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Local transcription complete for job {job_id}: "
|
||||
f"{len(words)} words, {audio_duration:.2f}s duration"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@celery_app.task(bind=True, queue='whisper', time_limit=1800, soft_time_limit=1700)
|
||||
def transcribe_video_audio_task(self, job_id: str, audio_path: str) -> dict:
|
||||
"""
|
||||
Run Whisper transcription on dedicated queue with concurrency=1.
|
||||
Run Whisper transcription - via Cloud Run if configured, otherwise locally.
|
||||
|
||||
This task is routed to the 'whisper' queue which should be processed
|
||||
by a worker with --concurrency=1 to prevent memory overload when
|
||||
processing multiple jobs in parallel.
|
||||
When WHISPER_SERVICE_URL is set, transcription is offloaded to Cloud Run
|
||||
for autoscaling. Otherwise, runs on dedicated local worker with concurrency=1.
|
||||
|
||||
Args:
|
||||
job_id: Job ID for logging
|
||||
|
|
@ -32,29 +153,17 @@ def transcribe_video_audio_task(self, job_id: str, audio_path: str) -> dict:
|
|||
logger.info(f"Starting Whisper transcription task for job {job_id}")
|
||||
|
||||
try:
|
||||
# Run transcription using the WhisperService
|
||||
words = whisper_service.transcribe_audio(audio_path)
|
||||
|
||||
# Convert to serializable format
|
||||
words_data = [w.to_dict() for w in words]
|
||||
|
||||
# Calculate audio duration from last word
|
||||
audio_duration = words[-1].end if words else 0.0
|
||||
|
||||
result = {
|
||||
"job_id": job_id,
|
||||
"word_count": len(words),
|
||||
"audio_duration": audio_duration,
|
||||
"words": words_data
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Whisper transcription complete for job {job_id}: "
|
||||
f"{len(words)} words, {audio_duration:.2f}s duration"
|
||||
)
|
||||
|
||||
return result
|
||||
# Use Cloud Run if configured, otherwise local
|
||||
if settings.whisper_service_url:
|
||||
logger.info(f"Using Cloud Run Whisper service: {settings.whisper_service_url}")
|
||||
return _transcribe_via_cloud_run(job_id, audio_path)
|
||||
else:
|
||||
logger.info("Using local Whisper service")
|
||||
return _transcribe_locally(job_id, audio_path)
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(f"Cloud Run transcription failed for job {job_id}: {e.response.status_code} - {e.response.text}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Whisper transcription failed for job {job_id}: {e}")
|
||||
raise
|
||||
|
|
|
|||
166
infra/cloud-run/cloudbuild-http-services.yaml
Normal file
166
infra/cloud-run/cloudbuild-http-services.yaml
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
# =============================================================================
|
||||
# Cloud Build: HTTP Services (Whisper & FFmpeg)
|
||||
# =============================================================================
|
||||
# Builds and deploys the autoscaling HTTP services to Cloud Run.
|
||||
#
|
||||
# Usage:
|
||||
# gcloud builds submit --config=infra/cloud-run/cloudbuild-http-services.yaml .
|
||||
#
|
||||
# Or trigger automatically on push to main branch via Cloud Build triggers.
|
||||
# =============================================================================
|
||||
|
||||
steps:
|
||||
# =========================================================================
|
||||
# Build Docker Images
|
||||
# =========================================================================
|
||||
|
||||
# Build Whisper HTTP Service image
|
||||
- name: 'gcr.io/cloud-builders/docker'
|
||||
args:
|
||||
- 'build'
|
||||
- '-f'
|
||||
- 'backend/Dockerfile.whisper-service'
|
||||
- '-t'
|
||||
- 'gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA'
|
||||
- '-t'
|
||||
- 'gcr.io/$PROJECT_ID/whisper-http-service:latest'
|
||||
- 'backend/'
|
||||
id: 'build-whisper-service'
|
||||
|
||||
# Build FFmpeg HTTP Service image
|
||||
- name: 'gcr.io/cloud-builders/docker'
|
||||
args:
|
||||
- 'build'
|
||||
- '-f'
|
||||
- 'backend/Dockerfile.ffmpeg-service'
|
||||
- '-t'
|
||||
- 'gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA'
|
||||
- '-t'
|
||||
- 'gcr.io/$PROJECT_ID/ffmpeg-http-service:latest'
|
||||
- 'backend/'
|
||||
id: 'build-ffmpeg-service'
|
||||
|
||||
# =========================================================================
|
||||
# Push Images to Container Registry
|
||||
# =========================================================================
|
||||
|
||||
- name: 'gcr.io/cloud-builders/docker'
|
||||
args: ['push', 'gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA']
|
||||
id: 'push-whisper-service-sha'
|
||||
waitFor: ['build-whisper-service']
|
||||
|
||||
- name: 'gcr.io/cloud-builders/docker'
|
||||
args: ['push', 'gcr.io/$PROJECT_ID/whisper-http-service:latest']
|
||||
id: 'push-whisper-service-latest'
|
||||
waitFor: ['build-whisper-service']
|
||||
|
||||
- name: 'gcr.io/cloud-builders/docker'
|
||||
args: ['push', 'gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA']
|
||||
id: 'push-ffmpeg-service-sha'
|
||||
waitFor: ['build-ffmpeg-service']
|
||||
|
||||
- name: 'gcr.io/cloud-builders/docker'
|
||||
args: ['push', 'gcr.io/$PROJECT_ID/ffmpeg-http-service:latest']
|
||||
id: 'push-ffmpeg-service-latest'
|
||||
waitFor: ['build-ffmpeg-service']
|
||||
|
||||
# =========================================================================
|
||||
# Deploy to Cloud Run
|
||||
# =========================================================================
|
||||
|
||||
# Deploy Whisper HTTP Service
|
||||
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
|
||||
entrypoint: 'gcloud'
|
||||
args:
|
||||
- 'run'
|
||||
- 'deploy'
|
||||
- 'whisper-http-service'
|
||||
- '--image=gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA'
|
||||
- '--region=$_REGION'
|
||||
- '--platform=managed'
|
||||
- '--no-allow-unauthenticated' # Internal only
|
||||
- '--port=8080'
|
||||
- '--memory=32Gi'
|
||||
- '--cpu=8'
|
||||
- '--min-instances=0'
|
||||
- '--max-instances=10'
|
||||
- '--concurrency=1'
|
||||
- '--timeout=300'
|
||||
- '--execution-environment=gen2'
|
||||
- '--no-cpu-throttling'
|
||||
- '--cpu-boost'
|
||||
- '--set-env-vars=APP_ENV=prod,PYTHONPATH=/app,WHISPER_MODEL=medium,GCS_BUCKET=$_GCS_BUCKET'
|
||||
- '--service-account=accessible-video-worker@$PROJECT_ID.iam.gserviceaccount.com'
|
||||
id: 'deploy-whisper-service'
|
||||
waitFor: ['push-whisper-service-sha']
|
||||
|
||||
# Deploy FFmpeg HTTP Service
|
||||
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
|
||||
entrypoint: 'gcloud'
|
||||
args:
|
||||
- 'run'
|
||||
- 'deploy'
|
||||
- 'ffmpeg-http-service'
|
||||
- '--image=gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA'
|
||||
- '--region=$_REGION'
|
||||
- '--platform=managed'
|
||||
- '--no-allow-unauthenticated' # Internal only
|
||||
- '--port=8080'
|
||||
- '--memory=32Gi'
|
||||
- '--cpu=8'
|
||||
- '--min-instances=0'
|
||||
- '--max-instances=20'
|
||||
- '--concurrency=1'
|
||||
- '--timeout=600'
|
||||
- '--execution-environment=gen2'
|
||||
- '--no-cpu-throttling'
|
||||
- '--cpu-boost'
|
||||
- '--set-env-vars=APP_ENV=prod,PYTHONPATH=/app,GCS_BUCKET=$_GCS_BUCKET'
|
||||
- '--service-account=accessible-video-worker@$PROJECT_ID.iam.gserviceaccount.com'
|
||||
id: 'deploy-ffmpeg-service'
|
||||
waitFor: ['push-ffmpeg-service-sha']
|
||||
|
||||
# =========================================================================
|
||||
# Output Service URLs
|
||||
# =========================================================================
|
||||
|
||||
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
|
||||
entrypoint: 'bash'
|
||||
args:
|
||||
- '-c'
|
||||
- |
|
||||
echo "=============================================="
|
||||
echo "HTTP Services Deployed Successfully!"
|
||||
echo "=============================================="
|
||||
echo ""
|
||||
echo "Whisper Service:"
|
||||
gcloud run services describe whisper-http-service --region=$_REGION --format='value(status.url)'
|
||||
echo ""
|
||||
echo "FFmpeg Service:"
|
||||
gcloud run services describe ffmpeg-http-service --region=$_REGION --format='value(status.url)'
|
||||
echo ""
|
||||
echo "=============================================="
|
||||
echo "To enable Cloud Run usage, set these environment variables:"
|
||||
echo " WHISPER_SERVICE_URL=<whisper-service-url>"
|
||||
echo " FFMPEG_SERVICE_URL=<ffmpeg-service-url>"
|
||||
echo "=============================================="
|
||||
id: 'output-urls'
|
||||
waitFor: ['deploy-whisper-service', 'deploy-ffmpeg-service']
|
||||
|
||||
substitutions:
|
||||
_REGION: 'us-central1'
|
||||
_GCS_BUCKET: 'accessible-video'
|
||||
|
||||
options:
|
||||
machineType: 'E2_HIGHCPU_8'
|
||||
diskSizeGb: '100'
|
||||
dynamic_substitutions: true
|
||||
|
||||
timeout: '2400s' # 40 minutes (Whisper image build takes time due to model download)
|
||||
|
||||
# Images to be pushed (for Cloud Build to track)
|
||||
images:
|
||||
- 'gcr.io/$PROJECT_ID/whisper-http-service:$COMMIT_SHA'
|
||||
- 'gcr.io/$PROJECT_ID/whisper-http-service:latest'
|
||||
- 'gcr.io/$PROJECT_ID/ffmpeg-http-service:$COMMIT_SHA'
|
||||
- 'gcr.io/$PROJECT_ID/ffmpeg-http-service:latest'
|
||||
125
infra/cloud-run/ffmpeg-http-service.yaml
Normal file
125
infra/cloud-run/ffmpeg-http-service.yaml
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
# =============================================================================
|
||||
# Cloud Run Service: FFmpeg HTTP Service
|
||||
# =============================================================================
|
||||
# Autoscaling FFmpeg processing service for Cloud Run deployment.
|
||||
# This service handles CPU-intensive video encoding via HTTP endpoints.
|
||||
#
|
||||
# Key features:
|
||||
# - Scale to zero when idle (pay only for compute time used)
|
||||
# - Up to 20 instances for parallel video processing
|
||||
# - 8 vCPU / 32GB RAM for fast encoding
|
||||
# - Startup CPU boost for faster cold starts
|
||||
# - Faster startup than Whisper (no model loading)
|
||||
# =============================================================================
|
||||
|
||||
apiVersion: serving.knative.dev/v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: ffmpeg-http-service
|
||||
annotations:
|
||||
run.googleapis.com/ingress: internal # Only accessible from within GCP
|
||||
run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU
|
||||
spec:
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
# Autoscaling configuration
|
||||
autoscaling.knative.dev/minScale: "0" # Scale to zero when idle
|
||||
autoscaling.knative.dev/maxScale: "20" # Max 20 concurrent instances
|
||||
|
||||
# Cloud Run Gen2 features
|
||||
run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU
|
||||
run.googleapis.com/cpu-throttling: "false" # Always-on CPU during requests
|
||||
run.googleapis.com/startup-cpu-boost: "true" # Faster cold start
|
||||
|
||||
spec:
|
||||
# Only 1 FFmpeg operation at a time per instance (CPU-intensive)
|
||||
containerConcurrency: 1
|
||||
|
||||
# 10-minute timeout for long encoding operations
|
||||
timeoutSeconds: 600
|
||||
|
||||
serviceAccountName: accessible-video-worker@PROJECT_ID.iam.gserviceaccount.com
|
||||
|
||||
containers:
|
||||
- image: gcr.io/PROJECT_ID/ffmpeg-http-service:latest
|
||||
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
|
||||
env:
|
||||
- name: APP_ENV
|
||||
value: "prod"
|
||||
- name: PYTHONPATH
|
||||
value: "/app"
|
||||
- name: PYTHONUNBUFFERED
|
||||
value: "1"
|
||||
- name: PYTHONDONTWRITEBYTECODE
|
||||
value: "1"
|
||||
|
||||
# GCP Configuration
|
||||
- name: GCP_PROJECT_ID
|
||||
value: "PROJECT_ID"
|
||||
- name: GCS_BUCKET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: gcs-bucket-name
|
||||
key: latest
|
||||
|
||||
# MongoDB for job tracking (optional, for logging)
|
||||
- name: MONGODB_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: mongodb-url
|
||||
key: latest
|
||||
|
||||
# OpenTelemetry configuration
|
||||
- name: OTEL_SERVICE_NAME
|
||||
value: "ffmpeg-http-service"
|
||||
- name: OTEL_SERVICE_VERSION
|
||||
value: "1.0.0"
|
||||
- name: OTEL_TRACES_EXPORTER
|
||||
value: "gcp_trace"
|
||||
|
||||
# Sentry configuration (optional)
|
||||
- name: SENTRY_DSN
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: sentry-dsn
|
||||
key: latest
|
||||
- name: SENTRY_ENVIRONMENT
|
||||
value: "production"
|
||||
|
||||
resources:
|
||||
limits:
|
||||
memory: "32Gi"
|
||||
cpu: "8000m" # 8 vCPU
|
||||
requests:
|
||||
memory: "4Gi"
|
||||
cpu: "2000m" # 2 vCPU minimum
|
||||
|
||||
# Health checks
|
||||
startupProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8080
|
||||
initialDelaySeconds: 5 # FFmpeg starts fast (no model to load)
|
||||
periodSeconds: 5
|
||||
timeoutSeconds: 5
|
||||
failureThreshold: 6
|
||||
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8080
|
||||
initialDelaySeconds: 15
|
||||
periodSeconds: 30
|
||||
timeoutSeconds: 10
|
||||
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8080
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 5
|
||||
128
infra/cloud-run/whisper-http-service.yaml
Normal file
128
infra/cloud-run/whisper-http-service.yaml
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
# =============================================================================
|
||||
# Cloud Run Service: Whisper HTTP Service
|
||||
# =============================================================================
|
||||
# Autoscaling Whisper transcription service for Cloud Run deployment.
|
||||
# This service handles CPU-intensive Whisper transcription via HTTP endpoints.
|
||||
#
|
||||
# Key features:
|
||||
# - Scale to zero when idle (pay only for compute time used)
|
||||
# - Up to 10 instances for parallel transcription
|
||||
# - 8 vCPU / 32GB RAM for fast transcription
|
||||
# - Startup CPU boost for faster cold starts
|
||||
# =============================================================================
|
||||
|
||||
apiVersion: serving.knative.dev/v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: whisper-http-service
|
||||
annotations:
|
||||
run.googleapis.com/ingress: internal # Only accessible from within GCP
|
||||
run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU
|
||||
spec:
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
# Autoscaling configuration
|
||||
autoscaling.knative.dev/minScale: "0" # Scale to zero when idle
|
||||
autoscaling.knative.dev/maxScale: "10" # Max 10 concurrent instances
|
||||
|
||||
# Cloud Run Gen2 features
|
||||
run.googleapis.com/execution-environment: gen2 # Required for 8 vCPU
|
||||
run.googleapis.com/cpu-throttling: "false" # Always-on CPU during requests
|
||||
run.googleapis.com/startup-cpu-boost: "true" # Faster cold start
|
||||
|
||||
spec:
|
||||
# Only 1 transcription at a time per instance (Whisper is CPU-intensive)
|
||||
containerConcurrency: 1
|
||||
|
||||
# 5-minute timeout for long transcriptions
|
||||
timeoutSeconds: 300
|
||||
|
||||
serviceAccountName: accessible-video-worker@PROJECT_ID.iam.gserviceaccount.com
|
||||
|
||||
containers:
|
||||
- image: gcr.io/PROJECT_ID/whisper-http-service:latest
|
||||
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
|
||||
env:
|
||||
- name: APP_ENV
|
||||
value: "prod"
|
||||
- name: PYTHONPATH
|
||||
value: "/app"
|
||||
- name: PYTHONUNBUFFERED
|
||||
value: "1"
|
||||
- name: PYTHONDONTWRITEBYTECODE
|
||||
value: "1"
|
||||
|
||||
# GCP Configuration
|
||||
- name: GCP_PROJECT_ID
|
||||
value: "PROJECT_ID"
|
||||
- name: GCS_BUCKET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: gcs-bucket-name
|
||||
key: latest
|
||||
|
||||
# MongoDB for job tracking (optional, for logging)
|
||||
- name: MONGODB_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: mongodb-url
|
||||
key: latest
|
||||
|
||||
# Whisper Configuration
|
||||
- name: WHISPER_MODEL
|
||||
value: "medium"
|
||||
|
||||
# OpenTelemetry configuration
|
||||
- name: OTEL_SERVICE_NAME
|
||||
value: "whisper-http-service"
|
||||
- name: OTEL_SERVICE_VERSION
|
||||
value: "1.0.0"
|
||||
- name: OTEL_TRACES_EXPORTER
|
||||
value: "gcp_trace"
|
||||
|
||||
# Sentry configuration (optional)
|
||||
- name: SENTRY_DSN
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: sentry-dsn
|
||||
key: latest
|
||||
- name: SENTRY_ENVIRONMENT
|
||||
value: "production"
|
||||
|
||||
resources:
|
||||
limits:
|
||||
memory: "32Gi"
|
||||
cpu: "8000m" # 8 vCPU
|
||||
requests:
|
||||
memory: "8Gi"
|
||||
cpu: "4000m" # 4 vCPU minimum
|
||||
|
||||
# Health checks
|
||||
startupProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8080
|
||||
initialDelaySeconds: 30 # Wait for Whisper model to load
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 5
|
||||
failureThreshold: 6
|
||||
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8080
|
||||
initialDelaySeconds: 60
|
||||
periodSeconds: 30
|
||||
timeoutSeconds: 10
|
||||
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8080
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 5
|
||||
Loading…
Add table
Reference in a new issue