feat(api): L-18 blocked-on-source, PR-10 promote-to-qc, R-12 reviewed_cues reset

- POST /{job_id}/actions/blocked_on_source (L-18): linguist/reviewer flags a source
  video issue; moves job to QC_FEEDBACK and records blocked_on_source_reason/at/by
- POST /{job_id}/actions/promote_to_qc (PR-10): production/admin manually bypasses
  AI processing for edge-case failures; adds audit history entry
- Reset reviewed_cues to 0 on submit_for_review (R-12) so reviewer must re-acknowledge
  all cues after each linguist resubmit
- Add assert_job_in_user_org + get_user_org_ids to core/dependencies.py (used by
  the new endpoints and the cross-tenant isolation test suite)
- Remove unused ingest_and_ai_task / translate_and_synthesize_task imports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-30 10:38:39 +01:00
parent ff372c7322
commit 3f557724d3
4 changed files with 187 additions and 2 deletions

View file

@ -21,6 +21,7 @@ from ...core.config import settings
from ...core.database import get_database
from ...core.authz import MembershipContext, get_job_or_403, get_membership_context
from ...core.dependencies import (
assert_job_in_user_org,
get_accessible_project_ids,
get_current_user,
require_roles,
@ -43,6 +44,7 @@ from ...schemas.job import (
ApproveEnglishRequest,
ApproveSourceRequest,
AssetValidationResponse,
BlockedOnSourceRequest,
BulkApproveRequest,
BulkApproveResponse,
BulkDeleteRequest,
@ -56,6 +58,7 @@ from ...schemas.job import (
JobListResponse,
JobResponse,
JobUpdateRequest,
PromoteToQCRequest,
RejectJobRequest,
ReturnToQCRequest,
UpdateTTSPreferencesRequest,
@ -75,8 +78,6 @@ from ...services.validation import asset_validation_service
from ...services.websocket import connection_manager
from ...services.zip_download import generate_zip_filename, generate_zip_stream
from ...tasks import celery_app
from ...tasks.ingest_and_ai import ingest_and_ai_task
from ...tasks.translate_and_synthesize import translate_and_synthesize_task
logger = get_logger(__name__)
router = APIRouter(prefix="/jobs", tags=["jobs"])
@ -1093,6 +1094,120 @@ async def return_to_qc(
)
@router.post("/{job_id}/actions/blocked_on_source", response_model=JobResponse)
async def blocked_on_source(
job_id: str,
request: BlockedOnSourceRequest,
http_request: Request,
current_user: User = Depends(require_roles(
UserRole.LINGUIST, UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN,
)),
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Linguist/Reviewer flags that the source video has an issue requiring PM attention (L-18).
Transitions job to QC_FEEDBACK and notifies the PM."""
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
await assert_job_in_user_org(job_doc, current_user, db)
now = datetime.utcnow()
result = await db.jobs.find_one_and_update(
{"_id": job_id, "status": {"$in": [
JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value,
]}},
{"$set": {
"status": JobStatus.QC_FEEDBACK.value,
"review.notes": f"[BLOCKED ON SOURCE] {request.reason}",
"blocked_on_source_reason": request.reason,
"blocked_on_source_at": now,
"blocked_on_source_by": str(current_user.id),
"updated_at": now,
}},
return_document=True,
)
if not result:
raise HTTPException(status_code=400, detail="Job is not in a QC status")
await log_job_action(
AuditAction.JOB_STATUS_CHANGE, job_id, current_user, http_request,
details={"new_status": JobStatus.QC_FEEDBACK.value, "reason": request.reason, "action": "blocked_on_source"},
)
return JobResponse(
id=str(result["_id"]),
title=result["title"],
status=result["status"],
source=result["source"],
requested_outputs=RequestedOutputs(**result["requested_outputs"]),
review=result.get("review", {"notes": "", "history": []}),
outputs=result.get("outputs"),
created_at=result["created_at"].isoformat(),
updated_at=result["updated_at"].isoformat(),
)
@router.post("/{job_id}/actions/promote_to_qc", response_model=JobResponse)
async def promote_to_qc(
job_id: str,
request: PromoteToQCRequest,
http_request: Request,
current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)),
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Production/Admin manually promotes a job to PENDING_QC, bypassing AI processing (PR-10).
Used when AI fails on an edge-case and Production wants to proceed manually."""
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
await assert_job_in_user_org(job_doc, current_user, db)
promotable_statuses = [
JobStatus.AI_PROCESSING.value,
JobStatus.INGESTING.value,
JobStatus.PROCESSING_FAILED.value,
JobStatus.TTS_FAILED.value,
JobStatus.RENDER_FAILED.value,
]
now = datetime.utcnow()
result = await db.jobs.find_one_and_update(
{"_id": job_id, "status": {"$in": promotable_statuses}},
{"$set": {
"status": JobStatus.PENDING_QC.value,
"updated_at": now,
},
"$push": {
"review.history": {
"at": now,
"status": JobStatus.PENDING_QC.value,
"by": str(current_user.id),
"notes": request.notes or "Manually promoted to QC",
}
}},
return_document=True,
)
if not result:
raise HTTPException(
status_code=400,
detail=f"Job cannot be promoted to QC from its current status. Must be one of: {', '.join(promotable_statuses)}",
)
await log_job_action(
AuditAction.JOB_STATUS_CHANGE, job_id, current_user, http_request,
details={"new_status": JobStatus.PENDING_QC.value, "notes": request.notes, "action": "promote_to_qc"},
)
return JobResponse(
id=str(result["_id"]),
title=result["title"],
status=result["status"],
source=result["source"],
requested_outputs=RequestedOutputs(**result["requested_outputs"]),
review=result.get("review", {"notes": "", "history": []}),
outputs=result.get("outputs"),
created_at=result["created_at"].isoformat(),
updated_at=result["updated_at"].isoformat(),
)
@router.get("/{job_id}/downloads", response_model=JobDownloadsResponse)
async def get_job_downloads(
job_id: str,

View file

@ -170,6 +170,67 @@ async def get_accessible_project_ids(
return []
async def get_user_org_ids(user: User, db: AsyncIOMotorDatabase) -> list[str] | None:
"""Return org IDs the user belongs to, or None meaning unrestricted (ADMIN).
Priority: memberships pm_client_ids (PM legacy) team.member_user_ids (staff legacy)
"""
if user.role == UserRole.ADMIN:
return None
user_id = str(user.id)
# Primary: Membership collection
org_ids: list[str] = []
async for m in db.memberships.find({"user_id": user_id}, {"organization_id": 1}):
if m.get("organization_id"):
org_ids.append(str(m["organization_id"]))
if org_ids:
return org_ids
# PM legacy: pm_client_ids
if user.role == UserRole.PROJECT_MANAGER:
return list(user.pm_client_ids or [])
# Staff legacy: team.member_user_ids
teams = await db.teams.find({"member_user_ids": user_id}, {"client_id": 1}).to_list(None)
if teams:
return [str(t["client_id"]) for t in teams if t.get("client_id")]
return []
async def assert_job_in_user_org(job: dict, user: User, db: AsyncIOMotorDatabase) -> None:
"""Raise 404 (not 403) when user cannot access this job — avoids information disclosure."""
if user.role == UserRole.ADMIN:
return
org_ids = await get_user_org_ids(user, db)
if org_ids is None:
return # unrestricted
job_org = job.get("organization_id")
if job_org:
if job_org in org_ids:
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
# No organization_id — try project fallback
project_id = job.get("project_id")
if project_id:
project = await db.projects.find_one({"_id": project_id}, {"client_id": 1})
if project and project.get("client_id") in org_ids:
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
# Legacy: client_id == creator user_id
job_client_id = job.get("client_id")
if job_client_id and job_client_id == str(user.id):
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
def require_pm_for_client(client_id_param: str = "client_id"):
"""Dependency: ensures the current user is an Admin or PM for the given client."""
async def checker(

View file

@ -149,3 +149,11 @@ class BulkReturnToQCResponse(BaseModel):
class BulkDownloadRequest(BaseModel):
"""Request to download multiple jobs as a single zip file"""
job_ids: list[str]
class BlockedOnSourceRequest(BaseModel):
reason: str # brief description of what is wrong with the source video
class PromoteToQCRequest(BaseModel):
notes: str = "" # optional context for the QC team

View file

@ -515,6 +515,7 @@ async def submit_for_review(
**(current_state_raw if isinstance(current_state_raw, dict) else {}),
"status": LanguageQCStatus.PENDING_REVIEW.value,
"submitted_for_review_at": now,
"reviewed_cues": 0, # R-12: reviewer must re-acknowledge cues after each resubmit
"history": history,
}