Migrate CPU-intensive workloads to Cloud Run for autoscaling: - Add Whisper HTTP service (FastAPI) with /transcribe endpoint - Add FFmpeg HTTP service (FastAPI) with /encode, /probe, /extract-frame, etc. - Add Dockerfiles for both services (8 vCPU, 32GB RAM, Gen2) - Add Cloud Build config for CI/CD deployment - Add Cloud Run service YAML configs with scale-to-zero - Update whisper_transcribe.py to call Cloud Run when WHISPER_SERVICE_URL set - Update video_renderer.py to call Cloud Run when FFMPEG_SERVICE_URL set - Update whisper_service.py for Cloud Run compatibility (no settings dependency) - Add ffmpeg_service_url and whisper_service_url to config.py Services scale 0→N based on request load, falling back to local execution when service URLs are not configured (hybrid mode). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
539 lines
18 KiB
Python
539 lines
18 KiB
Python
"""
|
|
FFmpeg HTTP Service - FastAPI application for Cloud Run deployment.
|
|
|
|
This service exposes FFmpeg operations as HTTP endpoints, allowing
|
|
the main application to offload CPU-intensive video encoding to Cloud Run
|
|
with autoscaling.
|
|
|
|
This module uses minimal configuration to avoid importing the full app Settings.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import uuid
|
|
from typing import Any, Optional
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from google.cloud import storage
|
|
from pydantic import BaseModel, Field
|
|
|
|
# Minimal configuration for Cloud Run deployment
|
|
# These are read directly from environment variables
|
|
GCS_BUCKET = os.environ.get("GCS_BUCKET", "")
|
|
|
|
# Simple logging setup (no dependency on app.core.logging)
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# FastAPI app for Cloud Run
|
|
app = FastAPI(
|
|
title="FFmpeg Processing Service",
|
|
description="Cloud Run service for FFmpeg video processing operations",
|
|
version="1.0.0",
|
|
)
|
|
|
|
|
|
# Request/Response Models
|
|
class RunFFmpegRequest(BaseModel):
|
|
"""Request model for running an FFmpeg command."""
|
|
input_gcs_uris: list[str] = Field(
|
|
...,
|
|
description="GCS URIs of input files (will be downloaded and mapped to local paths)"
|
|
)
|
|
output_gcs_uri: str = Field(
|
|
...,
|
|
description="GCS URI where the output file should be uploaded"
|
|
)
|
|
command_template: list[str] = Field(
|
|
...,
|
|
description="FFmpeg command template with {input_0}, {input_1}, etc. and {output} placeholders"
|
|
)
|
|
timeout: int = Field(
|
|
default=3600,
|
|
description="Command timeout in seconds"
|
|
)
|
|
|
|
|
|
class RunFFmpegResponse(BaseModel):
|
|
"""Response model for FFmpeg command execution."""
|
|
success: bool
|
|
output_gcs_uri: str
|
|
stderr: str = ""
|
|
duration_seconds: float = 0.0
|
|
|
|
|
|
class ProbeRequest(BaseModel):
|
|
"""Request model for probing video properties."""
|
|
gcs_uri: str = Field(
|
|
...,
|
|
description="GCS URI of the video file to probe"
|
|
)
|
|
|
|
|
|
class ProbeResponse(BaseModel):
|
|
"""Response model for video probe."""
|
|
width: int
|
|
height: int
|
|
fps: float
|
|
duration: float
|
|
sample_rate: int
|
|
channels: int
|
|
codec_name: str
|
|
pix_fmt: str
|
|
|
|
|
|
class EncodeSegmentRequest(BaseModel):
|
|
"""Request for encoding a video segment."""
|
|
source_gcs_uri: str
|
|
output_gcs_uri: str
|
|
start_time: float
|
|
duration: float
|
|
fps: float = 30.0
|
|
pix_fmt: str = "yuv420p"
|
|
sample_rate: int = 44100
|
|
channels: int = 2
|
|
|
|
|
|
class ExtractFrameRequest(BaseModel):
|
|
"""Request for extracting a single frame."""
|
|
source_gcs_uri: str
|
|
output_gcs_uri: str
|
|
time_point: float
|
|
|
|
|
|
class CreateFreezeSegmentRequest(BaseModel):
|
|
"""Request for creating a freeze-frame video with audio."""
|
|
frame_gcs_uri: str
|
|
audio_gcs_uri: str
|
|
output_gcs_uri: str
|
|
width: int
|
|
height: int
|
|
fps: float = 30.0
|
|
pix_fmt: str = "yuv420p"
|
|
sample_rate: int = 44100
|
|
channels: int = 2
|
|
|
|
|
|
class ConcatenateRequest(BaseModel):
|
|
"""Request for concatenating video segments."""
|
|
segment_gcs_uris: list[str]
|
|
output_gcs_uri: str
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response."""
|
|
status: str
|
|
ffmpeg_version: str
|
|
|
|
|
|
def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]:
|
|
"""Parse GCS URI into bucket and blob path."""
|
|
if not gcs_uri.startswith("gs://"):
|
|
raise ValueError(f"Invalid GCS URI: {gcs_uri}")
|
|
parts = gcs_uri[5:].split("/", 1)
|
|
if len(parts) != 2:
|
|
raise ValueError(f"Invalid GCS URI format: {gcs_uri}")
|
|
return parts[0], parts[1]
|
|
|
|
|
|
def _download_from_gcs(gcs_uri: str, local_path: str) -> None:
|
|
"""Download file from GCS to local path."""
|
|
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
|
|
client = storage.Client() # Auto-detects project from environment
|
|
bucket = client.bucket(bucket_name)
|
|
blob = bucket.blob(blob_path)
|
|
if not blob.exists():
|
|
raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}")
|
|
blob.download_to_filename(local_path)
|
|
logger.debug(f"Downloaded {gcs_uri} to {local_path}")
|
|
|
|
|
|
def _upload_to_gcs(local_path: str, gcs_uri: str) -> None:
|
|
"""Upload local file to GCS."""
|
|
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
|
|
client = storage.Client() # Auto-detects project from environment
|
|
bucket = client.bucket(bucket_name)
|
|
blob = bucket.blob(blob_path)
|
|
blob.upload_from_filename(local_path)
|
|
logger.debug(f"Uploaded {local_path} to {gcs_uri}")
|
|
|
|
|
|
def _run_command(cmd: list[str], timeout: int = 3600) -> tuple[bool, str, str]:
|
|
"""Run a command and return success, stdout, stderr."""
|
|
cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '')
|
|
logger.info(f"Executing: {cmd_preview}")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
return result.returncode == 0, result.stdout, result.stderr
|
|
except subprocess.TimeoutExpired:
|
|
return False, "", f"Command timed out after {timeout}s"
|
|
except Exception as e:
|
|
return False, "", str(e)
|
|
|
|
|
|
def _get_ffmpeg_version() -> str:
|
|
"""Get FFmpeg version string."""
|
|
try:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
first_line = result.stdout.split('\n')[0]
|
|
return first_line
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse)
|
|
async def health_check():
|
|
"""Health check endpoint."""
|
|
return HealthResponse(
|
|
status="healthy",
|
|
ffmpeg_version=_get_ffmpeg_version()
|
|
)
|
|
|
|
|
|
@app.post("/run-ffmpeg", response_model=RunFFmpegResponse)
|
|
async def run_ffmpeg(request: RunFFmpegRequest):
|
|
"""
|
|
Run a generic FFmpeg command with GCS input/output.
|
|
|
|
The command_template should use placeholders:
|
|
- {input_0}, {input_1}, etc. for input files
|
|
- {output} for the output file
|
|
|
|
Example:
|
|
{
|
|
"input_gcs_uris": ["gs://bucket/video.mp4"],
|
|
"output_gcs_uri": "gs://bucket/output.mp4",
|
|
"command_template": ["ffmpeg", "-y", "-i", "{input_0}", "-c:v", "libx264", "{output}"]
|
|
}
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
try:
|
|
# Download input files
|
|
input_paths = []
|
|
for i, gcs_uri in enumerate(request.input_gcs_uris):
|
|
ext = os.path.splitext(gcs_uri)[1] or ".bin"
|
|
local_path = os.path.join(tmpdir, f"input_{i}{ext}")
|
|
_download_from_gcs(gcs_uri, local_path)
|
|
input_paths.append(local_path)
|
|
|
|
# Prepare output path
|
|
output_ext = os.path.splitext(request.output_gcs_uri)[1] or ".mp4"
|
|
output_path = os.path.join(tmpdir, f"output{output_ext}")
|
|
|
|
# Build command from template
|
|
cmd = []
|
|
for part in request.command_template:
|
|
if part == "{output}":
|
|
cmd.append(output_path)
|
|
elif part.startswith("{input_") and part.endswith("}"):
|
|
idx = int(part[7:-1])
|
|
if idx < len(input_paths):
|
|
cmd.append(input_paths[idx])
|
|
else:
|
|
raise HTTPException(status_code=400, detail=f"Invalid input index: {idx}")
|
|
else:
|
|
cmd.append(part)
|
|
|
|
# Run FFmpeg
|
|
success, stdout, stderr = _run_command(cmd, request.timeout)
|
|
|
|
if not success:
|
|
logger.error(f"FFmpeg failed: {stderr[-500:]}")
|
|
raise HTTPException(status_code=500, detail=f"FFmpeg failed: {stderr[-500:]}")
|
|
|
|
# Upload output
|
|
_upload_to_gcs(output_path, request.output_gcs_uri)
|
|
|
|
duration = time.time() - start_time
|
|
logger.info(f"FFmpeg completed in {duration:.2f}s")
|
|
|
|
return RunFFmpegResponse(
|
|
success=True,
|
|
output_gcs_uri=request.output_gcs_uri,
|
|
stderr=stderr[-200:] if stderr else "",
|
|
duration_seconds=duration
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"FFmpeg operation failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/probe", response_model=ProbeResponse)
|
|
async def probe_video(request: ProbeRequest):
|
|
"""
|
|
Get video properties using FFprobe.
|
|
"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
try:
|
|
# Download video
|
|
ext = os.path.splitext(request.gcs_uri)[1] or ".mp4"
|
|
local_path = os.path.join(tmpdir, f"video{ext}")
|
|
_download_from_gcs(request.gcs_uri, local_path)
|
|
|
|
# Run ffprobe
|
|
cmd = [
|
|
"ffprobe", "-v", "quiet",
|
|
"-show_streams", "-of", "json",
|
|
local_path
|
|
]
|
|
success, stdout, stderr = _run_command(cmd, timeout=60)
|
|
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail=f"FFprobe failed: {stderr}")
|
|
|
|
# Parse output
|
|
data = json.loads(stdout)
|
|
streams = data.get("streams", [])
|
|
|
|
video_stream = next((s for s in streams if s.get("codec_type") == "video"), {})
|
|
audio_stream = next((s for s in streams if s.get("codec_type") == "audio"), {})
|
|
|
|
# Parse FPS
|
|
fps = 30.0
|
|
if "r_frame_rate" in video_stream:
|
|
fps_parts = video_stream["r_frame_rate"].split("/")
|
|
if len(fps_parts) == 2 and int(fps_parts[1]) != 0:
|
|
fps = int(fps_parts[0]) / int(fps_parts[1])
|
|
|
|
return ProbeResponse(
|
|
width=int(video_stream.get("width", 1920)),
|
|
height=int(video_stream.get("height", 1080)),
|
|
fps=fps,
|
|
duration=float(video_stream.get("duration", 0) or audio_stream.get("duration", 0)),
|
|
sample_rate=int(audio_stream.get("sample_rate", 44100)),
|
|
channels=int(audio_stream.get("channels", 2)),
|
|
codec_name=video_stream.get("codec_name", "h264"),
|
|
pix_fmt=video_stream.get("pix_fmt", "yuv420p")
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Probe failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/encode-segment", response_model=RunFFmpegResponse)
|
|
async def encode_segment(request: EncodeSegmentRequest):
|
|
"""
|
|
Extract and encode a video segment with re-encoding for frame accuracy.
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
try:
|
|
# Download source
|
|
ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4"
|
|
source_path = os.path.join(tmpdir, f"source{ext}")
|
|
_download_from_gcs(request.source_gcs_uri, source_path)
|
|
|
|
# Output path
|
|
output_path = os.path.join(tmpdir, "segment.mp4")
|
|
|
|
# Build FFmpeg command
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", str(request.start_time),
|
|
"-i", source_path,
|
|
"-t", str(request.duration),
|
|
"-c:v", "libx264", "-preset", "fast",
|
|
"-pix_fmt", request.pix_fmt,
|
|
"-r", str(request.fps),
|
|
"-c:a", "aac", "-ar", str(request.sample_rate),
|
|
"-ac", str(request.channels), "-b:a", "192k",
|
|
"-video_track_timescale", "90000",
|
|
output_path
|
|
]
|
|
|
|
success, stdout, stderr = _run_command(cmd)
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail=f"Encode failed: {stderr[-500:]}")
|
|
|
|
_upload_to_gcs(output_path, request.output_gcs_uri)
|
|
|
|
return RunFFmpegResponse(
|
|
success=True,
|
|
output_gcs_uri=request.output_gcs_uri,
|
|
duration_seconds=time.time() - start_time
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Encode segment failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/extract-frame", response_model=RunFFmpegResponse)
|
|
async def extract_frame(request: ExtractFrameRequest):
|
|
"""
|
|
Extract a single frame from a video as PNG.
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
try:
|
|
# Download source
|
|
ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4"
|
|
source_path = os.path.join(tmpdir, f"source{ext}")
|
|
_download_from_gcs(request.source_gcs_uri, source_path)
|
|
|
|
output_path = os.path.join(tmpdir, "frame.png")
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", str(request.time_point),
|
|
"-i", source_path,
|
|
"-frames:v", "1",
|
|
"-q:v", "2",
|
|
output_path
|
|
]
|
|
|
|
success, stdout, stderr = _run_command(cmd, timeout=60)
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail=f"Frame extraction failed: {stderr[-500:]}")
|
|
|
|
_upload_to_gcs(output_path, request.output_gcs_uri)
|
|
|
|
return RunFFmpegResponse(
|
|
success=True,
|
|
output_gcs_uri=request.output_gcs_uri,
|
|
duration_seconds=time.time() - start_time
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Extract frame failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/create-freeze-segment", response_model=RunFFmpegResponse)
|
|
async def create_freeze_segment(request: CreateFreezeSegmentRequest):
|
|
"""
|
|
Create a freeze-frame video segment with audio overlay.
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
try:
|
|
# Download inputs
|
|
frame_path = os.path.join(tmpdir, "frame.png")
|
|
audio_path = os.path.join(tmpdir, "audio.mp3")
|
|
_download_from_gcs(request.frame_gcs_uri, frame_path)
|
|
_download_from_gcs(request.audio_gcs_uri, audio_path)
|
|
|
|
output_path = os.path.join(tmpdir, "freeze.mp4")
|
|
|
|
# Build FFmpeg command for freeze frame with audio
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-loop", "1", "-i", frame_path,
|
|
"-i", audio_path,
|
|
"-c:v", "libx264", "-preset", "fast",
|
|
"-tune", "stillimage",
|
|
"-pix_fmt", request.pix_fmt,
|
|
"-r", str(request.fps),
|
|
"-vf", f"scale={request.width}:{request.height}:force_original_aspect_ratio=decrease,"
|
|
f"pad={request.width}:{request.height}:(ow-iw)/2:(oh-ih)/2",
|
|
"-c:a", "aac", "-ar", str(request.sample_rate),
|
|
"-ac", str(request.channels), "-b:a", "192k",
|
|
"-video_track_timescale", "90000",
|
|
"-shortest",
|
|
output_path
|
|
]
|
|
|
|
success, stdout, stderr = _run_command(cmd)
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail=f"Freeze segment failed: {stderr[-500:]}")
|
|
|
|
_upload_to_gcs(output_path, request.output_gcs_uri)
|
|
|
|
return RunFFmpegResponse(
|
|
success=True,
|
|
output_gcs_uri=request.output_gcs_uri,
|
|
duration_seconds=time.time() - start_time
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Create freeze segment failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/concatenate", response_model=RunFFmpegResponse)
|
|
async def concatenate_segments(request: ConcatenateRequest):
|
|
"""
|
|
Concatenate multiple video segments using concat demuxer.
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
try:
|
|
# Download all segments
|
|
segment_paths = []
|
|
for i, gcs_uri in enumerate(request.segment_gcs_uris):
|
|
ext = os.path.splitext(gcs_uri)[1] or ".mp4"
|
|
local_path = os.path.join(tmpdir, f"segment_{i}{ext}")
|
|
_download_from_gcs(gcs_uri, local_path)
|
|
segment_paths.append(local_path)
|
|
|
|
# Create concat file
|
|
concat_file = os.path.join(tmpdir, "concat.txt")
|
|
with open(concat_file, "w") as f:
|
|
for path in segment_paths:
|
|
f.write(f"file '{path}'\n")
|
|
|
|
output_path = os.path.join(tmpdir, "output.mp4")
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "concat",
|
|
"-safe", "0",
|
|
"-i", concat_file,
|
|
"-c", "copy",
|
|
output_path
|
|
]
|
|
|
|
success, stdout, stderr = _run_command(cmd)
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail=f"Concatenation failed: {stderr[-500:]}")
|
|
|
|
_upload_to_gcs(output_path, request.output_gcs_uri)
|
|
|
|
return RunFFmpegResponse(
|
|
success=True,
|
|
output_gcs_uri=request.output_gcs_uri,
|
|
duration_seconds=time.time() - start_time
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Concatenate failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|