diff --git a/backend/app/api/v1/routes_admin_production.py b/backend/app/api/v1/routes_admin_production.py index d18b262..8b186b9 100644 --- a/backend/app/api/v1/routes_admin_production.py +++ b/backend/app/api/v1/routes_admin_production.py @@ -1,22 +1,31 @@ -"""Admin production endpoints: failure dashboard and bulk retry.""" +"""Admin production endpoints: failure dashboard, bulk retry, queue stats, VTT override.""" from datetime import datetime -from ...services.cloud_run_dispatch import dispatch as _cr_dispatch - -from fastapi import APIRouter, Depends, HTTPException, Query, status +import redis.asyncio as aioredis +from fastapi import ( + APIRouter, + Depends, + File, + Form, + HTTPException, + Query, + UploadFile, + status, +) from motor.motor_asyncio import AsyncIOMotorDatabase from pydantic import BaseModel from ...core.database import get_database from ...core.dependencies import require_roles from ...core.logging import get_logger +from ...core.redis import get_redis from ...models.audit_log import AuditAction from ...models.job import JobStatus, RequestedOutputs from ...models.user import User, UserRole from ...schemas.job import JobResponse from ...services.audit_logger import audit_logger -from ...tasks.ingest_and_ai import ingest_and_ai_task -from ...tasks.translate_and_synthesize import translate_and_synthesize_task +from ...services.cloud_run_dispatch import dispatch as _cr_dispatch +from ...services.gcs import upload_vtt_to_gcs logger = get_logger(__name__) router = APIRouter(prefix="/admin/production", tags=["admin-production"]) @@ -147,7 +156,7 @@ async def bulk_retry( elif step in ("translation", "tts"): await _cr_dispatch("translate", job_id) elif step == "render": - lang = job.get("last_render_language", "en") + lang = job_doc.get("last_render_language", "en") await _cr_dispatch("rerender", job_id, language=lang) retried.append(job_id) @@ -169,3 +178,118 @@ async def bulk_retry( logger.warning(f"Failed to write bulk-retry audit log: {e}") return BulkRetryResponse(retried=retried, skipped=skipped, errors=errors) + + +# --------------------------------------------------------------------------- +# PR-7: Queue depth stats +# --------------------------------------------------------------------------- + +_CELERY_QUEUES = ["default", "ingest", "tts", "render", "ffmpeg", "whisper", "notify", "embed"] + + +class QueueStats(BaseModel): + queues: dict[str, int] # queue_name → pending task count + total_pending: int + + +@router.get("/queue-stats", response_model=QueueStats) +async def get_queue_stats( + current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), + redis: aioredis.Redis = Depends(get_redis), +): + """Return pending task counts per Celery queue (via Redis LLEN).""" + counts: dict[str, int] = {} + for q in _CELERY_QUEUES: + try: + n = await redis.llen(q) + counts[q] = n + except Exception: + counts[q] = 0 + return QueueStats(queues=counts, total_pending=sum(counts.values())) + + +# --------------------------------------------------------------------------- +# PR-8: Upload final VTT override — bypass AI, jump to PENDING_QC +# --------------------------------------------------------------------------- + +_BYPASSABLE_STATUSES = { + JobStatus.CREATED.value, + JobStatus.INGESTING.value, + JobStatus.AI_PROCESSING.value, + JobStatus.PROCESSING_FAILED.value, + JobStatus.TTS_FAILED.value, + JobStatus.RENDER_FAILED.value, +} + + +@router.post("/jobs/{job_id}/upload-final-vtt") +async def upload_final_vtt( + job_id: str, + language: str = Form(..., description="BCP-47 language code, e.g. 'en' or 'fr'"), + vtt_file: UploadFile = File(..., description="WebVTT (.vtt) file"), + vtt_type: str = Form("captions", description="'captions' or 'ad'"), + current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Upload a hand-crafted VTT to override AI output and advance job to PENDING_QC.""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException(status_code=404, detail="Job not found") + + if job_doc["status"] not in _BYPASSABLE_STATUSES: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Cannot override VTT when job is in status '{job_doc['status']}'. " + f"Only allowed in: {sorted(_BYPASSABLE_STATUSES)}", + ) + + if not vtt_file.filename or not vtt_file.filename.endswith(".vtt"): + raise HTTPException(status_code=400, detail="File must be a .vtt file") + + vtt_content = (await vtt_file.read()).decode("utf-8") + if not vtt_content.strip().startswith("WEBVTT"): + raise HTTPException(status_code=400, detail="File does not start with WEBVTT header") + + if vtt_type not in ("captions", "ad"): + raise HTTPException(status_code=400, detail="vtt_type must be 'captions' or 'ad'") + + lang_key = language.replace("-", "_") + field = "captions_vtt_gcs" if vtt_type == "captions" else "ad_vtt_gcs" + gcs_path = f"{job_id}/{lang_key}/{vtt_type}.vtt" + + gcs_uri = await upload_vtt_to_gcs(vtt_content, gcs_path) + + now = datetime.utcnow() + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"outputs.{lang_key}.{field}": gcs_uri, + "status": JobStatus.PENDING_QC.value, + "updated_at": now, + }, + "$push": { + "review.history": { + "at": now, + "status": "manual_vtt_upload", + "by": str(current_user.id), + "note": f"Manual {vtt_type} VTT upload for {language} by {current_user.email}", + } + }, + }, + ) + + try: + await audit_logger.log( + action=AuditAction.VTT_EDIT, + user_id=str(current_user.id), + user_email=current_user.email, + user_role=current_user.role.value if current_user.role else None, + resource_type="job", + resource_id=job_id, + description=f"Manual {vtt_type} VTT upload for {language} — job advanced to PENDING_QC", + ) + except Exception as e: + logger.warning(f"Failed to write upload-final-vtt audit log: {e}") + + return {"status": "ok", "gcs_uri": gcs_uri, "job_status": JobStatus.PENDING_QC.value} diff --git a/docker-compose.optical-dev.yml b/docker-compose.optical-dev.yml index c4a8bf0..008e244 100644 --- a/docker-compose.optical-dev.yml +++ b/docker-compose.optical-dev.yml @@ -1,8 +1,10 @@ # ============================================================================= # optical-dev overrides — 2 CPU / ~8 GB RAM server # -# Heavy pipeline workers (ingest, translate, render, rerender) run on -# Cloud Run Jobs. Only lightweight services run here. +# Cloud Run Jobs (va-worker) are NOT yet reachable from this server +# (VPC Connector pending). Until then USE_CELERY_FALLBACK=true routes all +# heavy tasks through local Celery workers constrained to WORKER_CONCURRENCY=2 +# so they fit in 2 CPU without OOM on large videos. # # Usage: # docker compose -f docker-compose.yml \ @@ -45,43 +47,53 @@ services: cpus: '0.5' environment: APP_ENV: prod - # Cloud Run dispatch config - CLOUD_RUN_WORKER_JOB: va-worker - GCP_REGION: europe-west1 - USE_CELERY_FALLBACK: "false" + # Fallback mode: bypass Cloud Run, dispatch heavy tasks to local workers + USE_CELERY_FALLBACK: "true" + WORKER_CONCURRENCY: "2" - # Lightweight worker: only notify + embed_glossary tasks - # Heavy tasks (ingest/translate/render) go to Cloud Run Jobs + # Full worker: handles ALL queues in fallback mode worker: deploy: + replicas: 1 + resources: + limits: + memory: 2G + cpus: '0.75' + reservations: + memory: 1G + cpus: '0.25' + environment: + APP_ENV: prod + WORKER_CONCURRENCY: "2" + command: > + celery -A app.tasks worker + --loglevel=info + --queues=default,ingest,tts,render,ffmpeg,whisper,notify,embed + --concurrency=2 + --hostname=full-worker@%h + + # ── Pipeline workers — enabled in fallback mode ──────────────────────────── + + ffmpeg-worker: + deploy: + replicas: 1 + resources: + limits: + memory: 1G + cpus: '0.5' + + tts-worker: + deploy: + replicas: 1 resources: limits: memory: 512M cpus: '0.25' - reservations: - memory: 256M - cpus: '0.1' - environment: - APP_ENV: prod - # Only consume lightweight queues; heavy queues handled by Cloud Run - CELERY_QUEUES: "notify,embed" - command: > - celery -A app.tasks worker - --loglevel=info - --queues=notify,embed - --concurrency=2 - --hostname=lite-worker@%h - - # ── Disabled on optical-dev — run on Cloud Run Jobs instead ─────────────── - - ffmpeg-worker: - deploy: - replicas: 0 - - tts-worker: - deploy: - replicas: 0 whisper-worker: deploy: - replicas: 0 + replicas: 1 + resources: + limits: + memory: 2G + cpus: '0.5' diff --git a/frontend/src/components/StatusBadge.tsx b/frontend/src/components/StatusBadge.tsx index b1d8663..c70a6a3 100644 --- a/frontend/src/components/StatusBadge.tsx +++ b/frontend/src/components/StatusBadge.tsx @@ -1,48 +1,14 @@ import type { JobStatus } from '../types/api'; -import { getJobStatusLabel } from '../utils/jobStatusMessages'; +import { getJobStatusLabel, getJobStatusColor } from '../utils/jobStatusMessages'; interface StatusBadgeProps { status: JobStatus; } export function StatusBadge({ status }: StatusBadgeProps) { - const getStatusStyles = (status: JobStatus) => { - switch (status) { - case 'created': - return 'bg-gray-100 text-gray-800'; - case 'ingesting': - return 'bg-blue-100 text-blue-800'; - case 'ai_processing': - return 'bg-purple-100 text-purple-800'; - case 'pending_qc': - return 'bg-yellow-100 text-yellow-800'; - case 'approved_english': - case 'approved_source': - return 'bg-green-100 text-green-800'; - case 'rejected': - case 'tts_failed': - case 'render_failed': - return 'bg-red-100 text-red-800'; - case 'translating': - return 'bg-blue-100 text-blue-800'; - case 'tts_generating': - return 'bg-indigo-100 text-indigo-800'; - case 'rendering_video': - return 'bg-violet-100 text-violet-800'; - case 'pending_final_review': - return 'bg-orange-100 text-orange-800'; - case 'completed': - return 'bg-green-100 text-green-800'; - default: - return 'bg-gray-100 text-gray-800'; - } - }; - - const getStatusLabel = getJobStatusLabel; - return ( - - {getStatusLabel(status)} + + {getJobStatusLabel(status)} ); } \ No newline at end of file diff --git a/frontend/src/components/TimelinePreview/TimelinePreview.tsx b/frontend/src/components/TimelinePreview/TimelinePreview.tsx index 959e1e7..1ca43f2 100644 --- a/frontend/src/components/TimelinePreview/TimelinePreview.tsx +++ b/frontend/src/components/TimelinePreview/TimelinePreview.tsx @@ -98,17 +98,10 @@ export function TimelinePreview({ const closeContextMenu = () => setContextMenu(null); const handleContextMenuPauseOpen = (pp: PausePointData) => { - const effectiveMs = pp.adjusted_ms ?? pp.original_ms; setEditorPosition({ x: contextMenu!.x, y: contextMenu!.y + 8 }); setSelectedPausePoint(pp); onPausePointClick(pp); setContextMenu(null); - if (timelineRef.current) { - // seek video - onPausePointClick(pp); - } - // Seek video - const _ = effectiveMs; // used by consumer via onPausePointClick }; const formatTime = (ms: number) => { diff --git a/frontend/src/hooks/useJob.ts b/frontend/src/hooks/useJob.ts index 179646a..41e9b6a 100644 --- a/frontend/src/hooks/useJob.ts +++ b/frontend/src/hooks/useJob.ts @@ -400,4 +400,26 @@ export function useBulkRetry() { queryClient.invalidateQueries({ queryKey: ['jobs'] }); }, }); +} + +export function useProductionQueueStats() { + return useQuery({ + queryKey: ['production-queue-stats'], + queryFn: () => apiClient.getProductionQueueStats(), + refetchInterval: 10_000, + }); +} + +export function useUploadFinalVtt() { + const queryClient = useQueryClient(); + return useMutation({ + mutationFn: ({ + jobId, language, vttFile, vttType, + }: { jobId: string; language: string; vttFile: File; vttType?: 'captions' | 'ad' }) => + apiClient.uploadFinalVtt(jobId, language, vttFile, vttType), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['failures'] }); + queryClient.invalidateQueries({ queryKey: ['jobs'] }); + }, + }); } \ No newline at end of file diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 5aff5d4..850b7ad 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -1007,6 +1007,27 @@ class ApiClient { const r = await this.client.get(`/public/share/${token}`); return r.data; } + + // ── Production admin ──────────────────────────────────────────────────────── + + async getProductionQueueStats(): Promise<{ queues: Record; total_pending: number }> { + const r = await this.client.get('/admin/production/queue-stats'); + return r.data; + } + + async uploadFinalVtt( + jobId: string, + language: string, + vttFile: File, + vttType: 'captions' | 'ad' = 'captions', + ): Promise<{ status: string; gcs_uri: string; job_status: string }> { + const form = new FormData(); + form.append('language', language); + form.append('vtt_type', vttType); + form.append('vtt_file', vttFile); + const r = await this.client.post(`/admin/production/jobs/${jobId}/upload-final-vtt`, form); + return r.data; + } } export const apiClient = new ApiClient(); diff --git a/frontend/src/routes/Dashboard.tsx b/frontend/src/routes/Dashboard.tsx index 789fbd4..e73e224 100644 --- a/frontend/src/routes/Dashboard.tsx +++ b/frontend/src/routes/Dashboard.tsx @@ -1,16 +1,17 @@ import { Link } from 'react-router-dom'; import { useAuthStore } from '../lib/auth'; -import { useJobs } from '../hooks/useJob'; +import { useJobs, useProductionQueueStats } from '../hooks/useJob'; import { StatusBadge } from '../components/StatusBadge'; import type { Job } from '../types/api'; export function Dashboard() { const { user, isAuthenticated } = useAuthStore(); - const { data: jobsData, isLoading, error } = useJobs({ - mine: user?.role === 'client' - }, { - enabled: isAuthenticated && !!user + const { data: jobsData, isLoading, error } = useJobs({ + mine: user?.role === 'client' + }, { + enabled: isAuthenticated && !!user }); + const { data: queueStats } = useProductionQueueStats(); const jobs = jobsData?.jobs || []; @@ -138,6 +139,7 @@ export function Dashboard() { case 'production': return ( + <>
@@ -185,6 +187,29 @@ export function Dashboard() { )}
+ + {/* Queue depth panel — refreshes every 10s */} + {queueStats && ( +
+
+

Celery Queue Depth

+ 0 ? 'bg-blue-100 text-blue-700' : 'bg-green-100 text-green-700'}`}> + {queueStats.total_pending} pending + +
+
+ {Object.entries(queueStats.queues) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([queue, count]) => ( +
+ 0 ? 'text-blue-600' : 'text-gray-400'}`}>{count} + {queue} +
+ ))} +
+
+ )} + ); case 'reviewer': diff --git a/frontend/src/routes/admin/FailuresList.tsx b/frontend/src/routes/admin/FailuresList.tsx index 630d559..c606632 100644 --- a/frontend/src/routes/admin/FailuresList.tsx +++ b/frontend/src/routes/admin/FailuresList.tsx @@ -1,8 +1,9 @@ -import { useState } from 'react'; +import { useRef, useState } from 'react'; import { Link } from 'react-router-dom'; -import { useFailures, useBulkRetry } from '../../hooks/useJob'; +import { useFailures, useBulkRetry, useUploadFinalVtt } from '../../hooks/useJob'; import { StatusBadge } from '../../components/StatusBadge'; import { useToastContext } from '../../contexts/ToastContext'; +import { useAuthStore } from '../../lib/auth'; const STEP_LABELS: Record = { ingestion: 'Ingestion', @@ -16,16 +17,23 @@ export function FailuresList() { const [stepFilter, setStepFilter] = useState(''); const [selected, setSelected] = useState>(new Set()); const [strategy, setStrategy] = useState<'auto' | 'from_scratch'>('auto'); + const [uploadTarget, setUploadTarget] = useState<{ jobId: string; title: string } | null>(null); + const [uploadLanguage, setUploadLanguage] = useState('en'); + const [uploadType, setUploadType] = useState<'captions' | 'ad'>('captions'); + const uploadFileRef = useRef(null); const toast = useToastContext(); + const { user } = useAuthStore(); + const canUploadVtt = user?.role === 'production' || user?.role === 'admin'; const { data: jobs = [], isLoading, error, refetch } = useFailures( stepFilter ? { step: stepFilter } : undefined ); const bulkRetryMutation = useBulkRetry(); + const uploadVttMutation = useUploadFinalVtt(); const toggle = (id: string) => { const next = new Set(selected); - next.has(id) ? next.delete(id) : next.add(id); + if (next.has(id)) { next.delete(id); } else { next.add(id); } setSelected(next); }; const selectAll = () => setSelected(new Set(jobs.map(j => j.id))); @@ -46,6 +54,23 @@ export function FailuresList() { } }; + const handleUploadVtt = async () => { + if (!uploadTarget || !uploadFileRef.current?.files?.[0]) return; + const file = uploadFileRef.current.files[0]; + try { + await uploadVttMutation.mutateAsync({ + jobId: uploadTarget.jobId, + language: uploadLanguage, + vttFile: file, + vttType: uploadType, + }); + toast.toastOnly.success(`VTT uploaded — job advanced to Pending QC`); + setUploadTarget(null); + } catch { + toast.toastOnly.error('VTT upload failed'); + } + }; + // Group by failure type for accordion const byType = jobs.reduce>((acc, job) => { const key = job.failure?.type || job.status; @@ -181,13 +206,22 @@ export function FailuresList() { {new Date(job.updated_at).toLocaleDateString()} - + View + {canUploadVtt && ( + + )} ))} @@ -199,6 +233,60 @@ export function FailuresList() {
)} + + {/* Upload final VTT modal */} + {uploadTarget && ( +
+
+

Upload Final VTT

+

+ Override AI output for {uploadTarget.title} and advance to Pending QC. +

+
+
+ + setUploadLanguage(e.target.value)} + placeholder="en" + className="w-full border border-gray-300 rounded px-3 py-1.5 text-sm" + /> +
+
+ + +
+
+ + +
+
+
+ + +
+
+
+ )} ); } diff --git a/frontend/src/utils/jobStatusMessages.ts b/frontend/src/utils/jobStatusMessages.ts index f36679d..87cc21d 100644 --- a/frontend/src/utils/jobStatusMessages.ts +++ b/frontend/src/utils/jobStatusMessages.ts @@ -163,4 +163,28 @@ export const JOB_STATUS_LABELS: Record = { }; export const getJobStatusLabel = (status: string): string => - JOB_STATUS_LABELS[status] ?? status.replace(/_/g, ' '); \ No newline at end of file + JOB_STATUS_LABELS[status] ?? status.replace(/_/g, ' '); + +/** Tailwind classes for job status badges (bg + text). */ +export function getJobStatusColor(status: string): string { + switch (status) { + case 'created': return 'bg-gray-100 text-gray-800'; + case 'ingesting': return 'bg-blue-100 text-blue-800'; + case 'ai_processing': return 'bg-purple-100 text-purple-800'; + case 'pending_qc': return 'bg-yellow-100 text-yellow-800'; + case 'approved_english': + case 'approved_source': + case 'completed': return 'bg-green-100 text-green-800'; + case 'rejected': + case 'tts_failed': + case 'render_failed': + case 'processing_failed': return 'bg-red-100 text-red-800'; + case 'qc_feedback': return 'bg-orange-100 text-orange-800'; + case 'translating': return 'bg-blue-100 text-blue-800'; + case 'tts_generating': return 'bg-indigo-100 text-indigo-800'; + case 'rendering_video': return 'bg-violet-100 text-violet-800'; + case 'rendering_qc': return 'bg-violet-100 text-violet-800'; + case 'pending_final_review':return 'bg-orange-100 text-orange-800'; + default: return 'bg-gray-100 text-gray-800'; + } +} \ No newline at end of file