""" FFmpeg HTTP Service - FastAPI application for Cloud Run deployment. This service exposes FFmpeg operations as HTTP endpoints, allowing the main application to offload CPU-intensive video encoding to Cloud Run with autoscaling. This module uses minimal configuration to avoid importing the full app Settings. """ import json import logging import os import subprocess import tempfile import uuid from typing import Any, Optional from fastapi import FastAPI, HTTPException from google.cloud import storage from pydantic import BaseModel, Field # Minimal configuration for Cloud Run deployment # These are read directly from environment variables GCS_BUCKET = os.environ.get("GCS_BUCKET", "") # Simple logging setup (no dependency on app.core.logging) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # FastAPI app for Cloud Run app = FastAPI( title="FFmpeg Processing Service", description="Cloud Run service for FFmpeg video processing operations", version="1.0.0", ) # Request/Response Models class RunFFmpegRequest(BaseModel): """Request model for running an FFmpeg command.""" input_gcs_uris: list[str] = Field( ..., description="GCS URIs of input files (will be downloaded and mapped to local paths)" ) output_gcs_uri: str = Field( ..., description="GCS URI where the output file should be uploaded" ) command_template: list[str] = Field( ..., description="FFmpeg command template with {input_0}, {input_1}, etc. and {output} placeholders" ) timeout: int = Field( default=3600, description="Command timeout in seconds" ) class RunFFmpegResponse(BaseModel): """Response model for FFmpeg command execution.""" success: bool output_gcs_uri: str stderr: str = "" duration_seconds: float = 0.0 class ProbeRequest(BaseModel): """Request model for probing video properties.""" gcs_uri: str = Field( ..., description="GCS URI of the video file to probe" ) class ProbeResponse(BaseModel): """Response model for video probe.""" width: int height: int fps: float duration: float sample_rate: int channels: int codec_name: str pix_fmt: str class EncodeSegmentRequest(BaseModel): """Request for encoding a video segment.""" source_gcs_uri: str output_gcs_uri: str start_time: float duration: float fps: float = 30.0 pix_fmt: str = "yuv420p" sample_rate: int = 44100 channels: int = 2 class ExtractFrameRequest(BaseModel): """Request for extracting a single frame.""" source_gcs_uri: str output_gcs_uri: str time_point: float class CreateFreezeSegmentRequest(BaseModel): """Request for creating a freeze-frame video with audio.""" frame_gcs_uri: str audio_gcs_uri: str output_gcs_uri: str width: int height: int fps: float = 30.0 pix_fmt: str = "yuv420p" sample_rate: int = 44100 channels: int = 2 class ConcatenateRequest(BaseModel): """Request for concatenating video segments.""" segment_gcs_uris: list[str] output_gcs_uri: str class HealthResponse(BaseModel): """Health check response.""" status: str ffmpeg_version: str def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]: """Parse GCS URI into bucket and blob path.""" if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}") parts = gcs_uri[5:].split("/", 1) if len(parts) != 2: raise ValueError(f"Invalid GCS URI format: {gcs_uri}") return parts[0], parts[1] def _download_from_gcs(gcs_uri: str, local_path: str) -> None: """Download file from GCS to local path.""" bucket_name, blob_path = _parse_gcs_uri(gcs_uri) client = storage.Client() # Auto-detects project from environment bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) if not blob.exists(): raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}") blob.download_to_filename(local_path) logger.debug(f"Downloaded {gcs_uri} to {local_path}") def _upload_to_gcs(local_path: str, gcs_uri: str) -> None: """Upload local file to GCS.""" bucket_name, blob_path = _parse_gcs_uri(gcs_uri) client = storage.Client() # Auto-detects project from environment bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) blob.upload_from_filename(local_path) logger.debug(f"Uploaded {local_path} to {gcs_uri}") def _run_command(cmd: list[str], timeout: int = 3600) -> tuple[bool, str, str]: """Run a command and return success, stdout, stderr.""" cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '') logger.info(f"Executing: {cmd_preview}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) return result.returncode == 0, result.stdout, result.stderr except subprocess.TimeoutExpired: return False, "", f"Command timed out after {timeout}s" except Exception as e: return False, "", str(e) def _get_ffmpeg_version() -> str: """Get FFmpeg version string.""" try: result = subprocess.run( ["ffmpeg", "-version"], capture_output=True, text=True, timeout=5 ) first_line = result.stdout.split('\n')[0] return first_line except Exception: return "unknown" @app.get("/health", response_model=HealthResponse) async def health_check(): """Health check endpoint.""" return HealthResponse( status="healthy", ffmpeg_version=_get_ffmpeg_version() ) @app.post("/run-ffmpeg", response_model=RunFFmpegResponse) async def run_ffmpeg(request: RunFFmpegRequest): """ Run a generic FFmpeg command with GCS input/output. The command_template should use placeholders: - {input_0}, {input_1}, etc. for input files - {output} for the output file Example: { "input_gcs_uris": ["gs://bucket/video.mp4"], "output_gcs_uri": "gs://bucket/output.mp4", "command_template": ["ffmpeg", "-y", "-i", "{input_0}", "-c:v", "libx264", "{output}"] } """ import time start_time = time.time() with tempfile.TemporaryDirectory() as tmpdir: try: # Download input files input_paths = [] for i, gcs_uri in enumerate(request.input_gcs_uris): ext = os.path.splitext(gcs_uri)[1] or ".bin" local_path = os.path.join(tmpdir, f"input_{i}{ext}") _download_from_gcs(gcs_uri, local_path) input_paths.append(local_path) # Prepare output path output_ext = os.path.splitext(request.output_gcs_uri)[1] or ".mp4" output_path = os.path.join(tmpdir, f"output{output_ext}") # Build command from template cmd = [] for part in request.command_template: if part == "{output}": cmd.append(output_path) elif part.startswith("{input_") and part.endswith("}"): idx = int(part[7:-1]) if idx < len(input_paths): cmd.append(input_paths[idx]) else: raise HTTPException(status_code=400, detail=f"Invalid input index: {idx}") else: cmd.append(part) # Run FFmpeg success, stdout, stderr = _run_command(cmd, request.timeout) if not success: logger.error(f"FFmpeg failed: {stderr[-500:]}") raise HTTPException(status_code=500, detail=f"FFmpeg failed: {stderr[-500:]}") # Upload output _upload_to_gcs(output_path, request.output_gcs_uri) duration = time.time() - start_time logger.info(f"FFmpeg completed in {duration:.2f}s") return RunFFmpegResponse( success=True, output_gcs_uri=request.output_gcs_uri, stderr=stderr[-200:] if stderr else "", duration_seconds=duration ) except HTTPException: raise except Exception as e: logger.error(f"FFmpeg operation failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/probe", response_model=ProbeResponse) async def probe_video(request: ProbeRequest): """ Get video properties using FFprobe. """ with tempfile.TemporaryDirectory() as tmpdir: try: # Download video ext = os.path.splitext(request.gcs_uri)[1] or ".mp4" local_path = os.path.join(tmpdir, f"video{ext}") _download_from_gcs(request.gcs_uri, local_path) # Run ffprobe cmd = [ "ffprobe", "-v", "quiet", "-show_streams", "-of", "json", local_path ] success, stdout, stderr = _run_command(cmd, timeout=60) if not success: raise HTTPException(status_code=500, detail=f"FFprobe failed: {stderr}") # Parse output data = json.loads(stdout) streams = data.get("streams", []) video_stream = next((s for s in streams if s.get("codec_type") == "video"), {}) audio_stream = next((s for s in streams if s.get("codec_type") == "audio"), {}) # Parse FPS fps = 30.0 if "r_frame_rate" in video_stream: fps_parts = video_stream["r_frame_rate"].split("/") if len(fps_parts) == 2 and int(fps_parts[1]) != 0: fps = int(fps_parts[0]) / int(fps_parts[1]) return ProbeResponse( width=int(video_stream.get("width", 1920)), height=int(video_stream.get("height", 1080)), fps=fps, duration=float(video_stream.get("duration", 0) or audio_stream.get("duration", 0)), sample_rate=int(audio_stream.get("sample_rate", 44100)), channels=int(audio_stream.get("channels", 2)), codec_name=video_stream.get("codec_name", "h264"), pix_fmt=video_stream.get("pix_fmt", "yuv420p") ) except HTTPException: raise except Exception as e: logger.error(f"Probe failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/encode-segment", response_model=RunFFmpegResponse) async def encode_segment(request: EncodeSegmentRequest): """ Extract and encode a video segment with re-encoding for frame accuracy. """ import time start_time = time.time() with tempfile.TemporaryDirectory() as tmpdir: try: # Download source ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4" source_path = os.path.join(tmpdir, f"source{ext}") _download_from_gcs(request.source_gcs_uri, source_path) # Output path output_path = os.path.join(tmpdir, "segment.mp4") # Build FFmpeg command cmd = [ "ffmpeg", "-y", "-ss", str(request.start_time), "-i", source_path, "-t", str(request.duration), "-c:v", "libx264", "-preset", "fast", "-pix_fmt", request.pix_fmt, "-r", str(request.fps), "-c:a", "aac", "-ar", str(request.sample_rate), "-ac", str(request.channels), "-b:a", "192k", "-video_track_timescale", "90000", output_path ] success, stdout, stderr = _run_command(cmd) if not success: raise HTTPException(status_code=500, detail=f"Encode failed: {stderr[-500:]}") _upload_to_gcs(output_path, request.output_gcs_uri) return RunFFmpegResponse( success=True, output_gcs_uri=request.output_gcs_uri, duration_seconds=time.time() - start_time ) except HTTPException: raise except Exception as e: logger.error(f"Encode segment failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/extract-frame", response_model=RunFFmpegResponse) async def extract_frame(request: ExtractFrameRequest): """ Extract a single frame from a video as PNG. """ import time start_time = time.time() with tempfile.TemporaryDirectory() as tmpdir: try: # Download source ext = os.path.splitext(request.source_gcs_uri)[1] or ".mp4" source_path = os.path.join(tmpdir, f"source{ext}") _download_from_gcs(request.source_gcs_uri, source_path) output_path = os.path.join(tmpdir, "frame.png") cmd = [ "ffmpeg", "-y", "-ss", str(request.time_point), "-i", source_path, "-frames:v", "1", "-q:v", "2", output_path ] success, stdout, stderr = _run_command(cmd, timeout=60) if not success: raise HTTPException(status_code=500, detail=f"Frame extraction failed: {stderr[-500:]}") _upload_to_gcs(output_path, request.output_gcs_uri) return RunFFmpegResponse( success=True, output_gcs_uri=request.output_gcs_uri, duration_seconds=time.time() - start_time ) except HTTPException: raise except Exception as e: logger.error(f"Extract frame failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/create-freeze-segment", response_model=RunFFmpegResponse) async def create_freeze_segment(request: CreateFreezeSegmentRequest): """ Create a freeze-frame video segment with audio overlay. """ import time start_time = time.time() with tempfile.TemporaryDirectory() as tmpdir: try: # Download inputs frame_path = os.path.join(tmpdir, "frame.png") audio_path = os.path.join(tmpdir, "audio.mp3") _download_from_gcs(request.frame_gcs_uri, frame_path) _download_from_gcs(request.audio_gcs_uri, audio_path) output_path = os.path.join(tmpdir, "freeze.mp4") # Build FFmpeg command for freeze frame with audio cmd = [ "ffmpeg", "-y", "-loop", "1", "-i", frame_path, "-i", audio_path, "-c:v", "libx264", "-preset", "fast", "-tune", "stillimage", "-pix_fmt", request.pix_fmt, "-r", str(request.fps), "-vf", f"scale={request.width}:{request.height}:force_original_aspect_ratio=decrease," f"pad={request.width}:{request.height}:(ow-iw)/2:(oh-ih)/2", "-c:a", "aac", "-ar", str(request.sample_rate), "-ac", str(request.channels), "-b:a", "192k", "-video_track_timescale", "90000", "-shortest", output_path ] success, stdout, stderr = _run_command(cmd) if not success: raise HTTPException(status_code=500, detail=f"Freeze segment failed: {stderr[-500:]}") _upload_to_gcs(output_path, request.output_gcs_uri) return RunFFmpegResponse( success=True, output_gcs_uri=request.output_gcs_uri, duration_seconds=time.time() - start_time ) except HTTPException: raise except Exception as e: logger.error(f"Create freeze segment failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/concatenate", response_model=RunFFmpegResponse) async def concatenate_segments(request: ConcatenateRequest): """ Concatenate multiple video segments using concat demuxer. """ import time start_time = time.time() with tempfile.TemporaryDirectory() as tmpdir: try: # Download all segments segment_paths = [] for i, gcs_uri in enumerate(request.segment_gcs_uris): ext = os.path.splitext(gcs_uri)[1] or ".mp4" local_path = os.path.join(tmpdir, f"segment_{i}{ext}") _download_from_gcs(gcs_uri, local_path) segment_paths.append(local_path) # Create concat file concat_file = os.path.join(tmpdir, "concat.txt") with open(concat_file, "w") as f: for path in segment_paths: f.write(f"file '{path}'\n") output_path = os.path.join(tmpdir, "output.mp4") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_file, "-c", "copy", output_path ] success, stdout, stderr = _run_command(cmd) if not success: raise HTTPException(status_code=500, detail=f"Concatenation failed: {stderr[-500:]}") _upload_to_gcs(output_path, request.output_gcs_uri) return RunFFmpegResponse( success=True, output_gcs_uri=request.output_gcs_uri, duration_seconds=time.time() - start_time ) except HTTPException: raise except Exception as e: logger.error(f"Concatenate failed: {e}") raise HTTPException(status_code=500, detail=str(e))