Pipeline retry: idempotent all + retry endpoint + UI buttons
The pipeline `all` command now skips stages whose .state/stage{N}.done
sentinel is present (unless --force), so retrying picks up exactly where
the previous run failed without re-spending on completed stages. Stage 5
(validate) is the deliberate exception — it always re-runs because
--drop-failing changes the selection set.
- pipeline/cli.ts: `all` wraps every stage in a maybeRun() helper that
checks the sentinel + writes one after running. Validate runs every
retry; if it doesn't reach 100% coverage the pipeline now fails LOUDLY
with a clear error rather than crashing in Stage 6.
- routes/reports.ts: handleRetryReport — POST /api/reports/:id/retry,
resets reports.status to pending (clears error_message + finished_at),
spawns `pipeline cli all` with --drop-failing (and --force when the
client passes {force:true}). Same singleton-running guard as run.
- operator-app: useRetryReport hook + two new buttons on Reports detail:
- "Retry pipeline" / "Force re-run" on the failure panel.
- "Retry with drop-failing" on the manifest panel (Stage 5 specific).
Items explicitly deferred (documented in head):
- SSE for live progress (3s React Query polling already in place).
- Conversational CLI brief intake (V3 §0; the operator-app form covers it).
62/62 unit tests pass. SPA build 284 kB.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7024acfdf0
commit
376802db41
4 changed files with 171 additions and 24 deletions
|
|
@ -113,3 +113,12 @@ export function useBuildReport(reportId: string | undefined) {
|
|||
onSuccess: () => qc.invalidateQueries({ queryKey: ['reports', reportId] }),
|
||||
});
|
||||
}
|
||||
|
||||
export function useRetryReport(reportId: string | undefined) {
|
||||
const qc = useQueryClient();
|
||||
return useMutation<{ ok: true }, Error, { force?: boolean } | void>({
|
||||
mutationFn: (body) =>
|
||||
fetcher(`/reports/${reportId}/retry`, { method: 'POST', body: JSON.stringify(body ?? {}) }),
|
||||
onSuccess: () => qc.invalidateQueries({ queryKey: ['reports', reportId] }),
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { useState } from 'react';
|
||||
import { useParams } from 'react-router-dom';
|
||||
import {
|
||||
useReport, useQaSignoff, useBuildReport,
|
||||
useReport, useQaSignoff, useBuildReport, useRetryReport,
|
||||
type CostEvent, type Report, type ReportStatus, TERMINAL_STATUSES,
|
||||
type ManifestSummary, type QaState,
|
||||
} from '../../api/reports';
|
||||
|
|
@ -80,14 +80,9 @@ export default function ReportDetail() {
|
|||
|
||||
<StageProgress report={report} />
|
||||
|
||||
{report.error_message && (
|
||||
<section className="bg-red-500/10 border border-red-500/30 rounded-lg p-4">
|
||||
<div className="text-red-400 text-sm font-medium mb-1">Pipeline failed</div>
|
||||
<pre className="text-xs text-red-300 whitespace-pre-wrap font-mono">{report.error_message}</pre>
|
||||
</section>
|
||||
)}
|
||||
{report.error_message && <FailurePanel reportId={report.id} error={report.error_message} />}
|
||||
|
||||
{manifest && manifest.missing.length > 0 && <ManifestPanel manifest={manifest} reportBriefSlug={report.brief_slug} />}
|
||||
{manifest && manifest.missing.length > 0 && <ManifestPanel manifest={manifest} reportId={report.id} reportBriefSlug={report.brief_slug} />}
|
||||
{showSignoffPanel && qa && <SignoffPanel reportId={report.id} qa={qa} />}
|
||||
|
||||
<CostSummary report={report} />
|
||||
|
|
@ -215,13 +210,75 @@ function EventLog({ events }: { events: CostEvent[] }) {
|
|||
);
|
||||
}
|
||||
|
||||
function ManifestPanel({ manifest, reportBriefSlug }: { manifest: ManifestSummary; reportBriefSlug: string }) {
|
||||
function FailurePanel({ reportId, error }: { reportId: string; error: string }) {
|
||||
const retry = useRetryReport(reportId);
|
||||
const [err, setErr] = useState<string | null>(null);
|
||||
return (
|
||||
<section className="bg-red-500/10 border border-red-500/30 rounded-lg p-4 space-y-3">
|
||||
<div className="flex items-start justify-between gap-3 flex-wrap">
|
||||
<div>
|
||||
<div className="text-red-400 text-sm font-medium">Pipeline failed</div>
|
||||
<pre className="text-xs text-red-300 whitespace-pre-wrap font-mono mt-1">{error}</pre>
|
||||
</div>
|
||||
<div className="flex gap-2 shrink-0">
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => {
|
||||
setErr(null);
|
||||
retry.mutate(undefined, {
|
||||
onError: (e: Error) => setErr(e instanceof ApiError ? e.message : 'Retry failed'),
|
||||
});
|
||||
}}
|
||||
disabled={retry.isPending}
|
||||
className="bg-accent hover:bg-accent-hover text-black font-medium px-4 py-2 rounded text-sm disabled:opacity-60"
|
||||
>
|
||||
{retry.isPending ? 'Retrying…' : 'Retry pipeline'}
|
||||
</button>
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => {
|
||||
if (!confirm('Re-run every stage from scratch (forces re-spend on Apify + Claude)?')) return;
|
||||
setErr(null);
|
||||
retry.mutate({ force: true }, {
|
||||
onError: (e: Error) => setErr(e instanceof ApiError ? e.message : 'Retry failed'),
|
||||
});
|
||||
}}
|
||||
disabled={retry.isPending}
|
||||
className="border border-border-input hover:border-accent text-text-body px-3 py-2 rounded text-sm disabled:opacity-60"
|
||||
title="Invalidate every stage sentinel and re-run from scratch"
|
||||
>
|
||||
Force re-run
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
<p className="text-xs text-text-dim">
|
||||
Retry resumes from the failed stage (idempotent via .state sentinels).
|
||||
--drop-failing is set automatically so Stage 5 backfills missing assets.
|
||||
</p>
|
||||
{err && <div className="text-red-400 text-sm">{err}</div>}
|
||||
</section>
|
||||
);
|
||||
}
|
||||
|
||||
function ManifestPanel({ manifest, reportId, reportBriefSlug }: { manifest: ManifestSummary; reportId: string; reportBriefSlug: string }) {
|
||||
const s = manifest.summary;
|
||||
const retry = useRetryReport(reportId);
|
||||
return (
|
||||
<section className="bg-bg-panel border border-amber-500/40 rounded-lg p-4">
|
||||
<h2 className="text-sm font-medium mb-3 text-amber-400 uppercase tracking-wider">
|
||||
Manifest gate — {s.coverage_pct}% coverage ({s.all_ok}/{s.selected_count})
|
||||
</h2>
|
||||
<div className="flex items-start justify-between gap-3 flex-wrap mb-3">
|
||||
<h2 className="text-sm font-medium text-amber-400 uppercase tracking-wider">
|
||||
Manifest gate — {s.coverage_pct}% coverage ({s.all_ok}/{s.selected_count})
|
||||
</h2>
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => retry.mutate(undefined)}
|
||||
disabled={retry.isPending || s.coverage_pct === 100}
|
||||
className="bg-accent hover:bg-accent-hover text-black font-medium px-3 py-1.5 rounded text-xs disabled:opacity-40"
|
||||
title="Re-run validate with --drop-failing, then continue the rest of the pipeline"
|
||||
>
|
||||
{retry.isPending ? 'Retrying…' : 'Retry with drop-failing'}
|
||||
</button>
|
||||
</div>
|
||||
<div className="grid grid-cols-2 sm:grid-cols-3 lg:grid-cols-6 gap-2 text-xs mb-3">
|
||||
{([
|
||||
['metadata', s.metadata_ok],
|
||||
|
|
@ -251,7 +308,7 @@ function ManifestPanel({ manifest, reportBriefSlug }: { manifest: ManifestSummar
|
|||
</ul>
|
||||
</details>
|
||||
<p className="text-xs text-text-dim mt-3">
|
||||
To auto-backfill failures, on the server run:
|
||||
Click <strong>Retry with drop-failing</strong> above, or run on the server:
|
||||
<code className="block mt-1 font-mono text-text-muted">
|
||||
docker exec social-reporting-v2-app-v2-1 npx tsx pipeline/cli.ts validate --report {reportBriefSlug} --drop-failing
|
||||
</code>
|
||||
|
|
|
|||
|
|
@ -316,26 +316,58 @@ async function main(): Promise<void> {
|
|||
case 'all': {
|
||||
// Drive every stage end-to-end. validate runs with --drop-failing so the
|
||||
// manifest gate isn't a hard stop on first try; auto-backfill kicks in.
|
||||
// Each stage is idempotent: completed stages (sentinel present) are
|
||||
// skipped unless --force. That makes "retry pipeline" automatically
|
||||
// resume from where it failed.
|
||||
const { brief, briefRow } = await loadBriefAndRow(reportId);
|
||||
const dropFailing = flags['drop-failing'] !== false;
|
||||
const recipe = (typeof flags.recipe === 'string' ? flags.recipe.toUpperCase() : undefined) as RecipeId | undefined;
|
||||
|
||||
await withStage(runId, 1, 'seeds', () => runStage1Seeds({ reportId, brief }));
|
||||
await withStage(runId, 2, 'pass1', () => runStage2Pass1Scrape({ reportId, brief }));
|
||||
await withStage(runId, 3, 'select', () => {
|
||||
const rid = reportId; // close over a narrowed non-null reference for the helpers
|
||||
async function maybeRun<T>(
|
||||
stage: number, name: ReportStatus, fn: () => Promise<T>, sentinelExtras?: Record<string, unknown>,
|
||||
): Promise<T | undefined> {
|
||||
if (isStageDone(rid, stage, force)) {
|
||||
console.log(`[stage ${stage}] cached (use --force to re-run)`);
|
||||
if (runId) await updateReportStatus(runId, name, stage).catch(() => {});
|
||||
return undefined;
|
||||
}
|
||||
const result = await withStage(runId, stage, name, fn);
|
||||
writeStageDone(rid, stage, { command, at: new Date().toISOString(), ...(sentinelExtras ?? {}) });
|
||||
return result;
|
||||
}
|
||||
|
||||
await maybeRun(1, 'seeds', () => runStage1Seeds({ reportId, brief }));
|
||||
await maybeRun(2, 'pass1', () => runStage2Pass1Scrape({ reportId, brief }));
|
||||
await maybeRun(3, 'select', () => {
|
||||
const a: Parameters<typeof runStage3Select>[0] = { reportId, brief };
|
||||
if (recipe) a.forceRecipe = recipe;
|
||||
return runStage3Select(a);
|
||||
});
|
||||
await withStage(runId, 4, 'pass2', () => runStage4Pass2Enrich({ reportId, brief }));
|
||||
await withStage(runId, 5, 'validate', () => runStage5Manifest({ reportId, brief, dropFailing }));
|
||||
await withStage(runId, 6, 'analyse', () => runStage6Analyse(reportId));
|
||||
await withStage(runId, 7, 'insights', () => runStage7AtomicInsights(reportId));
|
||||
await withStage(runId, 8, 'trends', () => runStage8Trends(reportId, brief));
|
||||
await maybeRun(4, 'pass2', () => runStage4Pass2Enrich({ reportId, brief }));
|
||||
// Validate is special: re-runs every time even if a sentinel exists,
|
||||
// because --drop-failing changes selected_video_ids and the gate must
|
||||
// re-evaluate on the (possibly updated) selection.
|
||||
const validateResult = await withStage(runId, 5, 'validate', () => runStage5Manifest({ reportId, brief, dropFailing }));
|
||||
writeStageDone(reportId, 5, { command, at: new Date().toISOString(), passed: validateResult.passed, coverage_pct: validateResult.manifest.summary.coverage_pct });
|
||||
if (!validateResult.passed) {
|
||||
// Fail loudly — the manifest panel + retry button in the UI will surface this.
|
||||
const missingCount = validateResult.manifest.videos.filter((v) => !v.all_ok).length;
|
||||
throw new Error(`Manifest gate failed at ${validateResult.manifest.summary.coverage_pct}% coverage; ${missingCount} videos missing assets. Retry to drop-failing-backfill, or relax MANIFEST_FRAMES_OPTIONAL=true if frames are out of scope.`);
|
||||
}
|
||||
await maybeRun(6, 'analyse', () => runStage6Analyse(reportId));
|
||||
await maybeRun(7, 'insights', () => runStage7AtomicInsights(reportId));
|
||||
await maybeRun(8, 'trends', () => runStage8Trends(reportId, brief));
|
||||
// 8c is optional — if it fails the rest of the pipeline still ships.
|
||||
try { await runStage8cLenses(reportId); }
|
||||
catch (e) { console.warn(`[stage 8c] lenses failed (continuing): ${(e as Error).message}`); }
|
||||
await withStage(runId, 9, 'qa', () => runStage9Qa(reportId));
|
||||
if (!isStageDone(reportId, 80, force)) {
|
||||
try {
|
||||
await runStage8cLenses(reportId);
|
||||
writeStageDone(reportId, 80, { command, at: new Date().toISOString() });
|
||||
} catch (e) { console.warn(`[stage 8c] lenses failed (continuing): ${(e as Error).message}`); }
|
||||
} else {
|
||||
console.log('[stage 8c] cached');
|
||||
}
|
||||
await maybeRun(9, 'qa', () => runStage9Qa(reportId));
|
||||
|
||||
// V3 §9: Stage 10 needs CM + Strategist sign-offs from two different
|
||||
// humans. The operator-app's Build button (POST /api/reports/:id/build)
|
||||
|
|
|
|||
|
|
@ -141,6 +141,55 @@ export async function handleRunPipeline(req: IncomingMessage, res: ServerRespons
|
|||
sendJSON(res, 201, { report: publicReport(report) });
|
||||
}
|
||||
|
||||
// ── Retry: re-run `all` against an existing report row ─────────────────
|
||||
//
|
||||
// The pipeline is idempotent via .state/stage{N}.done sentinels — completed
|
||||
// stages are skipped, so retry resumes from the failed step. Pass --force
|
||||
// (`?force=1`) to rerun every stage from scratch.
|
||||
import { sql } from '../db/client.js';
|
||||
|
||||
export async function handleRetryReport(req: IncomingMessage, res: ServerResponse, id: string): Promise<void> {
|
||||
const session = requireAuth(req, res);
|
||||
if (!session) return;
|
||||
const report = await getReport(id);
|
||||
if (!report) { sendJSON(res, 404, { error: 'Report not found' }); return; }
|
||||
if (!await requireTeamRole(res, session, report.team_id, 'editor')) return;
|
||||
if (runningReportId) {
|
||||
sendJSON(res, 409, { error: `Another pipeline is already running (report ${runningReportId}).` });
|
||||
return;
|
||||
}
|
||||
|
||||
let body: { force?: boolean } = {};
|
||||
try { body = await parseJSONBody<{ force?: boolean }>(req); } catch { /* tolerate empty body */ }
|
||||
|
||||
// Reset the row so the UI re-polls and the cost ticker starts fresh on this retry.
|
||||
await sql`
|
||||
UPDATE reports SET status = 'pending', error_message = NULL, finished_at = NULL
|
||||
WHERE id = ${report.id}
|
||||
`;
|
||||
runningReportId = report.id;
|
||||
|
||||
const cwd = resolve(process.cwd());
|
||||
const tsxBin = existsSync(resolve(cwd, 'node_modules/.bin/tsx'))
|
||||
? resolve(cwd, 'node_modules/.bin/tsx')
|
||||
: 'npx';
|
||||
const baseArgs = ['pipeline/cli.ts', 'all', '--report', report.brief_id, '--run-id', report.id, '--drop-failing'];
|
||||
if (body.force) baseArgs.push('--force');
|
||||
const args = tsxBin === 'npx' ? ['tsx', ...baseArgs] : baseArgs;
|
||||
const child = spawn(tsxBin, args, { cwd, detached: true, stdio: ['ignore', 'pipe', 'pipe'], env: { ...process.env } });
|
||||
child.stdout?.on('data', (c) => process.stdout.write(`[retry ${report.id.slice(0, 8)}] ${c}`));
|
||||
child.stderr?.on('data', (c) => process.stderr.write(`[retry ${report.id.slice(0, 8)}] ${c}`));
|
||||
child.on('exit', async (code) => {
|
||||
if (code !== 0) {
|
||||
await finishReport(report.id, 'failed', `Pipeline exited with code ${code}`).catch(() => {});
|
||||
}
|
||||
if (runningReportId === report.id) runningReportId = null;
|
||||
});
|
||||
child.unref();
|
||||
|
||||
sendJSON(res, 202, { ok: true, message: 'Retry started.' });
|
||||
}
|
||||
|
||||
// ── Dashboard static serve ───────────────────────────────────────────────
|
||||
const STATIC_MIME: Record<string, string> = {
|
||||
'.html': 'text/html; charset=utf-8',
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue