From 5342ab1a28381c87bfb367da82f2251a78515927 Mon Sep 17 00:00:00 2001 From: michael Date: Sat, 3 Jan 2026 08:44:27 -0600 Subject: [PATCH] fix: prevent event loop closed error in video renderer Cloud Run calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use context manager for AsyncClient instead of caching on singleton. Each asyncio.run() creates a new event loop, so cached clients bound to previous event loops fail when reused across jobs. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/app/services/video_renderer.py | 54 +++++++++++--------------- 1 file changed, 23 insertions(+), 31 deletions(-) diff --git a/backend/app/services/video_renderer.py b/backend/app/services/video_renderer.py index 48dc3d2..741ae15 100644 --- a/backend/app/services/video_renderer.py +++ b/backend/app/services/video_renderer.py @@ -55,7 +55,6 @@ class VideoRendererService: self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3) self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200) # Cloud Run support - self._http_client: httpx.AsyncClient | None = None self._gcs_client: storage.Client | None = None # Source video caching for Cloud Run (uploaded once, reused across operations) self._cached_source_gcs_uri: str | None = None @@ -65,13 +64,6 @@ class VideoRendererService: """Check if Cloud Run FFmpeg service is configured.""" return bool(settings.ffmpeg_service_url) - def _get_http_client(self) -> httpx.AsyncClient: - """Get or create async HTTP client for Cloud Run calls.""" - if self._http_client is None: - # 10-minute timeout for long FFmpeg operations - self._http_client = httpx.AsyncClient(timeout=600.0) - return self._http_client - def _get_gcs_client(self) -> storage.Client: """Get or create GCS client for file transfers.""" if self._gcs_client is None: @@ -168,17 +160,17 @@ class VideoRendererService: Returns: Probe result with duration and stream info """ - client = self._get_http_client() service_url = settings.ffmpeg_service_url.rstrip("/") auth_token = _get_cloud_run_id_token(service_url) - response = await client.post( - f"{service_url}/probe", - json={"gcs_uri": gcs_uri}, # Matches ProbeRequest model - headers={"Authorization": f"Bearer {auth_token}"} - ) - response.raise_for_status() - return response.json() + async with httpx.AsyncClient(timeout=600.0) as client: + response = await client.post( + f"{service_url}/probe", + json={"gcs_uri": gcs_uri}, # Matches ProbeRequest model + headers={"Authorization": f"Bearer {auth_token}"} + ) + response.raise_for_status() + return response.json() async def _call_cloud_run_endpoint( self, @@ -197,25 +189,25 @@ class VideoRendererService: Returns: Response JSON """ - client = self._get_http_client() service_url = settings.ffmpeg_service_url.rstrip("/") auth_token = _get_cloud_run_id_token(service_url) - try: - response = await client.post( - f"{service_url}{endpoint}", - json=payload, - headers={"Authorization": f"Bearer {auth_token}"} - ) - response.raise_for_status() - return response.json() - except httpx.HTTPStatusError as e: - # Try to get error details from response + async with httpx.AsyncClient(timeout=600.0) as client: try: - error_detail = e.response.json().get("detail", str(e)) - except Exception: - error_detail = str(e) - raise FFmpegExecutionError(f"Cloud Run {endpoint} failed: {error_detail}") + response = await client.post( + f"{service_url}{endpoint}", + json=payload, + headers={"Authorization": f"Bearer {auth_token}"} + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + # Try to get error details from response + try: + error_detail = e.response.json().get("detail", str(e)) + except Exception: + error_detail = str(e) + raise FFmpegExecutionError(f"Cloud Run {endpoint} failed: {error_detail}") async def _dispatch_ffmpeg(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]: """