fix: use async httpx client for true parallel Cloud Run calls

Changed from httpx.Client (sync) to httpx.AsyncClient so that
asyncio.gather() actually executes HTTP calls in parallel instead
of blocking the event loop sequentially.

Before: ~5 min for 18 segments (serial HTTP calls despite gather)
After: ~30 sec for 18 segments (truly parallel HTTP calls)

Changes:
- _http_client: httpx.Client -> httpx.AsyncClient
- _call_cloud_run_probe: sync -> async
- _call_cloud_run_endpoint: sync -> async
- Added await to all Cloud Run HTTP calls

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
michael 2026-01-03 08:11:46 -06:00
parent 95852f1357
commit 3e2099515a

View file

@ -55,7 +55,7 @@ class VideoRendererService:
self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3)
self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200)
# Cloud Run support
self._http_client: httpx.Client | None = None
self._http_client: httpx.AsyncClient | None = None
self._gcs_client: storage.Client | None = None
# Source video caching for Cloud Run (uploaded once, reused across operations)
self._cached_source_gcs_uri: str | None = None
@ -65,11 +65,11 @@ class VideoRendererService:
"""Check if Cloud Run FFmpeg service is configured."""
return bool(settings.ffmpeg_service_url)
def _get_http_client(self) -> httpx.Client:
"""Get or create HTTP client for Cloud Run calls."""
def _get_http_client(self) -> httpx.AsyncClient:
"""Get or create async HTTP client for Cloud Run calls."""
if self._http_client is None:
# 10-minute timeout for long FFmpeg operations
self._http_client = httpx.Client(timeout=600.0)
self._http_client = httpx.AsyncClient(timeout=600.0)
return self._http_client
def _get_gcs_client(self) -> storage.Client:
@ -158,7 +158,7 @@ class VideoRendererService:
# Log but don't fail on cleanup errors
logger.warning(f"Failed to delete GCS temp file {gcs_uri}: {e}")
def _call_cloud_run_probe(self, gcs_uri: str) -> dict[str, Any]:
async def _call_cloud_run_probe(self, gcs_uri: str) -> dict[str, Any]:
"""
Call Cloud Run FFmpeg service /probe endpoint.
@ -172,7 +172,7 @@ class VideoRendererService:
service_url = settings.ffmpeg_service_url.rstrip("/")
auth_token = _get_cloud_run_id_token(service_url)
response = client.post(
response = await client.post(
f"{service_url}/probe",
json={"gcs_uri": gcs_uri}, # Matches ProbeRequest model
headers={"Authorization": f"Bearer {auth_token}"}
@ -180,7 +180,7 @@ class VideoRendererService:
response.raise_for_status()
return response.json()
def _call_cloud_run_endpoint(
async def _call_cloud_run_endpoint(
self,
endpoint: str,
payload: dict[str, Any],
@ -202,7 +202,7 @@ class VideoRendererService:
auth_token = _get_cloud_run_id_token(service_url)
try:
response = client.post(
response = await client.post(
f"{service_url}{endpoint}",
json=payload,
headers={"Authorization": f"Bearer {auth_token}"}
@ -715,7 +715,7 @@ class VideoRendererService:
gcs_uri = self._upload_to_gcs_temp(video_path, "probe")
try:
result = self._call_cloud_run_probe(gcs_uri)
result = await self._call_cloud_run_probe(gcs_uri)
return result["duration"]
finally:
# Clean up if we uploaded specifically for this call
@ -781,7 +781,7 @@ class VideoRendererService:
gcs_uri = self._upload_to_gcs_temp(video_path, "probe")
try:
result = self._call_cloud_run_probe(gcs_uri)
result = await self._call_cloud_run_probe(gcs_uri)
# Defaults
props = {
@ -910,7 +910,7 @@ class VideoRendererService:
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/segment/{uuid4().hex}/segment.mp4"
try:
self._call_cloud_run_endpoint(
await self._call_cloud_run_endpoint(
"/encode-segment",
{
"source_gcs_uri": self._cached_source_gcs_uri,
@ -968,7 +968,7 @@ class VideoRendererService:
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/frame/{uuid4().hex}/frame.png"
try:
self._call_cloud_run_endpoint(
await self._call_cloud_run_endpoint(
"/extract-frame",
{
"source_gcs_uri": self._cached_source_gcs_uri,
@ -1116,7 +1116,7 @@ class VideoRendererService:
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/freeze/{uuid4().hex}/freeze.mp4"
try:
self._call_cloud_run_endpoint(
await self._call_cloud_run_endpoint(
"/create-freeze-segment",
{
"frame_gcs_uri": frame_gcs_uri,
@ -1292,7 +1292,7 @@ class VideoRendererService:
"{output}"
])
self._call_cloud_run_endpoint(
await self._call_cloud_run_endpoint(
"/run-ffmpeg",
{
"input_gcs_uris": audio_gcs_uris,
@ -1366,7 +1366,7 @@ class VideoRendererService:
try:
channel_layout = "stereo" if props["channels"] == "2" else "mono"
self._call_cloud_run_endpoint(
await self._call_cloud_run_endpoint(
"/run-ffmpeg",
{
"input_gcs_uris": [], # No input files, using lavfi
@ -1442,7 +1442,7 @@ class VideoRendererService:
output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/concat/{uuid4().hex}/final.mp4"
try:
self._call_cloud_run_endpoint(
await self._call_cloud_run_endpoint(
"/concatenate",
{
"segment_gcs_uris": segment_gcs_uris,