video-accessibility/backend/app/services/whisper_http_service.py
michael 79440929f4 feat: add Cloud Run HTTP services for Whisper and FFmpeg
Migrate CPU-intensive workloads to Cloud Run for autoscaling:

- Add Whisper HTTP service (FastAPI) with /transcribe endpoint
- Add FFmpeg HTTP service (FastAPI) with /encode, /probe, /extract-frame, etc.
- Add Dockerfiles for both services (8 vCPU, 32GB RAM, Gen2)
- Add Cloud Build config for CI/CD deployment
- Add Cloud Run service YAML configs with scale-to-zero
- Update whisper_transcribe.py to call Cloud Run when WHISPER_SERVICE_URL set
- Update video_renderer.py to call Cloud Run when FFMPEG_SERVICE_URL set
- Update whisper_service.py for Cloud Run compatibility (no settings dependency)
- Add ffmpeg_service_url and whisper_service_url to config.py

Services scale 0→N based on request load, falling back to local
execution when service URLs are not configured (hybrid mode).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 10:12:50 -06:00

311 lines
9 KiB
Python

"""
Whisper HTTP Service - FastAPI application for Cloud Run deployment.
This service exposes Whisper transcription as HTTP endpoints, allowing
the main application to offload CPU-intensive transcription to Cloud Run
with autoscaling.
This module uses minimal configuration to avoid importing the full app Settings.
"""
import logging
import os
import tempfile
from typing import Optional
from fastapi import FastAPI, HTTPException
from google.cloud import storage
from pydantic import BaseModel, Field
from .whisper_service import WordTimestamp, whisper_service
# Simple logging setup (no dependency on app.core.logging)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# FastAPI app for Cloud Run
app = FastAPI(
title="Whisper Transcription Service",
description="Cloud Run service for Whisper-based audio transcription",
version="1.0.0",
)
# Request/Response Models
class TranscribeRequest(BaseModel):
"""Request model for transcription endpoint."""
gcs_uri: str = Field(
...,
description="GCS URI of the audio file (gs://bucket/path/audio.mp3)",
examples=["gs://accessible-video-storage/jobs/123/audio.mp3"]
)
class WordTimestampResponse(BaseModel):
"""Word timestamp in response."""
word: str
start: float
end: float
class TranscribeResponse(BaseModel):
"""Response model for transcription endpoint."""
words: list[WordTimestampResponse]
word_count: int
transcript: str = Field(
...,
description="Full transcript text (words joined with spaces)"
)
class SpeechGapResponse(BaseModel):
"""Speech gap in response."""
start: float
end: float
duration: float
gap_type: str
class TranscribeWithGapsRequest(BaseModel):
"""Request model for transcription with gap analysis."""
gcs_uri: str = Field(
...,
description="GCS URI of the audio file"
)
class TranscribeWithGapsResponse(BaseModel):
"""Response model for transcription with gap analysis."""
words: list[WordTimestampResponse]
gaps: list[SpeechGapResponse]
word_count: int
transcript: str
class RefinePausePointsRequest(BaseModel):
"""Request model for pause point refinement."""
placements: list[dict] = Field(
...,
description="List of placement dicts from Gemini analysis"
)
words: list[WordTimestampResponse] = Field(
...,
description="Word timestamps from transcription"
)
consolidation_threshold: float = Field(
default=5.0,
description="Threshold in seconds for consolidating nearby cues"
)
class RefinePausePointsResponse(BaseModel):
"""Response model for pause point refinement."""
refined_placements: list[dict]
warnings: list[str]
class HealthResponse(BaseModel):
"""Health check response."""
status: str
model_loaded: bool
model_name: str
def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]:
"""Parse GCS URI into bucket and blob path."""
if not gcs_uri.startswith("gs://"):
raise ValueError(f"Invalid GCS URI: {gcs_uri}")
parts = gcs_uri[5:].split("/", 1)
if len(parts) != 2:
raise ValueError(f"Invalid GCS URI format: {gcs_uri}")
return parts[0], parts[1]
def _download_from_gcs(gcs_uri: str, local_path: str) -> None:
"""Download file from GCS to local path."""
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
client = storage.Client() # Auto-detects project from environment
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
if not blob.exists():
raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}")
blob.download_to_filename(local_path)
logger.info(f"Downloaded {gcs_uri} to {local_path}")
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
return HealthResponse(
status="healthy",
model_loaded=whisper_service._model is not None,
model_name=whisper_service._model_name
)
@app.post("/transcribe", response_model=TranscribeResponse)
async def transcribe(request: TranscribeRequest):
"""
Transcribe audio file from GCS and return word-level timestamps.
This endpoint:
1. Downloads the audio file from GCS
2. Runs Whisper transcription with word-level timestamps
3. Returns the words with timing information
"""
logger.info(f"Transcribe request: {request.gcs_uri}")
# Create temp file for audio
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
tmp_path = tmp.name
try:
# Download audio from GCS
_download_from_gcs(request.gcs_uri, tmp_path)
# Run transcription
words = whisper_service.transcribe_audio(tmp_path)
# Build response
word_responses = [
WordTimestampResponse(word=w.word, start=w.start, end=w.end)
for w in words
]
transcript = " ".join(w.word for w in words)
logger.info(f"Transcription complete: {len(words)} words")
return TranscribeResponse(
words=word_responses,
word_count=len(words),
transcript=transcript
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Transcription failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up temp file
if os.path.exists(tmp_path):
os.unlink(tmp_path)
@app.post("/transcribe-with-gaps", response_model=TranscribeWithGapsResponse)
async def transcribe_with_gaps(request: TranscribeWithGapsRequest):
"""
Transcribe audio and identify speech gaps for pause point placement.
This endpoint combines transcription and gap analysis in a single call,
which is more efficient for the pause-insert rendering method.
"""
logger.info(f"Transcribe with gaps request: {request.gcs_uri}")
# Create temp file for audio
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
tmp_path = tmp.name
try:
# Download audio from GCS
_download_from_gcs(request.gcs_uri, tmp_path)
# Run transcription
words = whisper_service.transcribe_audio(tmp_path)
# Identify speech gaps
gaps = whisper_service.identify_speech_gaps(words)
# Build response
word_responses = [
WordTimestampResponse(word=w.word, start=w.start, end=w.end)
for w in words
]
gap_responses = [
SpeechGapResponse(
start=g.start,
end=g.end,
duration=g.duration,
gap_type=g.gap_type
)
for g in gaps
]
transcript = " ".join(w.word for w in words)
logger.info(f"Transcription with gaps complete: {len(words)} words, {len(gaps)} gaps")
return TranscribeWithGapsResponse(
words=word_responses,
gaps=gap_responses,
word_count=len(words),
transcript=transcript
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Transcription with gaps failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up temp file
if os.path.exists(tmp_path):
os.unlink(tmp_path)
@app.post("/refine-pause-points", response_model=RefinePausePointsResponse)
async def refine_pause_points(request: RefinePausePointsRequest):
"""
Refine Gemini pause points using Whisper transcript analysis.
This endpoint takes the placements from Gemini analysis and the
word timestamps from transcription, then refines the pause points
to align with natural sentence boundaries.
"""
logger.info(f"Refine pause points request: {len(request.placements)} placements")
try:
# Convert word responses back to WordTimestamp objects
words = [
WordTimestamp(word=w.word, start=w.start, end=w.end)
for w in request.words
]
# Identify speech gaps
gaps = whisper_service.identify_speech_gaps(words)
# Refine pause points
refined_placements, warnings = whisper_service.refine_all_pause_points(
request.placements,
words,
gaps,
request.consolidation_threshold
)
logger.info(f"Pause point refinement complete: {len(warnings)} warnings")
return RefinePausePointsResponse(
refined_placements=refined_placements,
warnings=warnings
)
except Exception as e:
logger.error(f"Pause point refinement failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Startup event to pre-load Whisper model
@app.on_event("startup")
async def startup_event():
"""Pre-load Whisper model on startup to reduce first-request latency."""
logger.info("Whisper HTTP Service starting up...")
# Access the model property to trigger lazy loading
_ = whisper_service.model
logger.info("Whisper model pre-loaded successfully")