- Rewrote VTT translation to two-step (text-only → Gemini → apply to original timestamps) preventing caption timing desync - Added polling fallback for all processing states and Safari visibilitychange WebSocket reconnect - Added 11 new TTS languages (cs, da, fi, hu, no, sk, sv, es-419, pt-BR, fr-CA) - Updated caption/AD prompts to DCMP Captioning Key & Description Key standards (line splitting, ♪ music notation, italic tags, caption positioning, ethics guidelines) - Added descriptive transcript generation (WCAG 2.1 §1.2.1) combining captions + AD into plain text - Fixed amix normalize=0 to prevent audio loss in rendered videos - Fixed AD re-timing double-count when source_ms is None - Fixed cue block numbering to be 1-based in VttEditor and Timeline Preview Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
168 lines
5.9 KiB
Python
168 lines
5.9 KiB
Python
"""
|
|
Zip download service for bulk job downloads.
|
|
|
|
Generates streaming zip files containing job assets organized by job title and language.
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
import zipfile
|
|
from collections.abc import AsyncGenerator
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
from .gcs import gcs_service
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
def sanitize_filename(name: str, max_length: int = 50) -> str:
|
|
"""
|
|
Sanitize job title for safe filesystem path.
|
|
|
|
- Replace invalid characters with underscores
|
|
- Collapse multiple underscores
|
|
- Truncate to max_length
|
|
- Strip leading/trailing underscores
|
|
"""
|
|
# Replace any character that's not alphanumeric, hyphen, underscore, or space
|
|
safe_name = re.sub(r"[^\w\s-]", "_", name)
|
|
# Replace spaces with underscores
|
|
safe_name = safe_name.replace(" ", "_")
|
|
# Collapse multiple underscores
|
|
safe_name = re.sub(r"_+", "_", safe_name)
|
|
# Strip leading/trailing underscores
|
|
safe_name = safe_name.strip("_")
|
|
# Truncate
|
|
return safe_name[:max_length] if safe_name else "untitled"
|
|
|
|
|
|
# Mapping from LangOutput field names to output filenames
|
|
FILE_TYPE_MAPPING = {
|
|
"captions_vtt_gcs": "captions.vtt",
|
|
"sdh_captions_vtt_gcs": "sdh_captions.vtt",
|
|
"ad_vtt_gcs": "ad.vtt",
|
|
"ad_mp3_gcs": "ad.mp3",
|
|
"accessible_video_gcs": "accessible_video.mp4",
|
|
"retimed_captions_vtt_gcs": "accessible_captions.vtt",
|
|
"descriptive_transcript_gcs": "descriptive_transcript.txt",
|
|
}
|
|
|
|
|
|
async def _download_blob(blob_path: str, executor: ThreadPoolExecutor) -> bytes | None:
|
|
"""Download blob content from GCS."""
|
|
|
|
def _download():
|
|
blob = gcs_service.bucket.blob(blob_path)
|
|
if blob.exists():
|
|
return blob.download_as_bytes()
|
|
return None
|
|
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
return await loop.run_in_executor(executor, _download)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to download blob {blob_path}: {e}")
|
|
return None
|
|
|
|
|
|
async def generate_zip_stream(jobs: list[dict]) -> AsyncGenerator[bytes, None]:
|
|
"""
|
|
Generate a streaming zip file containing all downloadable assets from the provided jobs.
|
|
|
|
Directory structure:
|
|
{job_title}/
|
|
+-- source.mp4
|
|
+-- en/
|
|
| +-- captions.vtt
|
|
| +-- ad.vtt
|
|
| +-- ad.mp3
|
|
| +-- accessible_video.mp4
|
|
| +-- accessible_captions.vtt
|
|
+-- es/
|
|
+-- ...
|
|
|
|
Yields bytes chunks as files are added to enable streaming response.
|
|
"""
|
|
# Track job titles to handle duplicates
|
|
used_titles: dict[str, int] = {}
|
|
|
|
executor = ThreadPoolExecutor(max_workers=4)
|
|
|
|
try:
|
|
# We'll build the zip in memory and yield at the end
|
|
# Using ZIP_STORED (no compression) since videos/audio are already compressed
|
|
buffer = BytesIO()
|
|
|
|
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_STORED) as zf:
|
|
for job_doc in jobs:
|
|
raw_title = job_doc.get("title", "untitled")
|
|
job_title = sanitize_filename(raw_title)
|
|
job_id = job_doc["_id"]
|
|
|
|
# Handle duplicate titles by appending a counter
|
|
if job_title in used_titles:
|
|
used_titles[job_title] += 1
|
|
job_title = f"{job_title}_{used_titles[job_title]}"
|
|
else:
|
|
used_titles[job_title] = 0
|
|
|
|
logger.info(f"Adding job {job_id} ({job_title}) to zip")
|
|
|
|
# Add source video
|
|
source = job_doc.get("source", {})
|
|
if source.get("gcs_uri"):
|
|
gcs_uri = source["gcs_uri"]
|
|
blob_path = gcs_uri.replace(f"gs://{settings.gcs_bucket}/", "")
|
|
try:
|
|
data = await _download_blob(blob_path, executor)
|
|
if data:
|
|
arc_name = f"{job_title}/source.mp4"
|
|
zf.writestr(arc_name, data)
|
|
logger.debug(f"Added {arc_name} ({len(data)} bytes)")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to add source for job {job_id}: {e}")
|
|
|
|
# Add per-language outputs
|
|
outputs = job_doc.get("outputs", {})
|
|
for lang, lang_output in outputs.items():
|
|
if not isinstance(lang_output, dict):
|
|
continue
|
|
|
|
for gcs_key, filename in FILE_TYPE_MAPPING.items():
|
|
gcs_uri = lang_output.get(gcs_key)
|
|
if not gcs_uri:
|
|
continue
|
|
|
|
blob_path = gcs_uri.replace(f"gs://{settings.gcs_bucket}/", "")
|
|
try:
|
|
data = await _download_blob(blob_path, executor)
|
|
if data:
|
|
arc_name = f"{job_title}/{lang}/{filename}"
|
|
zf.writestr(arc_name, data)
|
|
logger.debug(f"Added {arc_name} ({len(data)} bytes)")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to add {filename} for job {job_id}/{lang}: {e}"
|
|
)
|
|
|
|
# Get the complete zip content and yield it
|
|
zip_content = buffer.getvalue()
|
|
logger.info(f"Generated zip file: {len(zip_content)} bytes total")
|
|
|
|
# Yield in chunks to avoid holding everything in a single response chunk
|
|
chunk_size = 1024 * 1024 # 1MB chunks
|
|
for i in range(0, len(zip_content), chunk_size):
|
|
yield zip_content[i : i + chunk_size]
|
|
|
|
finally:
|
|
executor.shutdown(wait=False)
|
|
|
|
|
|
def generate_zip_filename() -> str:
|
|
"""Generate zip filename with current datetime."""
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
return f"accessible_video_{timestamp}.zip"
|