video-accessibility/backend/app/services/gcs.py
Vadym Samoilenko ca312d48fa chore(lint): fix all ruff errors — 0 warnings remaining
- B904 (55): add `from err` / `from None` to raise-in-except across 13 files
- F821 (1): add missing HTTPException import in routes_language_qc.py
- F841 (7): remove unused variable assignments (current_user, job_title, tts_provider, etc.)
- W293 (13): strip trailing whitespace from blank lines
- C416 (4): rewrite unnecessary dict comprehensions as dict()
- C401 (1): rewrite unnecessary generator as set comprehension
- E701 (4): split multi-statement lines in cost_tracker.py
- E741 (1): rename ambiguous `l` to `lang` in cloud_run_dispatch.py
- B007 (4): prefix unused loop variables with _ in tts.py, video_renderer.py
- I001 (1): sort imports in tasks/__init__.py (move stdlib to top)
- E402 (3): move threading/time/signals imports to top of tasks/__init__.py
- UP042 (9): replace (str, Enum) with StrEnum in all model/schema enums

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 17:13:08 +01:00

213 lines
7.5 KiB
Python

import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from fastapi import HTTPException, UploadFile
from google.cloud import storage
from google.cloud.exceptions import NotFound
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
class GCSService:
def __init__(self) -> None:
self._client: storage.Client | None = None
self._bucket = None
self.executor = ThreadPoolExecutor(max_workers=4)
@property
def bucket(self):
if self._bucket is None:
self._client = storage.Client(project=settings.gcp_project_id)
self._bucket = self._client.bucket(settings.gcs_bucket)
return self._bucket
@bucket.setter
def bucket(self, value) -> None:
self._bucket = value
async def upload_file_to_gcs(
self,
file: UploadFile,
destination_path: str,
content_type: str | None = None
) -> str:
"""Upload file to GCS and return the GCS URI"""
def _upload():
blob = self.bucket.blob(destination_path)
# Set content type
if content_type:
blob.content_type = content_type
elif file.content_type:
blob.content_type = file.content_type
# Upload file
file.file.seek(0) # Reset file pointer
blob.upload_from_file(file.file)
return f"gs://{settings.gcs_bucket}/{destination_path}"
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _upload)
except Exception as e:
logger.error(f"Failed to upload file to GCS: {e}")
raise HTTPException(status_code=500, detail="File upload failed") from None
async def upload_text_to_gcs(
self,
content: str,
destination_path: str,
content_type: str = "text/plain"
) -> str:
"""Upload text content to GCS and return the GCS URI"""
def _upload():
blob = self.bucket.blob(destination_path)
blob.content_type = content_type
blob.upload_from_string(content, content_type=content_type)
return f"gs://{settings.gcs_bucket}/{destination_path}"
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _upload)
except Exception as e:
logger.error(f"Failed to upload text to GCS: {e}")
raise HTTPException(status_code=500, detail="Text upload failed") from None
async def get_signed_url(
self,
blob_path: str,
expiration_hours: int = 24,
method: str = "GET"
) -> str:
"""Generate a signed URL for downloading a file"""
def _get_signed_url():
blob = self.bucket.blob(blob_path)
# Check if blob exists
if not blob.exists():
raise NotFound(f"File not found: {blob_path}")
expiration = datetime.utcnow() + timedelta(hours=expiration_hours)
return blob.generate_signed_url(
expiration=expiration,
method=method,
version="v4"
)
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _get_signed_url)
except NotFound:
raise HTTPException(status_code=404, detail="File not found") from None
except Exception as e:
logger.error(f"Failed to generate signed URL: {e}")
raise HTTPException(status_code=500, detail="Failed to generate download URL") from None
async def create_resumable_upload_session(self, blob_path: str, content_type: str) -> str:
"""Create a GCS resumable upload session and return the session URI."""
def _create():
blob = self.bucket.blob(blob_path)
return blob.create_resumable_upload_session(
content_type=content_type,
timeout=60,
)
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _create)
except Exception as e:
logger.error(f"Failed to create resumable upload session: {e}")
raise HTTPException(status_code=500, detail="Failed to initiate upload session") from None
async def delete_file(self, blob_path: str) -> bool:
"""Delete a file from GCS"""
def _delete():
blob = self.bucket.blob(blob_path)
blob.delete()
return True
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _delete)
except NotFound:
return False
except Exception as e:
logger.error(f"Failed to delete file from GCS: {e}")
raise HTTPException(status_code=500, detail="File deletion failed") from None
async def file_exists(self, blob_path: str) -> bool:
"""Check if a file exists in GCS"""
def _exists():
blob = self.bucket.blob(blob_path)
return blob.exists()
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self.executor, _exists)
def gcs_path(job: "dict | object", *parts: str) -> str:
"""Return a GCS object path rooted at the job's prefix.
Jobs created before MT-14 have no gcs_prefix and use bare job_id/ as the
prefix. New jobs get prefix=orgs/{org_id}/jobs/{job_id}/.
"""
if isinstance(job, dict):
prefix = job.get("gcs_prefix") or job["_id"]
if not job.get("gcs_prefix"):
prefix = job["_id"]
else:
prefix = getattr(job, "gcs_prefix", None) or getattr(job, "id", str(job))
prefix = prefix.rstrip("/")
return "/".join([prefix, *parts]) if parts else prefix
# Global GCS service instance
gcs_service = GCSService()
# Convenience functions
async def upload_file_to_gcs(file: UploadFile, destination_path: str) -> str:
return await gcs_service.upload_file_to_gcs(file, destination_path)
async def upload_vtt_to_gcs(content: str, destination_path: str) -> str:
return await gcs_service.upload_text_to_gcs(content, destination_path, "text/vtt; charset=utf-8")
async def upload_json_to_gcs(content: str, destination_path: str) -> str:
return await gcs_service.upload_text_to_gcs(content, destination_path, "application/json")
async def get_signed_download_url(blob_path: str, expiration_hours: int = 24) -> str:
return await gcs_service.get_signed_url(blob_path, expiration_hours)
async def create_resumable_upload_session(blob_path: str, content_type: str) -> str:
return await gcs_service.create_resumable_upload_session(blob_path, content_type)
async def generate_signed_upload_url(
blob_path: str,
content_type: str,
max_size: int = 1024 * 1024 * 1024 # 1GB
) -> dict:
"""Generate a signed URL for direct browser-to-GCS upload"""
def _generate():
blob = gcs_service.bucket.blob(blob_path)
# Generate signed POST URL
url, fields = blob.generate_signed_post_policy_v4(
expiration=timedelta(hours=1),
conditions=[
["content-length-range", 1, max_size],
["starts-with", "$Content-Type", content_type.split("/")[0]]
],
fields={
"Content-Type": content_type
}
)
return {"url": url, "fields": fields}
loop = asyncio.get_event_loop()
return await loop.run_in_executor(gcs_service.executor, _generate)