diff --git a/backend/app/api/v1/routes_admin.py b/backend/app/api/v1/routes_admin.py index 6736f06..5e29aa5 100644 --- a/backend/app/api/v1/routes_admin.py +++ b/backend/app/api/v1/routes_admin.py @@ -661,21 +661,24 @@ async def get_user_audit_logs( ): """Get audit logs for a specific user — accepts user ID or email (production/admin only)""" - # Accept email address: look up the user to get their ID + import re as _re + + # Accept email address: look up user by case-insensitive email match resolved_id = user_id if "@" in user_id: - user_doc = await db.users.find_one({"email": user_id}, {"_id": 1}) + user_doc = await db.users.find_one( + {"email": _re.compile(f"^{_re.escape(user_id)}$", _re.IGNORECASE)}, + {"_id": 1}, + ) if user_doc: resolved_id = str(user_doc["_id"]) - # If not found, query by user_email field in audit logs directly below logs = await audit_logger.get_user_activity(resolved_id, days) - # If resolved by ObjectId returned nothing, try querying by email field + # Fallback: query by email field in audit logs (case-insensitive via audit_logger) if not logs and "@" in user_id: from ...models.audit_log import AuditLogQuery as ALQ from ...services.audit_logger import audit_logger as al - from datetime import timedelta q = ALQ(user_email=user_id, limit=1000, sort_by="timestamp", sort_order=-1) result = await al.query_logs(q) logs = result.logs diff --git a/backend/app/api/v1/routes_clients.py b/backend/app/api/v1/routes_clients.py index 614d86b..be53c2b 100644 --- a/backend/app/api/v1/routes_clients.py +++ b/backend/app/api/v1/routes_clients.py @@ -17,7 +17,7 @@ from motor.motor_asyncio import AsyncIOMotorDatabase from pydantic import BaseModel from ...core.database import get_database -from ...core.dependencies import get_current_user, require_pm_for_client, require_roles +from ...core.dependencies import get_current_user, require_roles from ...models.client import ( Client, ClientCreate, @@ -91,6 +91,9 @@ def _project_from_doc(doc: dict) -> Project: name=doc["name"], client_id=doc["client_id"], is_active=doc.get("is_active", True), + default_languages=doc.get("default_languages", []), + default_linguist_id=doc.get("default_linguist_id"), + default_reviewer_id=doc.get("default_reviewer_id"), created_at=doc.get("created_at"), updated_at=doc.get("updated_at"), ) @@ -381,7 +384,7 @@ async def create_project( db: AsyncIOMotorDatabase = Depends(get_database), ): await _get_client_or_404(client_id, db) - await _assert_pm_or_admin(current_user, client_id, db) + await _assert_pm_or_client_member(current_user, client_id, db) now = _now() project_id = str(ObjectId()) await db.projects.insert_one({ @@ -389,6 +392,9 @@ async def create_project( "name": body.name, "client_id": client_id, "is_active": True, + "default_languages": body.default_languages, + "default_linguist_id": body.default_linguist_id, + "default_reviewer_id": body.default_reviewer_id, "created_at": now, "updated_at": now, }) @@ -449,6 +455,24 @@ async def _assert_pm_or_admin(user: User, client_id: str, db: AsyncIOMotorDataba raise HTTPException(status_code=403, detail="Not a manager for this client") +async def _assert_pm_or_client_member(user: User, client_id: str, db: AsyncIOMotorDatabase) -> None: + """Allow PM/ADMIN/PROD or any org member (CLIENT role) with membership in this client's org.""" + if user.role in (UserRole.ADMIN, UserRole.PRODUCTION): + return + if user.role == UserRole.PROJECT_MANAGER: + if client_id in (user.pm_client_ids or []): + return + mem = await db.memberships.find_one({"user_id": str(user.id), "organization_id": client_id}) + if mem and mem.get("role_in_org") in ("owner", "admin", "manager"): + return + # Allow CLIENT users who are members of the org + if user.role == UserRole.CLIENT: + mem = await db.memberships.find_one({"user_id": str(user.id), "organization_id": client_id}) + if mem: + return + raise HTTPException(status_code=403, detail="Not authorized to create projects for this client") + + async def _assert_client_access(user: User, client_id: str, db: AsyncIOMotorDatabase) -> None: """Allow platform staff, org members (any role), or PM of the client.""" if user.role in (UserRole.ADMIN, UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.LINGUIST): diff --git a/backend/app/api/v1/routes_glossaries.py b/backend/app/api/v1/routes_glossaries.py index 2f53979..815bb49 100644 --- a/backend/app/api/v1/routes_glossaries.py +++ b/backend/app/api/v1/routes_glossaries.py @@ -212,6 +212,46 @@ async def activate_version( return {"status": "ok", "active_version_id": version_id} +# ── Re-queue embedding ──────────────────────────────────────────────────────── + +@router.post("/{glossary_id}/versions/{version_id}/reembed", status_code=202) +async def reembed_version( + client_id: str, + glossary_id: str, + version_id: str, + current_user: User = Depends(require_roles(UserRole.ADMIN, UserRole.PROJECT_MANAGER)), +): + """Re-queue the embedding task for a glossary version (resets failed/pending/stuck embeds).""" + glossary = await svc.get_glossary(glossary_id) + if not glossary or glossary.client_id != client_id: + raise HTTPException(status_code=404, detail="Glossary not found") + + versions = await svc.get_versions(glossary_id) + version = next((v for v in versions if str(v.id) == version_id), None) + if not version: + raise HTTPException(status_code=404, detail="Version not found") + + try: + from ...tasks.embed_glossary import embed_glossary_version_task + from bson import ObjectId + import motor.motor_asyncio + from ...core.config import settings + + client_db = motor.motor_asyncio.AsyncIOMotorClient(settings.mongodb_uri) + db = client_db[settings.mongodb_db] + await db.glossary_versions.update_one( + {"_id": ObjectId(version_id)}, + {"$set": {"embedding_status": "pending", "embedded_count": 0}}, + ) + client_db.close() + + embed_glossary_version_task.delay(version_id) + except Exception as exc: + raise HTTPException(status_code=500, detail=f"Failed to queue embedding: {exc}") from exc + + return {"status": "queued", "version_id": version_id} + + # ── Archive (soft-delete) ───────────────────────────────────────────────────── @router.delete("/{glossary_id}", status_code=204) diff --git a/backend/app/api/v1/routes_language_qc.py b/backend/app/api/v1/routes_language_qc.py index 0ab5279..a9a66ad 100644 --- a/backend/app/api/v1/routes_language_qc.py +++ b/backend/app/api/v1/routes_language_qc.py @@ -1,14 +1,15 @@ -"""Per-language QC endpoints — assignment, approval, rejection, queue.""" +"""Per-language QC endpoints — two-stage (linguist + reviewer) assignment, workflow, comments.""" +from datetime import datetime from typing import Optional -from fastapi import APIRouter, Depends, HTTPException, Query, Request, status +from fastapi import APIRouter, Depends, Query, Request from motor.motor_asyncio import AsyncIOMotorDatabase -from pydantic import BaseModel +from pydantic import BaseModel, Field from ...core.database import get_database -from ...core.dependencies import get_current_user, require_roles -from ...models.job import LanguageQCState, LanguageQCStatus +from ...core.dependencies import require_roles +from ...models.job import LanguageQCComment, LanguageQCState from ...models.user import User, UserRole from ...services import language_qc as lqc @@ -20,11 +21,25 @@ router = APIRouter(tags=["language-qc"]) class AssignRequest(BaseModel): linguist_user_id: str notes: Optional[str] = None + deadline: Optional[datetime] = None class ReassignRequest(BaseModel): linguist_user_id: str notes: Optional[str] = None + deadline: Optional[datetime] = None + + +class AssignReviewerRequest(BaseModel): + reviewer_user_id: str + notes: Optional[str] = None + deadline: Optional[datetime] = None + + +class ReassignReviewerRequest(BaseModel): + reviewer_user_id: str + notes: Optional[str] = None + deadline: Optional[datetime] = None class ApproveLanguageRequest(BaseModel): @@ -39,6 +54,10 @@ class ReopenLanguageRequest(BaseModel): notes: Optional[str] = None +class AddCommentRequest(BaseModel): + body: str = Field(..., min_length=1, max_length=4000) + + class LanguageQCStateResponse(BaseModel): lang: str state: LanguageQCState @@ -75,11 +94,12 @@ async def get_language_qc( )), db: AsyncIOMotorDatabase = Depends(get_database), ): - """Return per-language QC state map for a job.""" states = await lqc.get_all_states(db, job_id) return LanguageQCMapResponse(job_id=job_id, language_qc=states) +# ── Linguist assignment ─────────────────────────────────────────────────────── + @router.post("/jobs/{job_id}/languages/{lang}/assign", response_model=LanguageQCStateResponse) async def assign_language( job_id: str, @@ -91,10 +111,9 @@ async def assign_language( )), db: AsyncIOMotorDatabase = Depends(get_database), ): - """Assign a linguist to a language on this job (PM / PROD / ADMIN only).""" state = await lqc.assign_linguist( db, job_id, lang, request.linguist_user_id, current_user, - http_request=http_request, notes=request.notes, + http_request=http_request, notes=request.notes, deadline=request.deadline, ) return LanguageQCStateResponse(lang=lang, state=state) @@ -110,14 +129,100 @@ async def reassign_language( )), db: AsyncIOMotorDatabase = Depends(get_database), ): - """Hand off a language to another linguist (assigned linguist or PM/PROD/ADMIN).""" state = await lqc.reassign_linguist( db, job_id, lang, request.linguist_user_id, current_user, - http_request=http_request, notes=request.notes, + http_request=http_request, notes=request.notes, deadline=request.deadline, ) return LanguageQCStateResponse(lang=lang, state=state) +# ── Reviewer assignment ─────────────────────────────────────────────────────── + +@router.post("/jobs/{job_id}/languages/{lang}/assign-reviewer", response_model=LanguageQCStateResponse) +async def assign_reviewer( + job_id: str, + lang: str, + request: AssignReviewerRequest, + http_request: Request, + current_user: User = Depends(require_roles( + UserRole.PROJECT_MANAGER, UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + state = await lqc.assign_reviewer( + db, job_id, lang, request.reviewer_user_id, current_user, + http_request=http_request, notes=request.notes, deadline=request.deadline, + ) + return LanguageQCStateResponse(lang=lang, state=state) + + +@router.post("/jobs/{job_id}/languages/{lang}/reassign-reviewer", response_model=LanguageQCStateResponse) +async def reassign_reviewer( + job_id: str, + lang: str, + request: ReassignReviewerRequest, + http_request: Request, + current_user: User = Depends(require_roles( + UserRole.PROJECT_MANAGER, UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + state = await lqc.reassign_reviewer( + db, job_id, lang, request.reviewer_user_id, current_user, + http_request=http_request, notes=request.notes, deadline=request.deadline, + ) + return LanguageQCStateResponse(lang=lang, state=state) + + +# ── Workflow transitions ────────────────────────────────────────────────────── + +@router.post("/jobs/{job_id}/languages/{lang}/start-work", response_model=LanguageQCStateResponse) +async def start_linguist_work( + job_id: str, + lang: str, + http_request: Request, + current_user: User = Depends(require_roles( + UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Linguist opens the language — pending → in_progress.""" + state = await lqc.start_linguist_work(db, job_id, lang, current_user) + return LanguageQCStateResponse(lang=lang, state=state) + + +@router.post("/jobs/{job_id}/languages/{lang}/submit", response_model=LanguageQCStateResponse) +async def submit_for_review( + job_id: str, + lang: str, + http_request: Request, + current_user: User = Depends(require_roles( + UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Linguist submits — in_progress → pending_review. Notifies reviewer by email.""" + state = await lqc.submit_for_review(db, job_id, lang, current_user, http_request=http_request) + return LanguageQCStateResponse(lang=lang, state=state) + + +@router.post("/jobs/{job_id}/languages/{lang}/open-review", response_model=LanguageQCStateResponse) +async def open_review( + job_id: str, + lang: str, + http_request: Request, + current_user: User = Depends(require_roles( + UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Reviewer opens the review — pending_review → in_review.""" + state = await lqc.open_review(db, job_id, lang, current_user, http_request=http_request) + return LanguageQCStateResponse(lang=lang, state=state) + + +# ── Approve / Reject / Reopen ───────────────────────────────────────────────── + @router.post("/jobs/{job_id}/languages/{lang}/approve", response_model=LanguageQCStateResponse) async def approve_language( job_id: str, @@ -125,11 +230,10 @@ async def approve_language( request: ApproveLanguageRequest, http_request: Request, current_user: User = Depends(require_roles( - UserRole.LINGUIST, UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN, + UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN, )), db: AsyncIOMotorDatabase = Depends(get_database), ): - """Approve a language (assigned linguist, PROD, or ADMIN).""" state = await lqc.approve_language( db, job_id, lang, current_user, http_request=http_request, notes=request.notes, ) @@ -143,11 +247,10 @@ async def reject_language( request: RejectLanguageRequest, http_request: Request, current_user: User = Depends(require_roles( - UserRole.LINGUIST, UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN, + UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN, )), db: AsyncIOMotorDatabase = Depends(get_database), ): - """Reject a language with required notes (assigned linguist, PROD, or ADMIN).""" state = await lqc.reject_language( db, job_id, lang, current_user, request.notes, http_request=http_request, ) @@ -163,16 +266,54 @@ async def reopen_language( current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): - """Re-open an approved language for re-review (PROD / ADMIN only).""" state = await lqc.reopen_language( db, job_id, lang, current_user, http_request=http_request, notes=request.notes, ) return LanguageQCStateResponse(lang=lang, state=state) +# ── Comments ────────────────────────────────────────────────────────────────── + +@router.post("/jobs/{job_id}/languages/{lang}/comments", response_model=LanguageQCComment, status_code=201) +async def add_comment( + job_id: str, + lang: str, + request: AddCommentRequest, + http_request: Request, + current_user: User = Depends(require_roles( + UserRole.LINGUIST, UserRole.REVIEWER, UserRole.PROJECT_MANAGER, + UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + comment = await lqc.add_comment( + db, job_id, lang, current_user, request.body, http_request=http_request, + ) + return comment + + +@router.get("/jobs/{job_id}/languages/{lang}/comments", response_model=list[LanguageQCComment]) +async def list_comments( + job_id: str, + lang: str, + current_user: User = Depends(require_roles( + UserRole.LINGUIST, UserRole.REVIEWER, UserRole.PROJECT_MANAGER, + UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + state = await lqc.get_state(db, job_id, lang) + if state is None: + return [] + return state.comments + + +# ── Queues ───────────────────────────────────────────────────────────────────── + @router.get("/me/language-qc-queue", response_model=QueueResponse) async def my_language_qc_queue( - qc_status: Optional[str] = Query(None, description="Filter by status: pending, in_review, approved, rejected"), + role: str = Query("linguist", description="'linguist' or 'reviewer'"), + qc_status: Optional[str] = Query(None, description="Filter by status"), skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), current_user: User = Depends(require_roles( @@ -180,10 +321,15 @@ async def my_language_qc_queue( )), db: AsyncIOMotorDatabase = Depends(get_database), ): - """List jobs and languages assigned to the current user.""" - jobs = await lqc.list_for_linguist( - db, str(current_user.id), status_filter=qc_status, skip=skip, limit=limit, - ) + """List jobs and languages assigned to the current user as linguist or reviewer.""" + if role == "reviewer": + jobs = await lqc.list_for_reviewer( + db, str(current_user.id), status_filter=qc_status, skip=skip, limit=limit, + ) + else: + jobs = await lqc.list_for_linguist( + db, str(current_user.id), status_filter=qc_status, skip=skip, limit=limit, + ) items: list[QueueItem] = [] for job in jobs: diff --git a/backend/app/core/dependencies.py b/backend/app/core/dependencies.py index 0b09b9d..8184d80 100644 --- a/backend/app/core/dependencies.py +++ b/backend/app/core/dependencies.py @@ -11,8 +11,8 @@ from .security import decode_token security = HTTPBearer() -# Roles that see all jobs (no tenant isolation) -STAFF_ROLES = {UserRole.ADMIN, UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION} +# Only admins bypass tenant isolation; other staff are scoped by team membership +STAFF_ROLES = {UserRole.ADMIN} async def get_current_user( @@ -108,15 +108,19 @@ async def get_accessible_project_ids( """ Returns project IDs the user may access, or None meaning "see everything". - - Staff / Admin → None (unrestricted) - - Otherwise → projects in orgs where the user holds any membership - (falls back to legacy pm_client_ids/team lookups if no memberships found) + - Admin → None (unrestricted) + - Staff (REVIEWER/LINGUIST/PRODUCTION) → scoped by team membership; + if not yet assigned to any team, falls back to None (see all) + so existing staff aren't locked out before teams are configured + - PM → projects in accessible orgs/clients (pm_client_ids legacy) + - CLIENT → projects in orgs where the user holds any membership """ if user.role in STAFF_ROLES: return None - # Primary path: use memberships collection (Phase 3 SaaS) user_id = str(user.id) + + # Primary path: use memberships collection (Phase 3 SaaS) membership_cursor = db.memberships.find({"user_id": user_id}, {"organization_id": 1}) org_ids = [doc["organization_id"] async for doc in membership_cursor] @@ -127,29 +131,37 @@ async def get_accessible_project_ids( ).to_list(None) return [str(p["_id"]) for p in projects] - # Legacy fallback (pre-backfill) — keeps the app working before migration runs - if user.role == UserRole.PROJECT_MANAGER: - client_ids = user.pm_client_ids or [] - if not client_ids: - return [] + # Legacy fallback: team membership (used by REVIEWER/LINGUIST/PRODUCTION and legacy CLIENT) + teams = await db.teams.find( + {"member_user_ids": user_id}, + {"client_id": 1}, + ).to_list(None) + client_ids = list({t["client_id"] for t in teams}) + + if client_ids: projects = await db.projects.find( {"client_id": {"$in": client_ids}, "is_active": True}, {"_id": 1}, ).to_list(None) return [str(p["_id"]) for p in projects] - teams = await db.teams.find( - {"member_user_ids": user_id}, - {"client_id": 1}, - ).to_list(None) - client_ids = list({t["client_id"] for t in teams}) - if not client_ids: - return [] - projects = await db.projects.find( - {"client_id": {"$in": client_ids}, "is_active": True}, - {"_id": 1}, - ).to_list(None) - return [str(p["_id"]) for p in projects] + # PM legacy: scoped via pm_client_ids + if user.role == UserRole.PROJECT_MANAGER: + pm_client_ids = user.pm_client_ids or [] + if not pm_client_ids: + return [] + projects = await db.projects.find( + {"client_id": {"$in": pm_client_ids}, "is_active": True}, + {"_id": 1}, + ).to_list(None) + return [str(p["_id"]) for p in projects] + + # Staff with no team assignments → unrestricted until teams are configured + if user.role in {UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION}: + return None + + # CLIENT with no memberships and no teams → show nothing + return [] def require_pm_for_client(client_id_param: str = "client_id"): diff --git a/backend/app/models/audit_log.py b/backend/app/models/audit_log.py index ef86bb5..7f4fa08 100644 --- a/backend/app/models/audit_log.py +++ b/backend/app/models/audit_log.py @@ -51,9 +51,14 @@ class AuditAction(str, Enum): # Per-language QC actions LANGUAGE_QC_ASSIGN = "language_qc.assign" LANGUAGE_QC_REASSIGN = "language_qc.reassign" + LANGUAGE_QC_REVIEWER_ASSIGN = "language_qc.reviewer_assign" + LANGUAGE_QC_REVIEWER_REASSIGN = "language_qc.reviewer_reassign" + LANGUAGE_QC_SUBMIT = "language_qc.submit" + LANGUAGE_QC_OPEN_REVIEW = "language_qc.open_review" LANGUAGE_QC_APPROVE = "language_qc.approve" LANGUAGE_QC_REJECT = "language_qc.reject" LANGUAGE_QC_REOPEN = "language_qc.reopen" + LANGUAGE_QC_COMMENT = "language_qc.comment" # Admin actions ADMIN_CONFIG_CHANGE = "admin.config.change" diff --git a/backend/app/models/client.py b/backend/app/models/client.py index f4a7659..1fcfd59 100644 --- a/backend/app/models/client.py +++ b/backend/app/models/client.py @@ -58,14 +58,23 @@ class Project(BaseModel): name: str client_id: str is_active: bool = True + default_languages: list[str] = [] + default_linguist_id: Optional[str] = None + default_reviewer_id: Optional[str] = None created_at: Optional[datetime] = None updated_at: Optional[datetime] = None class ProjectCreate(BaseModel): name: str + default_languages: list[str] = [] + default_linguist_id: Optional[str] = None + default_reviewer_id: Optional[str] = None class ProjectUpdate(BaseModel): name: Optional[str] = None is_active: Optional[bool] = None + default_languages: Optional[list[str]] = None + default_linguist_id: Optional[str] = None + default_reviewer_id: Optional[str] = None diff --git a/backend/app/models/job.py b/backend/app/models/job.py index f8b79dd..6a1effc 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -144,7 +144,9 @@ class Review(BaseModel): class LanguageQCStatus(str, Enum): PENDING = "pending" - IN_REVIEW = "in_review" + IN_PROGRESS = "in_progress" # linguist is working + PENDING_REVIEW = "pending_review" # linguist submitted, awaiting reviewer + IN_REVIEW = "in_review" # reviewer has opened it APPROVED = "approved" REJECTED = "rejected" @@ -153,22 +155,50 @@ class LanguageQCEvent(BaseModel): at: datetime actor_user_id: str actor_email: str - action: Literal["assign", "reassign", "start_review", "approve", "reject", "reopen"] + action: Literal[ + "assign", "reassign", + "reviewer_assigned", "reviewer_reassigned", + "start_work", "submit_for_review", "open_review", + "approve", "reject", "reopen", + "comment_added", + ] notes: Optional[str] = None previous_assignee_id: Optional[str] = None +class LanguageQCComment(BaseModel): + id: str + author_id: str + author_name: str + author_email: str + body: str + created_at: datetime + + class LanguageQCState(BaseModel): status: LanguageQCStatus = LanguageQCStatus.PENDING + # Linguist slot assigned_linguist_id: Optional[str] = None assigned_linguist_email: Optional[str] = None + assigned_linguist_name: Optional[str] = None assigned_at: Optional[datetime] = None assigned_by_user_id: Optional[str] = None + submitted_for_review_at: Optional[datetime] = None + linguist_deadline: Optional[datetime] = None # when linguist must submit + # Reviewer slot + assigned_reviewer_id: Optional[str] = None + assigned_reviewer_email: Optional[str] = None + assigned_reviewer_name: Optional[str] = None + assigned_reviewer_at: Optional[datetime] = None + review_started_at: Optional[datetime] = None + reviewer_deadline: Optional[datetime] = None # when reviewer must decide + # Final outcome reviewed_at: Optional[datetime] = None reviewed_by_user_id: Optional[str] = None reviewed_by_email: Optional[str] = None notes: Optional[str] = None history: list[LanguageQCEvent] = [] + comments: list[LanguageQCComment] = [] class QCAssignment(BaseModel): diff --git a/backend/app/services/audit_logger.py b/backend/app/services/audit_logger.py index 3077e28..39261b4 100644 --- a/backend/app/services/audit_logger.py +++ b/backend/app/services/audit_logger.py @@ -126,7 +126,10 @@ class AuditLogger: if query.user_id: mongo_query["user_id"] = query.user_id if query.user_email: - mongo_query["user_email"] = query.user_email + import re as _re + mongo_query["user_email"] = _re.compile( + f"^{_re.escape(query.user_email)}$", _re.IGNORECASE + ) if query.resource_type: mongo_query["resource_type"] = query.resource_type if query.resource_id: diff --git a/backend/app/services/emailer.py b/backend/app/services/emailer.py index a824f99..ce3e2c8 100644 --- a/backend/app/services/emailer.py +++ b/backend/app/services/emailer.py @@ -144,6 +144,178 @@ class EmailService: """).render(full_name=full_name, reset_url=reset_url) return await self._send(to_email, "Reset your password", html) + async def send_language_assignment_email( + self, + to_email: str, + full_name: str, + job_title: str, + lang: str, + role: str, + deep_link: str, + ) -> bool: + html = Template(""" + + + +
+

New QC assignment

+
+

Hi {{ full_name or 'there' }},

+

You have been assigned as {{ role }} for language {{ lang }} on job {{ job_title }}.

+

Open QC Review

+
+ +
+ +""").render(full_name=full_name, role=role, lang=lang, job_title=job_title, deep_link=deep_link) + subject = f"[{job_title}] You've been assigned as {role} for {lang}" + return await self._send(to_email, subject, html) + + async def send_language_submitted_email( + self, + to_email: str, + full_name: str, + job_title: str, + lang: str, + linguist_name: str, + deep_link: str, + ) -> bool: + html = Template(""" + + + +
+

Ready for your review

+
+

Hi {{ full_name or 'there' }},

+

{{ linguist_name or 'The linguist' }} has submitted language {{ lang }} on job {{ job_title }} for your review.

+

Open Review

+
+ +
+ +""").render(full_name=full_name, linguist_name=linguist_name, lang=lang, job_title=job_title, deep_link=deep_link) + return await self._send(to_email, f"[{job_title}] {lang} ready for review", html) + + async def send_qc_comment_email( + self, + to_email: str, + full_name: str, + job_title: str, + lang: str, + author_name: str, + comment_body: str, + deep_link: str, + ) -> bool: + html = Template(""" + + + +
+

New comment

+
+

Hi {{ full_name or 'there' }},

+

{{ author_name }} commented on {{ lang }} · {{ job_title }}:

+
{{ comment_body }}
+

View Comment

+
+ +
+ +""").render(full_name=full_name, author_name=author_name, lang=lang, job_title=job_title, comment_body=comment_body, deep_link=deep_link) + return await self._send(to_email, f"[{job_title}] New comment on {lang}", html) + + async def send_qc_approved_email( + self, + to_email: str, + full_name: str, + job_title: str, + lang: str, + approver_name: str, + deep_link: str, + ) -> bool: + html = Template(""" + + + +
+

Language approved ✓

+
+

Hi {{ full_name or 'there' }},

+

{{ lang }} has been approved by {{ approver_name }} on job {{ job_title }}.

+

View Details

+
+ +
+ +""").render(full_name=full_name, lang=lang, approver_name=approver_name, job_title=job_title, deep_link=deep_link) + return await self._send(to_email, f"[{job_title}] {lang} approved", html) + + async def send_qc_rejected_email( + self, + to_email: str, + full_name: str, + job_title: str, + lang: str, + reviewer_name: str, + reason: str, + deep_link: str, + ) -> bool: + html = Template(""" + + + +
+

Changes requested

+
+

Hi {{ full_name or 'there' }},

+

{{ lang }} on job {{ job_title }} has been sent back for changes by {{ reviewer_name }}.

+
Feedback:
{{ reason }}
+

Open and Revise

+
+ +
+ +""").render(full_name=full_name, lang=lang, reviewer_name=reviewer_name, reason=reason, job_title=job_title, deep_link=deep_link) + return await self._send(to_email, f"[{job_title}] {lang} rejected — needs changes", html) + async def send_completion_email( self, recipient_email: str, diff --git a/backend/app/services/language_qc.py b/backend/app/services/language_qc.py index 642fc4f..098c742 100644 --- a/backend/app/services/language_qc.py +++ b/backend/app/services/language_qc.py @@ -1,16 +1,23 @@ -"""Per-language QC service — assignment, approval, rejection, and auto-advancement.""" +"""Per-language QC service — two-stage (linguist → reviewer) assignment, approval, rejection, comments.""" +import asyncio from datetime import datetime from typing import Any, Optional +from uuid import uuid4 -from bson import ObjectId -from fastapi import HTTPException, status +from fastapi import HTTPException from motor.motor_asyncio import AsyncIOMotorDatabase from ..core.logging import get_logger from ..models.audit_log import AuditAction, AuditLogSeverity -from ..models.job import JobStatus, LanguageQCEvent, LanguageQCState, LanguageQCStatus, QCAssignment -from ..models.user import User +from ..models.job import ( + JobStatus, + LanguageQCComment, + LanguageQCEvent, + LanguageQCState, + LanguageQCStatus, +) +from ..models.user import User, UserRole from ..services.audit_logger import audit_logger from ..services.websocket import connection_manager @@ -62,6 +69,31 @@ def _rebuild_qc_assignments(language_qc: dict) -> list[dict]: return assignments +def _qc_recipients( + job_doc: dict, + lang_state: dict, + exclude_user_id: Optional[str], +) -> list[tuple[str, str]]: + """Return [(email, full_name)] for linguist + reviewer assigned to a language, minus the actor.""" + seen: set[str] = set() + result: list[tuple[str, str]] = [] + + def _add(email: Optional[str], name: Optional[str]) -> None: + if email and email not in seen and email != exclude_user_id: + seen.add(email) + result.append((email, name or email.split("@")[0])) + + _add(lang_state.get("assigned_linguist_email"), lang_state.get("assigned_linguist_name")) + _add(lang_state.get("assigned_reviewer_email"), lang_state.get("assigned_reviewer_name")) + return result + + +def _deep_link(job_id: str, lang: str) -> str: + from ..core.config import settings + base = getattr(settings, "app_url", "https://ai-sandbox.oliver.solutions/video-accessibility") + return f"{base}/admin/qc/{job_id}#lang-{lang}" + + # ── Core mutations ──────────────────────────────────────────────────────────── async def get_state(db: AsyncIOMotorDatabase, job_id: str, lang: str) -> Optional[LanguageQCState]: @@ -84,6 +116,8 @@ async def get_all_states(db: AsyncIOMotorDatabase, job_id: str) -> dict[str, Lan return result +# ── Linguist assignment ──────────────────────────────────────────────────────── + async def assign_linguist( db: AsyncIOMotorDatabase, job_id: str, @@ -93,6 +127,7 @@ async def assign_linguist( *, http_request=None, notes: Optional[str] = None, + deadline: Optional[datetime] = None, ) -> LanguageQCState: """PM/PROD/ADMIN assigns a linguist to a language. Creates per-lang state if missing.""" job_doc = await db[_JOBS].find_one({"_id": job_id}) @@ -119,19 +154,17 @@ async def assign_linguist( ) updated_state = { + **(current_state_raw if isinstance(current_state_raw, dict) else {}), "status": current_state_raw.get("status", LanguageQCStatus.PENDING.value) if isinstance(current_state_raw, dict) else LanguageQCStatus.PENDING.value, "assigned_linguist_id": linguist_user_id, "assigned_linguist_email": linguist_doc["email"], + "assigned_linguist_name": linguist_doc.get("full_name", ""), "assigned_at": now, "assigned_by_user_id": str(actor.id), - "reviewed_at": current_state_raw.get("reviewed_at") if isinstance(current_state_raw, dict) else None, - "reviewed_by_user_id": current_state_raw.get("reviewed_by_user_id") if isinstance(current_state_raw, dict) else None, - "reviewed_by_email": current_state_raw.get("reviewed_by_email") if isinstance(current_state_raw, dict) else None, - "notes": current_state_raw.get("notes") if isinstance(current_state_raw, dict) else None, + "linguist_deadline": deadline, "history": (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()], } - # Rebuild full language_qc for denormalization full_language_qc = {**(job_doc.get("language_qc") or {}), lang: updated_state} qc_assignments = _rebuild_qc_assignments(full_language_qc) @@ -147,7 +180,7 @@ async def assign_linguist( audit_action = AuditAction.LANGUAGE_QC_REASSIGN if is_reassignment else AuditAction.LANGUAGE_QC_ASSIGN await audit_logger.log_action( audit_action, - f"Language QC {'reassigned' if is_reassignment else 'assigned'}: {lang} on job {job_id} → {linguist_doc['email']}", + f"Language QC linguist {'reassigned' if is_reassignment else 'assigned'}: {lang} on job {job_id} → {linguist_doc['email']}", user=actor, request=http_request, resource_type="job_language", @@ -155,7 +188,20 @@ async def assign_linguist( details={"lang": lang, "linguist_id": linguist_user_id, "linguist_email": linguist_doc["email"]}, ) - # Notify linguist via websocket + # Email the new linguist + try: + from ..services.emailer import email_service + await email_service.send_language_assignment_email( + to_email=linguist_doc["email"], + full_name=linguist_doc.get("full_name", ""), + job_title=job_doc.get("title", job_id), + lang=lang, + role="linguist", + deep_link=_deep_link(job_id, lang), + ) + except Exception: + logger.exception("Failed to send linguist assignment email") + try: await connection_manager.broadcast_to_user( linguist_user_id, @@ -176,6 +222,7 @@ async def reassign_linguist( *, http_request=None, notes: Optional[str] = None, + deadline: Optional[datetime] = None, ) -> LanguageQCState: """Currently-assigned linguist OR PM/PROD/ADMIN hands off to a colleague.""" job_doc = await db[_JOBS].find_one({"_id": job_id}) @@ -188,20 +235,123 @@ async def reassign_linguist( current_assignee = current_state_raw.get("assigned_linguist_id") if isinstance(current_state_raw, dict) else None if current_assignee != str(actor.id): - from ..models.user import UserRole if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN, UserRole.PROJECT_MANAGER): raise HTTPException(status_code=403, detail="Not authorized to reassign this language") - return await assign_linguist(db, job_id, lang, new_linguist_user_id, actor, http_request=http_request, notes=notes) + return await assign_linguist(db, job_id, lang, new_linguist_user_id, actor, http_request=http_request, notes=notes, deadline=deadline) -async def start_review( +# ── Reviewer assignment ──────────────────────────────────────────────────────── + +async def assign_reviewer( + db: AsyncIOMotorDatabase, + job_id: str, + lang: str, + reviewer_user_id: str, + actor: User, + *, + http_request=None, + notes: Optional[str] = None, + deadline: Optional[datetime] = None, +) -> LanguageQCState: + """PM/PROD/ADMIN assigns a reviewer to a language.""" + job_doc = await db[_JOBS].find_one({"_id": job_id}) + if not job_doc: + raise HTTPException(status_code=404, detail="Job not found") + + reviewer_doc = await db.users.find_one({"_id": reviewer_user_id}) + if not reviewer_doc: + raise HTTPException(status_code=404, detail="Reviewer not found") + + now = datetime.utcnow() + current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {}) + prev_reviewer = current_state_raw.get("assigned_reviewer_id") if isinstance(current_state_raw, dict) else None + is_reassignment = prev_reviewer is not None and prev_reviewer != reviewer_user_id + action_label = "reviewer_reassigned" if is_reassignment else "reviewer_assigned" + + event = LanguageQCEvent( + at=now, + actor_user_id=str(actor.id), + actor_email=actor.email, + action=action_label, + notes=notes, + previous_assignee_id=prev_reviewer if is_reassignment else None, + ) + + updated_state = { + **(current_state_raw if isinstance(current_state_raw, dict) else {}), + "assigned_reviewer_id": reviewer_user_id, + "assigned_reviewer_email": reviewer_doc["email"], + "assigned_reviewer_name": reviewer_doc.get("full_name", ""), + "assigned_reviewer_at": now, + "reviewer_deadline": deadline, + "history": (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()], + } + + full_language_qc = {**(job_doc.get("language_qc") or {}), lang: updated_state} + qc_assignments = _rebuild_qc_assignments(full_language_qc) + + await db[_JOBS].update_one( + {"_id": job_id}, + {"$set": { + f"language_qc.{lang}": updated_state, + "qc_assignments": qc_assignments, + "updated_at": now, + }} + ) + + audit_action = AuditAction.LANGUAGE_QC_REVIEWER_REASSIGN if is_reassignment else AuditAction.LANGUAGE_QC_REVIEWER_ASSIGN + await audit_logger.log_action( + audit_action, + f"Language QC reviewer {'reassigned' if is_reassignment else 'assigned'}: {lang} on job {job_id} → {reviewer_doc['email']}", + user=actor, + request=http_request, + resource_type="job_language", + resource_id=f"{job_id}:{lang}", + details={"lang": lang, "reviewer_id": reviewer_user_id, "reviewer_email": reviewer_doc["email"]}, + ) + + try: + from ..services.emailer import email_service + await email_service.send_language_assignment_email( + to_email=reviewer_doc["email"], + full_name=reviewer_doc.get("full_name", ""), + job_title=job_doc.get("title", job_id), + lang=lang, + role="reviewer", + deep_link=_deep_link(job_id, lang), + ) + except Exception: + logger.exception("Failed to send reviewer assignment email") + + return LanguageQCState(**updated_state) + + +async def reassign_reviewer( + db: AsyncIOMotorDatabase, + job_id: str, + lang: str, + new_reviewer_user_id: str, + actor: User, + *, + http_request=None, + notes: Optional[str] = None, + deadline: Optional[datetime] = None, +) -> LanguageQCState: + if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN, UserRole.PROJECT_MANAGER): + raise HTTPException(status_code=403, detail="Only PM/PROD/ADMIN can reassign reviewer") + return await assign_reviewer(db, job_id, lang, new_reviewer_user_id, actor, http_request=http_request, notes=notes, deadline=deadline) + + +# ── Workflow transitions ────────────────────────────────────────────────────── + +async def start_linguist_work( db: AsyncIOMotorDatabase, job_id: str, lang: str, actor: User, ) -> LanguageQCState: - """Transition pending → in_review when the assigned linguist opens the language for review.""" + """Linguist opens the language — transitions pending → in_progress.""" job_doc = await db[_JOBS].find_one({"_id": job_id}) if not job_doc: raise HTTPException(status_code=404, detail="Job not found") @@ -209,26 +359,175 @@ async def start_review( current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {}) current_status = current_state_raw.get("status", LanguageQCStatus.PENDING.value) if isinstance(current_state_raw, dict) else LanguageQCStatus.PENDING.value - if current_status != LanguageQCStatus.PENDING.value: + if current_status not in (LanguageQCStatus.PENDING.value, LanguageQCStatus.REJECTED.value): return LanguageQCState(**(current_state_raw if isinstance(current_state_raw, dict) else {})) + assigned = current_state_raw.get("assigned_linguist_id") if isinstance(current_state_raw, dict) else None + if assigned != str(actor.id) and actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN): + raise HTTPException(status_code=403, detail="Not the assigned linguist") + now = datetime.utcnow() - event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="start_review") - updated_status = LanguageQCStatus.IN_REVIEW.value + event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="start_work") history = (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()] + updated_state = { + **(current_state_raw if isinstance(current_state_raw, dict) else {}), + "status": LanguageQCStatus.IN_PROGRESS.value, + "submitted_for_review_at": None, + "history": history, + } + await db[_JOBS].update_one( {"_id": job_id}, {"$set": { - f"language_qc.{lang}.status": updated_status, + f"language_qc.{lang}.status": LanguageQCStatus.IN_PROGRESS.value, + f"language_qc.{lang}.submitted_for_review_at": None, f"language_qc.{lang}.history": history, "updated_at": now, }} ) - updated = {**(current_state_raw if isinstance(current_state_raw, dict) else {}), "status": updated_status, "history": history} - return LanguageQCState(**updated) + return LanguageQCState(**updated_state) +# Keep old name as alias so any existing callers don't break immediately +start_review = start_linguist_work + + +async def submit_for_review( + db: AsyncIOMotorDatabase, + job_id: str, + lang: str, + actor: User, + *, + http_request=None, +) -> LanguageQCState: + """Linguist submits work — transitions in_progress → pending_review.""" + job_doc = await db[_JOBS].find_one({"_id": job_id}) + if not job_doc: + raise HTTPException(status_code=404, detail="Job not found") + + current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {}) + current_status = current_state_raw.get("status", LanguageQCStatus.PENDING.value) if isinstance(current_state_raw, dict) else LanguageQCStatus.PENDING.value + assigned_linguist = current_state_raw.get("assigned_linguist_id") if isinstance(current_state_raw, dict) else None + + if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN) and assigned_linguist != str(actor.id): + raise HTTPException(status_code=403, detail="Not the assigned linguist") + + if current_status not in (LanguageQCStatus.IN_PROGRESS.value, LanguageQCStatus.PENDING.value): + raise HTTPException(status_code=400, detail=f"Cannot submit from status '{current_status}'") + + now = datetime.utcnow() + event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="submit_for_review") + history = (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()] + + updated_state = { + **(current_state_raw if isinstance(current_state_raw, dict) else {}), + "status": LanguageQCStatus.PENDING_REVIEW.value, + "submitted_for_review_at": now, + "history": history, + } + + full_language_qc = {**(job_doc.get("language_qc") or {}), lang: updated_state} + qc_assignments = _rebuild_qc_assignments(full_language_qc) + + await db[_JOBS].update_one( + {"_id": job_id}, + {"$set": { + f"language_qc.{lang}": updated_state, + "qc_assignments": qc_assignments, + "updated_at": now, + }} + ) + + await audit_logger.log_action( + AuditAction.LANGUAGE_QC_SUBMIT, + f"Language QC submitted for review: {lang} on job {job_id}", + user=actor, + request=http_request, + resource_type="job_language", + resource_id=f"{job_id}:{lang}", + details={"lang": lang}, + ) + + # Notify reviewer + reviewer_email = updated_state.get("assigned_reviewer_email") + reviewer_name = updated_state.get("assigned_reviewer_name", "") + if reviewer_email: + try: + from ..services.emailer import email_service + await email_service.send_language_submitted_email( + to_email=reviewer_email, + full_name=reviewer_name, + job_title=job_doc.get("title", job_id), + lang=lang, + linguist_name=updated_state.get("assigned_linguist_name", ""), + deep_link=_deep_link(job_id, lang), + ) + except Exception: + logger.exception("Failed to send submission notification email") + + return LanguageQCState(**updated_state) + + +async def open_review( + db: AsyncIOMotorDatabase, + job_id: str, + lang: str, + actor: User, + *, + http_request=None, +) -> LanguageQCState: + """Reviewer opens the language — transitions pending_review → in_review.""" + job_doc = await db[_JOBS].find_one({"_id": job_id}) + if not job_doc: + raise HTTPException(status_code=404, detail="Job not found") + + current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {}) + current_status = current_state_raw.get("status", LanguageQCStatus.PENDING.value) if isinstance(current_state_raw, dict) else LanguageQCStatus.PENDING.value + assigned_reviewer = current_state_raw.get("assigned_reviewer_id") if isinstance(current_state_raw, dict) else None + + if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN) and assigned_reviewer != str(actor.id): + raise HTTPException(status_code=403, detail="Not the assigned reviewer") + + if current_status != LanguageQCStatus.PENDING_REVIEW.value: + return LanguageQCState(**(current_state_raw if isinstance(current_state_raw, dict) else {})) + + now = datetime.utcnow() + event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="open_review") + history = (current_state_raw.get("history", []) if isinstance(current_state_raw, dict) else []) + [event.model_dump()] + + updated_state = { + **(current_state_raw if isinstance(current_state_raw, dict) else {}), + "status": LanguageQCStatus.IN_REVIEW.value, + "review_started_at": now, + "history": history, + } + + await db[_JOBS].update_one( + {"_id": job_id}, + {"$set": { + f"language_qc.{lang}.status": LanguageQCStatus.IN_REVIEW.value, + f"language_qc.{lang}.review_started_at": now, + f"language_qc.{lang}.history": history, + "updated_at": now, + }} + ) + + await audit_logger.log_action( + AuditAction.LANGUAGE_QC_OPEN_REVIEW, + f"Language QC review opened: {lang} on job {job_id}", + user=actor, + request=http_request, + resource_type="job_language", + resource_id=f"{job_id}:{lang}", + details={"lang": lang}, + ) + + return LanguageQCState(**updated_state) + + +# ── Approve / Reject ────────────────────────────────────────────────────────── + async def approve_language( db: AsyncIOMotorDatabase, job_id: str, @@ -245,7 +544,7 @@ async def approve_language( if job_doc["status"] not in (JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value): raise HTTPException(status_code=400, detail="Job is not in QC status") - _assert_can_act(job_doc, lang, actor) + _assert_can_approve(job_doc, lang, actor) now = datetime.utcnow() event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="approve", notes=notes) @@ -284,7 +583,23 @@ async def approve_language( details={"lang": lang, "notes": notes}, ) - # Re-fetch to check if we should advance the job + # Notify linguist + any other recipients + recipients = _qc_recipients(job_doc, current_state_raw if isinstance(current_state_raw, dict) else {}, exclude_user_id=actor.email) + if recipients: + try: + from ..services.emailer import email_service + await asyncio.gather(*[ + email_service.send_qc_approved_email( + to_email=email, full_name=name, + job_title=job_doc.get("title", job_id), lang=lang, + approver_name=actor.full_name or actor.email, + deep_link=_deep_link(job_id, lang), + ) + for email, name in recipients + ], return_exceptions=True) + except Exception: + logger.exception("Failed to send approval emails") + refreshed = await db[_JOBS].find_one({"_id": job_id}) await _maybe_advance_job(db, refreshed) @@ -310,7 +625,7 @@ async def reject_language( if job_doc["status"] not in (JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value): raise HTTPException(status_code=400, detail="Job is not in QC status") - _assert_can_act(job_doc, lang, actor) + _assert_can_approve(job_doc, lang, actor) now = datetime.utcnow() event = LanguageQCEvent(at=now, actor_user_id=str(actor.id), actor_email=actor.email, action="reject", notes=notes) @@ -319,11 +634,12 @@ async def reject_language( updated_state = { **(current_state_raw if isinstance(current_state_raw, dict) else {}), - "status": LanguageQCStatus.REJECTED.value, + "status": LanguageQCStatus.IN_PROGRESS.value, # send back to linguist "reviewed_at": now, "reviewed_by_user_id": str(actor.id), "reviewed_by_email": actor.email, "notes": notes, + "submitted_for_review_at": None, "history": history, } @@ -339,7 +655,6 @@ async def reject_language( }} ) - # Move job to qc_feedback await db[_JOBS].update_one( {"_id": job_id}, { @@ -359,6 +674,23 @@ async def reject_language( details={"lang": lang, "notes": notes}, ) + recipients = _qc_recipients(job_doc, current_state_raw if isinstance(current_state_raw, dict) else {}, exclude_user_id=actor.email) + if recipients: + try: + from ..services.emailer import email_service + await asyncio.gather(*[ + email_service.send_qc_rejected_email( + to_email=email, full_name=name, + job_title=job_doc.get("title", job_id), lang=lang, + reviewer_name=actor.full_name or actor.email, + reason=notes, + deep_link=_deep_link(job_id, lang), + ) + for email, name in recipients + ], return_exceptions=True) + except Exception: + logger.exception("Failed to send rejection emails") + return LanguageQCState(**updated_state) @@ -372,7 +704,6 @@ async def reopen_language( notes: Optional[str] = None, ) -> LanguageQCState: """PROD/ADMIN only — resets an approved language back to pending for re-review.""" - from ..models.user import UserRole if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN): raise HTTPException(status_code=403, detail="Only PRODUCTION or ADMIN can reopen a language") @@ -391,6 +722,8 @@ async def reopen_language( "reviewed_at": None, "reviewed_by_user_id": None, "reviewed_by_email": None, + "submitted_for_review_at": None, + "review_started_at": None, "notes": notes, "history": history, } @@ -407,7 +740,6 @@ async def reopen_language( }} ) - # If the job had advanced to pending_final_review, pull it back to pending_qc if job_doc["status"] == JobStatus.PENDING_FINAL_REVIEW.value: await db[_JOBS].update_one( {"_id": job_id}, @@ -430,30 +762,84 @@ async def reopen_language( return LanguageQCState(**updated_state) -async def reset_all_for_return_to_qc(db: AsyncIOMotorDatabase, job_id: str) -> None: - """Called by return_to_qc — resets statuses to pending while preserving assignments and history.""" - job_doc = await db[_JOBS].find_one({"_id": job_id}, {"language_qc": 1}) +# ── Comments ────────────────────────────────────────────────────────────────── + +async def add_comment( + db: AsyncIOMotorDatabase, + job_id: str, + lang: str, + actor: User, + body: str, + *, + http_request=None, +) -> LanguageQCComment: + if not body or not body.strip(): + raise HTTPException(status_code=422, detail="Comment body cannot be empty") + if len(body) > 4000: + raise HTTPException(status_code=422, detail="Comment too long (max 4000 chars)") + + job_doc = await db[_JOBS].find_one({"_id": job_id}) if not job_doc: - return + raise HTTPException(status_code=404, detail="Job not found") - lang_qc = job_doc.get("language_qc") or {} - updates: dict[str, Any] = {} - for lang, state in lang_qc.items(): - if isinstance(state, dict): - updates[f"language_qc.{lang}.status"] = LanguageQCStatus.PENDING.value - updates[f"language_qc.{lang}.reviewed_at"] = None - updates[f"language_qc.{lang}.reviewed_by_user_id"] = None - updates[f"language_qc.{lang}.reviewed_by_email"] = None + # Gate: only assigned linguist, assigned reviewer, or PM/PROD/ADMIN + current_state_raw = (job_doc.get("language_qc") or {}).get(lang, {}) + assigned_linguist = current_state_raw.get("assigned_linguist_id") if isinstance(current_state_raw, dict) else None + assigned_reviewer = current_state_raw.get("assigned_reviewer_id") if isinstance(current_state_raw, dict) else None + if actor.role not in (UserRole.PRODUCTION, UserRole.ADMIN, UserRole.PROJECT_MANAGER): + if str(actor.id) not in (assigned_linguist, assigned_reviewer): + raise HTTPException(status_code=403, detail="Not authorized to comment on this language") - if updates: - # Rebuild qc_assignments with reset statuses - updated_lang_qc = {} - for lang, state in lang_qc.items(): - updated_lang_qc[lang] = {**(state if isinstance(state, dict) else {}), "status": LanguageQCStatus.PENDING.value} + now = datetime.utcnow() + comment = LanguageQCComment( + id=str(uuid4()), + author_id=str(actor.id), + author_name=actor.full_name or "", + author_email=actor.email, + body=body.strip(), + created_at=now, + ) - updates["qc_assignments"] = _rebuild_qc_assignments(updated_lang_qc) - await db[_JOBS].update_one({"_id": job_id}, {"$set": updates}) + await db[_JOBS].update_one( + {"_id": job_id}, + { + "$push": {f"language_qc.{lang}.comments": comment.model_dump()}, + "$set": {"updated_at": now}, + } + ) + await audit_logger.log_action( + AuditAction.LANGUAGE_QC_COMMENT, + f"Comment added to language {lang} on job {job_id}", + user=actor, + request=http_request, + resource_type="job_language", + resource_id=f"{job_id}:{lang}", + details={"lang": lang}, + ) + + # Fan-out to all other assignees + recipients = _qc_recipients(job_doc, current_state_raw if isinstance(current_state_raw, dict) else {}, exclude_user_id=actor.email) + if recipients: + try: + from ..services.emailer import email_service + await asyncio.gather(*[ + email_service.send_qc_comment_email( + to_email=email, full_name=name, + job_title=job_doc.get("title", job_id), lang=lang, + author_name=actor.full_name or actor.email, + comment_body=body.strip(), + deep_link=_deep_link(job_id, lang), + ) + for email, name in recipients + ], return_exceptions=True) + except Exception: + logger.exception("Failed to send comment notification emails") + + return comment + + +# ── Queue / list ────────────────────────────────────────────────────────────── async def list_for_linguist( db: AsyncIOMotorDatabase, @@ -471,7 +857,6 @@ async def list_for_linguist( cursor = db[_JOBS].find(query, {"title": 1, "status": 1, "language_qc": 1, "qc_assignments": 1, "created_at": 1, "updated_at": 1}).skip(skip).limit(limit).sort("updated_at", -1) jobs = await cursor.to_list(length=limit) - # Filter qc_assignments to only include this linguist's languages result = [] for job in jobs: my_langs = [a for a in (job.get("qc_assignments") or []) if a.get("linguist_id") == linguist_id] @@ -479,6 +864,38 @@ async def list_for_linguist( return result +async def list_for_reviewer( + db: AsyncIOMotorDatabase, + reviewer_id: str, + *, + status_filter: Optional[str] = None, + skip: int = 0, + limit: int = 50, +) -> list[dict]: + """Return jobs where the reviewer is assigned to at least one language.""" + # language_qc is an embedded dict keyed by lang code; scan in Python + all_jobs_cursor = db[_JOBS].find( + {}, + {"title": 1, "status": 1, "language_qc": 1, "qc_assignments": 1, "created_at": 1, "updated_at": 1} + ).sort("updated_at", -1).skip(skip).limit(limit * 5) # over-fetch, filter in Python + + all_jobs = await all_jobs_cursor.to_list(length=limit * 5) + + result = [] + for job in all_jobs: + my_langs = [] + for lang, state in (job.get("language_qc") or {}).items(): + if isinstance(state, dict) and state.get("assigned_reviewer_id") == reviewer_id: + if not status_filter or state.get("status") == status_filter: + my_langs.append({"lang": lang, "status": state.get("status", "pending")}) + if my_langs: + result.append({**job, "_my_assignments": my_langs}) + if len(result) >= limit: + break + + return result + + async def seed_language_qc_for_job(db: AsyncIOMotorDatabase, job_doc: dict) -> None: """Idempotently seed language_qc entries for all languages in a job's outputs.""" job_id = str(job_doc["_id"]) @@ -498,8 +915,12 @@ async def seed_language_qc_for_job(db: AsyncIOMotorDatabase, job_doc: dict) -> N for lang in all_langs: if lang in existing_qc: - continue # already seeded - state: dict[str, Any] = {"status": LanguageQCStatus.APPROVED.value if is_approved else LanguageQCStatus.PENDING.value, "history": []} + continue + state: dict[str, Any] = { + "status": LanguageQCStatus.APPROVED.value if is_approved else LanguageQCStatus.PENDING.value, + "history": [], + "comments": [], + } if is_approved: state["reviewed_by_user_id"] = job_doc.get("review", {}).get("reviewer_id") state["reviewed_at"] = job_doc.get("updated_at") @@ -513,20 +934,55 @@ async def seed_language_qc_for_job(db: AsyncIOMotorDatabase, job_doc: dict) -> N await db[_JOBS].update_one({"_id": job_id}, {"$set": updates}) +async def reset_all_for_return_to_qc(db: AsyncIOMotorDatabase, job_id: str) -> None: + """Called by return_to_qc — resets statuses to pending while preserving assignments and history.""" + job_doc = await db[_JOBS].find_one({"_id": job_id}, {"language_qc": 1}) + if not job_doc: + return + + lang_qc = job_doc.get("language_qc") or {} + updates: dict[str, Any] = {} + for lang, state in lang_qc.items(): + if isinstance(state, dict): + updates[f"language_qc.{lang}.status"] = LanguageQCStatus.PENDING.value + updates[f"language_qc.{lang}.reviewed_at"] = None + updates[f"language_qc.{lang}.reviewed_by_user_id"] = None + updates[f"language_qc.{lang}.reviewed_by_email"] = None + updates[f"language_qc.{lang}.submitted_for_review_at"] = None + updates[f"language_qc.{lang}.review_started_at"] = None + + if updates: + updated_lang_qc = {} + for lang, state in lang_qc.items(): + updated_lang_qc[lang] = {**(state if isinstance(state, dict) else {}), "status": LanguageQCStatus.PENDING.value} + + updates["qc_assignments"] = _rebuild_qc_assignments(updated_lang_qc) + await db[_JOBS].update_one({"_id": job_id}, {"$set": updates}) + + # ── Internal ────────────────────────────────────────────────────────────────── -def _assert_can_act(job_doc: dict, lang: str, actor: User) -> None: - """Raise 403 if actor is not the assigned linguist and not PROD/ADMIN.""" - from ..models.user import UserRole +def _assert_can_approve(job_doc: dict, lang: str, actor: User) -> None: + """Raise 403 if actor is not the assigned reviewer (or PROD/ADMIN).""" if actor.role in (UserRole.PRODUCTION, UserRole.ADMIN): return state = (job_doc.get("language_qc") or {}).get(lang, {}) - assigned = state.get("assigned_linguist_id") if isinstance(state, dict) else None - if assigned is None: - raise HTTPException(status_code=403, detail=f"Language '{lang}' has no assigned linguist") - if assigned != str(actor.id): - raise HTTPException(status_code=403, detail=f"You are not assigned to language '{lang}'") + assigned_reviewer = state.get("assigned_reviewer_id") if isinstance(state, dict) else None + + if assigned_reviewer is None: + # Fallback: allow assigned linguist to approve if no reviewer assigned (backward compat) + assigned_linguist = state.get("assigned_linguist_id") if isinstance(state, dict) else None + if assigned_linguist == str(actor.id): + return + raise HTTPException(status_code=403, detail=f"Language '{lang}' has no assigned reviewer") + + if assigned_reviewer != str(actor.id): + raise HTTPException(status_code=403, detail=f"You are not the assigned reviewer for language '{lang}'") + + +# Keep old name for any remaining callers +_assert_can_act = _assert_can_approve async def _maybe_advance_job(db: AsyncIOMotorDatabase, job_doc: dict) -> None: diff --git a/backend/app/tasks/embed_glossary.py b/backend/app/tasks/embed_glossary.py index da90f20..c6b64c2 100644 --- a/backend/app/tasks/embed_glossary.py +++ b/backend/app/tasks/embed_glossary.py @@ -2,14 +2,15 @@ Celery task: compute and store Gemini embeddings for all terms in a glossary version. Runs as a background job after glossary ingestion so the API response is fast. -Processes terms in batches of 100 and updates embedded_count incrementally. +Processes terms in concurrent batches of 250 (5 batches in parallel). """ from __future__ import annotations import asyncio +from typing import Any from bson import ObjectId -from motor.motor_asyncio import AsyncIOMotorClient +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from ..core.config import settings from ..core.logging import get_logger @@ -18,15 +19,12 @@ from . import celery_app logger = get_logger(__name__) -_BATCH_SIZE = 100 +_BATCH_SIZE = 250 +_CONCURRENCY = 5 @celery_app.task(name="embed_glossary_version", bind=True, max_retries=3) def embed_glossary_version_task(self, version_id: str) -> dict: - """ - Compute embeddings for all GlossaryTerms of `version_id`. - Updates embedded_count and embedding_status on the GlossaryVersion doc. - """ try: result = asyncio.run(_async_embed_version(version_id)) return result @@ -35,53 +33,64 @@ def embed_glossary_version_task(self, version_id: str) -> dict: raise self.retry(exc=exc, countdown=60) from None -async def _async_embed_version(version_id: str) -> dict: +async def _embed_batch( + db: AsyncIOMotorDatabase, + version_id: str, + batch: list[dict[str, Any]], + sem: asyncio.Semaphore, + counter: list[int], + total: int, +) -> None: + from pymongo import UpdateOne from ..services.embedding_service import embedding_service + async with sem: + texts = [t["source_term"] for t in batch] + ids = [t["_id"] for t in batch] + embeddings = await embedding_service.embed_texts(texts) + + ops = [ + UpdateOne({"_id": tid}, {"$set": {"embedding": emb}}) + for tid, emb in zip(ids, embeddings, strict=False) + ] + if ops: + await db.glossary_terms.bulk_write(ops, ordered=False) + + counter[0] += len(batch) + await db.glossary_versions.update_one( + {"_id": ObjectId(version_id)}, + {"$set": {"embedded_count": counter[0]}}, + ) + logger.info(f"Version {version_id}: embedded {counter[0]}/{total}") + + +async def _async_embed_version(version_id: str) -> dict: mongo_client = AsyncIOMotorClient(settings.mongodb_uri) db = mongo_client[settings.mongodb_db] try: - # Mark in-progress await db.glossary_versions.update_one( {"_id": ObjectId(version_id)}, {"$set": {"embedding_status": EmbeddingStatus.IN_PROGRESS.value}}, ) - # Fetch all terms without embeddings cursor = db.glossary_terms.find( {"version_id": version_id, "embedding": None}, {"_id": 1, "source_term": 1}, ) terms = await cursor.to_list(length=None) total = len(terms) - logger.info(f"Embedding {total} terms for version {version_id}") + logger.info(f"Embedding {total} terms for version {version_id} (batch={_BATCH_SIZE}, concurrency={_CONCURRENCY})") - embedded_count = 0 - for i in range(0, total, _BATCH_SIZE): - batch = terms[i: i + _BATCH_SIZE] - texts = [t["source_term"] for t in batch] - ids = [t["_id"] for t in batch] + batches = [terms[i: i + _BATCH_SIZE] for i in range(0, total, _BATCH_SIZE)] + sem = asyncio.Semaphore(_CONCURRENCY) + counter = [0] - embeddings = await embedding_service.embed_texts(texts) + await asyncio.gather(*[ + _embed_batch(db, version_id, batch, sem, counter, total) + for batch in batches + ]) - # Bulk update - ops = [] - from pymongo import UpdateOne - for term_id, embedding in zip(ids, embeddings, strict=False): - ops.append(UpdateOne({"_id": term_id}, {"$set": {"embedding": embedding}})) - - if ops: - await db.glossary_terms.bulk_write(ops, ordered=False) - - embedded_count += len(batch) - await db.glossary_versions.update_one( - {"_id": ObjectId(version_id)}, - {"$set": {"embedded_count": embedded_count}}, - ) - logger.info(f"Version {version_id}: embedded {embedded_count}/{total}") - - # Mark done await db.glossary_versions.update_one( {"_id": ObjectId(version_id)}, {"$set": { diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index e014c09..486a60d 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -730,13 +730,38 @@ class ApiClient { return r.data; } - async assignLanguageQC(jobId: string, lang: string, linguistUserId: string, notes?: string): Promise { - const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/assign`, { linguist_user_id: linguistUserId, notes }); + async assignLanguageQC(jobId: string, lang: string, linguistUserId: string, notes?: string, deadline?: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/assign`, { linguist_user_id: linguistUserId, notes, deadline }); return r.data; } - async reassignLanguageQC(jobId: string, lang: string, linguistUserId: string, notes?: string): Promise { - const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/reassign`, { linguist_user_id: linguistUserId, notes }); + async reassignLanguageQC(jobId: string, lang: string, linguistUserId: string, notes?: string, deadline?: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/reassign`, { linguist_user_id: linguistUserId, notes, deadline }); + return r.data; + } + + async assignReviewerQC(jobId: string, lang: string, reviewerUserId: string, notes?: string, deadline?: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/assign-reviewer`, { reviewer_user_id: reviewerUserId, notes, deadline }); + return r.data; + } + + async reassignReviewerQC(jobId: string, lang: string, reviewerUserId: string, notes?: string, deadline?: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/reassign-reviewer`, { reviewer_user_id: reviewerUserId, notes, deadline }); + return r.data; + } + + async startLinguistWork(jobId: string, lang: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/start-work`); + return r.data; + } + + async submitForReview(jobId: string, lang: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/submit`); + return r.data; + } + + async openReview(jobId: string, lang: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/open-review`); return r.data; } @@ -755,8 +780,18 @@ class ApiClient { return r.data; } - async getMyLanguageQCQueue(statusFilter?: string, skip = 0, limit = 50): Promise { - const params = new URLSearchParams(); + async addQCComment(jobId: string, lang: string, body: string): Promise { + const r = await this.client.post(`/jobs/${jobId}/languages/${lang}/comments`, { body }); + return r.data; + } + + async listQCComments(jobId: string, lang: string): Promise { + const r = await this.client.get(`/jobs/${jobId}/languages/${lang}/comments`); + return r.data; + } + + async getMyLanguageQCQueue(role: 'linguist' | 'reviewer' = 'linguist', statusFilter?: string, skip = 0, limit = 50): Promise { + const params = new URLSearchParams({ role }); if (statusFilter) params.append('qc_status', statusFilter); params.append('skip', String(skip)); params.append('limit', String(limit)); @@ -824,6 +859,11 @@ class ApiClient { return r.data; } + async reembedGlossaryVersion(clientId: string, glossaryId: string, versionId: string): Promise<{ status: string; version_id: string }> { + const r = await this.client.post(`/clients/${clientId}/glossaries/${glossaryId}/versions/${versionId}/reembed`); + return r.data; + } + async getGlossaryTerms( clientId: string, glossaryId: string, diff --git a/frontend/src/routes/admin/AuditLog.tsx b/frontend/src/routes/admin/AuditLog.tsx index 876a7c4..9886ee0 100644 --- a/frontend/src/routes/admin/AuditLog.tsx +++ b/frontend/src/routes/admin/AuditLog.tsx @@ -1,7 +1,7 @@ import { useState, useCallback } from 'react'; import { useQuery } from '@tanstack/react-query'; import { api } from '../../lib/api'; -import type { AuditLogEntry, AuditLogQuery, AuditSeverity } from '../../types/api'; +import type { AuditLogEntry, AuditLogQuery, AuditSeverity, User } from '../../types/api'; const PAGE_SIZE = 50; @@ -137,9 +137,15 @@ export function AuditLog() { const [severityFilter, setSeverityFilter] = useState(''); const [successFilter, setSuccessFilter] = useState(''); const [securityHours, setSecurityHours] = useState(24); - const [userIdInput, setUserIdInput] = useState(''); const [activeUserId, setActiveUserId] = useState(''); + const usersQuery = useQuery({ + queryKey: ['admin-users-all'], + queryFn: () => api.listUsers({ size: 500, active_only: false }), + enabled: tab === 'user', + staleTime: 60_000, + }); + const buildQuery = useCallback((): AuditLogQuery => ({ ...filters, skip: page * PAGE_SIZE, @@ -282,24 +288,23 @@ export function AuditLog() { {/* User activity tab controls */} {tab === 'user' && ( -
{ e.preventDefault(); setActiveUserId(userIdInput.trim()); }} - > - setUserIdInput(e.target.value)} - className="px-3 py-1.5 text-sm border border-gray-300 rounded-md focus:ring-2 focus:ring-blue-500 focus:outline-none w-72" - /> - - )} {/* Progress bar */} {totalLangs > 1 && (
-
0 ? (approvedLangs / totalLangs) * 100 : 0}%` }} - /> +
0 ? (approvedLangs / totalLangs) * 100 : 0}%` }} />
)} -
+ {/* Language cards */} +
{availableLanguages.map((lang) => { const qcState = langQcMap[lang]; const qcStatus = (qcState?.status ?? 'pending') as LanguageQCStatus; const isActive = selectedLanguage === lang; - const isMyLang = authUser?.role === 'linguist' && qcState?.assigned_linguist_id === authUser?.id; - const canActOnThis = isMyLang || canApproveAll; + const myId = authUser?.id; + + const isAssignedLinguist = qcState?.assigned_linguist_id === myId; + const isAssignedReviewer = qcState?.assigned_reviewer_id === myId; + + // Linguist sees: start-work, submit + const canStartWork = isAssignedLinguist && (qcStatus === 'pending' || qcStatus === 'rejected'); + const canSubmit = isAssignedLinguist && qcStatus === 'in_progress'; + // Reviewer sees: open-review, approve, reject/request-changes + const canOpenReview = (isAssignedReviewer || canApproveAll) && qcStatus === 'pending_review'; + const canApproveThis = (isAssignedReviewer || canApproveAll) && (qcStatus === 'in_review' || (canApproveAll && qcStatus !== 'approved')); + const canRejectThis = (isAssignedReviewer || canApproveAll) && (qcStatus === 'in_review' || (canApproveAll && qcStatus !== 'rejected')); + + const isCommentsOpen = openCommentLang === lang; + + // Comments for this language + const commentsQuery = isCommentsOpen + ? { data: qcState?.comments ?? [] } + : null; + + // Deadline formatting + const linguistDeadline = qcState?.linguist_deadline ? new Date(qcState.linguist_deadline).toLocaleDateString() : null; + const reviewerDeadline = qcState?.reviewer_deadline ? new Date(qcState.reviewer_deadline).toLocaleDateString() : null; return ( -
+
+ {/* Card header — language selector + status */} - {qcState?.assigned_linguist_email && ( - - {qcState.assigned_linguist_email.split('@')[0]} - - )} - {/* Per-language action buttons, shown inline under the active language */} - {isActive && canActOnThis && ( -
- {qcStatus !== 'approved' && ( + + {/* Card body — only shown when active */} + {isActive && ( +
+ {/* Two-slot assignment row */} +
+ {/* Linguist slot */} +
+
Linguist
+ {qcState?.assigned_linguist_name || qcState?.assigned_linguist_email ? ( +
+ {qcState.assigned_linguist_name || qcState.assigned_linguist_email} + {linguistDeadline && due {linguistDeadline}} +
+ ) : ( +
Unassigned
+ )} + {canAssign && ( + + )} +
+ + {/* Reviewer slot */} +
+
Reviewer
+ {qcState?.assigned_reviewer_name || qcState?.assigned_reviewer_email ? ( +
+ {qcState.assigned_reviewer_name || qcState.assigned_reviewer_email} + {reviewerDeadline && due {reviewerDeadline}} +
+ ) : ( +
Unassigned
+ )} + {canAssign && ( + + )} +
+
+ + {/* Workflow action buttons */} +
+ {canStartWork && ( + + )} + {canSubmit && ( + + )} + {canOpenReview && ( + + )} + {canApproveThis && ( + + )} + {canRejectThis && ( + + )} + {canApproveAll && qcStatus === 'approved' && ( + + )} +
+ + {/* Comments toggle */} +
- )} - {qcStatus !== 'rejected' && ( - - )} - {canApproveAll && qcStatus === 'approved' && ( - - )} + + {isCommentsOpen && ( +
+ {(qcState?.comments ?? []).length === 0 && ( +

No comments yet.

+ )} + {(qcState?.comments ?? []).map((c) => ( +
+
+ {c.author_name || c.author_email} + {new Date(c.created_at).toLocaleString()} +
+

{c.body}

+
+ ))} +
+