Backend: - conftest with async SQLite DB, factory fixtures for all models - pytest-asyncio config in pyproject.toml - Tests: auth (JWT, dev login), RBAC (access service), audit (query, export), brand enforcement (colors, fonts, logos, contrast), retention (cleanup, purge), content intelligence (regex classifiers), slide mapping, review workflow, analytics data queries Frontend: - Cypress E2E config with baseUrl and viewport settings - Custom commands (devLogin, createPresentation) - E2E specs: login flow, wizard navigation, admin panel, review workflow - Test scripts in package.json Infrastructure: - Makefile: test-e2e and test-all targets Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
99 lines
4 KiB
Python
99 lines
4 KiB
Python
"""Tests for analytics endpoints: verify query logic returns correct aggregations."""
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
|
|
from models.sql.job import JobModel
|
|
from models.sql.presentation import PresentationModel
|
|
|
|
|
|
class TestAnalyticsData:
|
|
"""Test that analytics data can be correctly queried from our models."""
|
|
|
|
async def test_presentation_count(self, session, make_client, make_user, make_presentation):
|
|
user = await make_user(email="ana@test.com")
|
|
client = await make_client(name="Ana Corp", slug="ana-corp")
|
|
|
|
await make_presentation(owner_id=user.id, client_id=client.id, title="Deck 1")
|
|
await make_presentation(owner_id=user.id, client_id=client.id, title="Deck 2")
|
|
await make_presentation(owner_id=user.id, client_id=client.id, title="Deck 3")
|
|
|
|
from sqlmodel import select, func
|
|
result = await session.execute(
|
|
select(func.count()).select_from(PresentationModel).where(
|
|
PresentationModel.client_id == client.id,
|
|
PresentationModel.deleted_at.is_(None),
|
|
)
|
|
)
|
|
count = result.scalar()
|
|
assert count == 3
|
|
|
|
async def test_status_distribution(self, session, make_client, make_user, make_presentation):
|
|
user = await make_user(email="dist@test.com")
|
|
client = await make_client(name="Dist Corp", slug="dist-corp")
|
|
|
|
await make_presentation(owner_id=user.id, client_id=client.id, status="draft")
|
|
await make_presentation(owner_id=user.id, client_id=client.id, status="draft")
|
|
await make_presentation(owner_id=user.id, client_id=client.id, status="in_review")
|
|
await make_presentation(owner_id=user.id, client_id=client.id, status="approved")
|
|
|
|
from sqlmodel import select, func
|
|
result = await session.execute(
|
|
select(
|
|
PresentationModel.status,
|
|
func.count().label("cnt")
|
|
).where(
|
|
PresentationModel.client_id == client.id,
|
|
PresentationModel.deleted_at.is_(None),
|
|
).group_by(PresentationModel.status)
|
|
)
|
|
dist = {row[0]: row[1] for row in result.all()}
|
|
assert dist["draft"] == 2
|
|
assert dist["in_review"] == 1
|
|
assert dist["approved"] == 1
|
|
|
|
async def test_job_status_distribution(
|
|
self, session, make_client, make_user, make_job
|
|
):
|
|
user = await make_user(email="job@test.com")
|
|
client = await make_client(name="Job Corp", slug="job-corp")
|
|
|
|
await make_job(user_id=user.id, client_id=client.id, status="completed")
|
|
await make_job(user_id=user.id, client_id=client.id, status="completed")
|
|
await make_job(user_id=user.id, client_id=client.id, status="failed")
|
|
|
|
from sqlmodel import select, func
|
|
result = await session.execute(
|
|
select(
|
|
JobModel.status,
|
|
func.count().label("cnt")
|
|
).where(
|
|
JobModel.client_id == client.id
|
|
).group_by(JobModel.status)
|
|
)
|
|
dist = {row[0]: row[1] for row in result.all()}
|
|
assert dist["completed"] == 2
|
|
assert dist["failed"] == 1
|
|
|
|
async def test_deleted_presentations_excluded(
|
|
self, session, make_client, make_user, make_presentation
|
|
):
|
|
user = await make_user(email="del@test.com")
|
|
client = await make_client(name="Del Corp", slug="del-corp")
|
|
|
|
await make_presentation(owner_id=user.id, client_id=client.id, title="Active")
|
|
await make_presentation(
|
|
owner_id=user.id, client_id=client.id, title="Deleted",
|
|
deleted_at=datetime.now(timezone.utc),
|
|
)
|
|
|
|
from sqlmodel import select, func
|
|
result = await session.execute(
|
|
select(func.count()).select_from(PresentationModel).where(
|
|
PresentationModel.client_id == client.id,
|
|
PresentationModel.deleted_at.is_(None),
|
|
)
|
|
)
|
|
count = result.scalar()
|
|
assert count == 1
|