From a9f4dcf71a484eaaaf17d798da2dae6f25ce8dfa Mon Sep 17 00:00:00 2001 From: DJP Date: Wed, 29 Apr 2026 19:31:38 -0400 Subject: [PATCH] Finish V2: serve dashboards, downscale covers, post-run Apify cost re-poll, enable mp4 download, smart Stage 4 cache, default Apify live in prod MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the last gaps so the operator app is end-to-end usable in production. Server: - routes/reports.ts: GET /api/reports/:id/dashboard[/] serves files out of the report's brief outputs/ tree (HTML bundle, dataset_v2.json, any covers referenced relatively). Auth-gated by team viewer role. Path-traversal guarded. - index.ts: two new route patterns (with and without trailing path). Client: - routes/reports/detail.tsx: "Open dashboard" is a target=_blank anchor at /api/reports/:id/dashboard/, "Download" same URL with download attribute. No more dead SPA-internal link. Pipeline polish (the four open items from the smoke test): - stage_10_build.ts: covers are now downscaled via ffmpeg (240px / q=6) before base64 inlining. Hard ceiling per cover 60 KB; falls back to the original only if it already fits. Honours V3 brief's ≤3 MB HTML bundle. - lib/apify_client.ts: post-run cost is re-polled with backoff (0/5/15/30s) instead of a single read. TIKTOK_COMMENTS reports $0 immediately and $5+ later — without this the soft cap can't fire on it. - stage_2_pass1_scrape.ts: shouldDownloadVideos:true (and shouldDownloadCovers:true) by default so videoMeta.downloadAddr is populated for Stage 4 frame extraction. Disable with DISABLE_VIDEO_DOWNLOADS=true if the budget is tight. - stage_4_pass2_enrich.ts: Stage 5 backfill candidates aren't in the transcripts/comments cache. New loadOrFetchActor() reads what's cached, identifies missing ids, fetches just those from Apify, and merges back into the cache. Backfill no longer drops every candidate. Production defaults: - .env.example: APIFY_LIVE_APPROVED=true (commented; operators can flip to false for dry-runs). - cutover-in-place.sh: sets APIFY_LIVE_APPROVED=true if not already in .env after the migration step, so a fresh prod cutover doesn't accidentally dry-run. 62/62 unit tests pass; tsc + vite build green; bundle 269 kB. Co-Authored-By: Claude Opus 4.7 (1M context) --- v2/.env.example | 3 + v2/deploy/cutover-in-place.sh | 5 ++ v2/operator-app/src/routes/reports/detail.tsx | 22 ++--- v2/pipeline/lib/apify_client.ts | 24 ++++-- v2/pipeline/stages/stage_10_build.ts | 51 +++++++++-- v2/pipeline/stages/stage_2_pass1_scrape.ts | 21 +++-- v2/pipeline/stages/stage_4_pass2_enrich.ts | 85 ++++++++++++++----- 7 files changed, 158 insertions(+), 53 deletions(-) diff --git a/v2/.env.example b/v2/.env.example index 38e1691..bbac7a9 100644 --- a/v2/.env.example +++ b/v2/.env.example @@ -1,6 +1,9 @@ # ─── Anthropic & Apify ─── ANTHROPIC_API_KEY= APIFY_TOKEN= +# Live mode for Apify scrapes — ON by default for production deploys; flip to +# `false` for dry-runs that exercise the pipeline without spending Apify credits. +APIFY_LIVE_APPROVED=true # ─── V2 Database (separate from V1) ─── DB_V2_PORT=5437 diff --git a/v2/deploy/cutover-in-place.sh b/v2/deploy/cutover-in-place.sh index 26f0d5f..8ab6706 100755 --- a/v2/deploy/cutover-in-place.sh +++ b/v2/deploy/cutover-in-place.sh @@ -128,6 +128,11 @@ fi # Production knobs set_new NODE_ENV production set_new ALLOW_PASSWORD_FALLBACK false +# Default Apify to live for prod cutover. Operators who want a dry run can flip +# this to `false` in v2/.env after the cutover and rebuild. +if ! grep -q '^APIFY_LIVE_APPROVED=.\+' .env; then + set_new APIFY_LIVE_APPROVED true +fi # Force one VITE_AZURE_* surfacing — vite needs them at build time TENANT="$(grep '^AZURE_TENANT_ID=' .env | head -1 | cut -d= -f2-)" diff --git a/v2/operator-app/src/routes/reports/detail.tsx b/v2/operator-app/src/routes/reports/detail.tsx index 73d593c..4353099 100644 --- a/v2/operator-app/src/routes/reports/detail.tsx +++ b/v2/operator-app/src/routes/reports/detail.tsx @@ -1,4 +1,4 @@ -import { Link, useParams } from 'react-router-dom'; +import { useParams } from 'react-router-dom'; import { useReport, type CostEvent, type Report, type ReportStatus, TERMINAL_STATUSES } from '../../api/reports'; const STAGE_LABELS: Record = { @@ -205,27 +205,29 @@ function EventLog({ events }: { events: CostEvent[] }) { } function FinishedActions({ report }: { report: Report }) { - const dashboardUrl = `/api/reports/${report.id}/dashboard/`; - const htmlUrl = `${import.meta.env.BASE_URL.replace(/\/$/, '')}/api/reports/${report.id}/dashboard.html`; + const base = import.meta.env.BASE_URL.replace(/\/$/, ''); + const dashboardUrl = `${base}/api/reports/${report.id}/dashboard/`; return (
Report ready
- - Open dashboard - + Open dashboard ↗ + Download claude.ai HTML bundle
-
{dashboardUrl}
+
{dashboardUrl}
); } diff --git a/v2/pipeline/lib/apify_client.ts b/v2/pipeline/lib/apify_client.ts index 4ee593b..918068b 100644 --- a/v2/pipeline/lib/apify_client.ts +++ b/v2/pipeline/lib/apify_client.ts @@ -105,14 +105,24 @@ export async function runActor( } if (status !== 'SUCCEEDED') throw new Error(`Apify ${label} ended ${status}`); + // Some actors (notably TIKTOK_COMMENTS) report $0 immediately at SUCCEEDED + // and finalise their billing seconds-to-minutes later. We re-poll up to 4 + // times with backoff so the running budget reflects what we actually owe; + // otherwise a $5 run looks like $0 and the soft cap doesn't fire. let costUsd = 0; - try { - const costRes = await fetch(`${APIFY_BASE}/actor-runs/${runId}`, { - headers: { Authorization: `Bearer ${APIFY_TOKEN}` }, - }); - const costData = await costRes.json() as { data: { usageTotalUsd?: number } }; - costUsd = costData.data.usageTotalUsd || 0; - } catch { /* non-fatal */ } + const COST_POLLS = [0, 5_000, 15_000, 30_000]; + for (let i = 0; i < COST_POLLS.length; i++) { + if (COST_POLLS[i]! > 0) await new Promise((r) => setTimeout(r, COST_POLLS[i]!)); + try { + const costRes = await fetch(`${APIFY_BASE}/actor-runs/${runId}`, { + headers: { Authorization: `Bearer ${APIFY_TOKEN}` }, + }); + const costData = await costRes.json() as { data: { usageTotalUsd?: number } }; + const reported = costData.data.usageTotalUsd || 0; + if (reported > 0) { costUsd = reported; break; } + } catch { /* transient */ } + } + if (costUsd === 0) console.warn(`[apify] ${label} reports $0 after ${COST_POLLS.length} polls; budget tracking may undercount`); const itemsRes = await fetch(`${APIFY_BASE}/datasets/${datasetId}/items?format=json`, { headers: { Authorization: `Bearer ${APIFY_TOKEN}` }, diff --git a/v2/pipeline/stages/stage_10_build.ts b/v2/pipeline/stages/stage_10_build.ts index e40512c..f38983b 100644 --- a/v2/pipeline/stages/stage_10_build.ts +++ b/v2/pipeline/stages/stage_10_build.ts @@ -6,26 +6,60 @@ // // The full React/Vite per-report dashboard (10a) is scaffolded by Phase F-UI work // outside this file; here we produce the data + portable claude.ai bundle. -import { writeFileSync, readFileSync, existsSync, statSync, mkdirSync } from 'node:fs'; +import { writeFileSync, readFileSync, existsSync, statSync, unlinkSync } from 'node:fs'; import { join } from 'node:path'; +import { spawnSync } from 'node:child_process'; import { PATHS, ensureDir } from '../lib/paths.js'; import type { BriefInput } from '../../server/schemas/brief.js'; import type { Trend } from './stage_8_trends.js'; -const COVER_INLINE_MAX_BYTES = 250_000; // ~250 KB per cover ceiling, downscaled separately +// Per V3 brief §10b: downscale to 240px wide / ~70% JPEG quality before inline. +// That keeps each cover under ~30 KB so the full HTML bundle fits in 3 MB even +// across 200+ trend videos. +const TARGET_COVER_WIDTH = 240; +const FFMPEG_QUALITY = 6; // ffmpeg -q:v scale (lower = better; 6 ≈ 70%) +const HARD_INLINE_CEILING = 60_000; function readJson(path: string): T | null { if (!existsSync(path)) return null; try { return JSON.parse(readFileSync(path, 'utf-8')) as T; } catch { return null; } } +function downscaleCover(srcPath: string): Buffer | null { + // ffmpeg -i src -vf scale=240:-1 -q:v 6 stdout.jpg via tmp file + const tmp = `${srcPath}.thumb.jpg`; + try { if (existsSync(tmp)) unlinkSync(tmp); } catch { /* ignore */ } + const res = spawnSync('ffmpeg', [ + '-y', '-i', srcPath, + '-vf', `scale=${TARGET_COVER_WIDTH}:-2`, + '-q:v', String(FFMPEG_QUALITY), + tmp, + ], { encoding: 'utf-8' }); + if (res.status !== 0 || !existsSync(tmp)) return null; + try { + const buf = readFileSync(tmp); + return buf; + } finally { + try { unlinkSync(tmp); } catch { /* ignore */ } + } +} + function inlineCoverIfPresent(reportId: string, videoId: string): string | null { - const p = join(PATHS.enrichedVideo(reportId, videoId), 'cover.jpg'); - if (!existsSync(p)) return null; - const sz = statSync(p).size; - if (sz > COVER_INLINE_MAX_BYTES) return null; // skip oversized; stage 10a should downscale before inline - const b64 = readFileSync(p).toString('base64'); - return `data:image/jpeg;base64,${b64}`; + const src = join(PATHS.enrichedVideo(reportId, videoId), 'cover.jpg'); + if (!existsSync(src)) return null; + + // Try downscaling first (the common case). + const small = downscaleCover(src); + if (small && small.length <= HARD_INLINE_CEILING) { + return `data:image/jpeg;base64,${small.toString('base64')}`; + } + // Fallback: if ffmpeg unavailable or downscale failed, only inline if the + // original is already small enough. + const sz = statSync(src).size; + if (sz <= HARD_INLINE_CEILING) { + return `data:image/jpeg;base64,${readFileSync(src).toString('base64')}`; + } + return null; } export interface DatasetV2 { @@ -176,4 +210,3 @@ function escapeJsonForScript(s: string): string { return s.replace(/<\/script>/gi, '<\\/script>'); } -mkdirSync; // touch (used inside ensureDir, kept here for ESM clarity) diff --git a/v2/pipeline/stages/stage_2_pass1_scrape.ts b/v2/pipeline/stages/stage_2_pass1_scrape.ts index f861d5c..7f15e70 100644 --- a/v2/pipeline/stages/stage_2_pass1_scrape.ts +++ b/v2/pipeline/stages/stage_2_pass1_scrape.ts @@ -183,8 +183,11 @@ export async function runStage2Pass1Scrape(args: Stage2Args): Promise<{ ok: true input = { hashtags: [job.tag.replace(/^#/, '')], resultsPerPage: limits.resultsPerPage, - shouldDownloadVideos: false, - shouldDownloadCovers: false, + // Both true so videoMeta.coverUrl is populated AND mp4 download URLs + // come back, which Stage 4 needs for ffmpeg frame extraction. + // Set DISABLE_VIDEO_DOWNLOADS=true (env) to flip both off if budget is tight. + shouldDownloadVideos: process.env.DISABLE_VIDEO_DOWNLOADS !== 'true', + shouldDownloadCovers: true, proxyCountryCode: brief.geo, // engagement floor applied actor-side where supported minPlayCount: brief.min_plays, @@ -194,16 +197,22 @@ export async function runStage2Pass1Scrape(args: Stage2Args): Promise<{ ok: true input = { profiles: [job.handle.replace(/^@/, '')], resultsPerPage: limits.resultsPerPage, - shouldDownloadVideos: false, - shouldDownloadCovers: false, + // Both true so videoMeta.coverUrl is populated AND mp4 download URLs + // come back, which Stage 4 needs for ffmpeg frame extraction. + // Set DISABLE_VIDEO_DOWNLOADS=true (env) to flip both off if budget is tight. + shouldDownloadVideos: process.env.DISABLE_VIDEO_DOWNLOADS !== 'true', + shouldDownloadCovers: true, }; } else { actor = ACTORS.TIKTOK_HASHTAG; // hashtag actor accepts search terms via "searchQueries" input = { searchQueries: [job.term], resultsPerPage: limits.resultsPerPage, - shouldDownloadVideos: false, - shouldDownloadCovers: false, + // Both true so videoMeta.coverUrl is populated AND mp4 download URLs + // come back, which Stage 4 needs for ffmpeg frame extraction. + // Set DISABLE_VIDEO_DOWNLOADS=true (env) to flip both off if budget is tight. + shouldDownloadVideos: process.env.DISABLE_VIDEO_DOWNLOADS !== 'true', + shouldDownloadCovers: true, proxyCountryCode: brief.geo, minPlayCount: brief.min_plays, }; diff --git a/v2/pipeline/stages/stage_4_pass2_enrich.ts b/v2/pipeline/stages/stage_4_pass2_enrich.ts index cd43e14..586bae6 100644 --- a/v2/pipeline/stages/stage_4_pass2_enrich.ts +++ b/v2/pipeline/stages/stage_4_pass2_enrich.ts @@ -278,6 +278,49 @@ async function processOneVideo(opts: { return dropped ? { id, dropped } : { id, bundle }; } +/** + * Hybrid cache + fetch: + * 1. Load any cached items from disk + * 2. Bucket each item to a canonical id, drop dupes + * 3. Compute which requiredIds are missing → fetch from Apify + * 4. Merge new items back into the cache file + */ +async function loadOrFetchActor[2][number]>(opts: { + cachePath: string; + requiredIds: string[]; + actorId: string; + actorLabel: string; + fetchFor: (urlSubset: string[]) => Record; + metaById: Map; +}): Promise { + const cachedRaw: T[] = existsSync(opts.cachePath) + ? JSON.parse(readFileSync(opts.cachePath, 'utf-8')) + : []; + + const haveIds = new Set(); + for (const item of cachedRaw) { + const url = (item as { videoWebUrl?: string; submittedVideoUrl?: string; videoUrl?: string; url?: string; webVideoUrl?: string; postUrl?: string; input?: string; id?: string }); + const sourceUrl = url.webVideoUrl || url.videoWebUrl || url.submittedVideoUrl || url.videoUrl || url.postUrl || url.url || url.input || ''; + const id = extractTikTokId(url.id ?? sourceUrl); + if (id) haveIds.add(id); + } + + const missingIds = opts.requiredIds.filter((id) => !haveIds.has(id)); + if (missingIds.length === 0) { + console.log(`[stage 4] ${opts.actorLabel}: all ${opts.requiredIds.length} ids present in cache, skipping Apify`); + return cachedRaw; + } + + console.log(`[stage 4] ${opts.actorLabel}: cache has ${haveIds.size}/${opts.requiredIds.length} ids; fetching ${missingIds.length} missing from Apify`); + const missingUrls = missingIds + .map((id) => opts.metaById.get(id)?.url_canonical) + .filter((u): u is string => !!u); + const res = await runActor(opts.actorId, opts.fetchFor(missingUrls), opts.actorLabel); + const merged: T[] = [...cachedRaw, ...res.items]; + writeFileSync(opts.cachePath, JSON.stringify(merged)); + return merged; +} + async function inFlight(items: T[], concurrency: number, fn: (x: T) => Promise): Promise { const results: R[] = []; let i = 0; @@ -326,34 +369,34 @@ export async function runStage4Pass2Enrich(args: Stage4Args): Promise [v.id, v])); // Bulk Apify calls — one per actor for the entire selection set. Cheaper than per-video. - // We CACHE the raw responses to disk so reruns can skip Apify entirely. + // We CACHE the raw responses to disk so reruns of the SAME selection can skip Apify. + // Stage 5 backfill calls us with onlyIds containing fresh candidates that aren't in + // the cache; in that case we MUST do a fresh Apify call for the missing ids and + // merge into the cache, otherwise every backfill candidate is dropped. const urls = ids.map((id) => metaById.get(id)?.url_canonical).filter((u): u is string => !!u); const tCachePath = `${PATHS.pass2(reportId)}/_cache_transcripts.json`; const cCachePath = `${PATHS.pass2(reportId)}/_cache_comments.json`; - let tItems: RawTranscript[] = []; - if (existsSync(tCachePath)) { - console.log('[stage 4] using cached transcripts response (no Apify call)'); - tItems = JSON.parse(readFileSync(tCachePath, 'utf-8')); - } else { - console.log(`[stage 4] bulk transcripts call for ${urls.length} videos`); - const tRes = await runActor(ACTORS.TIKTOK_TRANSCRIPTS, { videos: urls }, 'TIKTOK_TRANSCRIPTS'); - tItems = tRes.items; - writeFileSync(tCachePath, JSON.stringify(tItems)); - } + const tItems = await loadOrFetchActor({ + cachePath: tCachePath, + requiredIds: ids, + actorId: ACTORS.TIKTOK_TRANSCRIPTS, + actorLabel: 'TIKTOK_TRANSCRIPTS', + fetchFor: (urlSubset) => ({ videos: urlSubset }), + metaById, + }); const transcripts = groupByCanonicalId(reportId, 'TIKTOK_TRANSCRIPTS', tItems, idSet); - let cItems: RawComment[] = []; - if (existsSync(cCachePath)) { - console.log('[stage 4] using cached comments response (no Apify call)'); - cItems = JSON.parse(readFileSync(cCachePath, 'utf-8')); - } else { - console.log(`[stage 4] bulk comments call for ${urls.length} videos`); - const cRes = await runActor(ACTORS.TIKTOK_COMMENTS, { postURLs: urls, maxComments: TARGET_COMMENTS }, 'TIKTOK_COMMENTS'); - cItems = cRes.items; - writeFileSync(cCachePath, JSON.stringify(cItems)); - } + const cItems = await loadOrFetchActor({ + cachePath: cCachePath, + requiredIds: ids, + actorId: ACTORS.TIKTOK_COMMENTS, + actorLabel: 'TIKTOK_COMMENTS', + fetchFor: (urlSubset) => ({ postURLs: urlSubset, maxComments: TARGET_COMMENTS }), + metaById, + }); const commentsByVid = groupByCanonicalId(reportId, 'TIKTOK_COMMENTS', cItems, idSet); + void urls; // shape preserved for any future debug log mkdirSync(PATHS.enriched(reportId), { recursive: true });