- Parse retryDelay from 429 error body and sleep for server_delay+1s instead of our own 2s/4s backoff (which was shorter than API requires) - Reduce embed concurrency 5→2 to halve burst when multiple glossary versions embed simultaneously - Increase max_retries 3→5 and initial backoff 2s→8s for headroom Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
86 lines
3.1 KiB
Python
86 lines
3.1 KiB
Python
"""
|
|
Embedding service backed by Gemini text-embedding-004.
|
|
|
|
Provides batch embedding with retry/backoff for use in glossary ingestion.
|
|
Batch size: 100 texts per API call (API limit is 2048 but we keep it conservative
|
|
for memory and retry ergonomics with large glossaries).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import re
|
|
from collections.abc import Sequence
|
|
|
|
from google import genai
|
|
from google.genai import types as genai_types
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
_EMBED_MODEL = "gemini-embedding-001"
|
|
_BATCH_SIZE = 100
|
|
_MAX_RETRIES = 5
|
|
_INITIAL_BACKOFF = 8.0
|
|
|
|
# Matches the 'retryDelay': '7s' field in Gemini 429 error bodies
|
|
_RETRY_DELAY_RE = re.compile(r"'retryDelay':\s*'(\d+)s'")
|
|
|
|
|
|
def _parse_retry_delay(exc: Exception) -> float | None:
|
|
"""Extract the server-suggested retry delay from a Gemini 429 error."""
|
|
m = _RETRY_DELAY_RE.search(str(exc))
|
|
return float(m.group(1)) if m else None
|
|
|
|
|
|
class EmbeddingService:
|
|
def __init__(self) -> None:
|
|
self._client = genai.Client(api_key=settings.gemini_api_key)
|
|
|
|
async def embed_texts(self, texts: Sequence[str]) -> list[list[float]]:
|
|
"""
|
|
Embed a list of texts and return a list of 768-dim float vectors.
|
|
Processes in batches; retries with exponential backoff on transient errors.
|
|
Order is preserved.
|
|
"""
|
|
results: list[list[float]] = []
|
|
for i in range(0, len(texts), _BATCH_SIZE):
|
|
batch = list(texts[i: i + _BATCH_SIZE])
|
|
vectors = await self._embed_batch_with_retry(batch)
|
|
results.extend(vectors)
|
|
return results
|
|
|
|
async def embed_text(self, text: str) -> list[float]:
|
|
vectors = await self.embed_texts([text])
|
|
return vectors[0]
|
|
|
|
async def _embed_batch_with_retry(self, texts: list[str]) -> list[list[float]]:
|
|
backoff = _INITIAL_BACKOFF
|
|
for attempt in range(1, _MAX_RETRIES + 1):
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
self._client.models.embed_content,
|
|
model=_EMBED_MODEL,
|
|
contents=texts,
|
|
config=genai_types.EmbedContentConfig(
|
|
task_type="RETRIEVAL_DOCUMENT",
|
|
),
|
|
)
|
|
return [list(emb.values) for emb in response.embeddings]
|
|
except Exception as exc:
|
|
if attempt == _MAX_RETRIES:
|
|
logger.error(f"Embedding batch failed after {_MAX_RETRIES} attempts: {exc}")
|
|
raise
|
|
# Honour the server-suggested retryDelay when present (e.g. 429 RESOURCE_EXHAUSTED).
|
|
# Fall back to our own exponential backoff otherwise.
|
|
server_delay = _parse_retry_delay(exc)
|
|
delay = max(server_delay + 1.0, backoff) if server_delay else backoff
|
|
logger.warning(f"Embedding attempt {attempt} failed, retrying in {delay}s: {exc}")
|
|
await asyncio.sleep(delay)
|
|
backoff *= 2
|
|
|
|
raise RuntimeError("unreachable") # makes type-checker happy
|
|
|
|
|
|
embedding_service = EmbeddingService()
|