- B904 (55): add `from err` / `from None` to raise-in-except across 13 files - F821 (1): add missing HTTPException import in routes_language_qc.py - F841 (7): remove unused variable assignments (current_user, job_title, tts_provider, etc.) - W293 (13): strip trailing whitespace from blank lines - C416 (4): rewrite unnecessary dict comprehensions as dict() - C401 (1): rewrite unnecessary generator as set comprehension - E701 (4): split multi-statement lines in cost_tracker.py - E741 (1): rename ambiguous `l` to `lang` in cloud_run_dispatch.py - B007 (4): prefix unused loop variables with _ in tts.py, video_renderer.py - I001 (1): sort imports in tasks/__init__.py (move stdlib to top) - E402 (3): move threading/time/signals imports to top of tasks/__init__.py - UP042 (9): replace (str, Enum) with StrEnum in all model/schema enums Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
213 lines
7.5 KiB
Python
213 lines
7.5 KiB
Python
import asyncio
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime, timedelta
|
|
|
|
from fastapi import HTTPException, UploadFile
|
|
from google.cloud import storage
|
|
from google.cloud.exceptions import NotFound
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
class GCSService:
|
|
def __init__(self) -> None:
|
|
self._client: storage.Client | None = None
|
|
self._bucket = None
|
|
self.executor = ThreadPoolExecutor(max_workers=4)
|
|
|
|
@property
|
|
def bucket(self):
|
|
if self._bucket is None:
|
|
self._client = storage.Client(project=settings.gcp_project_id)
|
|
self._bucket = self._client.bucket(settings.gcs_bucket)
|
|
return self._bucket
|
|
|
|
@bucket.setter
|
|
def bucket(self, value) -> None:
|
|
self._bucket = value
|
|
|
|
async def upload_file_to_gcs(
|
|
self,
|
|
file: UploadFile,
|
|
destination_path: str,
|
|
content_type: str | None = None
|
|
) -> str:
|
|
"""Upload file to GCS and return the GCS URI"""
|
|
def _upload():
|
|
blob = self.bucket.blob(destination_path)
|
|
|
|
# Set content type
|
|
if content_type:
|
|
blob.content_type = content_type
|
|
elif file.content_type:
|
|
blob.content_type = file.content_type
|
|
|
|
# Upload file
|
|
file.file.seek(0) # Reset file pointer
|
|
blob.upload_from_file(file.file)
|
|
|
|
return f"gs://{settings.gcs_bucket}/{destination_path}"
|
|
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
return await loop.run_in_executor(self.executor, _upload)
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload file to GCS: {e}")
|
|
raise HTTPException(status_code=500, detail="File upload failed") from None
|
|
|
|
async def upload_text_to_gcs(
|
|
self,
|
|
content: str,
|
|
destination_path: str,
|
|
content_type: str = "text/plain"
|
|
) -> str:
|
|
"""Upload text content to GCS and return the GCS URI"""
|
|
def _upload():
|
|
blob = self.bucket.blob(destination_path)
|
|
blob.content_type = content_type
|
|
blob.upload_from_string(content, content_type=content_type)
|
|
|
|
return f"gs://{settings.gcs_bucket}/{destination_path}"
|
|
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
return await loop.run_in_executor(self.executor, _upload)
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload text to GCS: {e}")
|
|
raise HTTPException(status_code=500, detail="Text upload failed") from None
|
|
|
|
async def get_signed_url(
|
|
self,
|
|
blob_path: str,
|
|
expiration_hours: int = 24,
|
|
method: str = "GET"
|
|
) -> str:
|
|
"""Generate a signed URL for downloading a file"""
|
|
def _get_signed_url():
|
|
blob = self.bucket.blob(blob_path)
|
|
|
|
# Check if blob exists
|
|
if not blob.exists():
|
|
raise NotFound(f"File not found: {blob_path}")
|
|
|
|
expiration = datetime.utcnow() + timedelta(hours=expiration_hours)
|
|
|
|
return blob.generate_signed_url(
|
|
expiration=expiration,
|
|
method=method,
|
|
version="v4"
|
|
)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
return await loop.run_in_executor(self.executor, _get_signed_url)
|
|
except NotFound:
|
|
raise HTTPException(status_code=404, detail="File not found") from None
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate signed URL: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to generate download URL") from None
|
|
|
|
async def create_resumable_upload_session(self, blob_path: str, content_type: str) -> str:
|
|
"""Create a GCS resumable upload session and return the session URI."""
|
|
def _create():
|
|
blob = self.bucket.blob(blob_path)
|
|
return blob.create_resumable_upload_session(
|
|
content_type=content_type,
|
|
timeout=60,
|
|
)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
return await loop.run_in_executor(self.executor, _create)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create resumable upload session: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to initiate upload session") from None
|
|
|
|
async def delete_file(self, blob_path: str) -> bool:
|
|
"""Delete a file from GCS"""
|
|
def _delete():
|
|
blob = self.bucket.blob(blob_path)
|
|
blob.delete()
|
|
return True
|
|
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
return await loop.run_in_executor(self.executor, _delete)
|
|
except NotFound:
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete file from GCS: {e}")
|
|
raise HTTPException(status_code=500, detail="File deletion failed") from None
|
|
|
|
async def file_exists(self, blob_path: str) -> bool:
|
|
"""Check if a file exists in GCS"""
|
|
def _exists():
|
|
blob = self.bucket.blob(blob_path)
|
|
return blob.exists()
|
|
|
|
loop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(self.executor, _exists)
|
|
|
|
|
|
def gcs_path(job: "dict | object", *parts: str) -> str:
|
|
"""Return a GCS object path rooted at the job's prefix.
|
|
|
|
Jobs created before MT-14 have no gcs_prefix and use bare job_id/ as the
|
|
prefix. New jobs get prefix=orgs/{org_id}/jobs/{job_id}/.
|
|
"""
|
|
if isinstance(job, dict):
|
|
prefix = job.get("gcs_prefix") or job["_id"]
|
|
if not job.get("gcs_prefix"):
|
|
prefix = job["_id"]
|
|
else:
|
|
prefix = getattr(job, "gcs_prefix", None) or getattr(job, "id", str(job))
|
|
prefix = prefix.rstrip("/")
|
|
return "/".join([prefix, *parts]) if parts else prefix
|
|
|
|
|
|
# Global GCS service instance
|
|
gcs_service = GCSService()
|
|
|
|
# Convenience functions
|
|
async def upload_file_to_gcs(file: UploadFile, destination_path: str) -> str:
|
|
return await gcs_service.upload_file_to_gcs(file, destination_path)
|
|
|
|
async def upload_vtt_to_gcs(content: str, destination_path: str) -> str:
|
|
return await gcs_service.upload_text_to_gcs(content, destination_path, "text/vtt; charset=utf-8")
|
|
|
|
async def upload_json_to_gcs(content: str, destination_path: str) -> str:
|
|
return await gcs_service.upload_text_to_gcs(content, destination_path, "application/json")
|
|
|
|
async def get_signed_download_url(blob_path: str, expiration_hours: int = 24) -> str:
|
|
return await gcs_service.get_signed_url(blob_path, expiration_hours)
|
|
|
|
async def create_resumable_upload_session(blob_path: str, content_type: str) -> str:
|
|
return await gcs_service.create_resumable_upload_session(blob_path, content_type)
|
|
|
|
async def generate_signed_upload_url(
|
|
blob_path: str,
|
|
content_type: str,
|
|
max_size: int = 1024 * 1024 * 1024 # 1GB
|
|
) -> dict:
|
|
"""Generate a signed URL for direct browser-to-GCS upload"""
|
|
def _generate():
|
|
blob = gcs_service.bucket.blob(blob_path)
|
|
|
|
# Generate signed POST URL
|
|
url, fields = blob.generate_signed_post_policy_v4(
|
|
expiration=timedelta(hours=1),
|
|
conditions=[
|
|
["content-length-range", 1, max_size],
|
|
["starts-with", "$Content-Type", content_type.split("/")[0]]
|
|
],
|
|
fields={
|
|
"Content-Type": content_type
|
|
}
|
|
)
|
|
|
|
return {"url": url, "fields": fields}
|
|
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(gcs_service.executor, _generate)
|