test: fix all unit tests — 168 passing, 0 failures

- conftest.py: set required env vars before app import to prevent Settings() crash
- gcs.py: lazy bucket init checks _bucket instead of _client; add @bucket.setter
- vtt.py: fix float precision in _format_timestamp; include empty-text cues in parser
- security.py: guard verify_password against empty hash (passlib UnknownHashError)
- tts.py: _parse_timestamp raises ValueError("Invalid timestamp format: …")
- emailer.py: HTML-escape job_title in _render_completion_template (XSS fix)
- test_emailer.py: rewrite for Mailgun-based service (replaced SendGrid)
- test_gcs.py: fix UploadFile constructor, MIME type, remove executor.submit mock
- test_gemini.py: patch module-level client instead of non-existent genai.upload_file;
  translate_vtt tests use numbered-list mock responses matching new implementation
- test_tts.py: fix aiohttp async CM mock pattern; fix error message match
- test_models.py: update JobCreate to use source_is_english instead of language
- test_security.py: set jwt_access_ttl_min in token test
- test_cross_tenant_isolation.py: add patch to imports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-30 14:02:04 +01:00
parent 90cbf23f0d
commit 5fd370c093
13 changed files with 462 additions and 563 deletions

View file

@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from typing import Any
from fastapi import HTTPException, status
from jose import JWTError, jwt
@ -11,8 +11,8 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any],
expires_delta: Optional[timedelta] = None,
subject: str | Any,
expires_delta: timedelta | None = None,
org_ids: list[str] | None = None,
) -> str:
if expires_delta:
@ -28,7 +28,7 @@ def create_access_token(
def create_refresh_token(
subject: Union[str, Any], expires_delta: Optional[timedelta] = None
subject: str | Any, expires_delta: timedelta | None = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
@ -41,6 +41,8 @@ def create_refresh_token(
def verify_password(plain_password: str, hashed_password: str) -> bool:
if not hashed_password:
return False
return pwd_context.verify(plain_password, hashed_password)

View file

@ -49,13 +49,12 @@ class VTTParser:
text_lines.append(lines[i].strip())
i += 1
if text_lines:
cues.append(VTTCue(
start_time=start_time,
end_time=end_time,
text="\n".join(text_lines),
identifier=identifier
))
cues.append(VTTCue(
start_time=start_time,
end_time=end_time,
text="\n".join(text_lines),
identifier=identifier
))
else:
i += 1
@ -80,7 +79,7 @@ class VTTParser:
lines.append(cue.text)
lines.append("") # Empty line between cues
return "\n".join(lines)
return "\n".join(lines) + "\n"
@staticmethod
def _parse_timestamp(timestamp: str) -> float:
@ -121,7 +120,7 @@ class VTTParser:
secs = seconds % 60
whole_secs = int(secs)
milliseconds = int((secs - whole_secs) * 1000)
milliseconds = round((secs - whole_secs) * 1000)
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{milliseconds:03d}"

View file

@ -1,5 +1,7 @@
import html as _html
from datetime import datetime
from jinja2 import Template
from ..core.config import settings
@ -385,7 +387,7 @@ class EmailService:
template = Template(template_str)
return template.render(
job_title=job_title,
job_title=_html.escape(job_title),
download_links=download_links
)

View file

@ -1,7 +1,6 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import Optional
from fastapi import HTTPException, UploadFile
from google.cloud import storage
@ -13,16 +12,27 @@ from ..core.logging import get_logger
logger = get_logger(__name__)
class GCSService:
def __init__(self):
self.client = storage.Client(project=settings.gcp_project_id)
self.bucket = self.client.bucket(settings.gcs_bucket)
def __init__(self) -> None:
self._client: storage.Client | None = None
self._bucket = None
self.executor = ThreadPoolExecutor(max_workers=4)
@property
def bucket(self):
if self._bucket is None:
self._client = storage.Client(project=settings.gcp_project_id)
self._bucket = self._client.bucket(settings.gcs_bucket)
return self._bucket
@bucket.setter
def bucket(self, value) -> None:
self._bucket = value
async def upload_file_to_gcs(
self,
file: UploadFile,
destination_path: str,
content_type: Optional[str] = None
content_type: str | None = None
) -> str:
"""Upload file to GCS and return the GCS URI"""
def _upload():
@ -184,7 +194,7 @@ async def generate_signed_upload_url(
"""Generate a signed URL for direct browser-to-GCS upload"""
def _generate():
blob = gcs_service.bucket.blob(blob_path)
# Generate signed POST URL
url, fields = blob.generate_signed_post_policy_v4(
expiration=timedelta(hours=1),
@ -196,8 +206,8 @@ async def generate_signed_upload_url(
"Content-Type": content_type
}
)
return {"url": url, "fields": fields}
loop = asyncio.get_event_loop()
return await loop.run_in_executor(gcs_service.executor, _generate)

View file

@ -1,6 +1,5 @@
import io
from dataclasses import dataclass
from typing import Optional
import aiohttp
from google.cloud import texttospeech
@ -47,8 +46,8 @@ class TTSService:
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
provider: Optional[str] = None,
voice_name: str | None = None,
provider: str | None = None,
model: str = "flash",
speed: float = 1.0,
style_prompt: str = "",
@ -114,8 +113,8 @@ class TTSService:
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
provider: Optional[str] = None,
voice_name: str | None = None,
provider: str | None = None,
model: str = "flash",
speed: float = 1.0,
style_prompt: str = "",
@ -219,7 +218,7 @@ class TTSService:
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None
voice_name: str | None = None
) -> bytes:
"""Generate MP3 using Google TTS with 2-second pauses between passages"""
@ -281,7 +280,7 @@ class TTSService:
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
voice_name: str | None = None,
stability: float = 0.5,
similarity_boost: float = 0.5,
) -> bytes:
@ -339,7 +338,7 @@ class TTSService:
self,
text: str,
language_code: str,
voice_name: Optional[str] = None
voice_name: str | None = None
) -> bytes:
"""Synthesize a single text string to audio using Google TTS"""
# Configure voice
@ -404,7 +403,7 @@ class TTSService:
error_text = await response.text()
raise ValueError(f"ElevenLabs TTS failed: {response.status} - {error_text}")
def _get_elevenlabs_voice(self, language_code: str, voice_name: Optional[str] = None) -> str:
def _get_elevenlabs_voice(self, language_code: str, voice_name: str | None = None) -> str:
"""Get ElevenLabs voice ID for language"""
if voice_name:
return voice_name
@ -452,28 +451,32 @@ class TTSService:
def _parse_timestamp(self, timestamp: str) -> float:
"""Convert VTT timestamp to seconds"""
# Format: HH:MM:SS.mmm or MM:SS.mmm
parts = timestamp.split(":")
try:
parts = timestamp.split(":")
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and milliseconds
sec_parts = seconds.split(".")
seconds = int(sec_parts[0])
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
# Parse seconds and milliseconds
sec_parts = seconds.split(".")
seconds_int = int(sec_parts[0])
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
seconds +
milliseconds / 1000.0
)
return total_seconds
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
seconds_int +
milliseconds / 1000.0
)
return total_seconds
except (ValueError, IndexError) as e:
if "Invalid timestamp format" in str(e):
raise
raise ValueError(f"Invalid timestamp format: {timestamp}") from e
# Global service instance

18
backend/tests/conftest.py Normal file
View file

@ -0,0 +1,18 @@
import os
# Set required env vars before any app module is imported.
# Settings() is instantiated at module level in config.py, so these must
# be present before collection starts.
_TEST_DEFAULTS = {
"APP_ENV": "test",
"JWT_SECRET": "test-jwt-secret-at-least-32-chars-long",
"MONGODB_URI": "mongodb://localhost:27017/test_accessible_video",
"REDIS_URL": "redis://localhost:6379/1",
"GCP_PROJECT_ID": "test-gcp-project",
"GCS_BUCKET": "test-bucket",
"GEMINI_API_KEY": "test-gemini-key",
"CLIENT_BASE_URL": "http://localhost:3000",
}
for key, value in _TEST_DEFAULTS.items():
os.environ.setdefault(key, value)

View file

@ -10,7 +10,7 @@ MT-18: list_for_reviewer must only return jobs from the reviewer's orgs.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi import HTTPException
from app.core.dependencies import assert_job_in_user_org, get_user_org_ids

View file

@ -1,4 +1,4 @@
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -6,236 +6,177 @@ from app.services.emailer import EmailService
class TestEmailService:
"""Test email service functionality"""
"""Test email service (Mailgun-based)"""
@pytest.fixture
def email_service(self):
"""Create email service with mocked SendGrid client"""
with patch('app.services.emailer.settings') as mock_settings:
mock_settings.sendgrid_api_key = "test_api_key"
mock_settings.email_from = "support@example.com"
with patch('app.services.emailer.SendGridAPIClient') as mock_client:
service = EmailService()
service.client = MagicMock()
return service
def service(self):
return EmailService()
@pytest.fixture
def sample_download_links(self):
"""Sample download links for testing"""
return {
"en": {
"captions_vtt": "https://signed-url.example.com/en/captions.vtt",
"audio_description_vtt": "https://signed-url.example.com/en/ad.vtt",
"audio_description_mp3": "https://signed-url.example.com/en/ad.mp3"
"audio_description_mp3": "https://signed-url.example.com/en/ad.mp3",
},
"es": {
"captions_vtt": "https://signed-url.example.com/es/captions.vtt",
"audio_description_vtt": "https://signed-url.example.com/es/ad.vtt",
"audio_description_mp3": "https://signed-url.example.com/es/ad.mp3"
}
"audio_description_mp3": "https://signed-url.example.com/es/ad.mp3",
},
}
@pytest.mark.asyncio
async def test_send_completion_email_success(self, email_service, sample_download_links):
"""Test successful completion email sending"""
# Mock successful SendGrid response
mock_response = MagicMock()
mock_response.status_code = 202
email_service.client.send.return_value = mock_response
result = await email_service.send_completion_email(
recipient_email="client@example.com",
job_title="Test Video Project",
download_links=sample_download_links
)
assert result is True
email_service.client.send.assert_called_once()
# ── _configured property ───────────────────────────────────────────────────
def test_configured_when_keys_present(self, service):
with patch("app.services.emailer.settings") as s:
s.mailgun_api_key = "key"
s.mailgun_domain = "mg.example.com"
assert service._configured is True
def test_not_configured_when_key_missing(self, service):
with patch("app.services.emailer.settings") as s:
s.mailgun_api_key = ""
s.mailgun_domain = "mg.example.com"
assert service._configured is False
def test_not_configured_when_domain_missing(self, service):
with patch("app.services.emailer.settings") as s:
s.mailgun_api_key = "key"
s.mailgun_domain = ""
assert service._configured is False
# ── _send ─────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_send_completion_email_no_client(self):
"""Test email sending when client is not configured"""
service = EmailService()
service.client = None
result = await service.send_completion_email(
recipient_email="client@example.com",
job_title="Test Video",
download_links={}
)
async def test_send_returns_false_when_not_configured(self, service):
with patch("app.services.emailer.settings") as s:
s.mailgun_api_key = ""
s.mailgun_domain = ""
result = await service._send("to@example.com", "Subject", "<p>HTML</p>")
assert result is False
@pytest.mark.asyncio
async def test_send_completion_email_api_failure(self, email_service, sample_download_links):
"""Test email sending with SendGrid API failure"""
# Mock failed SendGrid response
async def test_send_success(self, service):
mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_response)
with patch("app.services.emailer.settings") as s:
s.mailgun_api_key = "key"
s.mailgun_domain = "mg.example.com"
s.mailgun_from = "noreply@example.com"
with patch("httpx.AsyncClient", return_value=mock_client):
result = await service._send("to@example.com", "Subject", "<p>Hi</p>")
assert result is True
@pytest.mark.asyncio
async def test_send_api_failure(self, service):
mock_response = MagicMock()
mock_response.status_code = 400
email_service.client.send.return_value = mock_response
result = await email_service.send_completion_email(
recipient_email="client@example.com",
job_title="Test Video",
download_links=sample_download_links
)
mock_response.text = "Bad request"
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_response)
with patch("app.services.emailer.settings") as s:
s.mailgun_api_key = "key"
s.mailgun_domain = "mg.example.com"
s.mailgun_from = "noreply@example.com"
with patch("httpx.AsyncClient", return_value=mock_client):
result = await service._send("to@example.com", "Subject", "<p>Hi</p>")
assert result is False
@pytest.mark.asyncio
async def test_send_completion_email_exception(self, email_service, sample_download_links):
"""Test email sending with exception"""
# Mock SendGrid client raising exception
email_service.client.send.side_effect = Exception("SendGrid error")
result = await email_service.send_completion_email(
recipient_email="client@example.com",
job_title="Test Video",
download_links=sample_download_links
)
async def test_send_exception_returns_false(self, service):
with patch("app.services.emailer.settings") as s:
s.mailgun_api_key = "key"
s.mailgun_domain = "mg.example.com"
s.mailgun_from = "noreply@example.com"
with patch("httpx.AsyncClient", side_effect=Exception("network error")):
result = await service._send("to@example.com", "Subject", "<p>Hi</p>")
assert result is False
def test_render_completion_template_basic(self, email_service, sample_download_links):
"""Test rendering completion email template"""
html_content = email_service._render_completion_template(
job_title="Test Video Project",
download_links=sample_download_links
)
# Check that key elements are present
assert "Test Video Project" in html_content
assert "EN Assets" in html_content
assert "ES Assets" in html_content
assert "captions.vtt" in html_content
assert "audio_description_vtt" in html_content
assert "audio_description_mp3" in html_content
assert "24 hours" in html_content # Expiry warning
def test_render_completion_template_single_language(self, email_service):
"""Test rendering template with single language"""
download_links = {
"en": {
"captions_vtt": "https://example.com/captions.vtt"
}
}
html_content = email_service._render_completion_template(
job_title="English Only Video",
download_links=download_links
)
assert "English Only Video" in html_content
assert "EN Assets" in html_content
assert "ES Assets" not in html_content
def test_render_completion_template_no_downloads(self, email_service):
"""Test rendering template with no download links"""
html_content = email_service._render_completion_template(
job_title="Empty Job",
download_links={}
)
assert "Empty Job" in html_content
assert "<!DOCTYPE html>" in html_content
assert "24 hours" in html_content
def test_render_completion_template_html_structure(self, email_service, sample_download_links):
"""Test that rendered template has proper HTML structure"""
html_content = email_service._render_completion_template(
job_title="Test Video",
download_links=sample_download_links
)
# Check HTML structure
assert html_content.startswith("<!DOCTYPE html>")
assert "<html>" in html_content
assert "</html>" in html_content
assert "<head>" in html_content
assert "<body>" in html_content
assert "font-family: Arial" in html_content # CSS present
def test_render_completion_template_download_link_formatting(self, email_service):
"""Test that download links are properly formatted in template"""
download_links = {
"en": {
"captions_vtt": "https://example.com/captions.vtt",
"audio_description_mp3": "https://example.com/ad.mp3"
}
}
html_content = email_service._render_completion_template(
job_title="Test Video",
download_links=download_links
)
# Check that file types are properly formatted
assert "Download Captions Vtt" in html_content
assert "Download Audio Description Mp3" in html_content
assert 'href="https://example.com/captions.vtt"' in html_content
assert 'href="https://example.com/ad.mp3"' in html_content
def test_service_initialization_with_api_key(self):
"""Test service initialization with SendGrid API key"""
with patch('app.services.emailer.settings') as mock_settings:
mock_settings.sendgrid_api_key = "test_api_key"
with patch('app.services.emailer.SendGridAPIClient') as mock_client:
service = EmailService()
mock_client.assert_called_once_with(api_key="test_api_key")
assert service.client is not None
def test_service_initialization_without_api_key(self):
"""Test service initialization without SendGrid API key"""
with patch('app.services.emailer.settings') as mock_settings:
mock_settings.sendgrid_api_key = ""
service = EmailService()
assert service.client is None
# ── send_completion_email ─────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_send_completion_email_mail_object_creation(self, email_service, sample_download_links):
"""Test that Mail object is created correctly"""
mock_response = MagicMock()
mock_response.status_code = 202
email_service.client.send.return_value = mock_response
with patch('app.services.emailer.Mail') as mock_mail:
mock_mail_instance = MagicMock()
mock_mail.return_value = mock_mail_instance
await email_service.send_completion_email(
async def test_send_completion_email_calls_send(self, service, sample_download_links):
with patch.object(service, "_send", new_callable=AsyncMock) as mock_send:
mock_send.return_value = True
result = await service.send_completion_email(
recipient_email="client@example.com",
job_title="Test Video",
download_links=sample_download_links
download_links=sample_download_links,
)
# Verify Mail object was created with correct parameters
mock_mail.assert_called_once()
call_args = mock_mail.call_args
# Check that from_email, to_emails, subject, and html_content are set
assert call_args is not None
email_service.client.send.assert_called_once_with(mock_mail_instance)
assert result is True
mock_send.assert_called_once()
# Subject should contain job title
assert "Test Video" in mock_send.call_args[0][1]
def test_template_injection_safety(self, email_service):
"""Test that template is safe from injection attacks"""
malicious_title = "<script>alert('xss')</script>Malicious Title"
malicious_links = {
"en": {
"captions_vtt": "javascript:alert('xss')"
}
}
html_content = email_service._render_completion_template(
job_title=malicious_title,
download_links=malicious_links
# ── _render_completion_template ───────────────────────────────────────────
def test_render_template_basic(self, service, sample_download_links):
html = service._render_completion_template(
job_title="Test Video Project",
download_links=sample_download_links,
)
# Jinja2 should escape HTML by default
assert "<script>" not in html_content
assert "javascript:" in html_content # URL would still be there but not executed
assert "Malicious Title" in html_content
assert "Test Video Project" in html
assert "EN Assets" in html
assert "ES Assets" in html
assert "24 hours" in html
def test_render_template_single_language(self, service):
html = service._render_completion_template(
job_title="English Only",
download_links={"en": {"captions_vtt": "https://example.com/cap.vtt"}},
)
assert "English Only" in html
assert "EN Assets" in html
assert "ES Assets" not in html
def test_render_template_no_downloads(self, service):
html = service._render_completion_template(job_title="Empty Job", download_links={})
assert "Empty Job" in html
assert "<!DOCTYPE html>" in html
assert "24 hours" in html
def test_render_template_html_structure(self, service, sample_download_links):
html = service._render_completion_template(
job_title="Test", download_links=sample_download_links
)
assert html.startswith("<!DOCTYPE html>") or "<!DOCTYPE html>" in html
assert "<html>" in html
assert "</html>" in html
assert "<body>" in html
assert "font-family: Arial" in html
def test_render_template_download_links(self, service):
html = service._render_completion_template(
job_title="Test",
download_links={"en": {
"captions_vtt": "https://example.com/captions.vtt",
"audio_description_mp3": "https://example.com/ad.mp3",
}},
)
assert "Download Captions Vtt" in html
assert "Download Audio Description Mp3" in html
assert 'href="https://example.com/captions.vtt"' in html
assert 'href="https://example.com/ad.mp3"' in html
def test_render_template_xss_escaping(self, service):
html = service._render_completion_template(
job_title="<script>alert('xss')</script>Title",
download_links={},
)
assert "<script>" not in html
assert "Title" in html

View file

@ -40,11 +40,11 @@ class TestGCSService:
"""Create a sample UploadFile for testing"""
file_content = b"test video content"
file_obj = BytesIO(file_content)
from starlette.datastructures import Headers
upload_file = UploadFile(
filename="test.mp4",
file=file_obj,
content_type="video/mp4"
filename="test.mp4",
headers=Headers({"content-type": "video/mp4"}),
)
return upload_file
@ -53,42 +53,28 @@ class TestGCSService:
"""Test successful file upload to GCS"""
mock_blob = MagicMock()
gcs_service.bucket.blob.return_value = mock_blob
# Mock the upload operation
def mock_upload(file_obj):
mock_blob.upload_from_file(file_obj)
with patch.object(gcs_service.executor, 'submit') as mock_submit:
future = asyncio.Future()
future.set_result("gs://test-bucket/test/path.mp4")
mock_submit.return_value = future
result = await gcs_service.upload_file_to_gcs(
sample_upload_file,
"test/path.mp4"
)
assert result == "gs://test-bucket/test/path.mp4"
assert mock_blob.content_type == "video/mp4"
result = await gcs_service.upload_file_to_gcs(
sample_upload_file,
"test/path.mp4"
)
assert result == "gs://test-bucket/test/path.mp4"
assert mock_blob.content_type == "video/mp4"
@pytest.mark.asyncio
async def test_upload_file_with_custom_content_type(self, gcs_service, sample_upload_file):
"""Test file upload with custom content type"""
mock_blob = MagicMock()
gcs_service.bucket.blob.return_value = mock_blob
with patch.object(gcs_service.executor, 'submit') as mock_submit:
future = asyncio.Future()
future.set_result("gs://test-bucket/test/path.mp4")
mock_submit.return_value = future
await gcs_service.upload_file_to_gcs(
sample_upload_file,
"test/path.mp4",
content_type="application/octet-stream"
)
assert mock_blob.content_type == "application/octet-stream"
await gcs_service.upload_file_to_gcs(
sample_upload_file,
"test/path.mp4",
content_type="application/octet-stream"
)
assert mock_blob.content_type == "application/octet-stream"
@pytest.mark.asyncio
async def test_upload_file_failure(self, gcs_service, sample_upload_file):
@ -112,21 +98,16 @@ class TestGCSService:
"""Test successful text upload to GCS"""
mock_blob = MagicMock()
gcs_service.bucket.blob.return_value = mock_blob
with patch.object(gcs_service.executor, 'submit') as mock_submit:
future = asyncio.Future()
future.set_result("gs://test-bucket/test/file.txt")
mock_submit.return_value = future
result = await gcs_service.upload_text_to_gcs(
"test content",
"test/file.txt",
"text/plain"
)
assert result == "gs://test-bucket/test/file.txt"
assert mock_blob.content_type == "text/plain"
mock_blob.upload_from_string.assert_called_once_with("test content")
result = await gcs_service.upload_text_to_gcs(
"test content",
"test/file.txt",
"text/plain"
)
assert result == "gs://test-bucket/test/file.txt"
assert mock_blob.content_type == "text/plain"
mock_blob.upload_from_string.assert_called_once_with("test content", content_type="text/plain")
@pytest.mark.asyncio
async def test_get_signed_url_success(self, gcs_service):
@ -168,16 +149,11 @@ class TestGCSService:
"""Test successful file deletion"""
mock_blob = MagicMock()
gcs_service.bucket.blob.return_value = mock_blob
with patch.object(gcs_service.executor, 'submit') as mock_submit:
future = asyncio.Future()
future.set_result(True)
mock_submit.return_value = future
result = await gcs_service.delete_file("test/file.mp4")
assert result is True
mock_blob.delete.assert_called_once()
result = await gcs_service.delete_file("test/file.mp4")
assert result is True
mock_blob.delete.assert_called_once()
@pytest.mark.asyncio
async def test_delete_file_not_found(self, gcs_service):
@ -245,7 +221,7 @@ Hello world
result = await upload_vtt_to_gcs(vtt_content, "test.vtt")
assert result == "gs://bucket/test.vtt"
mock_upload.assert_called_once_with(vtt_content, "test.vtt", "text/vtt")
mock_upload.assert_called_once_with(vtt_content, "test.vtt", "text/vtt; charset=utf-8")
@pytest.mark.asyncio
async def test_get_signed_download_url(self):
@ -261,27 +237,23 @@ Hello world
@pytest.mark.asyncio
async def test_generate_signed_upload_url(self):
"""Test signed upload URL generation"""
import app.services.gcs as gcs_module
expected_result = {
"url": "https://upload-url.example.com",
"fields": {"Content-Type": "video/mp4"}
}
with patch('app.services.gcs.gcs_service.bucket') as mock_bucket:
mock_blob = MagicMock()
mock_blob.generate_signed_post_policy_v4.return_value = (
expected_result["url"],
expected_result["fields"]
)
mock_bucket.blob.return_value = mock_blob
with patch('app.services.gcs.gcs_service.executor') as mock_executor:
future = asyncio.Future()
future.set_result(expected_result)
mock_executor.submit.return_value = future
result = await generate_signed_upload_url(
"test/file.mp4",
"video/mp4"
)
assert result == expected_result
mock_blob = MagicMock()
mock_blob.generate_signed_post_policy_v4.return_value = (
expected_result["url"],
expected_result["fields"]
)
mock_bucket = MagicMock()
mock_bucket.blob.return_value = mock_blob
# Patch _bucket directly to avoid triggering lazy GCS client initialization
with patch.object(gcs_module.gcs_service, '_bucket', mock_bucket):
result = await generate_signed_upload_url("test/file.mp4", "video/mp4")
assert result == expected_result

View file

@ -1,5 +1,5 @@
import json
from unittest.mock import AsyncMock, MagicMock, patch, mock_open
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -11,11 +11,15 @@ class TestGeminiService:
@pytest.fixture
def gemini_service(self):
"""Create Gemini service instance with mocked dependencies"""
with patch('app.services.gemini.genai'):
service = GeminiService()
service.model = MagicMock()
return service
return GeminiService()
@pytest.fixture
def mock_uploaded_file(self):
f = MagicMock()
f.name = "files/test123"
f.uri = "gs://test-bucket/files/test123"
f.mime_type = "video/mp4"
return f
@pytest.fixture
def valid_gemini_response(self):
@ -44,79 +48,84 @@ learn about accessibility features.
}
@pytest.mark.asyncio
async def test_extract_accessibility_success(self, gemini_service, valid_gemini_response):
async def test_extract_accessibility_success(self, gemini_service, mock_uploaded_file, valid_gemini_response):
"""Test successful accessibility extraction"""
# Mock file upload and model response
mock_response = MagicMock()
mock_response.text = json.dumps(valid_gemini_response)
gemini_service.model.generate_content.return_value = mock_response
with patch('app.services.gemini.genai.upload_file') as mock_upload:
mock_upload.return_value = MagicMock()
with patch.object(gemini_service, '_load_prompt') as mock_load_prompt:
mock_load_prompt.return_value = "Test prompt"
result = await gemini_service.extract_accessibility("/tmp/test.mp4")
assert result == valid_gemini_response
assert result["confidence"] == 0.92
assert result["language"] == "en"
assert "WEBVTT" in result["captions_vtt"]
assert "WEBVTT" in result["audio_description_vtt"]
with patch('app.services.gemini.client') as mock_client:
mock_client.files.upload.return_value = mock_uploaded_file
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_wait_for_file_active', return_value=True):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
result = await gemini_service.extract_accessibility("/tmp/test.mp4")
assert result == valid_gemini_response
assert result["confidence"] == 0.92
assert result["language"] == "en"
assert "WEBVTT" in result["captions_vtt"]
assert "WEBVTT" in result["audio_description_vtt"]
@pytest.mark.asyncio
async def test_extract_accessibility_with_markdown_formatting(self, gemini_service, valid_gemini_response):
async def test_extract_accessibility_with_markdown_formatting(self, gemini_service, mock_uploaded_file, valid_gemini_response):
"""Test handling Gemini response with markdown formatting"""
# Mock response with markdown formatting
mock_response = MagicMock()
mock_response.text = f"```json\n{json.dumps(valid_gemini_response)}\n```"
gemini_service.model.generate_content.return_value = mock_response
with patch('app.services.gemini.genai.upload_file'):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
result = await gemini_service.extract_accessibility("/tmp/test.mp4")
assert result == valid_gemini_response
with patch('app.services.gemini.client') as mock_client:
mock_client.files.upload.return_value = mock_uploaded_file
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_wait_for_file_active', return_value=True):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
result = await gemini_service.extract_accessibility("/tmp/test.mp4")
assert result == valid_gemini_response
@pytest.mark.asyncio
async def test_extract_accessibility_invalid_json(self, gemini_service):
async def test_extract_accessibility_invalid_json(self, gemini_service, mock_uploaded_file):
"""Test handling of invalid JSON response"""
# Mock invalid JSON response
mock_response = MagicMock()
mock_response.text = "invalid json content"
gemini_service.model.generate_content.return_value = mock_response
with patch('app.services.gemini.genai.upload_file'):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
with patch.object(gemini_service, '_self_heal_response') as mock_self_heal:
mock_self_heal.return_value = {"language": "en", "confidence": 0.8}
result = await gemini_service.extract_accessibility("/tmp/test.mp4")
assert result == {"language": "en", "confidence": 0.8}
mock_self_heal.assert_called_once_with("/tmp/test.mp4", "invalid json content")
with patch('app.services.gemini.client') as mock_client:
mock_client.files.upload.return_value = mock_uploaded_file
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_wait_for_file_active', return_value=True):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
with patch.object(gemini_service, '_self_heal_response') as mock_self_heal:
mock_self_heal.return_value = {"language": "en", "confidence": 0.8}
result = await gemini_service.extract_accessibility("/tmp/test.mp4")
assert result == {"language": "en", "confidence": 0.8}
mock_self_heal.assert_called_once()
@pytest.mark.asyncio
async def test_extract_accessibility_missing_fields(self, gemini_service):
async def test_extract_accessibility_missing_fields(self, gemini_service, mock_uploaded_file):
"""Test error handling for missing required fields"""
incomplete_response = {
"language": "en",
"confidence": 0.92
# Missing required fields
}
mock_response = MagicMock()
mock_response.text = json.dumps(incomplete_response)
gemini_service.model.generate_content.return_value = mock_response
with patch('app.services.gemini.genai.upload_file'):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
with pytest.raises(ValueError, match="Missing required field"):
await gemini_service.extract_accessibility("/tmp/test.mp4")
with patch('app.services.gemini.client') as mock_client:
mock_client.files.upload.return_value = mock_uploaded_file
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_wait_for_file_active', return_value=True):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
with pytest.raises(ValueError, match="Missing required field"):
await gemini_service.extract_accessibility("/tmp/test.mp4")
@pytest.mark.asyncio
async def test_extract_accessibility_invalid_vtt_format(self, gemini_service):
async def test_extract_accessibility_invalid_vtt_format(self, gemini_service, mock_uploaded_file):
"""Test error handling for invalid VTT format"""
invalid_response = {
"language": "en",
@ -126,37 +135,42 @@ learn about accessibility features.
"captions_vtt": "Invalid VTT content", # Missing WEBVTT header
"audio_description_vtt": "WEBVTT\n\n00:00:01.000 --> 00:00:02.000\nValid AD"
}
mock_response = MagicMock()
mock_response.text = json.dumps(invalid_response)
gemini_service.model.generate_content.return_value = mock_response
with patch('app.services.gemini.genai.upload_file'):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
with pytest.raises(ValueError, match="Invalid captions VTT format"):
await gemini_service.extract_accessibility("/tmp/test.mp4")
with patch('app.services.gemini.client') as mock_client:
mock_client.files.upload.return_value = mock_uploaded_file
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_wait_for_file_active', return_value=True):
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt"):
with pytest.raises(ValueError, match="Invalid captions VTT format"):
await gemini_service.extract_accessibility("/tmp/test.mp4")
@pytest.mark.asyncio
async def test_self_heal_response_success(self, gemini_service, valid_gemini_response):
"""Test successful self-healing of invalid response"""
mock_response = MagicMock()
mock_response.text = json.dumps(valid_gemini_response)
gemini_service.model.generate_content.return_value = mock_response
with patch('app.services.gemini.genai.upload_file'):
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
result = await gemini_service._self_heal_response("/tmp/test.mp4", "invalid json")
assert result == valid_gemini_response
@pytest.mark.asyncio
async def test_self_heal_response_reask(self, gemini_service):
"""Test self-healing when Gemini returns REASK"""
"""Test self-healing when Gemini cannot produce valid JSON"""
mock_response = MagicMock()
mock_response.text = "REASK"
gemini_service.model.generate_content.return_value = mock_response
with patch('app.services.gemini.genai.upload_file'):
with pytest.raises(ValueError, match="Gemini unable to self-heal response"):
mock_response.text = "REASK" # Not valid JSON → triggers failure path
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
with pytest.raises(ValueError, match="Failed to get valid JSON"):
await gemini_service._self_heal_response("/tmp/test.mp4", "invalid json")
@pytest.mark.asyncio
@ -180,22 +194,24 @@ aprender sobre características de accesibilidad.
[El presentador gesticula hacia la pantalla]
"""
}
mock_response = MagicMock()
mock_response.text = json.dumps(transcreate_response)
gemini_service.model.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
result = await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es",
"Brand guidelines"
)
assert result == transcreate_response
assert "WEBVTT" in result["captions_vtt"]
assert "WEBVTT" in result["audio_description_vtt"]
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
result = await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es",
"Brand guidelines"
)
assert result == transcreate_response
assert "WEBVTT" in result["captions_vtt"]
assert "WEBVTT" in result["audio_description_vtt"]
@pytest.mark.asyncio
async def test_transcreate_content_missing_fields(self, gemini_service):
@ -204,38 +220,42 @@ aprender sobre características de accesibilidad.
"captions_vtt": "Some content"
# Missing audio_description_vtt
}
mock_response = MagicMock()
mock_response.text = json.dumps(incomplete_response)
gemini_service.model.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
with pytest.raises(ValueError, match="Missing required VTT fields"):
await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es"
)
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
with pytest.raises(ValueError, match="Missing required VTT fields"):
await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es"
)
@pytest.mark.asyncio
async def test_transcreate_content_invalid_json(self, gemini_service):
"""Test transcreation with invalid JSON response"""
mock_response = MagicMock()
mock_response.text = "invalid json"
gemini_service.model.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
with pytest.raises(ValueError, match="Invalid JSON response from transcreation"):
await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es"
)
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
with pytest.raises(ValueError, match="Invalid JSON response from transcreation"):
await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es"
)
def test_load_prompt_success(self, gemini_service):
"""Test successful prompt loading"""
prompt_content = "Test prompt content with {TARGET_LANGUAGE} placeholder"
with patch('pathlib.Path.read_text', return_value=prompt_content):
result = gemini_service._load_prompt("test_prompt.md")
assert result == prompt_content
@ -253,19 +273,21 @@ aprender sobre características de accesibilidad.
"captions_vtt": "Test VTT",
"audio_description_vtt": "Test AD VTT"
}
mock_response = MagicMock()
mock_response.text = f"```json\n{json.dumps(transcreate_response)}\n```"
gemini_service.model.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
result = await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es"
)
assert result == transcreate_response
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
with patch.object(gemini_service, '_load_prompt', return_value="Test prompt {TARGET_LANGUAGE}"):
result = await gemini_service.transcreate_content(
"English captions VTT",
"English AD VTT",
"es"
)
assert result == transcreate_response
class TestGeminiTranslateVtt:
@ -273,11 +295,7 @@ class TestGeminiTranslateVtt:
@pytest.fixture
def gemini_service(self):
"""Create Gemini service instance with mocked dependencies"""
with patch('app.services.gemini.genai'):
service = GeminiService()
service.model = MagicMock()
return service
return GeminiService()
@pytest.fixture
def sample_vtt(self):
@ -292,41 +310,24 @@ Welcome to our tutorial
00:00:07.000 --> 00:00:09.000
Let's get started
"""
@pytest.fixture
def translated_vtt_es(self):
"""Expected Spanish translation of sample VTT"""
return """WEBVTT
00:00:01.000 --> 00:00:03.000
Hola a todos
00:00:04.000 --> 00:00:06.000
Bienvenidos a nuestro tutorial
00:00:07.000 --> 00:00:09.000
Empecemos
"""
@pytest.mark.asyncio
async def test_translate_vtt_success(self, gemini_service, sample_vtt, translated_vtt_es):
async def test_translate_vtt_success(self, gemini_service, sample_vtt):
"""Test successful VTT translation using Gemini"""
# translate_vtt sends cue texts as a numbered list; mock must return a numbered list
mock_response = MagicMock()
mock_response.text = translated_vtt_es
mock_response.text = "1. Hola a todos\n2. Bienvenidos a nuestro tutorial\n3. Empecemos"
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
result = await gemini_service.translate_vtt(sample_vtt, "es")
# Verify structure is preserved
assert "WEBVTT" in result
assert "00:00:01.000 --> 00:00:03.000" in result
assert "00:00:04.000 --> 00:00:06.000" in result
assert "00:00:07.000 --> 00:00:09.000" in result
# Verify translation content
assert "Hola a todos" in result
assert "Bienvenidos a nuestro tutorial" in result
assert "Empecemos" in result
@ -341,24 +342,15 @@ Original text
00:00:05.890 --> 00:00:08.123
Another line
"""
translated_vtt = """WEBVTT
00:00:01.234 --> 00:00:03.567
Texto original
00:00:05.890 --> 00:00:08.123
Otra línea
"""
mock_response = MagicMock()
mock_response.text = translated_vtt
mock_response.text = "1. Texto original\n2. Otra línea"
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
result = await gemini_service.translate_vtt(original_vtt, "es")
# Check that exact timestamps are preserved
assert "00:00:01.234 --> 00:00:03.567" in result
assert "00:00:05.890 --> 00:00:08.123" in result
assert "Texto original" in result
@ -366,41 +358,17 @@ Otra línea
@pytest.mark.asyncio
async def test_translate_vtt_maintains_webvtt_header(self, gemini_service, sample_vtt):
"""Test that WEBVTT header is preserved or added if missing"""
# Response without WEBVTT header
response_without_header = """00:00:01.000 --> 00:00:03.000
Hola a todos
"""
"""Test that result always has WEBVTT header (rebuilt from original timings)"""
mock_response = MagicMock()
mock_response.text = response_without_header
mock_response.text = "1. Hola a todos\n2. Bienvenidos a nuestro tutorial\n3. Empecemos"
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
result = await gemini_service.translate_vtt(sample_vtt, "es")
# Should add WEBVTT header if missing
assert result.startswith("WEBVTT")
@pytest.mark.asyncio
async def test_translate_vtt_handles_markdown_formatting(self, gemini_service, sample_vtt, translated_vtt_es):
"""Test handling of markdown code blocks in response"""
# Response with markdown formatting
markdown_response = f"```vtt\n{translated_vtt_es}\n```"
mock_response = MagicMock()
mock_response.text = markdown_response
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
result = await gemini_service.translate_vtt(sample_vtt, "es")
# Should strip markdown formatting
assert "```" not in result
assert "WEBVTT" in result
assert "Hola a todos" in result
@pytest.mark.asyncio
async def test_translate_vtt_handles_multiline_cues(self, gemini_service):
"""Test translation of VTT with multi-line cues"""
@ -413,17 +381,9 @@ Second line
00:00:04.000 --> 00:00:06.000
Another cue
"""
translated_multiline = """WEBVTT
00:00:01.000 --> 00:00:03.000
Primera línea
Segunda línea
00:00:04.000 --> 00:00:06.000
Otra señal
"""
# Multi-line cues are joined with a space before sending to Gemini
mock_response = MagicMock()
mock_response.text = translated_multiline
mock_response.text = "1. Primera línea Segunda línea\n2. Otra señal"
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
@ -437,18 +397,18 @@ Otra señal
async def test_translate_vtt_with_source_language(self, gemini_service, sample_vtt):
"""Test translation with non-English source language"""
mock_response = MagicMock()
mock_response.text = sample_vtt # Just return same content for this test
mock_response.text = "1. Hello everyone\n2. Welcome to our tutorial\n3. Let's get started"
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
# Call with French as source language
await gemini_service.translate_vtt(sample_vtt, "en", source_language="fr")
# Verify the prompt included the source language
call_args = mock_client.models.generate_content.call_args
assert call_args is not None
# Prompt should reference the source language
prompt_content = str(call_args)
assert "fr" in prompt_content or "French" in prompt_content or call_args is not None
assert "fr" in prompt_content or "French" in prompt_content
@pytest.mark.asyncio
async def test_translate_vtt_error_handling(self, gemini_service, sample_vtt):
@ -469,24 +429,15 @@ Otra señal
00:00:04.000 --> 00:00:06.000
[Speaker 2]: Welcome to the show
"""
translated_with_speakers = """WEBVTT
00:00:01.000 --> 00:00:03.000
[Speaker 1]: Hola a todos
00:00:04.000 --> 00:00:06.000
[Speaker 2]: Bienvenidos al programa
"""
mock_response = MagicMock()
mock_response.text = translated_with_speakers
mock_response.text = "1. [Speaker 1]: Hola a todos\n2. [Speaker 2]: Bienvenidos al programa"
with patch('app.services.gemini.client') as mock_client:
mock_client.models.generate_content.return_value = mock_response
result = await gemini_service.translate_vtt(vtt_with_speakers, "es")
# Verify speaker labels are preserved
assert "[Speaker 1]" in result
assert "[Speaker 2]" in result
@ -499,21 +450,10 @@ class TestGeminiServiceIntegration:
@pytest.mark.asyncio
async def test_real_gemini_extraction(self):
"""Test real Gemini extraction (requires setup)"""
# This test should be enabled when running with real credentials
service = GeminiService()
# Would require a real test video file
# result = await service.extract_accessibility("/path/to/test/video.mp4")
# assert "captions_vtt" in result
# assert "audio_description_vtt" in result
pass
@pytest.mark.skip(reason="Requires actual Gemini API key")
@pytest.mark.asyncio
async def test_real_transcreation(self):
"""Test real transcreation (requires setup)"""
# This test should be enabled when running with real credentials
service = GeminiService()
# Would test actual transcreation
pass
pass

View file

@ -144,15 +144,15 @@ class TestJobModel:
"""Test JobCreate schema"""
job_create = JobCreate(
title="Test Video",
language="en",
source_is_english=True,
requested_outputs=RequestedOutputs(
captions_vtt=True,
languages=["es", "fr"]
)
)
assert job_create.title == "Test Video"
assert job_create.language == "en"
assert job_create.source_is_english is True
assert job_create.requested_outputs.languages == ["es", "fr"]
def test_job_update_schema(self):

View file

@ -198,7 +198,8 @@ class TestTokenSecurity:
with patch('app.core.security.settings') as mock_settings:
mock_settings.jwt_secret = "test_secret"
mock_settings.jwt_alg = "HS256"
mock_settings.jwt_access_ttl_min = 15
token = create_access_token("user123")
payload = jwt.decode(token, "test_secret", algorithms=["HS256"])

View file

@ -130,7 +130,7 @@ class TestTTSService:
service.google_client = None
service.elevenlabs_available = False
with pytest.raises(ValueError, match="No TTS service configured"):
with pytest.raises(ValueError, match="No TTS service available"):
await service.synthesize_audio_description("WEBVTT\n", "en-US")
@pytest.mark.asyncio
@ -195,34 +195,45 @@ class TestTTSService:
async def test_synthesize_text_elevenlabs_success(self, tts_service_elevenlabs):
"""Test ElevenLabs text synthesis"""
mock_audio_data = b"elevenlabs_audio_response"
# Mock aiohttp session
mock_response = AsyncMock()
mock_response.status = 200
mock_response.read.return_value = mock_audio_data
mock_session = AsyncMock()
mock_session.post.return_value.__aenter__.return_value = mock_response
mock_response.read = AsyncMock(return_value=mock_audio_data)
# async with session.post(...) as response: needs a sync callable returning async CM
mock_post_cm = MagicMock()
mock_post_cm.__aenter__ = AsyncMock(return_value=mock_response)
mock_post_cm.__aexit__ = AsyncMock(return_value=False)
mock_session = MagicMock()
mock_session.post = MagicMock(return_value=mock_post_cm)
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch('app.services.tts.aiohttp.ClientSession', return_value=mock_session):
result = await tts_service_elevenlabs._synthesize_text_elevenlabs(
"Test text",
"21m00Tcm4TlvDq8ikWAM"
)
assert result == mock_audio_data
assert result == mock_audio_data
@pytest.mark.asyncio
async def test_synthesize_text_elevenlabs_error(self, tts_service_elevenlabs):
"""Test ElevenLabs API error handling"""
# Mock error response
mock_response = AsyncMock()
mock_response.status = 400
mock_response.text.return_value = "Bad request error"
mock_session = AsyncMock()
mock_session.post.return_value.__aenter__.return_value = mock_response
mock_response.text = AsyncMock(return_value="Bad request error")
mock_post_cm = MagicMock()
mock_post_cm.__aenter__ = AsyncMock(return_value=mock_response)
mock_post_cm.__aexit__ = AsyncMock(return_value=False)
mock_session = MagicMock()
mock_session.post = MagicMock(return_value=mock_post_cm)
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch('app.services.tts.aiohttp.ClientSession', return_value=mock_session):
with pytest.raises(ValueError, match="ElevenLabs TTS failed: 400"):
await tts_service_elevenlabs._synthesize_text_elevenlabs(