video-accessibility/backend/test_mp3_serving.py
2025-08-24 16:28:33 -05:00

175 lines
No EOL
6.4 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Test MP3 serving to understand why frontend players show zero duration.
"""
import asyncio
import aiohttp
import tempfile
import os
from app.services.gcs import get_signed_download_url, gcs_service
from app.core.config import settings
async def test_mp3_serving():
"""Test the complete MP3 serving pipeline."""
print("🔍 Testing MP3 Serving Pipeline")
print("=" * 50)
# Find available MP3 files
blobs = list(gcs_service.bucket.list_blobs())
mp3_blobs = [b for b in blobs if b.name.endswith('.mp3')]
if not mp3_blobs:
print("❌ No MP3 files found in bucket")
return False
print(f"Found {len(mp3_blobs)} MP3 files")
# Test the first MP3 file
test_blob = mp3_blobs[0]
print(f"\n🎵 Testing: {test_blob.name}")
print(f" Size in GCS: {test_blob.size / 1024:.1f} KB")
# Step 1: Generate signed URL (like the API does)
print(f"\n1⃣ Generating signed download URL...")
try:
signed_url = await get_signed_download_url(test_blob.name, 24)
print(f"✅ Signed URL generated")
print(f" URL: {signed_url[:100]}...")
except Exception as e:
print(f"❌ Failed to generate signed URL: {e}")
return False
# Step 2: Download via signed URL (like frontend does)
print(f"\n2⃣ Downloading via signed URL...")
try:
async with aiohttp.ClientSession() as session:
async with session.get(signed_url) as response:
print(f" Status: {response.status}")
print(f" Content-Type: {response.headers.get('content-type')}")
print(f" Content-Length: {response.headers.get('content-length')}")
if response.status != 200:
print(f"❌ HTTP error: {response.status}")
return False
# Download the content
content = await response.read()
print(f" Downloaded: {len(content)} bytes")
# Check if content size matches expectations
if len(content) == 0:
print(f"❌ Downloaded content is empty!")
return False
elif len(content) != test_blob.size:
print(f"⚠️ Size mismatch: downloaded {len(content)} vs GCS {test_blob.size}")
else:
print(f"✅ Content size matches GCS")
# Step 3: Check if it's valid MP3 content
print(f"\n3⃣ Validating MP3 content...")
print(f" First 20 bytes: {content[:20].hex()}")
# Check MP3 headers
if content[:3] == b'ID3':
print(f"✅ Valid MP3 with ID3 header")
elif len(content) >= 2 and content[0] == 0xFF and (content[1] & 0xE0) == 0xE0:
print(f"✅ Valid MP3 with MPEG sync header")
else:
print(f"⚠️ May not be valid MP3 format")
# Step 4: Save to temp file and test duration
print(f"\n4⃣ Testing audio duration...")
try:
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file:
temp_file.write(content)
temp_filename = temp_file.name
# Try to get duration using pydub (same library used for TTS)
try:
from pydub import AudioSegment
audio = AudioSegment.from_mp3(temp_filename)
duration_seconds = len(audio) / 1000.0
print(f"✅ MP3 duration: {duration_seconds:.2f} seconds")
if duration_seconds == 0:
print(f"❌ MP3 has zero duration!")
return False
except Exception as e:
print(f"❌ Could not parse MP3 with pydub: {e}")
return False
finally:
# Clean up temp file
os.unlink(temp_filename)
except Exception as e:
print(f"❌ Error testing duration: {e}")
return False
return True
except Exception as e:
print(f"❌ Download failed: {e}")
return False
async def test_direct_gcs_download():
"""Test downloading directly from GCS (bypass signed URL)."""
print(f"\n🔄 Testing Direct GCS Download")
print("-" * 40)
# Get the first MP3 blob
blobs = list(gcs_service.bucket.list_blobs())
mp3_blobs = [b for b in blobs if b.name.endswith('.mp3')]
if not mp3_blobs:
return False
test_blob = mp3_blobs[0]
print(f"Testing: {test_blob.name}")
try:
# Download directly from GCS
content = test_blob.download_as_bytes()
print(f"✅ Downloaded {len(content)} bytes directly from GCS")
# Test with pydub
try:
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file:
temp_file.write(content)
temp_filename = temp_file.name
from pydub import AudioSegment
audio = AudioSegment.from_mp3(temp_filename)
duration_seconds = len(audio) / 1000.0
print(f"✅ Direct download MP3 duration: {duration_seconds:.2f} seconds")
os.unlink(temp_filename)
return duration_seconds > 0
except Exception as e:
print(f"❌ Could not parse directly downloaded MP3: {e}")
return False
except Exception as e:
print(f"❌ Direct download failed: {e}")
return False
if __name__ == "__main__":
async def main():
success1 = await test_mp3_serving()
success2 = await test_direct_gcs_download()
if success1 and success2:
print(f"\n🎉 MP3 serving works correctly!")
print("The issue may be in the frontend MP3 player implementation.")
else:
print(f"\n❌ Found issues with MP3 serving")
asyncio.run(main())