Phase 7: Testing Suite — backend unit tests + Cypress E2E framework

Backend:
- conftest with async SQLite DB, factory fixtures for all models
- pytest-asyncio config in pyproject.toml
- Tests: auth (JWT, dev login), RBAC (access service), audit (query, export),
  brand enforcement (colors, fonts, logos, contrast), retention (cleanup, purge),
  content intelligence (regex classifiers), slide mapping, review workflow,
  analytics data queries

Frontend:
- Cypress E2E config with baseUrl and viewport settings
- Custom commands (devLogin, createPresentation)
- E2E specs: login flow, wizard navigation, admin panel, review workflow
- Test scripts in package.json

Infrastructure:
- Makefile: test-e2e and test-all targets

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-02-26 16:49:23 +00:00
parent c97841f6d1
commit 76a4e41e3b
19 changed files with 1203 additions and 3 deletions

View file

@ -1,4 +1,4 @@
.PHONY: dev build up down migrate seed test test-frontend logs shell-api shell-db
.PHONY: dev build up down migrate seed test test-e2e test-all logs shell-api shell-db
dev:
docker compose up --build
@ -21,9 +21,11 @@ seed:
test:
docker compose exec api pytest tests/ -v
test-frontend:
test-e2e:
docker compose exec web npx cypress run
test-all: test test-e2e
logs:
docker compose logs -f

View file

@ -29,8 +29,15 @@ dependencies = [
"openpyxl>=3.1",
"trafilatura>=2.0",
"arq>=0.26",
"pytest-asyncio>=0.25",
"httpx>=0.28",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
pythonpath = ["."]
[[tool.uv.index]]
url = "https://download.pytorch.org/whl/cpu"

215
backend/tests/conftest.py Normal file
View file

@ -0,0 +1,215 @@
"""Test fixtures: async SQLite database, factory functions for models."""
import os
import uuid
from datetime import datetime, timedelta, timezone
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlmodel import SQLModel
# Force SQLite for tests (must be set before any app imports)
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///:memory:")
os.environ.setdefault("JWT_SECRET_KEY", "test-secret-key")
os.environ.setdefault("DEV_AUTH_PASSWORD", "testpass")
# Import all SQLModel table classes so metadata is populated
from models.sql.user import UserModel # noqa: E402
from models.sql.client import ClientModel # noqa: E402
from models.sql.team import TeamModel # noqa: E402
from models.sql.team_membership import TeamMembershipModel # noqa: E402
from models.sql.brand_config import BrandConfigModel # noqa: E402
from models.sql.audit_log import AuditLogModel # noqa: E402
from models.sql.presentation import PresentationModel # noqa: E402
from models.sql.job import JobModel # noqa: E402
from models.sql.master_deck import MasterDeckModel # noqa: E402
_test_engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
_test_session_maker = async_sessionmaker(_test_engine, expire_on_commit=False)
@pytest_asyncio.fixture(autouse=True)
async def setup_database():
"""Create all tables before each test, drop after."""
async with _test_engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
yield
async with _test_engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)
@pytest_asyncio.fixture
async def session() -> AsyncSession:
async with _test_session_maker() as s:
yield s
# --------------- Factory Fixtures ---------------
@pytest_asyncio.fixture
async def make_user(session: AsyncSession):
async def _factory(
email: str = "test@oliver.com",
role: str = "user",
display_name: str = "Test User",
is_active: bool = True,
azure_oid: str | None = None,
) -> UserModel:
user = UserModel(
email=email,
display_name=display_name,
role=role,
is_active=is_active,
azure_oid=azure_oid,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(user)
await session.commit()
await session.refresh(user)
return user
return _factory
@pytest_asyncio.fixture
async def make_client(session: AsyncSession):
async def _factory(
name: str = "Acme Corp",
slug: str | None = None,
retention_days: int | None = None,
is_active: bool = True,
) -> ClientModel:
client = ClientModel(
name=name,
slug=slug or name.lower().replace(" ", "-"),
retention_days=retention_days,
is_active=is_active,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(client)
await session.commit()
await session.refresh(client)
return client
return _factory
@pytest_asyncio.fixture
async def make_team(session: AsyncSession):
async def _factory(
name: str = "Dev Team",
client_id: uuid.UUID | None = None,
is_default: bool = False,
) -> TeamModel:
team = TeamModel(
name=name,
client_id=client_id,
is_default=is_default,
created_at=datetime.now(timezone.utc),
)
session.add(team)
await session.commit()
await session.refresh(team)
return team
return _factory
@pytest_asyncio.fixture
async def make_membership(session: AsyncSession):
async def _factory(user_id: uuid.UUID, team_id: uuid.UUID) -> TeamMembershipModel:
membership = TeamMembershipModel(
user_id=user_id,
team_id=team_id,
assigned_at=datetime.now(timezone.utc),
)
session.add(membership)
await session.commit()
await session.refresh(membership)
return membership
return _factory
@pytest_asyncio.fixture
async def make_brand_config(session: AsyncSession):
async def _factory(
client_id: uuid.UUID,
primary_colors: list | None = None,
secondary_colors: list | None = None,
fonts: dict | None = None,
logo_paths: list | None = None,
voice_rules: str | None = None,
voice_examples: list | None = None,
) -> BrandConfigModel:
brand = BrandConfigModel(
client_id=client_id,
primary_colors=primary_colors or ["#5146E5", "#3D35B0"],
secondary_colors=secondary_colors or ["#E9E8F8"],
fonts=fonts or {"heading": "Inter", "body": "Roboto"},
logo_paths=logo_paths,
voice_rules=voice_rules,
voice_examples=voice_examples,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(brand)
await session.commit()
await session.refresh(brand)
return brand
return _factory
@pytest_asyncio.fixture
async def make_presentation(session: AsyncSession):
async def _factory(
owner_id: uuid.UUID | None = None,
client_id: uuid.UUID | None = None,
title: str = "Test Deck",
status: str = "draft",
created_at: datetime | None = None,
deleted_at: datetime | None = None,
) -> PresentationModel:
pres = PresentationModel(
content="Test content",
n_slides=5,
language="English",
title=title,
owner_id=owner_id,
client_id=client_id,
status=status,
created_at=created_at or datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
deleted_at=deleted_at,
)
session.add(pres)
await session.commit()
await session.refresh(pres)
return pres
return _factory
@pytest_asyncio.fixture
async def make_job(session: AsyncSession):
async def _factory(
user_id: uuid.UUID,
client_id: uuid.UUID,
job_type: str = "generate_presentation",
status: str = "queued",
progress: int = 0,
presentation_id: uuid.UUID | None = None,
) -> JobModel:
job = JobModel(
user_id=user_id,
client_id=client_id,
job_type=job_type,
status=status,
progress=progress,
presentation_id=presentation_id,
created_at=datetime.now(timezone.utc),
)
session.add(job)
await session.commit()
await session.refresh(job)
return job
return _factory

View file

@ -0,0 +1,73 @@
"""Tests for RBAC access_service: client access by role and team membership."""
import uuid
import pytest
from services.access_service import get_accessible_client_ids, get_accessible_clients
class TestSuperAdminAccess:
async def test_super_admin_sees_all_active_clients(
self, session, make_user, make_client
):
admin = await make_user(email="admin@test.com", role="super_admin")
c1 = await make_client(name="Client A", slug="client-a")
c2 = await make_client(name="Client B", slug="client-b")
# Inactive client should be excluded
await make_client(name="Inactive", slug="inactive", is_active=False)
ids = await get_accessible_client_ids(admin, session)
assert set(ids) == {c1.id, c2.id}
async def test_super_admin_gets_full_client_objects(
self, session, make_user, make_client
):
admin = await make_user(email="admin@test.com", role="super_admin")
await make_client(name="X Corp", slug="x-corp")
clients = await get_accessible_clients(admin, session)
assert len(clients) == 1
assert clients[0].name == "X Corp"
class TestRegularUserAccess:
async def test_user_sees_only_team_linked_clients(
self, session, make_user, make_client, make_team, make_membership
):
user = await make_user(email="user@test.com", role="user")
c1 = await make_client(name="My Client", slug="my-client")
c2 = await make_client(name="Other Client", slug="other-client")
team = await make_team(name="Team A", client_id=c1.id)
await make_membership(user_id=user.id, team_id=team.id)
ids = await get_accessible_client_ids(user, session)
assert c1.id in ids
assert c2.id not in ids
async def test_user_with_no_teams_sees_nothing(
self, session, make_user, make_client
):
user = await make_user(email="lonely@test.com", role="user")
await make_client(name="Some Corp", slug="some-corp")
ids = await get_accessible_client_ids(user, session)
assert ids == []
clients = await get_accessible_clients(user, session)
assert clients == []
async def test_user_sees_multiple_clients_via_multiple_teams(
self, session, make_user, make_client, make_team, make_membership
):
user = await make_user(email="multi@test.com", role="user")
c1 = await make_client(name="Client 1", slug="client-1")
c2 = await make_client(name="Client 2", slug="client-2")
t1 = await make_team(name="Team 1", client_id=c1.id)
t2 = await make_team(name="Team 2", client_id=c2.id)
await make_membership(user_id=user.id, team_id=t1.id)
await make_membership(user_id=user.id, team_id=t2.id)
ids = await get_accessible_client_ids(user, session)
assert set(ids) == {c1.id, c2.id}

View file

@ -0,0 +1,99 @@
"""Tests for analytics endpoints: verify query logic returns correct aggregations."""
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from models.sql.job import JobModel
from models.sql.presentation import PresentationModel
class TestAnalyticsData:
"""Test that analytics data can be correctly queried from our models."""
async def test_presentation_count(self, session, make_client, make_user, make_presentation):
user = await make_user(email="ana@test.com")
client = await make_client(name="Ana Corp", slug="ana-corp")
await make_presentation(owner_id=user.id, client_id=client.id, title="Deck 1")
await make_presentation(owner_id=user.id, client_id=client.id, title="Deck 2")
await make_presentation(owner_id=user.id, client_id=client.id, title="Deck 3")
from sqlmodel import select, func
result = await session.execute(
select(func.count()).select_from(PresentationModel).where(
PresentationModel.client_id == client.id,
PresentationModel.deleted_at.is_(None),
)
)
count = result.scalar()
assert count == 3
async def test_status_distribution(self, session, make_client, make_user, make_presentation):
user = await make_user(email="dist@test.com")
client = await make_client(name="Dist Corp", slug="dist-corp")
await make_presentation(owner_id=user.id, client_id=client.id, status="draft")
await make_presentation(owner_id=user.id, client_id=client.id, status="draft")
await make_presentation(owner_id=user.id, client_id=client.id, status="in_review")
await make_presentation(owner_id=user.id, client_id=client.id, status="approved")
from sqlmodel import select, func
result = await session.execute(
select(
PresentationModel.status,
func.count().label("cnt")
).where(
PresentationModel.client_id == client.id,
PresentationModel.deleted_at.is_(None),
).group_by(PresentationModel.status)
)
dist = {row[0]: row[1] for row in result.all()}
assert dist["draft"] == 2
assert dist["in_review"] == 1
assert dist["approved"] == 1
async def test_job_status_distribution(
self, session, make_client, make_user, make_job
):
user = await make_user(email="job@test.com")
client = await make_client(name="Job Corp", slug="job-corp")
await make_job(user_id=user.id, client_id=client.id, status="completed")
await make_job(user_id=user.id, client_id=client.id, status="completed")
await make_job(user_id=user.id, client_id=client.id, status="failed")
from sqlmodel import select, func
result = await session.execute(
select(
JobModel.status,
func.count().label("cnt")
).where(
JobModel.client_id == client.id
).group_by(JobModel.status)
)
dist = {row[0]: row[1] for row in result.all()}
assert dist["completed"] == 2
assert dist["failed"] == 1
async def test_deleted_presentations_excluded(
self, session, make_client, make_user, make_presentation
):
user = await make_user(email="del@test.com")
client = await make_client(name="Del Corp", slug="del-corp")
await make_presentation(owner_id=user.id, client_id=client.id, title="Active")
await make_presentation(
owner_id=user.id, client_id=client.id, title="Deleted",
deleted_at=datetime.now(timezone.utc),
)
from sqlmodel import select, func
result = await session.execute(
select(func.count()).select_from(PresentationModel).where(
PresentationModel.client_id == client.id,
PresentationModel.deleted_at.is_(None),
)
)
count = result.scalar()
assert count == 1

View file

@ -0,0 +1,108 @@
"""Tests for audit_service: query, export_csv, export_json."""
import uuid
from datetime import datetime, timezone
import pytest
from models.sql.audit_log import AuditLogModel
from services import audit_service
class TestAuditQuery:
async def test_query_returns_entries(self, session):
entry = AuditLogModel(
user_id=uuid.uuid4(),
action="create",
resource_type="presentation",
resource_id=uuid.uuid4(),
created_at=datetime.now(timezone.utc),
)
session.add(entry)
await session.commit()
logs = await audit_service.query(session)
assert len(logs) == 1
assert logs[0].action == "create"
async def test_query_filter_by_action(self, session):
uid = uuid.uuid4()
for action in ["create", "update", "delete"]:
session.add(AuditLogModel(
user_id=uid,
action=action,
resource_type="presentation",
created_at=datetime.now(timezone.utc),
))
await session.commit()
logs = await audit_service.query(session, action="delete")
assert len(logs) == 1
assert logs[0].action == "delete"
async def test_query_filter_by_user_id(self, session):
uid1 = uuid.uuid4()
uid2 = uuid.uuid4()
session.add(AuditLogModel(
user_id=uid1, action="create", resource_type="deck",
created_at=datetime.now(timezone.utc),
))
session.add(AuditLogModel(
user_id=uid2, action="create", resource_type="deck",
created_at=datetime.now(timezone.utc),
))
await session.commit()
logs = await audit_service.query(session, user_id=uid1)
assert len(logs) == 1
async def test_query_pagination(self, session):
uid = uuid.uuid4()
for i in range(5):
session.add(AuditLogModel(
user_id=uid, action=f"action_{i}", resource_type="test",
created_at=datetime.now(timezone.utc),
))
await session.commit()
page1 = await audit_service.query(session, limit=2, offset=0)
page2 = await audit_service.query(session, limit=2, offset=2)
assert len(page1) == 2
assert len(page2) == 2
class TestAuditExport:
def test_export_csv(self):
entries = [
AuditLogModel(
id=uuid.uuid4(),
user_id=uuid.uuid4(),
action="export",
resource_type="presentation",
created_at=datetime.now(timezone.utc),
)
]
csv_str = audit_service.export_csv(entries)
assert "export" in csv_str
assert "presentation" in csv_str
# Header row
assert "id,user_id,action" in csv_str
def test_export_json(self):
entries = [
AuditLogModel(
id=uuid.uuid4(),
user_id=uuid.uuid4(),
action="login",
resource_type="session",
details={"method": "dev"},
created_at=datetime.now(timezone.utc),
)
]
json_str = audit_service.export_json(entries)
assert '"action": "login"' in json_str
assert '"method": "dev"' in json_str
def test_export_csv_empty(self):
csv_str = audit_service.export_csv([])
lines = csv_str.strip().split("\n")
assert len(lines) == 1 # Only header

View file

@ -0,0 +1,78 @@
"""Tests for auth_service: JWT creation/validation, dev login, user creation."""
import uuid
import pytest
from services.auth_service import AuthService
@pytest.fixture
def auth_service():
return AuthService()
class TestJWT:
def test_create_and_validate_token(self, auth_service, make_user):
"""Token round-trip: create → validate → same payload."""
from models.sql.user import UserModel
user = UserModel(
id=uuid.uuid4(),
email="jwt@test.com",
display_name="JWT User",
role="user",
is_active=True,
)
token = auth_service.create_session_jwt(user)
payload = auth_service.validate_token(token)
assert payload is not None
assert payload["sub"] == str(user.id)
assert payload["email"] == "jwt@test.com"
assert payload["role"] == "user"
def test_validate_invalid_token(self, auth_service):
assert auth_service.validate_token("garbage.token.here") is None
def test_validate_tampered_token(self, auth_service):
from models.sql.user import UserModel
user = UserModel(
id=uuid.uuid4(),
email="tamper@test.com",
display_name="Tamper",
role="user",
is_active=True,
)
token = auth_service.create_session_jwt(user)
# Tamper with the token
tampered = token[:-5] + "XXXXX"
assert auth_service.validate_token(tampered) is None
class TestDevLogin:
async def test_dev_login_success(self, auth_service, session):
"""Dev mode: correct password creates user."""
user = await auth_service.dev_login("dev@test.com", "testpass", session)
assert user is not None
assert user.email == "dev@test.com"
async def test_dev_login_wrong_password(self, auth_service, session):
user = await auth_service.dev_login("dev@test.com", "wrongpass", session)
assert user is None
async def test_dev_login_idempotent(self, auth_service, session):
"""Logging in twice with same email returns same user."""
user1 = await auth_service.dev_login("dev@test.com", "testpass", session)
user2 = await auth_service.dev_login("dev@test.com", "testpass", session)
assert user1.id == user2.id
class TestDevMode:
def test_is_dev_mode_when_no_azure(self, auth_service):
"""Without AZURE_AD_TENANT_ID, service is in dev mode."""
assert auth_service.is_dev_mode is True
def test_get_authorization_url_raises_in_dev_mode(self, auth_service):
with pytest.raises(ValueError, match="dev login"):
auth_service.get_authorization_url()

View file

@ -0,0 +1,168 @@
"""Tests for BrandEnforcementService: color utilities, font/color/logo enforcement."""
import uuid
import pytest
from models.pptx_models import (
PptxAutoShapeBoxModel,
PptxChartBoxModel,
PptxFillModel,
PptxFontModel,
PptxParagraphModel,
PptxPositionModel,
PptxPresentationModel,
PptxSlideModel,
PptxTextBoxModel,
PptxTextRunModel,
)
from models.sql.brand_config import BrandConfigModel
from pptx.enum.shapes import MSO_AUTO_SHAPE_TYPE
from services.brand_enforcement_service import (
BrandEnforcementService,
_contrast_ratio,
_hex_to_rgb,
_relative_luminance,
)
@pytest.fixture
def svc():
return BrandEnforcementService()
@pytest.fixture
def brand() -> BrandConfigModel:
return BrandConfigModel(
client_id=uuid.uuid4(),
primary_colors=["#FF0000", "#00FF00"],
secondary_colors=["#0000FF"],
fonts={"heading": "Montserrat", "body": "Open Sans"},
logo_paths=["/logos/acme.png"],
)
class TestColorUtilities:
def test_hex_to_rgb_six_chars(self):
assert _hex_to_rgb("FF0000") == (255, 0, 0)
assert _hex_to_rgb("00FF00") == (0, 255, 0)
def test_hex_to_rgb_with_hash(self):
assert _hex_to_rgb("#0000FF") == (0, 0, 255)
def test_hex_to_rgb_three_chars(self):
assert _hex_to_rgb("F00") == (255, 0, 0)
def test_white_has_high_luminance(self):
assert _relative_luminance("FFFFFF") > 0.9
def test_black_has_low_luminance(self):
assert _relative_luminance("000000") < 0.01
def test_contrast_ratio_black_vs_white(self):
white_lum = _relative_luminance("FFFFFF")
black_lum = _relative_luminance("000000")
ratio = _contrast_ratio(white_lum, black_lum)
assert ratio > 20 # Should be ~21:1
class TestBrandColors:
def test_get_brand_colors_list(self, svc, brand):
colors = svc.get_brand_colors_list(brand)
assert "FF0000" in colors
assert "00FF00" in colors
assert "0000FF" in colors
def test_fallback_when_no_colors(self, svc):
brand = BrandConfigModel(
client_id=uuid.uuid4(),
primary_colors=None,
secondary_colors=None,
)
colors = svc.get_brand_colors_list(brand)
assert len(colors) > 0 # Defaults
assert "4472C4" in colors
class TestEnforceOnPptxModel:
def _make_text_slide(self, font_name="Arial", font_size=14):
return PptxSlideModel(shapes=[
PptxTextBoxModel(
position=PptxPositionModel(left=10, top=10, width=200, height=50),
paragraphs=[
PptxParagraphModel(
text="Hello World",
font=PptxFontModel(name=font_name, size=font_size, color="000000"),
)
],
)
])
def test_fonts_replaced_with_brand_fonts(self, svc, brand):
model = PptxPresentationModel(slides=[
self._make_text_slide("Arial", 32), # Title slide (idx=0) → heading font
self._make_text_slide("Times", 14), # Content slide → body font
])
result = svc.enforce_on_pptx_model(model, brand)
# Slide 0 (title): heading font
assert result.slides[0].shapes[0].paragraphs[0].font.name == "Montserrat"
# Slide 1 (content): body font
assert result.slides[1].shapes[0].paragraphs[0].font.name == "Open Sans"
def test_logo_added_to_content_slides_only(self, svc, brand):
model = PptxPresentationModel(slides=[
self._make_text_slide(), # Title slide — no logo
self._make_text_slide(), # Content slide — should get logo
self._make_text_slide(), # Another content slide
])
result = svc.enforce_on_pptx_model(model, brand)
# Slide 0: no logo shape added (still 1 shape)
assert len(result.slides[0].shapes) == 1
# Slides 1-2: logo added (2 shapes each)
assert len(result.slides[1].shapes) == 2
assert len(result.slides[2].shapes) == 2
def test_no_logo_when_no_logo_paths(self, svc):
brand_no_logo = BrandConfigModel(
client_id=uuid.uuid4(),
fonts={"heading": "Inter", "body": "Inter"},
logo_paths=None,
)
model = PptxPresentationModel(slides=[
self._make_text_slide(),
self._make_text_slide(),
])
result = svc.enforce_on_pptx_model(model, brand_no_logo)
assert len(result.slides[1].shapes) == 1
def test_chart_gets_brand_colors(self, svc, brand):
chart = PptxChartBoxModel(
position=PptxPositionModel(left=0, top=0, width=400, height=300),
chart_type="bar",
chart_data={"categories": ["A"], "series": [{"name": "S", "values": [1]}]},
)
model = PptxPresentationModel(slides=[
PptxSlideModel(shapes=[chart]),
])
result = svc.enforce_on_pptx_model(model, brand)
assert result.slides[0].shapes[0].brand_colors is not None
assert "FF0000" in result.slides[0].shapes[0].brand_colors
def test_contrast_fix_white_on_white(self, svc, brand):
"""White text on white bg should be auto-corrected to black."""
slide = PptxSlideModel(shapes=[
PptxTextBoxModel(
position=PptxPositionModel(left=0, top=0, width=100, height=50),
paragraphs=[
PptxParagraphModel(
text="Invisible",
font=PptxFontModel(name="Arial", size=14, color="FFFFFF"),
)
],
)
])
# Default bg is white (FFFFFF)
model = PptxPresentationModel(slides=[slide])
result = svc.enforce_on_pptx_model(model, brand)
# After contrast fix, text should be dark
assert result.slides[0].shapes[0].paragraphs[0].font.color == "000000"

View file

@ -0,0 +1,103 @@
"""Tests for ContentIntelligenceService: rule-based content classification."""
import re
import pytest
from services.content_intelligence_service import (
_COMPARISON_RE,
_IMAGE_REF_RE,
_LIST_RE,
_METRIC_RE,
_QUOTE_RE,
_TABLE_RE,
_TIMELINE_RE,
)
class TestMetricRegex:
@pytest.mark.parametrize("text", [
"$2.3M revenue",
"45% growth",
"1,200 units",
"revenue grew 45%",
"profit increased by $2M",
"ROI of 340%",
"CAGR 12%",
])
def test_detects_metrics(self, text):
assert _METRIC_RE.search(text), f"Failed to detect metric: {text}"
@pytest.mark.parametrize("text", [
"The cat sat on the mat",
"We had a meeting yesterday",
])
def test_rejects_non_metrics(self, text):
assert not _METRIC_RE.search(text)
class TestQuoteRegex:
def test_detects_quoted_text(self):
text = '"Innovation is the ability to see change as an opportunity" — John Doe'
assert _QUOTE_RE.search(text)
def test_detects_smart_quotes(self):
text = '\u201cThis is a quoted statement\u201d'
assert _QUOTE_RE.search(text)
def test_rejects_short_quotes(self):
text = '"Hi"'
assert not _QUOTE_RE.search(text)
class TestTableRegex:
def test_detects_markdown_table(self):
text = "| Name | Value |\n| --- | --- |\n| A | 1 |"
assert _TABLE_RE.search(text)
def test_rejects_non_table(self):
text = "This is just normal text"
assert not _TABLE_RE.search(text)
class TestTimelineRegex:
@pytest.mark.parametrize("text", [
"In 2023, we launched the product",
"Q1 results were strong",
"January 2024 earnings",
])
def test_detects_timeline(self, text):
assert _TIMELINE_RE.search(text)
class TestComparisonRegex:
@pytest.mark.parametrize("text", [
"Plan A vs. Plan B",
"compared to last year",
"in contrast to competitors",
"on the other hand, they chose",
])
def test_detects_comparison(self, text):
assert _COMPARISON_RE.search(text)
class TestListRegex:
def test_detects_bullet_list(self):
text = "- Item one\n- Item two\n- Item three"
matches = _LIST_RE.findall(text)
assert len(matches) == 3
def test_detects_asterisk_list(self):
text = "* First\n* Second"
assert _LIST_RE.search(text)
class TestImageRefRegex:
@pytest.mark.parametrize("text", [
"See figure 1 below",
"see diagram for details",
"image.png",
"![alt text](photo.jpg)",
"attached image shows",
])
def test_detects_image_references(self, text):
assert _IMAGE_REF_RE.search(text)

View file

@ -0,0 +1,127 @@
"""Tests for RetentionService: cleanup and purge logic.
Uses mocked async_session_maker since RetentionService creates its own sessions.
"""
import uuid
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from models.sql.client import ClientModel
from models.sql.presentation import PresentationModel
from services.retention_service import RetentionService
@pytest.fixture
def svc():
return RetentionService()
class TestRunCleanup:
async def test_soft_deletes_expired_presentations(
self, session, make_client, make_presentation, make_user
):
"""Presentations older than client.retention_days get soft-deleted."""
client = await make_client(name="RetCorp", slug="retcorp", retention_days=30)
user = await make_user(email="ret@test.com")
# Old presentation (60 days ago) — should be deleted
old = await make_presentation(
owner_id=user.id,
client_id=client.id,
title="Old Deck",
created_at=datetime.now(timezone.utc) - timedelta(days=60),
)
# Recent presentation — should survive
recent = await make_presentation(
owner_id=user.id,
client_id=client.id,
title="New Deck",
created_at=datetime.now(timezone.utc) - timedelta(days=5),
)
# Mock async_session_maker to use our test session
with patch("services.retention_service.async_session_maker") as mock_sm:
mock_sm.return_value.__aenter__ = AsyncMock(return_value=session)
mock_sm.return_value.__aexit__ = AsyncMock(return_value=False)
result = await svc.run_cleanup()
assert result["deleted"] == 1
await session.refresh(old)
await session.refresh(recent)
assert old.deleted_at is not None
assert recent.deleted_at is None
async def test_no_clients_with_retention_policy(self, session, make_client):
"""Clients without retention_days are skipped."""
await make_client(name="NoPol", slug="nopol", retention_days=None)
with patch("services.retention_service.async_session_maker") as mock_sm:
mock_sm.return_value.__aenter__ = AsyncMock(return_value=session)
mock_sm.return_value.__aexit__ = AsyncMock(return_value=False)
result = await RetentionService().run_cleanup()
assert result["deleted"] == 0
class TestPurgeSoftDeleted:
async def test_purges_old_soft_deleted(
self, session, make_client, make_presentation, make_user
):
"""Soft-deleted presentations older than threshold get permanently deleted."""
client = await make_client(name="Purge Corp", slug="purge-corp")
user = await make_user(email="purge@test.com")
# Soft-deleted 45 days ago — should be purged
pres = await make_presentation(
owner_id=user.id,
client_id=client.id,
title="Dead Deck",
deleted_at=datetime.now(timezone.utc) - timedelta(days=45),
)
with patch("services.retention_service.async_session_maker") as mock_sm:
mock_sm.return_value.__aenter__ = AsyncMock(return_value=session)
mock_sm.return_value.__aexit__ = AsyncMock(return_value=False)
result = await RetentionService().purge_soft_deleted(days_after_soft_delete=30)
assert result["purged"] == 1
# Should be gone from DB
deleted = await session.get(PresentationModel, pres.id)
assert deleted is None
class TestCleanupFiles:
def test_cleanup_files_removes_existing_files(self, svc, tmp_path):
"""_cleanup_files removes files listed in presentation.file_paths."""
f = tmp_path / "test.pptx"
f.write_text("dummy")
pres = MagicMock()
pres.id = uuid.uuid4()
pres.file_paths = [str(f)]
svc._cleanup_files(pres)
assert not f.exists()
def test_cleanup_files_handles_missing_files(self, svc):
"""Missing files don't raise errors."""
pres = MagicMock()
pres.id = uuid.uuid4()
pres.file_paths = ["/nonexistent/file.pptx"]
# Should not raise
svc._cleanup_files(pres)
def test_cleanup_files_handles_none(self, svc):
pres = MagicMock()
pres.id = uuid.uuid4()
pres.file_paths = None
svc._cleanup_files(pres)

View file

@ -0,0 +1,35 @@
"""Tests for review workflow: status transitions and validation."""
import pytest
from api.v1.ppt.endpoints.review import VALID_TRANSITIONS, VALID_STATUSES
class TestStatusTransitions:
def test_valid_statuses(self):
assert VALID_STATUSES == {"draft", "in_review", "approved"}
def test_draft_can_go_to_in_review(self):
assert "in_review" in VALID_TRANSITIONS["draft"]
def test_draft_cannot_go_to_approved_directly(self):
assert "approved" not in VALID_TRANSITIONS["draft"]
def test_in_review_can_go_to_approved(self):
assert "approved" in VALID_TRANSITIONS["in_review"]
def test_in_review_can_go_back_to_draft(self):
assert "draft" in VALID_TRANSITIONS["in_review"]
def test_approved_can_go_back_to_draft(self):
assert "draft" in VALID_TRANSITIONS["approved"]
def test_approved_cannot_go_to_in_review(self):
assert "in_review" not in VALID_TRANSITIONS["approved"]
def test_all_statuses_have_transition_rules(self):
for status in VALID_STATUSES:
assert status in VALID_TRANSITIONS
def test_no_self_transitions(self):
for status, targets in VALID_TRANSITIONS.items():
assert status not in targets, f"{status} should not transition to itself"

View file

@ -0,0 +1,34 @@
"""Tests for SlideMappingEngine: block-to-layout type mapping."""
import pytest
from models.content_models import ContentBlock, ContentBlockType
from services.slide_mapping_engine import _BLOCK_TO_LAYOUT_TYPE
class TestBlockToLayoutMapping:
"""Verify every content block type has layout mappings defined."""
def test_all_block_types_have_mappings(self):
for bt in ContentBlockType:
assert bt in _BLOCK_TO_LAYOUT_TYPE, (
f"ContentBlockType.{bt.value} missing from _BLOCK_TO_LAYOUT_TYPE"
)
def test_metric_prefers_metrics_layout(self):
assert _BLOCK_TO_LAYOUT_TYPE[ContentBlockType.metric][0] == "metrics"
def test_quote_prefers_quote_layout(self):
assert _BLOCK_TO_LAYOUT_TYPE[ContentBlockType.quote][0] == "quote"
def test_table_prefers_table_layout(self):
assert _BLOCK_TO_LAYOUT_TYPE[ContentBlockType.table][0] == "table"
def test_timeline_prefers_timeline_layout(self):
assert _BLOCK_TO_LAYOUT_TYPE[ContentBlockType.timeline][0] == "timeline"
def test_every_mapping_has_content_fallback(self):
"""Each block type should have 'content' as a fallback layout."""
for bt, layouts in _BLOCK_TO_LAYOUT_TYPE.items():
assert "content" in layouts, (
f"ContentBlockType.{bt.value} has no 'content' fallback in layout mapping"
)

View file

@ -7,4 +7,12 @@ export default defineConfig({
bundler: "webpack",
},
},
e2e: {
baseUrl: "http://localhost:3000",
supportFile: "cypress/support/e2e.ts",
specPattern: "cypress/e2e/**/*.cy.{ts,tsx}",
viewportWidth: 1280,
viewportHeight: 720,
defaultCommandTimeout: 10000,
},
});

View file

@ -0,0 +1,30 @@
describe("Admin Panel", () => {
beforeEach(() => {
cy.devLogin("admin@oliver.com");
});
it("loads admin dashboard", () => {
cy.visit("/admin");
cy.contains("Admin").should("be.visible");
});
it("navigates to users page", () => {
cy.visit("/admin/users");
cy.contains("Users").should("be.visible");
});
it("navigates to clients page", () => {
cy.visit("/admin/clients");
cy.contains("Clients").should("be.visible");
});
it("navigates to analytics page", () => {
cy.visit("/admin/analytics");
cy.contains("Analytics").should("be.visible");
});
it("navigates to audit page", () => {
cy.visit("/admin/audit");
cy.contains("Audit").should("be.visible");
});
});

View file

@ -0,0 +1,25 @@
describe("Login Flow", () => {
it("shows login page when not authenticated", () => {
cy.visit("/dashboard");
// Should redirect to login
cy.url().should("include", "/login");
});
it("dev login succeeds and redirects to dashboard", () => {
cy.visit("/login");
cy.get('input[type="email"]').type("admin@oliver.com");
cy.get('input[type="password"]').type("devpass123");
cy.get('button[type="submit"]').click();
// Should redirect to dashboard after successful login
cy.url().should("include", "/dashboard");
});
it("dev login fails with wrong password", () => {
cy.visit("/login");
cy.get('input[type="email"]').type("admin@oliver.com");
cy.get('input[type="password"]').type("wrongpassword");
cy.get('button[type="submit"]').click();
// Should stay on login page
cy.url().should("include", "/login");
});
});

View file

@ -0,0 +1,15 @@
describe("Review Workflow", () => {
beforeEach(() => {
cy.devLogin("admin@oliver.com");
});
it("shows review status badge on presentation page", () => {
// Assume a presentation exists (created by seed or setup)
cy.createPresentation("Review Test Deck");
cy.visit("/dashboard");
// Click on the first presentation
cy.get("[class*='cursor-pointer']").first().click();
// Review badge should be visible
cy.contains(/draft|in review|approved/i).should("be.visible");
});
});

View file

@ -0,0 +1,32 @@
describe("Generation Wizard", () => {
beforeEach(() => {
cy.devLogin("admin@oliver.com");
});
it("navigates through wizard steps", () => {
cy.visit("/generate/upload");
// Step 1: Upload page loads
cy.contains("Upload Your Content").should("be.visible");
// Add brief text
cy.get("textarea").type("This is a test presentation about AI in healthcare.");
// Click Next
cy.contains("button", "Next").click();
// Step 2: Configure page
cy.url().should("include", "/generate/configure");
cy.contains("Slide Count").should("be.visible");
});
it("preserves wizard state via localStorage", () => {
cy.visit("/generate/upload");
cy.get("textarea").type("State persistence test");
// Reload page
cy.reload();
// Brief text should be restored from localStorage
cy.get("textarea").should("have.value", "State persistence test");
});
});

View file

@ -0,0 +1,39 @@
/// <reference types="cypress" />
// Custom commands for DeckForge E2E tests
Cypress.Commands.add("devLogin", (email = "admin@oliver.com") => {
cy.request("POST", "/api/v1/auth/dev-login", {
email,
password: Cypress.env("DEV_AUTH_PASSWORD") || "devpass123",
}).then((response) => {
const { token } = response.body;
window.localStorage.setItem("deckforge_token", token);
});
});
Cypress.Commands.add("createPresentation", (title = "E2E Test Deck") => {
const token = window.localStorage.getItem("deckforge_token");
cy.request({
method: "POST",
url: "/api/v1/ppt/presentation/create",
headers: { Authorization: `Bearer ${token}` },
body: {
content: "Test brief content for E2E testing.",
n_slides: 5,
language: "English",
title,
},
});
});
declare global {
namespace Cypress {
interface Chainable {
devLogin(email?: string): Chainable<void>;
createPresentation(title?: string): Chainable<void>;
}
}
}
export {};

View file

@ -7,7 +7,9 @@
"dev": "next dev ",
"build": "next build",
"start": "next start",
"lint": "next lint"
"lint": "next lint",
"test:e2e": "cypress run",
"test:e2e:open": "cypress open"
},
"dependencies": {
"@babel/standalone": "^7.28.2",