from typing import Dict, List, Any from ..core.logging import get_logger from ..lib.vtt import VTTEditor from ..services.gcs import gcs_service logger = get_logger(__name__) class AssetValidationService: """Service for validating job assets before completion""" @staticmethod async def validate_job_assets(job_doc: Dict[str, Any]) -> tuple[bool, List[str]]: """ Validate all assets for a job before allowing completion Returns (is_valid, list_of_errors) """ errors = [] outputs = job_doc.get("outputs", {}) requested_outputs = job_doc.get("requested_outputs", {}) if not outputs: errors.append("No outputs generated for this job") return False, errors # Validate each language for language in requested_outputs.get("languages", ["en"]): lang_output = outputs.get(language) if not lang_output: errors.append(f"Missing outputs for language: {language}") continue # Validate captions VTT if requested if requested_outputs.get("captions_vtt"): captions_error = await AssetValidationService._validate_vtt_asset( lang_output.get("captions_vtt_gcs"), f"{language} captions VTT" ) if captions_error: errors.append(captions_error) # Validate audio description VTT if requested if requested_outputs.get("audio_description_vtt"): ad_vtt_error = await AssetValidationService._validate_vtt_asset( lang_output.get("ad_vtt_gcs"), f"{language} audio description VTT" ) if ad_vtt_error: errors.append(ad_vtt_error) # Validate MP3 if requested if requested_outputs.get("audio_description_mp3"): mp3_error = await AssetValidationService._validate_mp3_asset( lang_output.get("ad_mp3_gcs"), f"{language} audio description MP3" ) if mp3_error: errors.append(mp3_error) # Validate accessible video if present if lang_output.get("accessible_video_gcs"): video_error = await AssetValidationService._validate_video_asset( lang_output.get("accessible_video_gcs"), f"{language} accessible video" ) if video_error: errors.append(video_error) # Validate retimed captions if accessible video uses pause-insert method if lang_output.get("retimed_captions_vtt_gcs"): retimed_error = await AssetValidationService._validate_vtt_asset( lang_output.get("retimed_captions_vtt_gcs"), f"{language} retimed captions VTT" ) if retimed_error: errors.append(retimed_error) return len(errors) == 0, errors @staticmethod async def _validate_vtt_asset(gcs_uri: str, asset_name: str) -> str | None: """Validate a VTT asset exists and is properly formatted""" if not gcs_uri: return f"Missing {asset_name}" try: # Download and validate VTT content blob_path = gcs_uri.replace(f"gs://{gcs_service.bucket.name}/", "") blob = gcs_service.bucket.blob(blob_path) if not blob.exists(): return f"{asset_name} file not found in storage" vtt_content = blob.download_as_text() is_valid, vtt_errors = VTTEditor.validate_vtt(vtt_content) if not is_valid: return f"{asset_name} validation failed: {'; '.join(vtt_errors[:3])}" # Check minimum content requirements cue_count = VTTEditor.get_cue_count(vtt_content) if cue_count == 0: return f"{asset_name} contains no cues" except Exception as e: logger.error(f"Failed to validate {asset_name}: {e}") return f"{asset_name} validation error: {str(e)}" return None @staticmethod async def _validate_mp3_asset(gcs_uri: str, asset_name: str) -> str | None: """Validate an MP3 asset exists and has reasonable properties""" if not gcs_uri: return f"Missing {asset_name}" try: blob_path = gcs_uri.replace(f"gs://{gcs_service.bucket.name}/", "") blob = gcs_service.bucket.blob(blob_path) if not blob.exists(): return f"{asset_name} file not found in storage" # Reload blob to get metadata (including size) blob.reload() # Check file size (should be reasonable for audio) size_mb = blob.size / (1024 * 1024) if blob.size else 0 if size_mb < 0.01: # Less than 10KB return f"{asset_name} file too small (likely empty)" elif size_mb > 500: # More than 500MB return f"{asset_name} file too large ({size_mb:.1f}MB)" except Exception as e: logger.error(f"Failed to validate {asset_name}: {e}") return f"{asset_name} validation error: {str(e)}" return None @staticmethod async def _validate_video_asset(gcs_uri: str, asset_name: str) -> str | None: """Validate a video asset exists and has reasonable properties""" if not gcs_uri: return f"Missing {asset_name}" try: blob_path = gcs_uri.replace(f"gs://{gcs_service.bucket.name}/", "") blob = gcs_service.bucket.blob(blob_path) if not blob.exists(): return f"{asset_name} file not found in storage" # Reload blob to get metadata (including size) blob.reload() # Check file size (should be reasonable for video) size_mb = blob.size / (1024 * 1024) if blob.size else 0 if size_mb < 0.1: # Less than 100KB return f"{asset_name} file too small (likely empty or corrupted)" elif size_mb > 5000: # More than 5GB return f"{asset_name} file too large ({size_mb:.1f}MB)" except Exception as e: logger.error(f"Failed to validate {asset_name}: {e}") return f"{asset_name} validation error: {str(e)}" return None # Global service instance asset_validation_service = AssetValidationService()