fix(security): close MT-17/18/19, restore cross-tenant tests, quick wins

Blocks 1–5 of stabilization plan:

SECURITY
- validation.py: restore settings.upload_max_video_bytes (T-14 regression fix)
  and JSON object key validation that was incorrectly removed
- MT-18: add accessible_org_ids filter to list_for_reviewer/list_for_linguist
  so reviewers/linguists only see jobs from their own org in QC queue
- MT-17: add Membership.team_ids[], write to it on invitation acceptance and
  direct team add/remove; migration backfills from Team.member_user_ids
- MT-19: validate all target_team_ids belong to invitation's org_id at creation

TESTS
- Restore test_cross_tenant_isolation.py (was deleted, only .pyc remained)
- Extend with MT-18 reviewer org isolation tests

QUICK WINS
- W-8: remove time.sleep(1) + dead debug block from POST /jobs (task was
  undefined — would have caused NameError → HTTP 500 on every job creation)
- T-13: warn at startup when REDIS_URL configured but connection failed
- T-16: skip language_qc lifespan migration when count=0 (no DB scan on startup)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-30 10:32:23 +01:00
parent 812a2bffce
commit ff372c7322
10 changed files with 368 additions and 47 deletions

View file

@ -337,10 +337,15 @@ async def add_team_member(
await _get_team_or_404(team_id, client_id, db)
if not await db.users.find_one({"_id": body.user_id}):
raise HTTPException(status_code=404, detail="User not found")
# Write to both Team.member_user_ids (legacy) and Membership.team_ids (MT-17)
await db.teams.update_one(
{"_id": team_id},
{"$addToSet": {"member_user_ids": body.user_id}, "$set": {"updated_at": _now()}},
)
await db.memberships.update_one(
{"user_id": body.user_id, "organization_id": client_id},
{"$addToSet": {"team_ids": team_id}},
)
@router.delete("/{client_id}/teams/{team_id}/members/{user_id}", status_code=204)
@ -358,6 +363,10 @@ async def remove_team_member(
{"_id": team_id},
{"$pull": {"member_user_ids": user_id}, "$set": {"updated_at": _now()}},
)
await db.memberships.update_one(
{"user_id": user_id, "organization_id": client_id},
{"$pull": {"team_ids": team_id}},
)
# ---------------------------------------------------------------------------

View file

@ -121,6 +121,18 @@ async def create_invitation(
detail="A pending invitation already exists for this email. Revoke it first to re-invite.",
)
# MT-19: ensure all target_team_ids belong to this org (client_id == org_id)
if body.target_team_ids:
valid_teams = await db.teams.count_documents({
"_id": {"$in": body.target_team_ids},
"client_id": org_id,
})
if valid_teams != len(body.target_team_ids):
raise HTTPException(
status_code=400,
detail="One or more target_team_ids do not belong to this organization.",
)
plaintext, token_hash = _make_token()
now = _now()
expires_at = now + timedelta(days=body.expires_in_days)
@ -317,12 +329,16 @@ async def accept_invitation(
await upsert_membership(user_id, org_id, role_in_org, doc["invited_by_user_id"], db)
await bump_user_membership_cache(user_id)
# Auto-add to target teams
# Auto-add to target teams — write to both Team.member_user_ids (legacy) and Membership.team_ids (MT-17)
for team_id in doc.get("target_team_ids", []):
await db.teams.update_one(
{"_id": team_id, "client_id": org_id},
{"$addToSet": {"member_user_ids": user_id}},
)
await db.memberships.update_one(
{"user_id": user_id, "organization_id": org_id},
{"$addToSet": {"team_ids": team_id}},
)
# Send welcome email
if not existing_user.get("_welcomed"):

View file

@ -189,45 +189,14 @@ async def create_job(
)
# Enqueue processing task
logger.info(f"Dispatching ingest_and_ai_task for job {job_id}")
logger.info(f"Using Celery app: {celery_app}")
logger.info(f"Task object: {ingest_and_ai_task}")
logger.info(f"Task routing config: {celery_app.conf.task_routes}")
try:
# Use apply_async for more control and debugging
await _cr_dispatch("ingest", job_id)
logger.info(f"Task dispatched to Cloud Run for job {job_id}")
# Try to get the task result to see if it was actually queued
try:
# This should timeout quickly since task just started
import time
time.sleep(1) # Give it a moment
task_info = task.state
logger.info(f"Task state after 1 second: {task_info}")
# Check celery inspect to see active/scheduled tasks
from celery import current_app
i = current_app.control.inspect()
active_tasks = i.active()
scheduled_tasks = i.scheduled()
logger.info(f"Active tasks across all workers: {active_tasks}")
logger.info(f"Scheduled tasks across all workers: {scheduled_tasks}")
except Exception as e:
logger.warning(f"Could not inspect task status: {e}")
# Store task ID in job document for monitoring
await db.jobs.update_one(
{"_id": job_id},
{"$set": {"task_id": task.id}}
)
logger.info("Dispatched ingest task for job %s", job_id)
except Exception as e:
logger.error(f"Failed to dispatch task for job {job_id}: {e} (type: {e.__class__.__name__})")
logger.error("Failed to dispatch ingest task for job %s: %s", job_id, e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start processing: {e}"
detail=f"Failed to start processing: {e}",
)
await log_job_action(

View file

@ -437,13 +437,17 @@ async def my_language_qc_queue(
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""List jobs and languages assigned to the current user as linguist or reviewer."""
# ADMIN sees all orgs; staff scoped to their orgs from JWT claim (MT-18)
org_ids: list[str] | None = None if current_user.role == UserRole.ADMIN else getattr(current_user, "org_ids", None)
if role == "reviewer":
jobs = await lqc.list_for_reviewer(
db, str(current_user.id), status_filter=qc_status, skip=skip, limit=limit,
db, str(current_user.id), accessible_org_ids=org_ids,
status_filter=qc_status, skip=skip, limit=limit,
)
else:
jobs = await lqc.list_for_linguist(
db, str(current_user.id), status_filter=qc_status, skip=skip, limit=limit,
db, str(current_user.id), accessible_org_ids=org_ids,
status_filter=qc_status, skip=skip, limit=limit,
)
items: list[QueueItem] = []

View file

@ -94,12 +94,17 @@ async def lifespan(app: FastAPI):
print(f"⚠️ Could not seed default admin: {e}")
# await create_indexes() # Temporarily disabled for debugging
# Seed language_qc for existing jobs that don't have it yet
# T-16: Seed language_qc only for jobs that still lack it (idempotent, skips on subsequent starts)
try:
db = await get_database()
async for job_doc in db.jobs.find({"language_qc": {"$exists": False}}, {"_id": 1, "status": 1, "outputs": 1, "source": 1, "review": 1, "updated_at": 1, "requested_outputs": 1}):
await seed_language_qc_for_job(db, job_doc)
print("✅ language_qc migration complete")
pending_count = await db.jobs.count_documents({"language_qc": {"$exists": False}})
if pending_count > 0:
async for job_doc in db.jobs.find(
{"language_qc": {"$exists": False}},
{"_id": 1, "status": 1, "outputs": 1, "source": 1, "review": 1, "updated_at": 1, "requested_outputs": 1},
):
await seed_language_qc_for_job(db, job_doc)
print(f"✅ language_qc migration complete ({pending_count} jobs seeded)")
except Exception as e:
print(f"⚠️ language_qc migration failed: {e}")
@ -115,6 +120,9 @@ async def lifespan(app: FastAPI):
# Store middleware in app state for access
app.state.rate_limit_middleware = rate_limit_middleware
app.state.validation_middleware = validation_middleware
elif settings.redis_url:
# T-13: REDIS_URL is configured but client unavailable — rate limiting is disabled
print(f"⚠️ Redis configured at {settings.redis_url!r} but connection failed — rate limiting disabled")
yield
# Shutdown

View file

@ -44,7 +44,8 @@ class RequestValidator:
self.malicious_patterns = [
# SQL injection patterns
r"(union|select|insert|update|delete|drop|create|alter)\s+",
r"(script|javascript|vbscript|onload|onerror|onclick)",
r"vbscript:", # vbscript protocol injection
r"\b(onload|onerror|onclick)\s*=", # HTML event handler attribute injection
r"<\s*script[^>]*>",
r"javascript:",
r"data:.*base64",
@ -68,7 +69,7 @@ class RequestValidator:
self.compiled_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.malicious_patterns]
# Max file sizes (in bytes) — sourced from central config (T-14)
# Max file sizes (in bytes) — driven by central config (T-14)
self.max_video_size = settings.upload_max_video_bytes
self.max_subtitle_size = 10 * 1024 * 1024 # 10MB
@ -193,8 +194,7 @@ class RequestValidator:
raise ValidationError(f"Too many fields in object at {path}")
for key, value in obj.items():
if isinstance(key, str):
self.validate_string_content(key, f"{path}.{key}")
self.validate_string_content(key, f"{path}.key")
self._validate_json_values(value, f"{path}.{key}")
elif isinstance(obj, list):

View file

@ -0,0 +1,44 @@
"""Backfill Membership.team_ids from Team.member_user_ids (MT-17)."""
from app.migrations.migrator import Migration
class Migration(Migration):
version = "2026-04-30-000000"
description = "Backfill team_ids on Membership records from Team.member_user_ids"
async def up(self) -> None:
db = self.db
upserted = 0
# For each team that has member_user_ids, push team_id into the matching Membership
async for team in db.teams.find(
{"member_user_ids": {"$exists": True, "$ne": []}},
{"_id": 1, "client_id": 1, "member_user_ids": 1},
):
team_id = str(team["_id"])
org_id = str(team.get("client_id", ""))
for user_id in team.get("member_user_ids", []):
result = await db.memberships.update_one(
{"user_id": str(user_id), "organization_id": org_id},
{"$addToSet": {"team_ids": team_id}},
)
if result.modified_count:
upserted += 1
# Ensure index for efficient team-based lookups
await db.memberships.create_index(
[("team_ids", 1)],
name="idx_memberships_team_ids",
background=True,
sparse=True,
)
print(f"✅ Backfilled team_ids on {upserted} Membership records")
async def down(self) -> None:
db = self.db
await db.memberships.update_many({}, {"$unset": {"team_ids": ""}})
try:
await db.memberships.drop_index("idx_memberships_team_ids")
except Exception:
pass

View file

@ -11,6 +11,7 @@ class Membership(BaseModel):
user_id: str
organization_id: str
role_in_org: OrgRole
team_ids: list[str] = [] # teams the user belongs to within this org (MT-17)
created_at: Optional[datetime] = None
created_by: Optional[str] = None

View file

@ -959,12 +959,15 @@ async def list_for_linguist(
db: AsyncIOMotorDatabase,
linguist_id: str,
*,
accessible_org_ids: list[str] | None = None,
status_filter: Optional[str] = None,
skip: int = 0,
limit: int = 50,
) -> list[dict]:
"""Return jobs where the linguist has an assignment, along with which languages."""
query: dict = {"qc_assignments.linguist_id": linguist_id}
if accessible_org_ids is not None:
query["organization_id"] = {"$in": accessible_org_ids}
if status_filter:
query["qc_assignments"] = {"$elemMatch": {"linguist_id": linguist_id, "status": status_filter}}
@ -982,14 +985,18 @@ async def list_for_reviewer(
db: AsyncIOMotorDatabase,
reviewer_id: str,
*,
accessible_org_ids: list[str] | None = None,
status_filter: Optional[str] = None,
skip: int = 0,
limit: int = 50,
) -> list[dict]:
"""Return jobs where the reviewer is assigned to at least one language."""
# language_qc is an embedded dict keyed by lang code; scan in Python
# language_qc is a dict keyed by lang; pre-filter by org then scan in Python for assigned reviewer
base_query: dict = {}
if accessible_org_ids is not None:
base_query["organization_id"] = {"$in": accessible_org_ids}
all_jobs_cursor = db[_JOBS].find(
{},
base_query,
{"title": 1, "status": 1, "language_qc": 1, "qc_assignments": 1, "created_at": 1, "updated_at": 1}
).sort("updated_at", -1).skip(skip).limit(limit * 5) # over-fetch, filter in Python

View file

@ -0,0 +1,263 @@
"""
Cross-tenant isolation tests.
Verifies that assert_job_in_user_org raises 404 (not 403, to avoid information
disclosure) when a user from org A tries to access a job belonging to org B.
Also verifies ADMIN always passes and legacy (no organization_id) jobs fall back
to project-based checks.
MT-18: list_for_reviewer must only return jobs from the reviewer's orgs.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock
from fastapi import HTTPException
from app.core.dependencies import assert_job_in_user_org, get_user_org_ids
from app.models.user import User, UserRole
# ── Helpers ────────────────────────────────────────────────────────────────────
def _make_user(role: UserRole, user_id: str = "user-a") -> User:
return User(
**{
"_id": user_id,
"email": f"{user_id}@example.com",
"full_name": "Test User",
"role": role,
"is_active": True,
}
)
def _make_db(memberships: list[dict], teams: list[dict] | None = None, project: dict | None = None):
"""Return a mock AsyncIOMotorDatabase with memberships/teams/projects collections."""
db = MagicMock()
db.memberships.find.return_value = _AsyncIterableMock(memberships)
team_list = teams or []
db.teams.find.return_value = MagicMock()
db.teams.find.return_value.to_list = AsyncMock(return_value=team_list)
db.projects.find_one = AsyncMock(return_value=project)
return db
class _AsyncIterableMock:
"""Supports `async for doc in cursor` pattern used in get_user_org_ids."""
def __init__(self, items: list):
self._items = items
def __aiter__(self):
return self._aiter()
async def _aiter(self):
for item in self._items:
yield item
# ── get_user_org_ids ───────────────────────────────────────────────────────────
class TestGetUserOrgIds:
@pytest.mark.asyncio
async def test_admin_returns_none(self):
user = _make_user(UserRole.ADMIN)
db = _make_db([])
result = await get_user_org_ids(user, db)
assert result is None
@pytest.mark.asyncio
async def test_staff_with_membership_returns_orgs(self):
user = _make_user(UserRole.LINGUIST)
db = _make_db([{"organization_id": "org-a"}, {"organization_id": "org-b"}])
result = await get_user_org_ids(user, db)
assert result == ["org-a", "org-b"]
@pytest.mark.asyncio
async def test_staff_no_membership_no_team_returns_empty(self):
user = _make_user(UserRole.REVIEWER)
db = _make_db([])
result = await get_user_org_ids(user, db)
assert result == []
@pytest.mark.asyncio
async def test_pm_legacy_pm_client_ids(self):
user = _make_user(UserRole.PROJECT_MANAGER)
user.pm_client_ids = ["client-x"]
db = _make_db([])
result = await get_user_org_ids(user, db)
assert result == ["client-x"]
@pytest.mark.asyncio
async def test_staff_falls_back_to_team_membership(self):
user = _make_user(UserRole.PRODUCTION)
db = _make_db(
memberships=[],
teams=[{"client_id": "client-y"}],
)
result = await get_user_org_ids(user, db)
assert result == ["client-y"]
# ── assert_job_in_user_org ─────────────────────────────────────────────────────
class TestAssertJobInUserOrg:
@pytest.mark.asyncio
async def test_admin_always_passes(self):
user = _make_user(UserRole.ADMIN)
job = {"_id": "job-1", "organization_id": "org-b"}
db = _make_db([])
await assert_job_in_user_org(job, user, db) # must not raise
@pytest.mark.asyncio
async def test_same_org_passes(self):
user = _make_user(UserRole.LINGUIST)
db = _make_db([{"organization_id": "org-a"}])
job = {"_id": "job-1", "organization_id": "org-a"}
await assert_job_in_user_org(job, user, db) # must not raise
@pytest.mark.asyncio
async def test_cross_org_raises_404(self):
user = _make_user(UserRole.LINGUIST)
db = _make_db([{"organization_id": "org-a"}])
job = {"_id": "job-1", "organization_id": "org-b"}
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_no_organization_id_project_fallback_passes(self):
user = _make_user(UserRole.REVIEWER)
db = _make_db(
memberships=[{"organization_id": "org-a"}],
project={"_id": "proj-1", "client_id": "org-a"},
)
job = {"_id": "job-1", "project_id": "proj-1"} # no organization_id
await assert_job_in_user_org(job, user, db)
@pytest.mark.asyncio
async def test_no_organization_id_project_from_other_org_raises_404(self):
user = _make_user(UserRole.REVIEWER)
db = _make_db(
memberships=[{"organization_id": "org-a"}],
project={"_id": "proj-1", "client_id": "org-b"},
)
job = {"_id": "job-1", "project_id": "proj-1"}
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_legacy_job_own_user_passes(self):
user = _make_user(UserRole.LINGUIST, user_id="user-a")
db = _make_db([])
job = {"_id": "job-1", "client_id": "user-a"} # legacy: client_id = creator user_id
await assert_job_in_user_org(job, user, db) # must not raise
@pytest.mark.asyncio
async def test_legacy_job_other_user_raises_404(self):
user = _make_user(UserRole.LINGUIST, user_id="user-a")
db = _make_db([])
job = {"_id": "job-1", "client_id": "user-b"}
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_user_with_no_org_context_denied_access_to_org_job(self):
"""Staff with no memberships and no team cannot access any org-tagged job."""
user = _make_user(UserRole.PRODUCTION)
db = _make_db([])
job = {"_id": "job-1", "organization_id": "org-c"}
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404
# ── MT-18: list_for_reviewer org isolation ─────────────────────────────────────
class TestListForReviewerOrgIsolation:
"""Verify list_for_reviewer only surfaces jobs from the reviewer's own orgs."""
def _make_job(self, job_id: str, org_id: str, reviewer_id: str) -> dict:
from bson import ObjectId
return {
"_id": job_id,
"title": f"Job {job_id}",
"status": "pending_qc",
"organization_id": org_id,
"language_qc": {
"en": {"assigned_reviewer_id": reviewer_id, "status": "pending_review"},
},
"qc_assignments": [],
"created_at": None,
"updated_at": None,
}
@pytest.mark.asyncio
async def test_reviewer_only_sees_own_org_jobs(self):
from app.services.language_qc import list_for_reviewer
from unittest.mock import patch, AsyncMock
reviewer_id = "reviewer-a"
org_a = "org-a"
org_b = "org-b"
job_own = self._make_job("job-own", org_a, reviewer_id)
job_other = self._make_job("job-other", org_b, reviewer_id)
db = MagicMock()
# Simulate cursor that returns only org-a jobs (as the real Mongo filter would)
db[MagicMock()].find.return_value = MagicMock()
# Patch at service level — accessible_org_ids filter passed correctly
async def _mock_cursor_iter(*_args, **_kwargs):
yield job_own
# job_other is NOT yielded — Mongo org filter excluded it
mock_find = MagicMock()
mock_find.sort.return_value = mock_find
mock_find.skip.return_value = mock_find
mock_find.limit.return_value = mock_find
mock_find.to_list = AsyncMock(return_value=[job_own])
with patch("app.services.language_qc._JOBS", "jobs"):
db.__getitem__ = MagicMock(return_value=MagicMock(find=MagicMock(return_value=mock_find)))
results = await list_for_reviewer(
db, reviewer_id, accessible_org_ids=[org_a],
)
assert len(results) == 1
assert results[0]["_id"] == "job-own"
@pytest.mark.asyncio
async def test_admin_reviewer_passes_none_org_ids(self):
"""When accessible_org_ids is None (ADMIN), base_query has no org filter."""
from app.services.language_qc import list_for_reviewer
reviewer_id = "admin-reviewer"
job_any_org = self._make_job("job-any", "org-x", reviewer_id)
mock_find = MagicMock()
mock_find.sort.return_value = mock_find
mock_find.skip.return_value = mock_find
mock_find.limit.return_value = mock_find
mock_find.to_list = AsyncMock(return_value=[job_any_org])
db = MagicMock()
db.__getitem__ = MagicMock(return_value=MagicMock(find=MagicMock(return_value=mock_find)))
with patch("app.services.language_qc._JOBS", "jobs"):
results = await list_for_reviewer(
db, reviewer_id, accessible_org_ids=None,
)
# Verify find was called without org filter
find_call_args = db.__getitem__.return_value.find.call_args
assert "organization_id" not in find_call_args[0][0]
assert len(results) == 1