oliver-sales-ops-platform/backend/tests/test_notifications.py
DJP b8baf1a37e Tests: approval workflow + notifications (13 new, 46 total)
Extends the integration suite with coverage for the approval and
notification endpoints shipped in 3f7531a / f631dad.

test_approvals.py (9 tests):
- /api/users/me returns the dev-bypass user; /api/users includes them.
- Requesting approvals on a non-gated stage returns 400 naming the
  gated stages.
- Unknown approver_user_id returns 400.
- Happy path: request -> list -> /approvals/me -> blocked stage 3
  complete -> approve -> stage 3 completes, opportunity advances.
- Re-deciding the same approval returns 400.
- Reject path still blocks stage 3 complete with the same error pattern.
- Token deeplink path: GET /approvals/by-token/{token} returns the
  context; bogus token -> 404.
- Invalid decision string ('maybe') -> 400.

test_notifications.py (4 active + 1 skipped):
- APPROVAL_REQUESTED notification surfaced after request_approval.
- unread-count delta after mark-one-read.
- POST /me/mark-all-read zeros the count.
- Owner-notification path is skipped with a clear marker — the create
  endpoint doesn't yet accept owner_user_id and the service correctly
  skips self-notification when approver == owner.

Notes:
- Test harness reuses the existing `client` and `opportunity` fixtures.
- Cascade-delete (verified via 0001_initial.py / 0004_notifications.py)
  means deleting the opportunity in the fixture teardown also cleans
  approvals + notifications, so no extra teardown is needed.
- Email tokens aren't exposed in the API; the test reads them via
  psycopg2 against localhost:5435 (DSN overridable via OSOP_TEST_DSN).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:32:25 -04:00

123 lines
4.5 KiB
Python

"""In-app notification coverage.
Verifies that requesting an approval creates an APPROVAL_REQUESTED notification,
and that read/mark-all-read flows behave correctly.
Notifications cascade-delete with Opportunity (alembic 0004 — `ondelete=CASCADE`),
so the existing `opportunity` fixture's teardown cleans up any rows we create.
"""
from __future__ import annotations
import httpx
import pytest
async def _walk_to_stage_3(client: httpx.AsyncClient, opp_id: int) -> None:
r = await client.post(f"/opportunities/{opp_id}/stages/1/complete")
assert r.status_code == 200, r.text
r = await client.post(f"/opportunities/{opp_id}/stages/2/complete")
assert r.status_code == 200, r.text
async def _request_approval(
client: httpx.AsyncClient, opp_id: int, approver_id: int
) -> int:
r = await client.post(
f"/opportunities/{opp_id}/stages/3/approvals",
json={"requests": [{"role_required": "Director", "approver_user_id": approver_id}]},
)
assert r.status_code == 200, r.text
return r.json()[0]["id"]
async def test_approval_request_creates_notification(
client: httpx.AsyncClient, opportunity
):
opp_id = opportunity["id"]
me_id = (await client.get("/users/me")).json()["id"]
await _walk_to_stage_3(client, opp_id)
approval_id = await _request_approval(client, opp_id, me_id)
r = await client.get("/notifications/me")
assert r.status_code == 200, r.text
notifs = r.json()
matching = [
n for n in notifs
if n["related_approval_id"] == approval_id
and n["type"] == "approval_requested"
]
assert matching, (
f"expected an approval_requested notification for approval {approval_id}; "
f"got {[(n['type'], n['related_approval_id']) for n in notifs]}"
)
n = matching[0]
assert n["read"] is False
assert n["related_opportunity_id"] == opp_id
async def test_unread_count_reflects_new_notification(
client: httpx.AsyncClient, opportunity
):
opp_id = opportunity["id"]
me_id = (await client.get("/users/me")).json()["id"]
before = (await client.get("/notifications/me/unread-count")).json()["count"]
await _walk_to_stage_3(client, opp_id)
await _request_approval(client, opp_id, me_id)
after = (await client.get("/notifications/me/unread-count")).json()["count"]
assert after >= before + 1, f"unread count should grow by at least 1: {before} -> {after}"
async def test_mark_one_read(client: httpx.AsyncClient, opportunity):
opp_id = opportunity["id"]
me_id = (await client.get("/users/me")).json()["id"]
await _walk_to_stage_3(client, opp_id)
approval_id = await _request_approval(client, opp_id, me_id)
notifs = (await client.get("/notifications/me")).json()
target = next(n for n in notifs if n["related_approval_id"] == approval_id)
assert target["read"] is False
before = (await client.get("/notifications/me/unread-count")).json()["count"]
r = await client.put(f"/notifications/{target['id']}/read")
assert r.status_code == 200, r.text
body = r.json()
assert body["read"] is True
assert body["read_at"] is not None
after = (await client.get("/notifications/me/unread-count")).json()["count"]
assert after == before - 1, f"count should drop by 1: {before} -> {after}"
async def test_mark_all_read_zeroes_count(client: httpx.AsyncClient, opportunity):
opp_id = opportunity["id"]
me_id = (await client.get("/users/me")).json()["id"]
await _walk_to_stage_3(client, opp_id)
await _request_approval(client, opp_id, me_id)
# Fire mark-all-read
r = await client.post("/notifications/me/mark-all-read")
assert r.status_code == 200, r.text
assert "marked_read" in r.json()
# Count should now be zero
count = (await client.get("/notifications/me/unread-count")).json()["count"]
assert count == 0, f"after mark-all-read, count should be 0, got {count}"
@pytest.mark.skip(
reason="owner notification path not testable until OpportunityCreate accepts owner_user_id "
"(dev bypass user is currently both the approver AND would-be owner; service deliberately skips "
"self-notification)"
)
async def test_owner_notified_on_approval_decision():
"""When a non-owner approves, the opportunity owner should receive an
APPROVAL_APPROVED notification. Currently un-testable in dev because:
1. The create-opportunity endpoint doesn't accept owner_user_id.
2. There's only one AppUser (dev@localhost), and the service skips
self-notification (`opp.owner_user_id != approval.approver_user_id`).
"""