Replace AI Studio gemini-embedding-001 with Vertex AI text-multilingual-embedding-002 via google-genai SDK (vertexai=True). Vertex AI uses ADC (already configured) and has significantly higher per-project quotas than AI Studio per-user limits. Same 768-dim output; multilingual model better suited for 50+ language glossaries. Add gcp_location config field (default us-central1). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
92 lines
3.3 KiB
Python
92 lines
3.3 KiB
Python
"""
|
|
Embedding service backed by Vertex AI text-multilingual-embedding-002.
|
|
|
|
Uses the google-genai SDK in Vertex AI mode (Application Default Credentials)
|
|
instead of AI Studio so we get higher per-project quotas and no per-user limits.
|
|
|
|
Batch size: 100 texts per API call.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import re
|
|
from collections.abc import Sequence
|
|
|
|
from google import genai
|
|
from google.genai import types as genai_types
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# Vertex AI multilingual model — 768-dim, 50+ languages, higher quota than AI Studio
|
|
_EMBED_MODEL = "text-multilingual-embedding-002"
|
|
_BATCH_SIZE = 100
|
|
_MAX_RETRIES = 5
|
|
_INITIAL_BACKOFF = 4.0
|
|
|
|
# Matches the 'retryDelay': '7s' field in Gemini/Vertex 429 error bodies
|
|
_RETRY_DELAY_RE = re.compile(r"'retryDelay':\s*'(\d+)s'")
|
|
|
|
|
|
def _parse_retry_delay(exc: Exception) -> float | None:
|
|
"""Extract the server-suggested retry delay from a 429 error."""
|
|
m = _RETRY_DELAY_RE.search(str(exc))
|
|
return float(m.group(1)) if m else None
|
|
|
|
|
|
class EmbeddingService:
|
|
def __init__(self) -> None:
|
|
self._client = genai.Client(
|
|
vertexai=True,
|
|
project=settings.gcp_project_id,
|
|
location=settings.gcp_location,
|
|
)
|
|
|
|
async def embed_texts(self, texts: Sequence[str]) -> list[list[float]]:
|
|
"""
|
|
Embed a list of texts and return a list of 768-dim float vectors.
|
|
Processes in batches; retries with exponential backoff on transient errors.
|
|
Order is preserved.
|
|
"""
|
|
results: list[list[float]] = []
|
|
for i in range(0, len(texts), _BATCH_SIZE):
|
|
batch = list(texts[i: i + _BATCH_SIZE])
|
|
vectors = await self._embed_batch_with_retry(batch)
|
|
results.extend(vectors)
|
|
return results
|
|
|
|
async def embed_text(self, text: str) -> list[float]:
|
|
vectors = await self.embed_texts([text])
|
|
return vectors[0]
|
|
|
|
async def _embed_batch_with_retry(self, texts: list[str]) -> list[list[float]]:
|
|
backoff = _INITIAL_BACKOFF
|
|
for attempt in range(1, _MAX_RETRIES + 1):
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
self._client.models.embed_content,
|
|
model=_EMBED_MODEL,
|
|
contents=texts,
|
|
config=genai_types.EmbedContentConfig(
|
|
task_type="RETRIEVAL_DOCUMENT",
|
|
),
|
|
)
|
|
return [list(emb.values) for emb in response.embeddings]
|
|
except Exception as exc:
|
|
if attempt == _MAX_RETRIES:
|
|
logger.error(f"Embedding batch failed after {_MAX_RETRIES} attempts: {exc}")
|
|
raise
|
|
# Honour the server-suggested retryDelay when present (e.g. 429 RESOURCE_EXHAUSTED).
|
|
# Fall back to our own exponential backoff otherwise.
|
|
server_delay = _parse_retry_delay(exc)
|
|
delay = max(server_delay + 1.0, backoff) if server_delay else backoff
|
|
logger.warning(f"Embedding attempt {attempt} failed, retrying in {delay}s: {exc}")
|
|
await asyncio.sleep(delay)
|
|
backoff *= 2
|
|
|
|
raise RuntimeError("unreachable") # makes type-checker happy
|
|
|
|
|
|
embedding_service = EmbeddingService()
|