175 lines
No EOL
6.4 KiB
Python
175 lines
No EOL
6.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Test MP3 serving to understand why frontend players show zero duration.
|
||
"""
|
||
|
||
import asyncio
|
||
import aiohttp
|
||
import tempfile
|
||
import os
|
||
from app.services.gcs import get_signed_download_url, gcs_service
|
||
from app.core.config import settings
|
||
|
||
|
||
async def test_mp3_serving():
|
||
"""Test the complete MP3 serving pipeline."""
|
||
|
||
print("🔍 Testing MP3 Serving Pipeline")
|
||
print("=" * 50)
|
||
|
||
# Find available MP3 files
|
||
blobs = list(gcs_service.bucket.list_blobs())
|
||
mp3_blobs = [b for b in blobs if b.name.endswith('.mp3')]
|
||
|
||
if not mp3_blobs:
|
||
print("❌ No MP3 files found in bucket")
|
||
return False
|
||
|
||
print(f"Found {len(mp3_blobs)} MP3 files")
|
||
|
||
# Test the first MP3 file
|
||
test_blob = mp3_blobs[0]
|
||
print(f"\n🎵 Testing: {test_blob.name}")
|
||
print(f" Size in GCS: {test_blob.size / 1024:.1f} KB")
|
||
|
||
# Step 1: Generate signed URL (like the API does)
|
||
print(f"\n1️⃣ Generating signed download URL...")
|
||
try:
|
||
signed_url = await get_signed_download_url(test_blob.name, 24)
|
||
print(f"✅ Signed URL generated")
|
||
print(f" URL: {signed_url[:100]}...")
|
||
except Exception as e:
|
||
print(f"❌ Failed to generate signed URL: {e}")
|
||
return False
|
||
|
||
# Step 2: Download via signed URL (like frontend does)
|
||
print(f"\n2️⃣ Downloading via signed URL...")
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(signed_url) as response:
|
||
print(f" Status: {response.status}")
|
||
print(f" Content-Type: {response.headers.get('content-type')}")
|
||
print(f" Content-Length: {response.headers.get('content-length')}")
|
||
|
||
if response.status != 200:
|
||
print(f"❌ HTTP error: {response.status}")
|
||
return False
|
||
|
||
# Download the content
|
||
content = await response.read()
|
||
print(f" Downloaded: {len(content)} bytes")
|
||
|
||
# Check if content size matches expectations
|
||
if len(content) == 0:
|
||
print(f"❌ Downloaded content is empty!")
|
||
return False
|
||
elif len(content) != test_blob.size:
|
||
print(f"⚠️ Size mismatch: downloaded {len(content)} vs GCS {test_blob.size}")
|
||
else:
|
||
print(f"✅ Content size matches GCS")
|
||
|
||
# Step 3: Check if it's valid MP3 content
|
||
print(f"\n3️⃣ Validating MP3 content...")
|
||
print(f" First 20 bytes: {content[:20].hex()}")
|
||
|
||
# Check MP3 headers
|
||
if content[:3] == b'ID3':
|
||
print(f"✅ Valid MP3 with ID3 header")
|
||
elif len(content) >= 2 and content[0] == 0xFF and (content[1] & 0xE0) == 0xE0:
|
||
print(f"✅ Valid MP3 with MPEG sync header")
|
||
else:
|
||
print(f"⚠️ May not be valid MP3 format")
|
||
|
||
# Step 4: Save to temp file and test duration
|
||
print(f"\n4️⃣ Testing audio duration...")
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file:
|
||
temp_file.write(content)
|
||
temp_filename = temp_file.name
|
||
|
||
# Try to get duration using pydub (same library used for TTS)
|
||
try:
|
||
from pydub import AudioSegment
|
||
audio = AudioSegment.from_mp3(temp_filename)
|
||
duration_seconds = len(audio) / 1000.0
|
||
print(f"✅ MP3 duration: {duration_seconds:.2f} seconds")
|
||
|
||
if duration_seconds == 0:
|
||
print(f"❌ MP3 has zero duration!")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ Could not parse MP3 with pydub: {e}")
|
||
return False
|
||
|
||
finally:
|
||
# Clean up temp file
|
||
os.unlink(temp_filename)
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error testing duration: {e}")
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ Download failed: {e}")
|
||
return False
|
||
|
||
|
||
async def test_direct_gcs_download():
|
||
"""Test downloading directly from GCS (bypass signed URL)."""
|
||
|
||
print(f"\n🔄 Testing Direct GCS Download")
|
||
print("-" * 40)
|
||
|
||
# Get the first MP3 blob
|
||
blobs = list(gcs_service.bucket.list_blobs())
|
||
mp3_blobs = [b for b in blobs if b.name.endswith('.mp3')]
|
||
|
||
if not mp3_blobs:
|
||
return False
|
||
|
||
test_blob = mp3_blobs[0]
|
||
print(f"Testing: {test_blob.name}")
|
||
|
||
try:
|
||
# Download directly from GCS
|
||
content = test_blob.download_as_bytes()
|
||
print(f"✅ Downloaded {len(content)} bytes directly from GCS")
|
||
|
||
# Test with pydub
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file:
|
||
temp_file.write(content)
|
||
temp_filename = temp_file.name
|
||
|
||
from pydub import AudioSegment
|
||
audio = AudioSegment.from_mp3(temp_filename)
|
||
duration_seconds = len(audio) / 1000.0
|
||
print(f"✅ Direct download MP3 duration: {duration_seconds:.2f} seconds")
|
||
|
||
os.unlink(temp_filename)
|
||
return duration_seconds > 0
|
||
|
||
except Exception as e:
|
||
print(f"❌ Could not parse directly downloaded MP3: {e}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ Direct download failed: {e}")
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
async def main():
|
||
success1 = await test_mp3_serving()
|
||
success2 = await test_direct_gcs_download()
|
||
|
||
if success1 and success2:
|
||
print(f"\n🎉 MP3 serving works correctly!")
|
||
print("The issue may be in the frontend MP3 player implementation.")
|
||
else:
|
||
print(f"\n❌ Found issues with MP3 serving")
|
||
|
||
asyncio.run(main()) |