video-accessibility/backend/app/services/ffmpeg_http_service.py
michael 79440929f4 feat: add Cloud Run HTTP services for Whisper and FFmpeg
Migrate CPU-intensive workloads to Cloud Run for autoscaling:

- Add Whisper HTTP service (FastAPI) with /transcribe endpoint
- Add FFmpeg HTTP service (FastAPI) with /encode, /probe, /extract-frame, etc.
- Add Dockerfiles for both services (8 vCPU, 32GB RAM, Gen2)
- Add Cloud Build config for CI/CD deployment
- Add Cloud Run service YAML configs with scale-to-zero
- Update whisper_transcribe.py to call Cloud Run when WHISPER_SERVICE_URL set
- Update video_renderer.py to call Cloud Run when FFMPEG_SERVICE_URL set
- Update whisper_service.py for Cloud Run compatibility (no settings dependency)
- Add ffmpeg_service_url and whisper_service_url to config.py

Services scale 0→N based on request load, falling back to local
execution when service URLs are not configured (hybrid mode).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 10:12:50 -06:00

539 lines
18 KiB
Python

"""
FFmpeg HTTP Service - FastAPI application for Cloud Run deployment.
This service exposes FFmpeg operations as HTTP endpoints, allowing
the main application to offload CPU-intensive video encoding to Cloud Run
with autoscaling.
This module uses minimal configuration to avoid importing the full app Settings.
"""
import json
import logging
import os
import subprocess
import tempfile
import uuid
from typing import Any, Optional
from fastapi import FastAPI, HTTPException
from google.cloud import storage
from pydantic import BaseModel, Field
# Minimal configuration for Cloud Run deployment
# These are read directly from environment variables
GCS_BUCKET = os.environ.get("GCS_BUCKET", "")
# Simple logging setup (no dependency on app.core.logging)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# FastAPI app for Cloud Run
app = FastAPI(
title="FFmpeg Processing Service",
description="Cloud Run service for FFmpeg video processing operations",
version="1.0.0",
)
# Request/Response Models
class RunFFmpegRequest(BaseModel):
"""Request model for running an FFmpeg command."""
input_gcs_uris: list[str] = Field(
...,
description="GCS URIs of input files (will be downloaded and mapped to local paths)"
)
output_gcs_uri: str = Field(
...,
description="GCS URI where the output file should be uploaded"
)
command_template: list[str] = Field(
...,
description="FFmpeg command template with {input_0}, {input_1}, etc. and {output} placeholders"
)
timeout: int = Field(
default=3600,
description="Command timeout in seconds"
)
class RunFFmpegResponse(BaseModel):
"""Response model for FFmpeg command execution."""
success: bool
output_gcs_uri: str
stderr: str = ""
duration_seconds: float = 0.0
class ProbeRequest(BaseModel):
"""Request model for probing video properties."""
gcs_uri: str = Field(
...,
description="GCS URI of the video file to probe"
)
class ProbeResponse(BaseModel):
"""Response model for video probe."""
width: int
height: int
fps: float
duration: float
sample_rate: int
channels: int
codec_name: str
pix_fmt: str
class EncodeSegmentRequest(BaseModel):
"""Request for encoding a video segment."""
source_gcs_uri: str
output_gcs_uri: str
start_time: float
duration: float
fps: float = 30.0
pix_fmt: str = "yuv420p"
sample_rate: int = 44100
channels: int = 2
class ExtractFrameRequest(BaseModel):
"""Request for extracting a single frame."""
source_gcs_uri: str
output_gcs_uri: str
time_point: float
class CreateFreezeSegmentRequest(BaseModel):
"""Request for creating a freeze-frame video with audio."""
frame_gcs_uri: str
audio_gcs_uri: str
output_gcs_uri: str
width: int
height: int
fps: float = 30.0
pix_fmt: str = "yuv420p"
sample_rate: int = 44100
channels: int = 2
class ConcatenateRequest(BaseModel):
"""Request for concatenating video segments."""
segment_gcs_uris: list[str]
output_gcs_uri: str
class HealthResponse(BaseModel):
"""Health check response."""
status: str
ffmpeg_version: str
def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]:
"""Parse GCS URI into bucket and blob path."""
if not gcs_uri.startswith("gs://"):
raise ValueError(f"Invalid GCS URI: {gcs_uri}")
parts = gcs_uri[5:].split("/", 1)
if len(parts) != 2:
raise ValueError(f"Invalid GCS URI format: {gcs_uri}")
return parts[0], parts[1]
def _download_from_gcs(gcs_uri: str, local_path: str) -> None:
"""Download file from GCS to local path."""
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
client = storage.Client() # Auto-detects project from environment
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
if not blob.exists():
raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}")
blob.download_to_filename(local_path)
logger.debug(f"Downloaded {gcs_uri} to {local_path}")
def _upload_to_gcs(local_path: str, gcs_uri: str) -> None:
"""Upload local file to GCS."""
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
client = storage.Client() # Auto-detects project from environment
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
blob.upload_from_filename(local_path)
logger.debug(f"Uploaded {local_path} to {gcs_uri}")
def _run_command(cmd: list[str], timeout: int = 3600) -> tuple[bool, str, str]:
"""Run a command and return success, stdout, stderr."""
cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '')
logger.info(f"Executing: {cmd_preview}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
return result.returncode == 0, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return False, "", f"Command timed out after {timeout}s"
except Exception as e:
return False, "", str(e)
def _get_ffmpeg_version() -> str:
"""Get FFmpeg version string."""
try:
result = subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
text=True,
timeout=5
)
first_line = result.stdout.split('\n')[0]
return first_line
except Exception:
return "unknown"
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
return HealthResponse(
status="healthy",
ffmpeg_version=_get_ffmpeg_version()
)
@app.post("/run-ffmpeg", response_model=RunFFmpegResponse)
async def run_ffmpeg(request: RunFFmpegRequest):
"""
Run a generic FFmpeg command with GCS input/output.
The command_template should use placeholders:
- {input_0}, {input_1}, etc. for input files
- {output} for the output file
Example:
{
"input_gcs_uris": ["gs://bucket/video.mp4"],
"output_gcs_uri": "gs://bucket/output.mp4",
"command_template": ["ffmpeg", "-y", "-i", "{input_0}", "-c:v", "libx264", "{output}"]
}
"""
import time
start_time = time.time()
with tempfile.TemporaryDirectory() as tmpdir:
try:
# Download input files
input_paths = []
for i, gcs_uri in enumerate(request.input_gcs_uris):
ext = os.path.splitext(gcs_uri)[1] or ".bin"
local_path = os.path.join(tmpdir, f"input_{i}{ext}")
_download_from_gcs(gcs_uri, local_path)
input_paths.append(local_path)
# Prepare output path
output_ext = os.path.splitext(request.output_gcs_uri)[1] or ".mp4"
output_path = os.path.join(tmpdir, f"output{output_ext}")
# Build command from template
cmd = []
for part in request.command_template:
if part == "{output}":
cmd.append(output_path)
elif part.startswith("{input_") and part.endswith("}"):
idx = int(part[7:-1])
if idx < len(input_paths):
cmd.append(input_paths[idx])
else:
raise HTTPException(status_code=400, detail=f"Invalid input index: {idx}")
else:
cmd.append(part)
# Run FFmpeg
success, stdout, stderr = _run_command(cmd, request.timeout)
if not success:
logger.error(f"FFmpeg failed: {stderr[-500:]}")
raise HTTPException(status_code=500, detail=f"FFmpeg failed: {stderr[-500:]}")
# Upload output
_upload_to_gcs(output_path, request.output_gcs_uri)
duration = time.time() - start_time
logger.info(f"FFmpeg completed in {duration:.2f}s")
return RunFFmpegResponse(
success=True,
output_gcs_uri=request.output_gcs_uri,
stderr=stderr[-200:] if stderr else "",
duration_seconds=duration
)
except HTTPException:
raise
except Exception as e:
logger.error(f"FFmpeg operation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/probe", response_model=ProbeResponse)
async def probe_video(request: ProbeRequest):
"""
Get video properties using FFprobe.
"""
with tempfile.TemporaryDirectory() as tmpdir:
try:
# Download video
ext = os.path.splitext(request.gcs_uri)[1] or ".mp4"
local_path = os.path.join(tmpdir, f"video{ext}")
_download_from_gcs(request.gcs_uri, local_path)
# Run ffprobe
cmd = [
"ffprobe", "-v", "quiet",
"-show_streams", "-of", "json",
local_path
]
success, stdout, stderr = _run_command(cmd, timeout=60)
if not success:
raise HTTPException(status_code=500, detail=f"FFprobe failed: {stderr}")
# Parse output
data = json.loads(stdout)
streams = data.get("streams", [])
video_stream = next((s for s in streams if s.get("codec_type") == "video"), {})
audio_stream = next((s for s in streams if s.get("codec_type") == "audio"), {})
# Parse FPS
fps = 30.0
if "r_frame_rate" in video_stream:
fps_parts = video_stream["r_frame_rate"].split("/")
if len(fps_parts) == 2 and int(fps_parts[1]) != 0:
fps = int(fps_parts[0]) / int(fps_parts[1])
return ProbeResponse(
width=int(video_stream.get("width", 1920)),
height=int(video_stream.get("height", 1080)),
fps=fps,
duration=float(video_stream.get("duration", 0) or audio_stream.get("duration", 0)),
sample_rate=int(audio_stream.get("sample_rate", 44100)),
channels=int(audio_stream.get("channels", 2)),
codec_name=video_stream.get("codec_name", "h264"),
pix_fmt=video_stream.get("pix_fmt", "yuv420p")
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Probe failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/encode-segment", response_model=RunFFmpegResponse)
async def encode_segment(request: EncodeSegmentRequest):
"""
Extract and encode a video segment with re-encoding for frame accuracy.
"""
import time
start_time = time.time()
with tempfile.TemporaryDirectory() as tmpdir:
try:
# Download source
ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4"
source_path = os.path.join(tmpdir, f"source{ext}")
_download_from_gcs(request.source_gcs_uri, source_path)
# Output path
output_path = os.path.join(tmpdir, "segment.mp4")
# Build FFmpeg command
cmd = [
"ffmpeg", "-y",
"-ss", str(request.start_time),
"-i", source_path,
"-t", str(request.duration),
"-c:v", "libx264", "-preset", "fast",
"-pix_fmt", request.pix_fmt,
"-r", str(request.fps),
"-c:a", "aac", "-ar", str(request.sample_rate),
"-ac", str(request.channels), "-b:a", "192k",
"-video_track_timescale", "90000",
output_path
]
success, stdout, stderr = _run_command(cmd)
if not success:
raise HTTPException(status_code=500, detail=f"Encode failed: {stderr[-500:]}")
_upload_to_gcs(output_path, request.output_gcs_uri)
return RunFFmpegResponse(
success=True,
output_gcs_uri=request.output_gcs_uri,
duration_seconds=time.time() - start_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Encode segment failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/extract-frame", response_model=RunFFmpegResponse)
async def extract_frame(request: ExtractFrameRequest):
"""
Extract a single frame from a video as PNG.
"""
import time
start_time = time.time()
with tempfile.TemporaryDirectory() as tmpdir:
try:
# Download source
ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4"
source_path = os.path.join(tmpdir, f"source{ext}")
_download_from_gcs(request.source_gcs_uri, source_path)
output_path = os.path.join(tmpdir, "frame.png")
cmd = [
"ffmpeg", "-y",
"-ss", str(request.time_point),
"-i", source_path,
"-frames:v", "1",
"-q:v", "2",
output_path
]
success, stdout, stderr = _run_command(cmd, timeout=60)
if not success:
raise HTTPException(status_code=500, detail=f"Frame extraction failed: {stderr[-500:]}")
_upload_to_gcs(output_path, request.output_gcs_uri)
return RunFFmpegResponse(
success=True,
output_gcs_uri=request.output_gcs_uri,
duration_seconds=time.time() - start_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Extract frame failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/create-freeze-segment", response_model=RunFFmpegResponse)
async def create_freeze_segment(request: CreateFreezeSegmentRequest):
"""
Create a freeze-frame video segment with audio overlay.
"""
import time
start_time = time.time()
with tempfile.TemporaryDirectory() as tmpdir:
try:
# Download inputs
frame_path = os.path.join(tmpdir, "frame.png")
audio_path = os.path.join(tmpdir, "audio.mp3")
_download_from_gcs(request.frame_gcs_uri, frame_path)
_download_from_gcs(request.audio_gcs_uri, audio_path)
output_path = os.path.join(tmpdir, "freeze.mp4")
# Build FFmpeg command for freeze frame with audio
cmd = [
"ffmpeg", "-y",
"-loop", "1", "-i", frame_path,
"-i", audio_path,
"-c:v", "libx264", "-preset", "fast",
"-tune", "stillimage",
"-pix_fmt", request.pix_fmt,
"-r", str(request.fps),
"-vf", f"scale={request.width}:{request.height}:force_original_aspect_ratio=decrease,"
f"pad={request.width}:{request.height}:(ow-iw)/2:(oh-ih)/2",
"-c:a", "aac", "-ar", str(request.sample_rate),
"-ac", str(request.channels), "-b:a", "192k",
"-video_track_timescale", "90000",
"-shortest",
output_path
]
success, stdout, stderr = _run_command(cmd)
if not success:
raise HTTPException(status_code=500, detail=f"Freeze segment failed: {stderr[-500:]}")
_upload_to_gcs(output_path, request.output_gcs_uri)
return RunFFmpegResponse(
success=True,
output_gcs_uri=request.output_gcs_uri,
duration_seconds=time.time() - start_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Create freeze segment failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/concatenate", response_model=RunFFmpegResponse)
async def concatenate_segments(request: ConcatenateRequest):
"""
Concatenate multiple video segments using concat demuxer.
"""
import time
start_time = time.time()
with tempfile.TemporaryDirectory() as tmpdir:
try:
# Download all segments
segment_paths = []
for i, gcs_uri in enumerate(request.segment_gcs_uris):
ext = os.path.splitext(gcs_uri)[1] or ".mp4"
local_path = os.path.join(tmpdir, f"segment_{i}{ext}")
_download_from_gcs(gcs_uri, local_path)
segment_paths.append(local_path)
# Create concat file
concat_file = os.path.join(tmpdir, "concat.txt")
with open(concat_file, "w") as f:
for path in segment_paths:
f.write(f"file '{path}'\n")
output_path = os.path.join(tmpdir, "output.mp4")
cmd = [
"ffmpeg", "-y",
"-f", "concat",
"-safe", "0",
"-i", concat_file,
"-c", "copy",
output_path
]
success, stdout, stderr = _run_command(cmd)
if not success:
raise HTTPException(status_code=500, detail=f"Concatenation failed: {stderr[-500:]}")
_upload_to_gcs(output_path, request.output_gcs_uri)
return RunFFmpegResponse(
success=True,
output_gcs_uri=request.output_gcs_uri,
duration_seconds=time.time() - start_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Concatenate failed: {e}")
raise HTTPException(status_code=500, detail=str(e))