Migrate CPU-intensive workloads to Cloud Run for autoscaling: - Add Whisper HTTP service (FastAPI) with /transcribe endpoint - Add FFmpeg HTTP service (FastAPI) with /encode, /probe, /extract-frame, etc. - Add Dockerfiles for both services (8 vCPU, 32GB RAM, Gen2) - Add Cloud Build config for CI/CD deployment - Add Cloud Run service YAML configs with scale-to-zero - Update whisper_transcribe.py to call Cloud Run when WHISPER_SERVICE_URL set - Update video_renderer.py to call Cloud Run when FFMPEG_SERVICE_URL set - Update whisper_service.py for Cloud Run compatibility (no settings dependency) - Add ffmpeg_service_url and whisper_service_url to config.py Services scale 0→N based on request load, falling back to local execution when service URLs are not configured (hybrid mode). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
311 lines
9 KiB
Python
311 lines
9 KiB
Python
"""
|
|
Whisper HTTP Service - FastAPI application for Cloud Run deployment.
|
|
|
|
This service exposes Whisper transcription as HTTP endpoints, allowing
|
|
the main application to offload CPU-intensive transcription to Cloud Run
|
|
with autoscaling.
|
|
|
|
This module uses minimal configuration to avoid importing the full app Settings.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from google.cloud import storage
|
|
from pydantic import BaseModel, Field
|
|
|
|
from .whisper_service import WordTimestamp, whisper_service
|
|
|
|
# Simple logging setup (no dependency on app.core.logging)
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# FastAPI app for Cloud Run
|
|
app = FastAPI(
|
|
title="Whisper Transcription Service",
|
|
description="Cloud Run service for Whisper-based audio transcription",
|
|
version="1.0.0",
|
|
)
|
|
|
|
|
|
# Request/Response Models
|
|
class TranscribeRequest(BaseModel):
|
|
"""Request model for transcription endpoint."""
|
|
gcs_uri: str = Field(
|
|
...,
|
|
description="GCS URI of the audio file (gs://bucket/path/audio.mp3)",
|
|
examples=["gs://accessible-video-storage/jobs/123/audio.mp3"]
|
|
)
|
|
|
|
|
|
class WordTimestampResponse(BaseModel):
|
|
"""Word timestamp in response."""
|
|
word: str
|
|
start: float
|
|
end: float
|
|
|
|
|
|
class TranscribeResponse(BaseModel):
|
|
"""Response model for transcription endpoint."""
|
|
words: list[WordTimestampResponse]
|
|
word_count: int
|
|
transcript: str = Field(
|
|
...,
|
|
description="Full transcript text (words joined with spaces)"
|
|
)
|
|
|
|
|
|
class SpeechGapResponse(BaseModel):
|
|
"""Speech gap in response."""
|
|
start: float
|
|
end: float
|
|
duration: float
|
|
gap_type: str
|
|
|
|
|
|
class TranscribeWithGapsRequest(BaseModel):
|
|
"""Request model for transcription with gap analysis."""
|
|
gcs_uri: str = Field(
|
|
...,
|
|
description="GCS URI of the audio file"
|
|
)
|
|
|
|
|
|
class TranscribeWithGapsResponse(BaseModel):
|
|
"""Response model for transcription with gap analysis."""
|
|
words: list[WordTimestampResponse]
|
|
gaps: list[SpeechGapResponse]
|
|
word_count: int
|
|
transcript: str
|
|
|
|
|
|
class RefinePausePointsRequest(BaseModel):
|
|
"""Request model for pause point refinement."""
|
|
placements: list[dict] = Field(
|
|
...,
|
|
description="List of placement dicts from Gemini analysis"
|
|
)
|
|
words: list[WordTimestampResponse] = Field(
|
|
...,
|
|
description="Word timestamps from transcription"
|
|
)
|
|
consolidation_threshold: float = Field(
|
|
default=5.0,
|
|
description="Threshold in seconds for consolidating nearby cues"
|
|
)
|
|
|
|
|
|
class RefinePausePointsResponse(BaseModel):
|
|
"""Response model for pause point refinement."""
|
|
refined_placements: list[dict]
|
|
warnings: list[str]
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response."""
|
|
status: str
|
|
model_loaded: bool
|
|
model_name: str
|
|
|
|
|
|
def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]:
|
|
"""Parse GCS URI into bucket and blob path."""
|
|
if not gcs_uri.startswith("gs://"):
|
|
raise ValueError(f"Invalid GCS URI: {gcs_uri}")
|
|
|
|
parts = gcs_uri[5:].split("/", 1)
|
|
if len(parts) != 2:
|
|
raise ValueError(f"Invalid GCS URI format: {gcs_uri}")
|
|
|
|
return parts[0], parts[1]
|
|
|
|
|
|
def _download_from_gcs(gcs_uri: str, local_path: str) -> None:
|
|
"""Download file from GCS to local path."""
|
|
bucket_name, blob_path = _parse_gcs_uri(gcs_uri)
|
|
|
|
client = storage.Client() # Auto-detects project from environment
|
|
bucket = client.bucket(bucket_name)
|
|
blob = bucket.blob(blob_path)
|
|
|
|
if not blob.exists():
|
|
raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}")
|
|
|
|
blob.download_to_filename(local_path)
|
|
logger.info(f"Downloaded {gcs_uri} to {local_path}")
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse)
|
|
async def health_check():
|
|
"""Health check endpoint."""
|
|
return HealthResponse(
|
|
status="healthy",
|
|
model_loaded=whisper_service._model is not None,
|
|
model_name=whisper_service._model_name
|
|
)
|
|
|
|
|
|
@app.post("/transcribe", response_model=TranscribeResponse)
|
|
async def transcribe(request: TranscribeRequest):
|
|
"""
|
|
Transcribe audio file from GCS and return word-level timestamps.
|
|
|
|
This endpoint:
|
|
1. Downloads the audio file from GCS
|
|
2. Runs Whisper transcription with word-level timestamps
|
|
3. Returns the words with timing information
|
|
"""
|
|
logger.info(f"Transcribe request: {request.gcs_uri}")
|
|
|
|
# Create temp file for audio
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
|
|
tmp_path = tmp.name
|
|
|
|
try:
|
|
# Download audio from GCS
|
|
_download_from_gcs(request.gcs_uri, tmp_path)
|
|
|
|
# Run transcription
|
|
words = whisper_service.transcribe_audio(tmp_path)
|
|
|
|
# Build response
|
|
word_responses = [
|
|
WordTimestampResponse(word=w.word, start=w.start, end=w.end)
|
|
for w in words
|
|
]
|
|
|
|
transcript = " ".join(w.word for w in words)
|
|
|
|
logger.info(f"Transcription complete: {len(words)} words")
|
|
|
|
return TranscribeResponse(
|
|
words=word_responses,
|
|
word_count=len(words),
|
|
transcript=transcript
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Transcription failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
# Clean up temp file
|
|
if os.path.exists(tmp_path):
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
@app.post("/transcribe-with-gaps", response_model=TranscribeWithGapsResponse)
|
|
async def transcribe_with_gaps(request: TranscribeWithGapsRequest):
|
|
"""
|
|
Transcribe audio and identify speech gaps for pause point placement.
|
|
|
|
This endpoint combines transcription and gap analysis in a single call,
|
|
which is more efficient for the pause-insert rendering method.
|
|
"""
|
|
logger.info(f"Transcribe with gaps request: {request.gcs_uri}")
|
|
|
|
# Create temp file for audio
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
|
|
tmp_path = tmp.name
|
|
|
|
try:
|
|
# Download audio from GCS
|
|
_download_from_gcs(request.gcs_uri, tmp_path)
|
|
|
|
# Run transcription
|
|
words = whisper_service.transcribe_audio(tmp_path)
|
|
|
|
# Identify speech gaps
|
|
gaps = whisper_service.identify_speech_gaps(words)
|
|
|
|
# Build response
|
|
word_responses = [
|
|
WordTimestampResponse(word=w.word, start=w.start, end=w.end)
|
|
for w in words
|
|
]
|
|
|
|
gap_responses = [
|
|
SpeechGapResponse(
|
|
start=g.start,
|
|
end=g.end,
|
|
duration=g.duration,
|
|
gap_type=g.gap_type
|
|
)
|
|
for g in gaps
|
|
]
|
|
|
|
transcript = " ".join(w.word for w in words)
|
|
|
|
logger.info(f"Transcription with gaps complete: {len(words)} words, {len(gaps)} gaps")
|
|
|
|
return TranscribeWithGapsResponse(
|
|
words=word_responses,
|
|
gaps=gap_responses,
|
|
word_count=len(words),
|
|
transcript=transcript
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Transcription with gaps failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
# Clean up temp file
|
|
if os.path.exists(tmp_path):
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
@app.post("/refine-pause-points", response_model=RefinePausePointsResponse)
|
|
async def refine_pause_points(request: RefinePausePointsRequest):
|
|
"""
|
|
Refine Gemini pause points using Whisper transcript analysis.
|
|
|
|
This endpoint takes the placements from Gemini analysis and the
|
|
word timestamps from transcription, then refines the pause points
|
|
to align with natural sentence boundaries.
|
|
"""
|
|
logger.info(f"Refine pause points request: {len(request.placements)} placements")
|
|
|
|
try:
|
|
# Convert word responses back to WordTimestamp objects
|
|
words = [
|
|
WordTimestamp(word=w.word, start=w.start, end=w.end)
|
|
for w in request.words
|
|
]
|
|
|
|
# Identify speech gaps
|
|
gaps = whisper_service.identify_speech_gaps(words)
|
|
|
|
# Refine pause points
|
|
refined_placements, warnings = whisper_service.refine_all_pause_points(
|
|
request.placements,
|
|
words,
|
|
gaps,
|
|
request.consolidation_threshold
|
|
)
|
|
|
|
logger.info(f"Pause point refinement complete: {len(warnings)} warnings")
|
|
|
|
return RefinePausePointsResponse(
|
|
refined_placements=refined_placements,
|
|
warnings=warnings
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Pause point refinement failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# Startup event to pre-load Whisper model
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Pre-load Whisper model on startup to reduce first-request latency."""
|
|
logger.info("Whisper HTTP Service starting up...")
|
|
# Access the model property to trigger lazy loading
|
|
_ = whisper_service.model
|
|
logger.info("Whisper model pre-loaded successfully")
|