video-accessibility/backend/app/services/zip_download.py
Vadym Samoilenko 6f963ff7c4 feat: DCMP compliance, descriptive transcript, new languages, QA bug fixes
- Rewrote VTT translation to two-step (text-only → Gemini → apply to original timestamps) preventing caption timing desync
- Added polling fallback for all processing states and Safari visibilitychange WebSocket reconnect
- Added 11 new TTS languages (cs, da, fi, hu, no, sk, sv, es-419, pt-BR, fr-CA)
- Updated caption/AD prompts to DCMP Captioning Key & Description Key standards (line splitting, ♪ music notation, italic tags, caption positioning, ethics guidelines)
- Added descriptive transcript generation (WCAG 2.1 §1.2.1) combining captions + AD into plain text
- Fixed amix normalize=0 to prevent audio loss in rendered videos
- Fixed AD re-timing double-count when source_ms is None
- Fixed cue block numbering to be 1-based in VttEditor and Timeline Preview

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 11:50:43 +00:00

168 lines
5.9 KiB
Python

"""
Zip download service for bulk job downloads.
Generates streaming zip files containing job assets organized by job title and language.
"""
import asyncio
import re
import zipfile
from collections.abc import AsyncGenerator
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from io import BytesIO
from ..core.config import settings
from ..core.logging import get_logger
from .gcs import gcs_service
logger = get_logger(__name__)
def sanitize_filename(name: str, max_length: int = 50) -> str:
"""
Sanitize job title for safe filesystem path.
- Replace invalid characters with underscores
- Collapse multiple underscores
- Truncate to max_length
- Strip leading/trailing underscores
"""
# Replace any character that's not alphanumeric, hyphen, underscore, or space
safe_name = re.sub(r"[^\w\s-]", "_", name)
# Replace spaces with underscores
safe_name = safe_name.replace(" ", "_")
# Collapse multiple underscores
safe_name = re.sub(r"_+", "_", safe_name)
# Strip leading/trailing underscores
safe_name = safe_name.strip("_")
# Truncate
return safe_name[:max_length] if safe_name else "untitled"
# Mapping from LangOutput field names to output filenames
FILE_TYPE_MAPPING = {
"captions_vtt_gcs": "captions.vtt",
"sdh_captions_vtt_gcs": "sdh_captions.vtt",
"ad_vtt_gcs": "ad.vtt",
"ad_mp3_gcs": "ad.mp3",
"accessible_video_gcs": "accessible_video.mp4",
"retimed_captions_vtt_gcs": "accessible_captions.vtt",
"descriptive_transcript_gcs": "descriptive_transcript.txt",
}
async def _download_blob(blob_path: str, executor: ThreadPoolExecutor) -> bytes | None:
"""Download blob content from GCS."""
def _download():
blob = gcs_service.bucket.blob(blob_path)
if blob.exists():
return blob.download_as_bytes()
return None
loop = asyncio.get_event_loop()
try:
return await loop.run_in_executor(executor, _download)
except Exception as e:
logger.warning(f"Failed to download blob {blob_path}: {e}")
return None
async def generate_zip_stream(jobs: list[dict]) -> AsyncGenerator[bytes, None]:
"""
Generate a streaming zip file containing all downloadable assets from the provided jobs.
Directory structure:
{job_title}/
+-- source.mp4
+-- en/
| +-- captions.vtt
| +-- ad.vtt
| +-- ad.mp3
| +-- accessible_video.mp4
| +-- accessible_captions.vtt
+-- es/
+-- ...
Yields bytes chunks as files are added to enable streaming response.
"""
# Track job titles to handle duplicates
used_titles: dict[str, int] = {}
executor = ThreadPoolExecutor(max_workers=4)
try:
# We'll build the zip in memory and yield at the end
# Using ZIP_STORED (no compression) since videos/audio are already compressed
buffer = BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_STORED) as zf:
for job_doc in jobs:
raw_title = job_doc.get("title", "untitled")
job_title = sanitize_filename(raw_title)
job_id = job_doc["_id"]
# Handle duplicate titles by appending a counter
if job_title in used_titles:
used_titles[job_title] += 1
job_title = f"{job_title}_{used_titles[job_title]}"
else:
used_titles[job_title] = 0
logger.info(f"Adding job {job_id} ({job_title}) to zip")
# Add source video
source = job_doc.get("source", {})
if source.get("gcs_uri"):
gcs_uri = source["gcs_uri"]
blob_path = gcs_uri.replace(f"gs://{settings.gcs_bucket}/", "")
try:
data = await _download_blob(blob_path, executor)
if data:
arc_name = f"{job_title}/source.mp4"
zf.writestr(arc_name, data)
logger.debug(f"Added {arc_name} ({len(data)} bytes)")
except Exception as e:
logger.warning(f"Failed to add source for job {job_id}: {e}")
# Add per-language outputs
outputs = job_doc.get("outputs", {})
for lang, lang_output in outputs.items():
if not isinstance(lang_output, dict):
continue
for gcs_key, filename in FILE_TYPE_MAPPING.items():
gcs_uri = lang_output.get(gcs_key)
if not gcs_uri:
continue
blob_path = gcs_uri.replace(f"gs://{settings.gcs_bucket}/", "")
try:
data = await _download_blob(blob_path, executor)
if data:
arc_name = f"{job_title}/{lang}/{filename}"
zf.writestr(arc_name, data)
logger.debug(f"Added {arc_name} ({len(data)} bytes)")
except Exception as e:
logger.warning(
f"Failed to add {filename} for job {job_id}/{lang}: {e}"
)
# Get the complete zip content and yield it
zip_content = buffer.getvalue()
logger.info(f"Generated zip file: {len(zip_content)} bytes total")
# Yield in chunks to avoid holding everything in a single response chunk
chunk_size = 1024 * 1024 # 1MB chunks
for i in range(0, len(zip_content), chunk_size):
yield zip_content[i : i + chunk_size]
finally:
executor.shutdown(wait=False)
def generate_zip_filename() -> str:
"""Generate zip filename with current datetime."""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
return f"accessible_video_{timestamp}.zip"