fix: prevent event loop closed error in video renderer Cloud Run calls

Use context manager for AsyncClient instead of caching on singleton.
Each asyncio.run() creates a new event loop, so cached clients bound
to previous event loops fail when reused across jobs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
michael 2026-01-03 08:44:27 -06:00
parent 3e2099515a
commit 5342ab1a28

View file

@ -55,7 +55,6 @@ class VideoRendererService:
self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3)
self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200)
# Cloud Run support
self._http_client: httpx.AsyncClient | None = None
self._gcs_client: storage.Client | None = None
# Source video caching for Cloud Run (uploaded once, reused across operations)
self._cached_source_gcs_uri: str | None = None
@ -65,13 +64,6 @@ class VideoRendererService:
"""Check if Cloud Run FFmpeg service is configured."""
return bool(settings.ffmpeg_service_url)
def _get_http_client(self) -> httpx.AsyncClient:
"""Get or create async HTTP client for Cloud Run calls."""
if self._http_client is None:
# 10-minute timeout for long FFmpeg operations
self._http_client = httpx.AsyncClient(timeout=600.0)
return self._http_client
def _get_gcs_client(self) -> storage.Client:
"""Get or create GCS client for file transfers."""
if self._gcs_client is None:
@ -168,17 +160,17 @@ class VideoRendererService:
Returns:
Probe result with duration and stream info
"""
client = self._get_http_client()
service_url = settings.ffmpeg_service_url.rstrip("/")
auth_token = _get_cloud_run_id_token(service_url)
response = await client.post(
f"{service_url}/probe",
json={"gcs_uri": gcs_uri}, # Matches ProbeRequest model
headers={"Authorization": f"Bearer {auth_token}"}
)
response.raise_for_status()
return response.json()
async with httpx.AsyncClient(timeout=600.0) as client:
response = await client.post(
f"{service_url}/probe",
json={"gcs_uri": gcs_uri}, # Matches ProbeRequest model
headers={"Authorization": f"Bearer {auth_token}"}
)
response.raise_for_status()
return response.json()
async def _call_cloud_run_endpoint(
self,
@ -197,25 +189,25 @@ class VideoRendererService:
Returns:
Response JSON
"""
client = self._get_http_client()
service_url = settings.ffmpeg_service_url.rstrip("/")
auth_token = _get_cloud_run_id_token(service_url)
try:
response = await client.post(
f"{service_url}{endpoint}",
json=payload,
headers={"Authorization": f"Bearer {auth_token}"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# Try to get error details from response
async with httpx.AsyncClient(timeout=600.0) as client:
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise FFmpegExecutionError(f"Cloud Run {endpoint} failed: {error_detail}")
response = await client.post(
f"{service_url}{endpoint}",
json=payload,
headers={"Authorization": f"Bearer {auth_token}"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# Try to get error details from response
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise FFmpegExecutionError(f"Cloud Run {endpoint} failed: {error_detail}")
async def _dispatch_ffmpeg(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]:
"""