""" Whisper HTTP Service - FastAPI application for Cloud Run deployment. This service exposes Whisper transcription as HTTP endpoints, allowing the main application to offload CPU-intensive transcription to Cloud Run with autoscaling. This module uses minimal configuration to avoid importing the full app Settings. """ import logging import os import tempfile from typing import Optional from fastapi import FastAPI, HTTPException from google.cloud import storage from pydantic import BaseModel, Field from .whisper_service import WordTimestamp, whisper_service # Simple logging setup (no dependency on app.core.logging) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # FastAPI app for Cloud Run app = FastAPI( title="Whisper Transcription Service", description="Cloud Run service for Whisper-based audio transcription", version="1.0.0", ) # Request/Response Models class TranscribeRequest(BaseModel): """Request model for transcription endpoint.""" gcs_uri: str = Field( ..., description="GCS URI of the audio file (gs://bucket/path/audio.mp3)", examples=["gs://accessible-video-storage/jobs/123/audio.mp3"] ) class WordTimestampResponse(BaseModel): """Word timestamp in response.""" word: str start: float end: float class TranscribeResponse(BaseModel): """Response model for transcription endpoint.""" words: list[WordTimestampResponse] word_count: int transcript: str = Field( ..., description="Full transcript text (words joined with spaces)" ) class SpeechGapResponse(BaseModel): """Speech gap in response.""" start: float end: float duration: float gap_type: str class TranscribeWithGapsRequest(BaseModel): """Request model for transcription with gap analysis.""" gcs_uri: str = Field( ..., description="GCS URI of the audio file" ) class TranscribeWithGapsResponse(BaseModel): """Response model for transcription with gap analysis.""" words: list[WordTimestampResponse] gaps: list[SpeechGapResponse] word_count: int transcript: str class RefinePausePointsRequest(BaseModel): """Request model for pause point refinement.""" placements: list[dict] = Field( ..., description="List of placement dicts from Gemini analysis" ) words: list[WordTimestampResponse] = Field( ..., description="Word timestamps from transcription" ) consolidation_threshold: float = Field( default=5.0, description="Threshold in seconds for consolidating nearby cues" ) class RefinePausePointsResponse(BaseModel): """Response model for pause point refinement.""" refined_placements: list[dict] warnings: list[str] class HealthResponse(BaseModel): """Health check response.""" status: str model_loaded: bool model_name: str def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]: """Parse GCS URI into bucket and blob path.""" if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}") parts = gcs_uri[5:].split("/", 1) if len(parts) != 2: raise ValueError(f"Invalid GCS URI format: {gcs_uri}") return parts[0], parts[1] def _download_from_gcs(gcs_uri: str, local_path: str) -> None: """Download file from GCS to local path.""" bucket_name, blob_path = _parse_gcs_uri(gcs_uri) client = storage.Client() # Auto-detects project from environment bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) if not blob.exists(): raise HTTPException(status_code=404, detail=f"File not found: {gcs_uri}") blob.download_to_filename(local_path) logger.info(f"Downloaded {gcs_uri} to {local_path}") @app.get("/health", response_model=HealthResponse) async def health_check(): """Health check endpoint.""" return HealthResponse( status="healthy", model_loaded=whisper_service._model is not None, model_name=whisper_service._model_name ) @app.post("/transcribe", response_model=TranscribeResponse) async def transcribe(request: TranscribeRequest): """ Transcribe audio file from GCS and return word-level timestamps. This endpoint: 1. Downloads the audio file from GCS 2. Runs Whisper transcription with word-level timestamps 3. Returns the words with timing information """ logger.info(f"Transcribe request: {request.gcs_uri}") # Create temp file for audio with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: tmp_path = tmp.name try: # Download audio from GCS _download_from_gcs(request.gcs_uri, tmp_path) # Run transcription words = whisper_service.transcribe_audio(tmp_path) # Build response word_responses = [ WordTimestampResponse(word=w.word, start=w.start, end=w.end) for w in words ] transcript = " ".join(w.word for w in words) logger.info(f"Transcription complete: {len(words)} words") return TranscribeResponse( words=word_responses, word_count=len(words), transcript=transcript ) except HTTPException: raise except Exception as e: logger.error(f"Transcription failed: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: # Clean up temp file if os.path.exists(tmp_path): os.unlink(tmp_path) @app.post("/transcribe-with-gaps", response_model=TranscribeWithGapsResponse) async def transcribe_with_gaps(request: TranscribeWithGapsRequest): """ Transcribe audio and identify speech gaps for pause point placement. This endpoint combines transcription and gap analysis in a single call, which is more efficient for the pause-insert rendering method. """ logger.info(f"Transcribe with gaps request: {request.gcs_uri}") # Create temp file for audio with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: tmp_path = tmp.name try: # Download audio from GCS _download_from_gcs(request.gcs_uri, tmp_path) # Run transcription words = whisper_service.transcribe_audio(tmp_path) # Identify speech gaps gaps = whisper_service.identify_speech_gaps(words) # Build response word_responses = [ WordTimestampResponse(word=w.word, start=w.start, end=w.end) for w in words ] gap_responses = [ SpeechGapResponse( start=g.start, end=g.end, duration=g.duration, gap_type=g.gap_type ) for g in gaps ] transcript = " ".join(w.word for w in words) logger.info(f"Transcription with gaps complete: {len(words)} words, {len(gaps)} gaps") return TranscribeWithGapsResponse( words=word_responses, gaps=gap_responses, word_count=len(words), transcript=transcript ) except HTTPException: raise except Exception as e: logger.error(f"Transcription with gaps failed: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: # Clean up temp file if os.path.exists(tmp_path): os.unlink(tmp_path) @app.post("/refine-pause-points", response_model=RefinePausePointsResponse) async def refine_pause_points(request: RefinePausePointsRequest): """ Refine Gemini pause points using Whisper transcript analysis. This endpoint takes the placements from Gemini analysis and the word timestamps from transcription, then refines the pause points to align with natural sentence boundaries. """ logger.info(f"Refine pause points request: {len(request.placements)} placements") try: # Convert word responses back to WordTimestamp objects words = [ WordTimestamp(word=w.word, start=w.start, end=w.end) for w in request.words ] # Identify speech gaps gaps = whisper_service.identify_speech_gaps(words) # Refine pause points refined_placements, warnings = whisper_service.refine_all_pause_points( request.placements, words, gaps, request.consolidation_threshold ) logger.info(f"Pause point refinement complete: {len(warnings)} warnings") return RefinePausePointsResponse( refined_placements=refined_placements, warnings=warnings ) except Exception as e: logger.error(f"Pause point refinement failed: {e}") raise HTTPException(status_code=500, detail=str(e)) # Startup event to pre-load Whisper model @app.on_event("startup") async def startup_event(): """Pre-load Whisper model on startup to reduce first-request latency.""" logger.info("Whisper HTTP Service starting up...") # Access the model property to trigger lazy loading _ = whisper_service.model logger.info("Whisper model pre-loaded successfully")