video-accessibility/backend/app/services/language_qc.py
Vadym Samoilenko 05f25a1141 feat: per-language QC workflow with linguist assignment
- Job.language_qc dict tracks per-language status (pending/in_review/approved/rejected)
  with full event history; qc_assignments denormalized array enables efficient queue queries
- language_qc service handles assign/reassign/approve/reject/reopen with atomic DB updates,
  audit logging, and auto-advancement to pending_final_review when all languages approved
- Linguists can only edit VTT and trigger re-renders for their assigned language (403 guard)
- return_to_qc resets all language statuses while preserving assignments
- routes_language_qc.py: 7 new endpoints; /me/language-qc-queue for linguist queue
- Startup migration idempotently seeds language_qc for all existing jobs
- Frontend: LanguageQCState types, API methods, LinguistQueue page, QCDetail redesigned
  with per-language status badges, assignment dropdown, inline approve/reject buttons,
  progress bar, and reject modal; My QC Queue sidebar link

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 12:09:40 +01:00

558 lines
22 KiB
Python

"""Per-language QC service — assignment, approval, rejection, and auto-advancement."""
from datetime import datetime
from typing import Any, Optional
from bson import ObjectId
from fastapi import HTTPException, status
from motor.motor_asyncio import AsyncIOMotorDatabase
from ..core.logging import get_logger
from ..models.audit_log import AuditAction, AuditLogSeverity
from ..models.job import JobStatus, LanguageQCEvent, LanguageQCState, LanguageQCStatus, QCAssignment
from ..models.user import User
from ..services.audit_logger import audit_logger
from ..services.websocket import connection_manager
logger = get_logger(__name__)
_JOBS = "jobs"
# ── Helpers ───────────────────────────────────────────────────────────────────
def _job_languages(job_doc: dict) -> list[str]:
"""Canonical set of language codes that must be QC-approved before final review."""
source_lang = job_doc.get("source", {}).get("language", "en")
requested = job_doc.get("requested_outputs", {}).get("languages", [])
langs = list({source_lang} | set(requested))
return langs
def _all_approved(job_doc: dict) -> bool:
lang_qc = job_doc.get("language_qc", {})
for lang in _job_languages(job_doc):
state = lang_qc.get(lang, {})
if state.get("status") != LanguageQCStatus.APPROVED.value:
return False
return True
def _any_rejected(job_doc: dict) -> bool:
lang_qc = job_doc.get("language_qc", {})
for lang in _job_languages(job_doc):
state = lang_qc.get(lang, {})
if state.get("status") == LanguageQCStatus.REJECTED.value:
return True
return False
def _rebuild_qc_assignments(language_qc: dict) -> list[dict]:
"""Rebuild the denormalized qc_assignments array from language_qc dict."""
assignments = []
for lang, state in language_qc.items():
linguist_id = state.get("assigned_linguist_id") if isinstance(state, dict) else state.assigned_linguist_id
qc_status = state.get("status") if isinstance(state, dict) else state.status
if linguist_id:
assignments.append({
"lang": lang,
"linguist_id": linguist_id,
"status": qc_status,
})
return assignments
# ── Core mutations ────────────────────────────────────────────────────────────
async def get_state(db: AsyncIOMotorDatabase, job_id: str, lang: str) -> Optional[LanguageQCState]:
job_doc = await db[_JOBS].find_one({"_id": job_id}, {f"language_qc.{lang}": 1})
if not job_doc:
return None
raw = job_doc.get("language_qc", {}).get(lang)
if raw is None:
return None
return LanguageQCState(**raw)
async def get_all_states(db: AsyncIOMotorDatabase, job_id: str) -> dict[str, LanguageQCState]:
job_doc = await db[_JOBS].find_one({"_id": job_id}, {"language_qc": 1})
if not job_doc:
return {}
result = {}
for lang, raw in (job_doc.get("language_qc") or {}).items():
result[lang] = LanguageQCState(**raw)
return result
async def assign_linguist(
db: AsyncIOMotorDatabase,
job_id: str,
lang: str,
linguist_user_id: str,
actor: User,
*,
http_request=None,
notes: Optional[str] = None,
) -> LanguageQCState:
"""PM/PROD/ADMIN assigns a linguist to a language. Creates per-lang state if missing."""
job_doc = await db[_JOBS].find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
linguist_doc = await db.users.find_one({"_id": linguist_user_id})
if not linguist_doc:
raise HTTPException(status_code=404, detail="Linguist not found")
now = datetime.utcnow()
current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {})
prev_assignee = current_state_raw.get("assigned_linguist_id") if isinstance(current_state_raw, dict) else None
is_reassignment = prev_assignee is not None and prev_assignee != linguist_user_id
action_label = "reassign" if is_reassignment else "assign"
event = LanguageQCEvent(
at=now,
actor_user_id=str(actor.id),
actor_email=actor.email,
action=action_label,
notes=notes,
previous_assignee_id=prev_assignee if is_reassignment else None,
)
updated_state = {
"status": current_state_raw.get("status", LanguageQCStatus.PENDING.value) if isinstance(current_state_raw, dict) else LanguageQCStatus.PENDING.value,
"assigned_linguist_id": linguist_user_id,
"assigned_linguist_email": linguist_doc["email"],
"assigned_at": now,
"assigned_by_user_id": str(actor.id),
"reviewed_at": current_state_raw.get("reviewed_at") if isinstance(current_state_raw, dict) else None,
"reviewed_by_user_id": current_state_raw.get("reviewed_by_user_id") if isinstance(current_state_raw, dict) else None,
"reviewed_by_email": current_state_raw.get("reviewed_by_email") if isinstance(current_state_raw, dict) else None,
"notes": current_state_raw.get("notes") if isinstance(current_state_raw, dict) else None,
"history": (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()],
}
# Rebuild full language_qc for denormalization
full_language_qc = {**(job_doc.get("language_qc") or {}), lang: updated_state}
qc_assignments = _rebuild_qc_assignments(full_language_qc)
await db[_JOBS].update_one(
{"_id": job_id},
{"$set": {
f"language_qc.{lang}": updated_state,
"qc_assignments": qc_assignments,
"updated_at": now,
}}
)
audit_action = AuditAction.LANGUAGE_QC_REASSIGN if is_reassignment else AuditAction.LANGUAGE_QC_ASSIGN
await audit_logger.log_action(
audit_action,
f"Language QC {'reassigned' if is_reassignment else 'assigned'}: {lang} on job {job_id}{linguist_doc['email']}",
user=actor,
request=http_request,
resource_type="job_language",
resource_id=f"{job_id}:{lang}",
details={"lang": lang, "linguist_id": linguist_user_id, "linguist_email": linguist_doc["email"]},
)
# Notify linguist via websocket
try:
await connection_manager.broadcast_to_user(
linguist_user_id,
{"type": "language_qc_assigned", "job_id": job_id, "lang": lang, "job_title": job_doc.get("title")},
)
except Exception:
pass
return LanguageQCState(**updated_state)
async def reassign_linguist(
db: AsyncIOMotorDatabase,
job_id: str,
lang: str,
new_linguist_user_id: str,
actor: User,
*,
http_request=None,
notes: Optional[str] = None,
) -> LanguageQCState:
"""Currently-assigned linguist OR PM/PROD/ADMIN hands off to a colleague."""
job_doc = await db[_JOBS].find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
current_state_raw = (job_doc.get("language_qc") or {}).get(lang)
if not current_state_raw:
raise HTTPException(status_code=400, detail=f"No QC state found for language '{lang}'")
current_assignee = current_state_raw.get("assigned_linguist_id") if isinstance(current_state_raw, dict) else None
if current_assignee != str(actor.id):
from ..models.user import UserRole
if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN, UserRole.PROJECT_MANAGER):
raise HTTPException(status_code=403, detail="Not authorized to reassign this language")
return await assign_linguist(db, job_id, lang, new_linguist_user_id, actor, http_request=http_request, notes=notes)
async def start_review(
db: AsyncIOMotorDatabase,
job_id: str,
lang: str,
actor: User,
) -> LanguageQCState:
"""Transition pending → in_review when the assigned linguist opens the language for review."""
job_doc = await db[_JOBS].find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {})
current_status = current_state_raw.get("status", LanguageQCStatus.PENDING.value) if isinstance(current_state_raw, dict) else LanguageQCStatus.PENDING.value
if current_status != LanguageQCStatus.PENDING.value:
return LanguageQCState(**(current_state_raw if isinstance(current_state_raw, dict) else {}))
now = datetime.utcnow()
event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="start_review")
updated_status = LanguageQCStatus.IN_REVIEW.value
history = (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()]
await db[_JOBS].update_one(
{"_id": job_id},
{"$set": {
f"language_qc.{lang}.status": updated_status,
f"language_qc.{lang}.history": history,
"updated_at": now,
}}
)
updated = {**(current_state_raw if isinstance(current_state_raw, dict) else {}), "status": updated_status, "history": history}
return LanguageQCState(**updated)
async def approve_language(
db: AsyncIOMotorDatabase,
job_id: str,
lang: str,
actor: User,
*,
http_request=None,
notes: Optional[str] = None,
) -> LanguageQCState:
job_doc = await db[_JOBS].find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
if job_doc["status"] not in (JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value):
raise HTTPException(status_code=400, detail="Job is not in QC status")
_assert_can_act(job_doc, lang, actor)
now = datetime.utcnow()
event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="approve", notes=notes)
current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {})
history = (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()]
updated_state = {
**(current_state_raw if isinstance(current_state_raw, dict) else {}),
"status": LanguageQCStatus.APPROVED.value,
"reviewed_at": now,
"reviewed_by_user_id": str(actor.id),
"reviewed_by_email": actor.email,
"notes": notes,
"history": history,
}
full_language_qc = {**(job_doc.get("language_qc") or {}), lang: updated_state}
qc_assignments = _rebuild_qc_assignments(full_language_qc)
await db[_JOBS].update_one(
{"_id": job_id},
{"$set": {
f"language_qc.{lang}": updated_state,
"qc_assignments": qc_assignments,
"updated_at": now,
}}
)
await audit_logger.log_action(
AuditAction.LANGUAGE_QC_APPROVE,
f"Language QC approved: {lang} on job {job_id}",
user=actor,
request=http_request,
resource_type="job_language",
resource_id=f"{job_id}:{lang}",
details={"lang": lang, "notes": notes},
)
# Re-fetch to check if we should advance the job
refreshed = await db[_JOBS].find_one({"_id": job_id})
await _maybe_advance_job(db, refreshed)
return LanguageQCState(**updated_state)
async def reject_language(
db: AsyncIOMotorDatabase,
job_id: str,
lang: str,
actor: User,
notes: str,
*,
http_request=None,
) -> LanguageQCState:
if not notes or not notes.strip():
raise HTTPException(status_code=422, detail="Rejection notes are required")
job_doc = await db[_JOBS].find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
if job_doc["status"] not in (JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value):
raise HTTPException(status_code=400, detail="Job is not in QC status")
_assert_can_act(job_doc, lang, actor)
now = datetime.utcnow()
event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="reject", notes=notes)
current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {})
history = (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()]
updated_state = {
**(current_state_raw if isinstance(current_state_raw, dict) else {}),
"status": LanguageQCStatus.REJECTED.value,
"reviewed_at": now,
"reviewed_by_user_id": str(actor.id),
"reviewed_by_email": actor.email,
"notes": notes,
"history": history,
}
full_language_qc = {**(job_doc.get("language_qc") or {}), lang: updated_state}
qc_assignments = _rebuild_qc_assignments(full_language_qc)
await db[_JOBS].update_one(
{"_id": job_id},
{"$set": {
f"language_qc.{lang}": updated_state,
"qc_assignments": qc_assignments,
"updated_at": now,
}}
)
# Move job to qc_feedback
await db[_JOBS].update_one(
{"_id": job_id},
{
"$set": {"status": JobStatus.QC_FEEDBACK.value, "updated_at": now},
"$push": {"review.history": {"at": now, "status": JobStatus.QC_FEEDBACK.value, "by": str(actor.id), "notes": notes}},
}
)
await audit_logger.log_action(
AuditAction.LANGUAGE_QC_REJECT,
f"Language QC rejected: {lang} on job {job_id}",
user=actor,
request=http_request,
severity=AuditLogSeverity.WARNING,
resource_type="job_language",
resource_id=f"{job_id}:{lang}",
details={"lang": lang, "notes": notes},
)
return LanguageQCState(**updated_state)
async def reopen_language(
db: AsyncIOMotorDatabase,
job_id: str,
lang: str,
actor: User,
*,
http_request=None,
notes: Optional[str] = None,
) -> LanguageQCState:
"""PROD/ADMIN only — resets an approved language back to pending for re-review."""
from ..models.user import UserRole
if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN):
raise HTTPException(status_code=403, detail="Only PRODUCTION or ADMIN can reopen a language")
job_doc = await db[_JOBS].find_one({"_id": job_id})
if not job_doc:
raise HTTPException(status_code=404, detail="Job not found")
current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {})
now = datetime.utcnow()
event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="reopen", notes=notes)
history = (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()]
updated_state = {
**(current_state_raw if isinstance(current_state_raw, dict) else {}),
"status": LanguageQCStatus.PENDING.value,
"reviewed_at": None,
"reviewed_by_user_id": None,
"reviewed_by_email": None,
"notes": notes,
"history": history,
}
full_language_qc = {**(job_doc.get("language_qc") or {}), lang: updated_state}
qc_assignments = _rebuild_qc_assignments(full_language_qc)
await db[_JOBS].update_one(
{"_id": job_id},
{"$set": {
f"language_qc.{lang}": updated_state,
"qc_assignments": qc_assignments,
"updated_at": now,
}}
)
# If the job had advanced to pending_final_review, pull it back to pending_qc
if job_doc["status"] == JobStatus.PENDING_FINAL_REVIEW.value:
await db[_JOBS].update_one(
{"_id": job_id},
{
"$set": {"status": JobStatus.PENDING_QC.value, "updated_at": now},
"$push": {"review.history": {"at": now, "status": JobStatus.PENDING_QC.value, "by": str(actor.id), "notes": f"Language {lang} reopened: {notes or ''}"}},
}
)
await audit_logger.log_action(
AuditAction.LANGUAGE_QC_REOPEN,
f"Language QC reopened: {lang} on job {job_id}",
user=actor,
request=http_request,
resource_type="job_language",
resource_id=f"{job_id}:{lang}",
details={"lang": lang, "notes": notes},
)
return LanguageQCState(**updated_state)
async def reset_all_for_return_to_qc(db: AsyncIOMotorDatabase, job_id: str) -> None:
"""Called by return_to_qc — resets statuses to pending while preserving assignments and history."""
job_doc = await db[_JOBS].find_one({"_id": job_id}, {"language_qc": 1})
if not job_doc:
return
lang_qc = job_doc.get("language_qc") or {}
updates: dict[str, Any] = {}
for lang, state in lang_qc.items():
if isinstance(state, dict):
updates[f"language_qc.{lang}.status"] = LanguageQCStatus.PENDING.value
updates[f"language_qc.{lang}.reviewed_at"] = None
updates[f"language_qc.{lang}.reviewed_by_user_id"] = None
updates[f"language_qc.{lang}.reviewed_by_email"] = None
if updates:
# Rebuild qc_assignments with reset statuses
updated_lang_qc = {}
for lang, state in lang_qc.items():
updated_lang_qc[lang] = {**(state if isinstance(state, dict) else {}), "status": LanguageQCStatus.PENDING.value}
updates["qc_assignments"] = _rebuild_qc_assignments(updated_lang_qc)
await db[_JOBS].update_one({"_id": job_id}, {"$set": updates})
async def list_for_linguist(
db: AsyncIOMotorDatabase,
linguist_id: str,
*,
status_filter: Optional[str] = None,
skip: int = 0,
limit: int = 50,
) -> list[dict]:
"""Return jobs where the linguist has an assignment, along with which languages."""
query: dict = {"qc_assignments.linguist_id": linguist_id}
if status_filter:
query["qc_assignments"] = {"$elemMatch": {"linguist_id": linguist_id, "status": status_filter}}
cursor = db[_JOBS].find(query, {"title": 1, "status": 1, "language_qc": 1, "qc_assignments": 1, "created_at": 1, "updated_at": 1}).skip(skip).limit(limit).sort("updated_at", -1)
jobs = await cursor.to_list(length=limit)
# Filter qc_assignments to only include this linguist's languages
result = []
for job in jobs:
my_langs = [a for a in (job.get("qc_assignments") or []) if a.get("linguist_id") == linguist_id]
result.append({**job, "_my_assignments": my_langs})
return result
async def seed_language_qc_for_job(db: AsyncIOMotorDatabase, job_doc: dict) -> None:
"""Idempotently seed language_qc entries for all languages in a job's outputs."""
job_id = str(job_doc["_id"])
outputs = job_doc.get("outputs") or {}
source_lang = job_doc.get("source", {}).get("language", "en")
all_langs = list({source_lang} | set(outputs.keys()))
job_status = job_doc.get("status", "")
is_approved = job_status in (
JobStatus.APPROVED_ENGLISH.value, JobStatus.APPROVED_SOURCE.value,
JobStatus.PENDING_FINAL_REVIEW.value, JobStatus.COMPLETED.value,
)
existing_qc = job_doc.get("language_qc") or {}
updates: dict[str, Any] = {}
updated_lang_qc = dict(existing_qc)
for lang in all_langs:
if lang in existing_qc:
continue # already seeded
state: dict[str, Any] = {"status": LanguageQCStatus.APPROVED.value if is_approved else LanguageQCStatus.PENDING.value, "history": []}
if is_approved:
state["reviewed_by_user_id"] = job_doc.get("review", {}).get("reviewer_id")
state["reviewed_at"] = job_doc.get("updated_at")
state["notes"] = job_doc.get("review", {}).get("notes")
updates[f"language_qc.{lang}"] = state
updated_lang_qc[lang] = state
if updates:
updates["qc_assignments"] = _rebuild_qc_assignments(updated_lang_qc)
updates["updated_at"] = datetime.utcnow()
await db[_JOBS].update_one({"_id": job_id}, {"$set": updates})
# ── Internal ──────────────────────────────────────────────────────────────────
def _assert_can_act(job_doc: dict, lang: str, actor: User) -> None:
"""Raise 403 if actor is not the assigned linguist and not PROD/ADMIN."""
from ..models.user import UserRole
if actor.role in (UserRole.PRODUCTION, UserRole.ADMIN):
return
state = (job_doc.get("language_qc") or {}).get(lang, {})
assigned = state.get("assigned_linguist_id") if isinstance(state, dict) else None
if assigned is None:
raise HTTPException(status_code=403, detail=f"Language '{lang}' has no assigned linguist")
if assigned != str(actor.id):
raise HTTPException(status_code=403, detail=f"You are not assigned to language '{lang}'")
async def _maybe_advance_job(db: AsyncIOMotorDatabase, job_doc: dict) -> None:
"""Advance job to pending_final_review if all languages are approved."""
job_id = str(job_doc["_id"])
if job_doc["status"] not in (JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value):
return
if not _all_approved(job_doc):
return
now = datetime.utcnow()
result = await db[_JOBS].find_one_and_update(
{"_id": job_id, "status": {"$in": [JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value]}},
{
"$set": {"status": JobStatus.PENDING_FINAL_REVIEW.value, "updated_at": now},
"$push": {"review.history": {"at": now, "status": JobStatus.PENDING_FINAL_REVIEW.value, "by": "system", "notes": "All languages approved"}},
},
return_document=True,
)
if result:
logger.info(f"Job {job_id} auto-advanced to pending_final_review — all languages approved")
try:
await connection_manager.broadcast_to_job(
job_id,
{"type": "job_status_change", "job_id": job_id, "status": JobStatus.PENDING_FINAL_REVIEW.value},
)
except Exception:
pass