video-accessibility/backend/app/services/gcs.py
Vadym Samoilenko 5fd370c093 test: fix all unit tests — 168 passing, 0 failures
- conftest.py: set required env vars before app import to prevent Settings() crash
- gcs.py: lazy bucket init checks _bucket instead of _client; add @bucket.setter
- vtt.py: fix float precision in _format_timestamp; include empty-text cues in parser
- security.py: guard verify_password against empty hash (passlib UnknownHashError)
- tts.py: _parse_timestamp raises ValueError("Invalid timestamp format: …")
- emailer.py: HTML-escape job_title in _render_completion_template (XSS fix)
- test_emailer.py: rewrite for Mailgun-based service (replaced SendGrid)
- test_gcs.py: fix UploadFile constructor, MIME type, remove executor.submit mock
- test_gemini.py: patch module-level client instead of non-existent genai.upload_file;
  translate_vtt tests use numbered-list mock responses matching new implementation
- test_tts.py: fix aiohttp async CM mock pattern; fix error message match
- test_models.py: update JobCreate to use source_is_english instead of language
- test_security.py: set jwt_access_ttl_min in token test
- test_cross_tenant_isolation.py: add patch to imports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 14:02:04 +01:00

213 lines
7.5 KiB
Python

import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from fastapi import HTTPException, UploadFile
from google.cloud import storage
from google.cloud.exceptions import NotFound
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
class GCSService:
def __init__(self) -> None:
self._client: storage.Client | None = None
self._bucket = None
self.executor = ThreadPoolExecutor(max_workers=4)
@property
def bucket(self):
if self._bucket is None:
self._client = storage.Client(project=settings.gcp_project_id)
self._bucket = self._client.bucket(settings.gcs_bucket)
return self._bucket
@bucket.setter
def bucket(self, value) -> None:
self._bucket = value
async def upload_file_to_gcs(
self,
file: UploadFile,
destination_path: str,
content_type: str | None = None
) -> str:
"""Upload file to GCS and return the GCS URI"""
def _upload():
blob = self.bucket.blob(destination_path)
# Set content type
if content_type:
blob.content_type = content_type
elif file.content_type:
blob.content_type = file.content_type
# Upload file
file.file.seek(0) # Reset file pointer
blob.upload_from_file(file.file)
return f"gs://{settings.gcs_bucket}/{destination_path}"
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _upload)
except Exception as e:
logger.error(f"Failed to upload file to GCS: {e}")
raise HTTPException(status_code=500, detail="File upload failed")
async def upload_text_to_gcs(
self,
content: str,
destination_path: str,
content_type: str = "text/plain"
) -> str:
"""Upload text content to GCS and return the GCS URI"""
def _upload():
blob = self.bucket.blob(destination_path)
blob.content_type = content_type
blob.upload_from_string(content, content_type=content_type)
return f"gs://{settings.gcs_bucket}/{destination_path}"
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _upload)
except Exception as e:
logger.error(f"Failed to upload text to GCS: {e}")
raise HTTPException(status_code=500, detail="Text upload failed")
async def get_signed_url(
self,
blob_path: str,
expiration_hours: int = 24,
method: str = "GET"
) -> str:
"""Generate a signed URL for downloading a file"""
def _get_signed_url():
blob = self.bucket.blob(blob_path)
# Check if blob exists
if not blob.exists():
raise NotFound(f"File not found: {blob_path}")
expiration = datetime.utcnow() + timedelta(hours=expiration_hours)
return blob.generate_signed_url(
expiration=expiration,
method=method,
version="v4"
)
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _get_signed_url)
except NotFound:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
logger.error(f"Failed to generate signed URL: {e}")
raise HTTPException(status_code=500, detail="Failed to generate download URL")
async def create_resumable_upload_session(self, blob_path: str, content_type: str) -> str:
"""Create a GCS resumable upload session and return the session URI."""
def _create():
blob = self.bucket.blob(blob_path)
return blob.create_resumable_upload_session(
content_type=content_type,
timeout=60,
)
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _create)
except Exception as e:
logger.error(f"Failed to create resumable upload session: {e}")
raise HTTPException(status_code=500, detail="Failed to initiate upload session")
async def delete_file(self, blob_path: str) -> bool:
"""Delete a file from GCS"""
def _delete():
blob = self.bucket.blob(blob_path)
blob.delete()
return True
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(self.executor, _delete)
except NotFound:
return False
except Exception as e:
logger.error(f"Failed to delete file from GCS: {e}")
raise HTTPException(status_code=500, detail="File deletion failed")
async def file_exists(self, blob_path: str) -> bool:
"""Check if a file exists in GCS"""
def _exists():
blob = self.bucket.blob(blob_path)
return blob.exists()
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self.executor, _exists)
def gcs_path(job: "dict | object", *parts: str) -> str:
"""Return a GCS object path rooted at the job's prefix.
Jobs created before MT-14 have no gcs_prefix and use bare job_id/ as the
prefix. New jobs get prefix=orgs/{org_id}/jobs/{job_id}/.
"""
if isinstance(job, dict):
prefix = job.get("gcs_prefix") or job["_id"]
if not job.get("gcs_prefix"):
prefix = job["_id"]
else:
prefix = getattr(job, "gcs_prefix", None) or getattr(job, "id", str(job))
prefix = prefix.rstrip("/")
return "/".join([prefix, *parts]) if parts else prefix
# Global GCS service instance
gcs_service = GCSService()
# Convenience functions
async def upload_file_to_gcs(file: UploadFile, destination_path: str) -> str:
return await gcs_service.upload_file_to_gcs(file, destination_path)
async def upload_vtt_to_gcs(content: str, destination_path: str) -> str:
return await gcs_service.upload_text_to_gcs(content, destination_path, "text/vtt; charset=utf-8")
async def upload_json_to_gcs(content: str, destination_path: str) -> str:
return await gcs_service.upload_text_to_gcs(content, destination_path, "application/json")
async def get_signed_download_url(blob_path: str, expiration_hours: int = 24) -> str:
return await gcs_service.get_signed_url(blob_path, expiration_hours)
async def create_resumable_upload_session(blob_path: str, content_type: str) -> str:
return await gcs_service.create_resumable_upload_session(blob_path, content_type)
async def generate_signed_upload_url(
blob_path: str,
content_type: str,
max_size: int = 1024 * 1024 * 1024 # 1GB
) -> dict:
"""Generate a signed URL for direct browser-to-GCS upload"""
def _generate():
blob = gcs_service.bucket.blob(blob_path)
# Generate signed POST URL
url, fields = blob.generate_signed_post_policy_v4(
expiration=timedelta(hours=1),
conditions=[
["content-length-range", 1, max_size],
["starts-with", "$Content-Type", content_type.split("/")[0]]
],
fields={
"Content-Type": content_type
}
)
return {"url": url, "fields": fields}
loop = asyncio.get_event_loop()
return await loop.run_in_executor(gcs_service.executor, _generate)