"""Google Cloud Secret Manager integration service.""" import os import asyncio from typing import Dict, List, Optional, Any from functools import lru_cache from google.cloud import secretmanager from google.api_core import exceptions as gcp_exceptions from app.core.config import get_settings from app.core.logging import get_logger from app.telemetry.tracing import trace_async_operation logger = get_logger(__name__) class SecretManagerError(Exception): """Custom exception for Secret Manager operations.""" pass class SecretsManager: """Service for managing secrets via Google Cloud Secret Manager.""" def __init__(self): self.settings = get_settings() self.client: Optional[secretmanager.SecretManagerServiceClient] = None self.project_id = self.settings.google_cloud_project self._cache: Dict[str, str] = {} self._cache_ttl = 300 # 5 minutes cache def _get_client(self) -> secretmanager.SecretManagerServiceClient: """Get or create Secret Manager client.""" if not self.client: try: self.client = secretmanager.SecretManagerServiceClient() logger.info("Secret Manager client initialized") except Exception as e: logger.error(f"Failed to initialize Secret Manager client: {e}") raise SecretManagerError(f"Failed to initialize Secret Manager: {e}") return self.client @trace_async_operation("secrets_manager.get_secret") async def get_secret(self, secret_name: str, version: str = "latest") -> str: """ Retrieve a secret from Google Cloud Secret Manager. Args: secret_name: Name of the secret version: Version of the secret (default: "latest") Returns: The secret value as a string Raises: SecretManagerError: If secret cannot be retrieved """ cache_key = f"{secret_name}:{version}" # Check cache first if cache_key in self._cache: logger.debug(f"Secret {secret_name} retrieved from cache") return self._cache[cache_key] try: # Build the secret name name = f"projects/{self.project_id}/secrets/{secret_name}/versions/{version}" # Get the secret client = self._get_client() # Run in thread pool since Secret Manager client is synchronous loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, client.access_secret_version, {"name": name} ) secret_value = response.payload.data.decode("UTF-8") # Cache the secret (with TTL handled by application restart) self._cache[cache_key] = secret_value logger.info(f"Successfully retrieved secret: {secret_name}") return secret_value except gcp_exceptions.NotFound: error_msg = f"Secret not found: {secret_name}" logger.error(error_msg) raise SecretManagerError(error_msg) except gcp_exceptions.PermissionDenied: error_msg = f"Permission denied accessing secret: {secret_name}" logger.error(error_msg) raise SecretManagerError(error_msg) except Exception as e: error_msg = f"Failed to retrieve secret {secret_name}: {e}" logger.error(error_msg) raise SecretManagerError(error_msg) @trace_async_operation("secrets_manager.get_secrets_batch") async def get_secrets_batch(self, secret_names: List[str]) -> Dict[str, str]: """ Retrieve multiple secrets efficiently. Args: secret_names: List of secret names to retrieve Returns: Dictionary mapping secret names to their values """ secrets = {} tasks = [] for secret_name in secret_names: task = asyncio.create_task( self.get_secret(secret_name), name=f"get_secret_{secret_name}" ) tasks.append((secret_name, task)) # Wait for all tasks to complete for secret_name, task in tasks: try: secrets[secret_name] = await task except SecretManagerError as e: logger.warning(f"Failed to retrieve secret {secret_name}: {e}") # Continue with other secrets continue return secrets async def create_secret(self, secret_name: str, secret_value: str, labels: Optional[Dict[str, str]] = None) -> str: """ Create a new secret in Secret Manager. Args: secret_name: Name of the secret secret_value: Value to store labels: Optional labels for the secret Returns: The full secret resource name """ try: client = self._get_client() parent = f"projects/{self.project_id}" # Create the secret secret = { "labels": labels or {}, "replication": {"automatic": {}} } loop = asyncio.get_event_loop() # Create secret resource create_response = await loop.run_in_executor( None, client.create_secret, { "parent": parent, "secret_id": secret_name, "secret": secret } ) # Add secret version with the actual value version_response = await loop.run_in_executor( None, client.add_secret_version, { "parent": create_response.name, "payload": {"data": secret_value.encode("UTF-8")} } ) logger.info(f"Successfully created secret: {secret_name}") return version_response.name except gcp_exceptions.AlreadyExists: error_msg = f"Secret already exists: {secret_name}" logger.error(error_msg) raise SecretManagerError(error_msg) except Exception as e: error_msg = f"Failed to create secret {secret_name}: {e}" logger.error(error_msg) raise SecretManagerError(error_msg) def clear_cache(self) -> None: """Clear the secrets cache.""" self._cache.clear() logger.info("Secrets cache cleared") # Global secrets manager instance secrets_manager = SecretsManager() # Convenience functions for common operations async def get_secret(secret_name: str, version: str = "latest") -> str: """Get a secret value.""" return await secrets_manager.get_secret(secret_name, version) async def get_database_url() -> str: """Get MongoDB connection URL from Secret Manager.""" try: return await secrets_manager.get_secret("mongodb-url") except SecretManagerError: # Fallback to environment variable url = os.getenv("MONGODB_URL") if not url: raise SecretManagerError("MongoDB URL not available in secrets or environment") return url async def get_redis_url() -> str: """Get Redis connection URL from Secret Manager.""" try: return await secrets_manager.get_secret("redis-url") except SecretManagerError: # Fallback to environment variable url = os.getenv("REDIS_URL") if not url: raise SecretManagerError("Redis URL not available in secrets or environment") return url async def get_jwt_secrets() -> Dict[str, str]: """Get JWT secrets from Secret Manager.""" try: return await secrets_manager.get_secrets_batch([ "jwt-secret", "jwt-refresh-secret" ]) except SecretManagerError: # Fallback to environment variables return { "jwt-secret": os.getenv("JWT_SECRET_KEY", "dev-secret-change-in-production"), "jwt-refresh-secret": os.getenv("JWT_REFRESH_SECRET_KEY", "dev-refresh-secret-change-in-production") } async def get_api_keys() -> Dict[str, str]: """Get all API keys from Secret Manager.""" api_keys = {} secret_names = [ "gemini-api-key", "sendgrid-api-key", "elevenlabs-api-key", "sentry-dsn" ] try: api_keys = await secrets_manager.get_secrets_batch(secret_names) except SecretManagerError: logger.warning("Failed to retrieve some API keys from Secret Manager, using environment fallback") # Fallback to environment variables for missing keys env_mapping = { "gemini-api-key": "GEMINI_API_KEY", "sendgrid-api-key": "SENDGRID_API_KEY", "elevenlabs-api-key": "ELEVENLABS_API_KEY", "sentry-dsn": "SENTRY_DSN" } for secret_name, env_var in env_mapping.items(): if secret_name not in api_keys: env_value = os.getenv(env_var) if env_value: api_keys[secret_name] = env_value else: logger.warning(f"API key {secret_name} not available in secrets or environment") return api_keys