pahvalentines/backend/app/services/storage.py
michael 2ebe470a83 feat(download): use signed URLs for video download
Download button now redirects to backend endpoint that generates
GCS signed URLs with Content-Disposition: attachment header,
forcing browser download instead of opening in new tab.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 14:48:06 -06:00

233 lines
6.6 KiB
Python

"""Google Cloud Storage utility module for file operations."""
import logging
import os
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import Generator
from google.cloud import storage
from google.cloud.storage import Bucket, Client
from app.config import settings
logger = logging.getLogger(__name__)
# Module-level cached instances
_client: Client | None = None
_bucket: Bucket | None = None
def get_client() -> Client:
"""Get lazy-loaded GCS client instance."""
global _client
if _client is None:
credentials_path = settings.GCS_CREDENTIALS_PATH
if os.path.exists(credentials_path):
_client = storage.Client.from_service_account_json(
credentials_path,
project=settings.GCS_PROJECT_ID,
)
else:
# Fall back to application default credentials (e.g., in Cloud Run)
_client = storage.Client(project=settings.GCS_PROJECT_ID)
logger.info(f"GCS client initialized for project {settings.GCS_PROJECT_ID}")
return _client
def get_bucket() -> Bucket:
"""Get lazy-loaded GCS bucket instance."""
global _bucket
if _bucket is None:
_bucket = get_client().bucket(settings.GCS_BUCKET_NAME)
logger.info(f"GCS bucket initialized: {settings.GCS_BUCKET_NAME}")
return _bucket
def upload_bytes(data: bytes, blob_path: str, content_type: str) -> str:
"""Upload bytes directly to GCS.
Args:
data: Bytes to upload
blob_path: Path within the bucket (e.g., "uploads/session123.jpg")
content_type: MIME type (e.g., "image/jpeg")
Returns:
The blob path that was uploaded
"""
bucket = get_bucket()
blob = bucket.blob(blob_path)
blob.upload_from_string(data, content_type=content_type)
logger.info(f"Uploaded {len(data)} bytes to gs://{settings.GCS_BUCKET_NAME}/{blob_path}")
return blob_path
def upload_file(local_path: str | Path, blob_path: str, content_type: str) -> str:
"""Upload local file to GCS.
Args:
local_path: Path to local file
blob_path: Path within the bucket
content_type: MIME type
Returns:
The blob path that was uploaded
"""
bucket = get_bucket()
blob = bucket.blob(blob_path)
blob.upload_from_filename(str(local_path), content_type=content_type)
logger.info(f"Uploaded {local_path} to gs://{settings.GCS_BUCKET_NAME}/{blob_path}")
return blob_path
def download_to_file(blob_path: str, local_path: str | Path) -> None:
"""Download blob to local file.
Args:
blob_path: Path within the bucket
local_path: Destination local path
"""
bucket = get_bucket()
blob = bucket.blob(blob_path)
blob.download_to_filename(str(local_path))
logger.info(f"Downloaded gs://{settings.GCS_BUCKET_NAME}/{blob_path} to {local_path}")
def delete_blob(blob_path: str) -> bool:
"""Delete blob from GCS.
Args:
blob_path: Path within the bucket
Returns:
True if deleted, False if blob didn't exist
"""
bucket = get_bucket()
blob = bucket.blob(blob_path)
if blob.exists():
blob.delete()
logger.info(f"Deleted gs://{settings.GCS_BUCKET_NAME}/{blob_path}")
return True
logger.warning(f"Blob not found for deletion: {blob_path}")
return False
def get_public_url(blob_path: str) -> str:
"""Get public URL for a blob.
Args:
blob_path: Path within the bucket
Returns:
Public URL (e.g., "https://storage.googleapis.com/vday2026/uploads/session123.jpg")
"""
return f"https://storage.googleapis.com/{settings.GCS_BUCKET_NAME}/{blob_path}"
def generate_signed_download_url(
blob_path: str,
filename: str,
expiration_minutes: int = 60
) -> str:
"""Generate a signed URL that forces download with custom filename.
Args:
blob_path: Path within the bucket
filename: Filename for Content-Disposition header
expiration_minutes: URL expiration time in minutes
Returns:
Signed URL with Content-Disposition: attachment header
"""
from datetime import timedelta
bucket = get_bucket()
blob = bucket.blob(blob_path)
url = blob.generate_signed_url(
version="v4",
expiration=timedelta(minutes=expiration_minutes),
method="GET",
response_disposition=f'attachment; filename="{filename}"',
)
logger.info(f"Generated signed download URL for {blob_path}")
return url
@contextmanager
def temp_download(blob_path: str, suffix: str = "") -> Generator[str, None, None]:
"""Context manager that downloads blob to temp file, auto-cleans up.
Args:
blob_path: Path within the bucket
suffix: File suffix for temp file (e.g., ".mp3")
Yields:
Path to the temporary file
"""
fd, temp_path = tempfile.mkstemp(suffix=suffix)
os.close(fd)
try:
download_to_file(blob_path, temp_path)
yield temp_path
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
logger.debug(f"Cleaned up temp file: {temp_path}")
@contextmanager
def temp_file_for_upload(suffix: str = "") -> Generator[str, None, None]:
"""Context manager for temp output files that will be uploaded.
Args:
suffix: File suffix (e.g., ".mp4")
Yields:
Path to the temporary file
"""
fd, temp_path = tempfile.mkstemp(suffix=suffix)
os.close(fd)
try:
yield temp_path
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
logger.debug(f"Cleaned up temp file: {temp_path}")
# Helper functions for blob path construction
def get_upload_blob_path(session_id: str) -> str:
"""Get blob path for uploaded pet photo."""
return f"{settings.GCS_UPLOADS_FOLDER}/{session_id}.jpg"
def get_audio_blob_path(session_id: str) -> str:
"""Get blob path for generated audio."""
return f"{settings.GCS_AUDIO_FOLDER}/{session_id}.mp3"
def get_video_blob_path(session_id: str) -> str:
"""Get blob path for generated video."""
return f"{settings.GCS_VIDEO_FOLDER}/{session_id}.mp4"
def get_composite_blob_path(session_id: str) -> str:
"""Get blob path for composite record image."""
return f"{settings.GCS_IMAGES_FOLDER}/{session_id}.png"
def check_connectivity() -> bool:
"""Check if GCS is accessible.
Returns:
True if bucket is accessible, raises exception otherwise
"""
bucket = get_bucket()
# Just check if we can access the bucket
bucket.exists()
logger.info(f"GCS connectivity check passed for bucket: {settings.GCS_BUCKET_NAME}")
return True