import asyncio from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Optional from fastapi import HTTPException, UploadFile from google.cloud import storage from google.cloud.exceptions import NotFound from ..core.config import settings from ..core.logging import get_logger logger = get_logger(__name__) class GCSService: def __init__(self): self.client = storage.Client(project=settings.gcp_project_id) self.bucket = self.client.bucket(settings.gcs_bucket) self.executor = ThreadPoolExecutor(max_workers=4) async def upload_file_to_gcs( self, file: UploadFile, destination_path: str, content_type: Optional[str] = None ) -> str: """Upload file to GCS and return the GCS URI""" def _upload(): blob = self.bucket.blob(destination_path) # Set content type if content_type: blob.content_type = content_type elif file.content_type: blob.content_type = file.content_type # Upload file file.file.seek(0) # Reset file pointer blob.upload_from_file(file.file) return f"gs://{settings.gcs_bucket}/{destination_path}" loop = asyncio.get_event_loop() try: return await loop.run_in_executor(self.executor, _upload) except Exception as e: logger.error(f"Failed to upload file to GCS: {e}") raise HTTPException(status_code=500, detail="File upload failed") async def upload_text_to_gcs( self, content: str, destination_path: str, content_type: str = "text/plain" ) -> str: """Upload text content to GCS and return the GCS URI""" def _upload(): blob = self.bucket.blob(destination_path) blob.content_type = content_type blob.upload_from_string(content, content_type=content_type) return f"gs://{settings.gcs_bucket}/{destination_path}" loop = asyncio.get_event_loop() try: return await loop.run_in_executor(self.executor, _upload) except Exception as e: logger.error(f"Failed to upload text to GCS: {e}") raise HTTPException(status_code=500, detail="Text upload failed") async def get_signed_url( self, blob_path: str, expiration_hours: int = 24, method: str = "GET" ) -> str: """Generate a signed URL for downloading a file""" def _get_signed_url(): blob = self.bucket.blob(blob_path) # Check if blob exists if not blob.exists(): raise NotFound(f"File not found: {blob_path}") expiration = datetime.utcnow() + timedelta(hours=expiration_hours) return blob.generate_signed_url( expiration=expiration, method=method, version="v4" ) loop = asyncio.get_event_loop() try: return await loop.run_in_executor(self.executor, _get_signed_url) except NotFound: raise HTTPException(status_code=404, detail="File not found") except Exception as e: logger.error(f"Failed to generate signed URL: {e}") raise HTTPException(status_code=500, detail="Failed to generate download URL") async def delete_file(self, blob_path: str) -> bool: """Delete a file from GCS""" def _delete(): blob = self.bucket.blob(blob_path) blob.delete() return True loop = asyncio.get_event_loop() try: return await loop.run_in_executor(self.executor, _delete) except NotFound: return False except Exception as e: logger.error(f"Failed to delete file from GCS: {e}") raise HTTPException(status_code=500, detail="File deletion failed") async def file_exists(self, blob_path: str) -> bool: """Check if a file exists in GCS""" def _exists(): blob = self.bucket.blob(blob_path) return blob.exists() loop = asyncio.get_event_loop() return await loop.run_in_executor(self.executor, _exists) # Global GCS service instance gcs_service = GCSService() # Convenience functions async def upload_file_to_gcs(file: UploadFile, destination_path: str) -> str: return await gcs_service.upload_file_to_gcs(file, destination_path) async def upload_vtt_to_gcs(content: str, destination_path: str) -> str: return await gcs_service.upload_text_to_gcs(content, destination_path, "text/vtt; charset=utf-8") async def upload_json_to_gcs(content: str, destination_path: str) -> str: return await gcs_service.upload_text_to_gcs(content, destination_path, "application/json") async def get_signed_download_url(blob_path: str, expiration_hours: int = 24) -> str: return await gcs_service.get_signed_url(blob_path, expiration_hours) async def generate_signed_upload_url( blob_path: str, content_type: str, max_size: int = 1024 * 1024 * 1024 # 1GB ) -> dict: """Generate a signed URL for direct browser-to-GCS upload""" def _generate(): blob = gcs_service.bucket.blob(blob_path) # Generate signed POST URL url, fields = blob.generate_signed_post_policy_v4( expiration=timedelta(hours=1), conditions=[ ["content-length-range", 1, max_size], ["starts-with", "$Content-Type", content_type.split("/")[0]] ], fields={ "Content-Type": content_type } ) return {"url": url, "fields": fields} loop = asyncio.get_event_loop() return await loop.run_in_executor(gcs_service.executor, _generate)