284 lines
No EOL
9.5 KiB
Python
284 lines
No EOL
9.5 KiB
Python
"""Google Cloud Secret Manager integration service."""
|
|
|
|
import os
|
|
import asyncio
|
|
from typing import Dict, List, Optional, Any
|
|
from functools import lru_cache
|
|
from google.cloud import secretmanager
|
|
from google.api_core import exceptions as gcp_exceptions
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.logging import get_logger
|
|
from app.telemetry.tracing import trace_async_operation
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class SecretManagerError(Exception):
|
|
"""Custom exception for Secret Manager operations."""
|
|
pass
|
|
|
|
|
|
class SecretsManager:
|
|
"""Service for managing secrets via Google Cloud Secret Manager."""
|
|
|
|
def __init__(self):
|
|
self.settings = get_settings()
|
|
self.client: Optional[secretmanager.SecretManagerServiceClient] = None
|
|
self.project_id = self.settings.google_cloud_project
|
|
self._cache: Dict[str, str] = {}
|
|
self._cache_ttl = 300 # 5 minutes cache
|
|
|
|
def _get_client(self) -> secretmanager.SecretManagerServiceClient:
|
|
"""Get or create Secret Manager client."""
|
|
if not self.client:
|
|
try:
|
|
self.client = secretmanager.SecretManagerServiceClient()
|
|
logger.info("Secret Manager client initialized")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Secret Manager client: {e}")
|
|
raise SecretManagerError(f"Failed to initialize Secret Manager: {e}")
|
|
|
|
return self.client
|
|
|
|
@trace_async_operation("secrets_manager.get_secret")
|
|
async def get_secret(self, secret_name: str, version: str = "latest") -> str:
|
|
"""
|
|
Retrieve a secret from Google Cloud Secret Manager.
|
|
|
|
Args:
|
|
secret_name: Name of the secret
|
|
version: Version of the secret (default: "latest")
|
|
|
|
Returns:
|
|
The secret value as a string
|
|
|
|
Raises:
|
|
SecretManagerError: If secret cannot be retrieved
|
|
"""
|
|
|
|
cache_key = f"{secret_name}:{version}"
|
|
|
|
# Check cache first
|
|
if cache_key in self._cache:
|
|
logger.debug(f"Secret {secret_name} retrieved from cache")
|
|
return self._cache[cache_key]
|
|
|
|
try:
|
|
# Build the secret name
|
|
name = f"projects/{self.project_id}/secrets/{secret_name}/versions/{version}"
|
|
|
|
# Get the secret
|
|
client = self._get_client()
|
|
|
|
# Run in thread pool since Secret Manager client is synchronous
|
|
loop = asyncio.get_event_loop()
|
|
response = await loop.run_in_executor(
|
|
None,
|
|
client.access_secret_version,
|
|
{"name": name}
|
|
)
|
|
|
|
secret_value = response.payload.data.decode("UTF-8")
|
|
|
|
# Cache the secret (with TTL handled by application restart)
|
|
self._cache[cache_key] = secret_value
|
|
|
|
logger.info(f"Successfully retrieved secret: {secret_name}")
|
|
return secret_value
|
|
|
|
except gcp_exceptions.NotFound:
|
|
error_msg = f"Secret not found: {secret_name}"
|
|
logger.error(error_msg)
|
|
raise SecretManagerError(error_msg)
|
|
|
|
except gcp_exceptions.PermissionDenied:
|
|
error_msg = f"Permission denied accessing secret: {secret_name}"
|
|
logger.error(error_msg)
|
|
raise SecretManagerError(error_msg)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to retrieve secret {secret_name}: {e}"
|
|
logger.error(error_msg)
|
|
raise SecretManagerError(error_msg)
|
|
|
|
@trace_async_operation("secrets_manager.get_secrets_batch")
|
|
async def get_secrets_batch(self, secret_names: List[str]) -> Dict[str, str]:
|
|
"""
|
|
Retrieve multiple secrets efficiently.
|
|
|
|
Args:
|
|
secret_names: List of secret names to retrieve
|
|
|
|
Returns:
|
|
Dictionary mapping secret names to their values
|
|
"""
|
|
|
|
secrets = {}
|
|
tasks = []
|
|
|
|
for secret_name in secret_names:
|
|
task = asyncio.create_task(
|
|
self.get_secret(secret_name),
|
|
name=f"get_secret_{secret_name}"
|
|
)
|
|
tasks.append((secret_name, task))
|
|
|
|
# Wait for all tasks to complete
|
|
for secret_name, task in tasks:
|
|
try:
|
|
secrets[secret_name] = await task
|
|
except SecretManagerError as e:
|
|
logger.warning(f"Failed to retrieve secret {secret_name}: {e}")
|
|
# Continue with other secrets
|
|
continue
|
|
|
|
return secrets
|
|
|
|
async def create_secret(self, secret_name: str, secret_value: str, labels: Optional[Dict[str, str]] = None) -> str:
|
|
"""
|
|
Create a new secret in Secret Manager.
|
|
|
|
Args:
|
|
secret_name: Name of the secret
|
|
secret_value: Value to store
|
|
labels: Optional labels for the secret
|
|
|
|
Returns:
|
|
The full secret resource name
|
|
"""
|
|
|
|
try:
|
|
client = self._get_client()
|
|
parent = f"projects/{self.project_id}"
|
|
|
|
# Create the secret
|
|
secret = {
|
|
"labels": labels or {},
|
|
"replication": {"automatic": {}}
|
|
}
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
# Create secret resource
|
|
create_response = await loop.run_in_executor(
|
|
None,
|
|
client.create_secret,
|
|
{
|
|
"parent": parent,
|
|
"secret_id": secret_name,
|
|
"secret": secret
|
|
}
|
|
)
|
|
|
|
# Add secret version with the actual value
|
|
version_response = await loop.run_in_executor(
|
|
None,
|
|
client.add_secret_version,
|
|
{
|
|
"parent": create_response.name,
|
|
"payload": {"data": secret_value.encode("UTF-8")}
|
|
}
|
|
)
|
|
|
|
logger.info(f"Successfully created secret: {secret_name}")
|
|
return version_response.name
|
|
|
|
except gcp_exceptions.AlreadyExists:
|
|
error_msg = f"Secret already exists: {secret_name}"
|
|
logger.error(error_msg)
|
|
raise SecretManagerError(error_msg)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to create secret {secret_name}: {e}"
|
|
logger.error(error_msg)
|
|
raise SecretManagerError(error_msg)
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear the secrets cache."""
|
|
self._cache.clear()
|
|
logger.info("Secrets cache cleared")
|
|
|
|
|
|
# Global secrets manager instance
|
|
secrets_manager = SecretsManager()
|
|
|
|
|
|
# Convenience functions for common operations
|
|
async def get_secret(secret_name: str, version: str = "latest") -> str:
|
|
"""Get a secret value."""
|
|
return await secrets_manager.get_secret(secret_name, version)
|
|
|
|
|
|
async def get_database_url() -> str:
|
|
"""Get MongoDB connection URL from Secret Manager."""
|
|
try:
|
|
return await secrets_manager.get_secret("mongodb-url")
|
|
except SecretManagerError:
|
|
# Fallback to environment variable
|
|
url = os.getenv("MONGODB_URL")
|
|
if not url:
|
|
raise SecretManagerError("MongoDB URL not available in secrets or environment")
|
|
return url
|
|
|
|
|
|
async def get_redis_url() -> str:
|
|
"""Get Redis connection URL from Secret Manager."""
|
|
try:
|
|
return await secrets_manager.get_secret("redis-url")
|
|
except SecretManagerError:
|
|
# Fallback to environment variable
|
|
url = os.getenv("REDIS_URL")
|
|
if not url:
|
|
raise SecretManagerError("Redis URL not available in secrets or environment")
|
|
return url
|
|
|
|
|
|
async def get_jwt_secrets() -> Dict[str, str]:
|
|
"""Get JWT secrets from Secret Manager."""
|
|
try:
|
|
return await secrets_manager.get_secrets_batch([
|
|
"jwt-secret",
|
|
"jwt-refresh-secret"
|
|
])
|
|
except SecretManagerError:
|
|
# Fallback to environment variables
|
|
return {
|
|
"jwt-secret": os.getenv("JWT_SECRET_KEY", "dev-secret-change-in-production"),
|
|
"jwt-refresh-secret": os.getenv("JWT_REFRESH_SECRET_KEY", "dev-refresh-secret-change-in-production")
|
|
}
|
|
|
|
|
|
async def get_api_keys() -> Dict[str, str]:
|
|
"""Get all API keys from Secret Manager."""
|
|
api_keys = {}
|
|
|
|
secret_names = [
|
|
"gemini-api-key",
|
|
"sendgrid-api-key",
|
|
"elevenlabs-api-key",
|
|
"sentry-dsn"
|
|
]
|
|
|
|
try:
|
|
api_keys = await secrets_manager.get_secrets_batch(secret_names)
|
|
except SecretManagerError:
|
|
logger.warning("Failed to retrieve some API keys from Secret Manager, using environment fallback")
|
|
|
|
# Fallback to environment variables for missing keys
|
|
env_mapping = {
|
|
"gemini-api-key": "GEMINI_API_KEY",
|
|
"sendgrid-api-key": "SENDGRID_API_KEY",
|
|
"elevenlabs-api-key": "ELEVENLABS_API_KEY",
|
|
"sentry-dsn": "SENTRY_DSN"
|
|
}
|
|
|
|
for secret_name, env_var in env_mapping.items():
|
|
if secret_name not in api_keys:
|
|
env_value = os.getenv(env_var)
|
|
if env_value:
|
|
api_keys[secret_name] = env_value
|
|
else:
|
|
logger.warning(f"API key {secret_name} not available in secrets or environment")
|
|
|
|
return api_keys |