video-accessibility/backend/app/services/secrets_manager.py
2025-08-24 16:28:33 -05:00

284 lines
No EOL
9.5 KiB
Python

"""Google Cloud Secret Manager integration service."""
import os
import asyncio
from typing import Dict, List, Optional, Any
from functools import lru_cache
from google.cloud import secretmanager
from google.api_core import exceptions as gcp_exceptions
from app.core.config import get_settings
from app.core.logging import get_logger
from app.telemetry.tracing import trace_async_operation
logger = get_logger(__name__)
class SecretManagerError(Exception):
"""Custom exception for Secret Manager operations."""
pass
class SecretsManager:
"""Service for managing secrets via Google Cloud Secret Manager."""
def __init__(self):
self.settings = get_settings()
self.client: Optional[secretmanager.SecretManagerServiceClient] = None
self.project_id = self.settings.google_cloud_project
self._cache: Dict[str, str] = {}
self._cache_ttl = 300 # 5 minutes cache
def _get_client(self) -> secretmanager.SecretManagerServiceClient:
"""Get or create Secret Manager client."""
if not self.client:
try:
self.client = secretmanager.SecretManagerServiceClient()
logger.info("Secret Manager client initialized")
except Exception as e:
logger.error(f"Failed to initialize Secret Manager client: {e}")
raise SecretManagerError(f"Failed to initialize Secret Manager: {e}")
return self.client
@trace_async_operation("secrets_manager.get_secret")
async def get_secret(self, secret_name: str, version: str = "latest") -> str:
"""
Retrieve a secret from Google Cloud Secret Manager.
Args:
secret_name: Name of the secret
version: Version of the secret (default: "latest")
Returns:
The secret value as a string
Raises:
SecretManagerError: If secret cannot be retrieved
"""
cache_key = f"{secret_name}:{version}"
# Check cache first
if cache_key in self._cache:
logger.debug(f"Secret {secret_name} retrieved from cache")
return self._cache[cache_key]
try:
# Build the secret name
name = f"projects/{self.project_id}/secrets/{secret_name}/versions/{version}"
# Get the secret
client = self._get_client()
# Run in thread pool since Secret Manager client is synchronous
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
client.access_secret_version,
{"name": name}
)
secret_value = response.payload.data.decode("UTF-8")
# Cache the secret (with TTL handled by application restart)
self._cache[cache_key] = secret_value
logger.info(f"Successfully retrieved secret: {secret_name}")
return secret_value
except gcp_exceptions.NotFound:
error_msg = f"Secret not found: {secret_name}"
logger.error(error_msg)
raise SecretManagerError(error_msg)
except gcp_exceptions.PermissionDenied:
error_msg = f"Permission denied accessing secret: {secret_name}"
logger.error(error_msg)
raise SecretManagerError(error_msg)
except Exception as e:
error_msg = f"Failed to retrieve secret {secret_name}: {e}"
logger.error(error_msg)
raise SecretManagerError(error_msg)
@trace_async_operation("secrets_manager.get_secrets_batch")
async def get_secrets_batch(self, secret_names: List[str]) -> Dict[str, str]:
"""
Retrieve multiple secrets efficiently.
Args:
secret_names: List of secret names to retrieve
Returns:
Dictionary mapping secret names to their values
"""
secrets = {}
tasks = []
for secret_name in secret_names:
task = asyncio.create_task(
self.get_secret(secret_name),
name=f"get_secret_{secret_name}"
)
tasks.append((secret_name, task))
# Wait for all tasks to complete
for secret_name, task in tasks:
try:
secrets[secret_name] = await task
except SecretManagerError as e:
logger.warning(f"Failed to retrieve secret {secret_name}: {e}")
# Continue with other secrets
continue
return secrets
async def create_secret(self, secret_name: str, secret_value: str, labels: Optional[Dict[str, str]] = None) -> str:
"""
Create a new secret in Secret Manager.
Args:
secret_name: Name of the secret
secret_value: Value to store
labels: Optional labels for the secret
Returns:
The full secret resource name
"""
try:
client = self._get_client()
parent = f"projects/{self.project_id}"
# Create the secret
secret = {
"labels": labels or {},
"replication": {"automatic": {}}
}
loop = asyncio.get_event_loop()
# Create secret resource
create_response = await loop.run_in_executor(
None,
client.create_secret,
{
"parent": parent,
"secret_id": secret_name,
"secret": secret
}
)
# Add secret version with the actual value
version_response = await loop.run_in_executor(
None,
client.add_secret_version,
{
"parent": create_response.name,
"payload": {"data": secret_value.encode("UTF-8")}
}
)
logger.info(f"Successfully created secret: {secret_name}")
return version_response.name
except gcp_exceptions.AlreadyExists:
error_msg = f"Secret already exists: {secret_name}"
logger.error(error_msg)
raise SecretManagerError(error_msg)
except Exception as e:
error_msg = f"Failed to create secret {secret_name}: {e}"
logger.error(error_msg)
raise SecretManagerError(error_msg)
def clear_cache(self) -> None:
"""Clear the secrets cache."""
self._cache.clear()
logger.info("Secrets cache cleared")
# Global secrets manager instance
secrets_manager = SecretsManager()
# Convenience functions for common operations
async def get_secret(secret_name: str, version: str = "latest") -> str:
"""Get a secret value."""
return await secrets_manager.get_secret(secret_name, version)
async def get_database_url() -> str:
"""Get MongoDB connection URL from Secret Manager."""
try:
return await secrets_manager.get_secret("mongodb-url")
except SecretManagerError:
# Fallback to environment variable
url = os.getenv("MONGODB_URL")
if not url:
raise SecretManagerError("MongoDB URL not available in secrets or environment")
return url
async def get_redis_url() -> str:
"""Get Redis connection URL from Secret Manager."""
try:
return await secrets_manager.get_secret("redis-url")
except SecretManagerError:
# Fallback to environment variable
url = os.getenv("REDIS_URL")
if not url:
raise SecretManagerError("Redis URL not available in secrets or environment")
return url
async def get_jwt_secrets() -> Dict[str, str]:
"""Get JWT secrets from Secret Manager."""
try:
return await secrets_manager.get_secrets_batch([
"jwt-secret",
"jwt-refresh-secret"
])
except SecretManagerError:
# Fallback to environment variables
return {
"jwt-secret": os.getenv("JWT_SECRET_KEY", "dev-secret-change-in-production"),
"jwt-refresh-secret": os.getenv("JWT_REFRESH_SECRET_KEY", "dev-refresh-secret-change-in-production")
}
async def get_api_keys() -> Dict[str, str]:
"""Get all API keys from Secret Manager."""
api_keys = {}
secret_names = [
"gemini-api-key",
"sendgrid-api-key",
"elevenlabs-api-key",
"sentry-dsn"
]
try:
api_keys = await secrets_manager.get_secrets_batch(secret_names)
except SecretManagerError:
logger.warning("Failed to retrieve some API keys from Secret Manager, using environment fallback")
# Fallback to environment variables for missing keys
env_mapping = {
"gemini-api-key": "GEMINI_API_KEY",
"sendgrid-api-key": "SENDGRID_API_KEY",
"elevenlabs-api-key": "ELEVENLABS_API_KEY",
"sentry-dsn": "SENTRY_DSN"
}
for secret_name, env_var in env_mapping.items():
if secret_name not in api_keys:
env_value = os.getenv(env_var)
if env_value:
api_keys[secret_name] = env_value
else:
logger.warning(f"API key {secret_name} not available in secrets or environment")
return api_keys