video-accessibility/backend/app/services/embedding_service.py
Vadym Samoilenko 6bf88474ee feat(embed): switch embeddings to Vertex AI text-multilingual-embedding-002
Replace AI Studio gemini-embedding-001 with Vertex AI text-multilingual-embedding-002
via google-genai SDK (vertexai=True). Vertex AI uses ADC (already configured) and
has significantly higher per-project quotas than AI Studio per-user limits.
Same 768-dim output; multilingual model better suited for 50+ language glossaries.
Add gcp_location config field (default us-central1).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 18:41:32 +01:00

92 lines
3.3 KiB
Python

"""
Embedding service backed by Vertex AI text-multilingual-embedding-002.
Uses the google-genai SDK in Vertex AI mode (Application Default Credentials)
instead of AI Studio so we get higher per-project quotas and no per-user limits.
Batch size: 100 texts per API call.
"""
from __future__ import annotations
import asyncio
import re
from collections.abc import Sequence
from google import genai
from google.genai import types as genai_types
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
# Vertex AI multilingual model — 768-dim, 50+ languages, higher quota than AI Studio
_EMBED_MODEL = "text-multilingual-embedding-002"
_BATCH_SIZE = 100
_MAX_RETRIES = 5
_INITIAL_BACKOFF = 4.0
# Matches the 'retryDelay': '7s' field in Gemini/Vertex 429 error bodies
_RETRY_DELAY_RE = re.compile(r"'retryDelay':\s*'(\d+)s'")
def _parse_retry_delay(exc: Exception) -> float | None:
"""Extract the server-suggested retry delay from a 429 error."""
m = _RETRY_DELAY_RE.search(str(exc))
return float(m.group(1)) if m else None
class EmbeddingService:
def __init__(self) -> None:
self._client = genai.Client(
vertexai=True,
project=settings.gcp_project_id,
location=settings.gcp_location,
)
async def embed_texts(self, texts: Sequence[str]) -> list[list[float]]:
"""
Embed a list of texts and return a list of 768-dim float vectors.
Processes in batches; retries with exponential backoff on transient errors.
Order is preserved.
"""
results: list[list[float]] = []
for i in range(0, len(texts), _BATCH_SIZE):
batch = list(texts[i: i + _BATCH_SIZE])
vectors = await self._embed_batch_with_retry(batch)
results.extend(vectors)
return results
async def embed_text(self, text: str) -> list[float]:
vectors = await self.embed_texts([text])
return vectors[0]
async def _embed_batch_with_retry(self, texts: list[str]) -> list[list[float]]:
backoff = _INITIAL_BACKOFF
for attempt in range(1, _MAX_RETRIES + 1):
try:
response = await asyncio.to_thread(
self._client.models.embed_content,
model=_EMBED_MODEL,
contents=texts,
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_DOCUMENT",
),
)
return [list(emb.values) for emb in response.embeddings]
except Exception as exc:
if attempt == _MAX_RETRIES:
logger.error(f"Embedding batch failed after {_MAX_RETRIES} attempts: {exc}")
raise
# Honour the server-suggested retryDelay when present (e.g. 429 RESOURCE_EXHAUSTED).
# Fall back to our own exponential backoff otherwise.
server_delay = _parse_retry_delay(exc)
delay = max(server_delay + 1.0, backoff) if server_delay else backoff
logger.warning(f"Embedding attempt {attempt} failed, retrying in {delay}s: {exc}")
await asyncio.sleep(delay)
backoff *= 2
raise RuntimeError("unreachable") # makes type-checker happy
embedding_service = EmbeddingService()