social-mi-bi/scripts/run_e2e_cif.py
Dave Porter fb5acdb892 Initial commit: social MI/BI multi-agent reporting tool
4-stage agent pipeline (Data Mapper, Input Collector, Strategy Writer, Report
Builder) with React wizard, Postgres persistence, HITL + YOLO modes, Apify
embed hydration, clone-for-next-month, and slide-deck HTML output.

Proven end-to-end against real Cif Meltwater data (Instagram + TikTok) with
Anthropic Opus 4.7 (strategy) and Sonnet 4.6 (report builder).
2026-04-17 17:07:44 -04:00

167 lines
5.5 KiB
Python

#!/usr/bin/env python3
"""Standalone end-to-end harness: run the full pipeline against the two real
Cif Excels and write the final HTML slide deck next to this script.
Run from inside the api container:
docker compose run --rm api python scripts/run_e2e_cif.py
"""
from __future__ import annotations
import asyncio
import os
import sys
from datetime import date
from pathlib import Path
from uuid import uuid4
async def _main() -> int:
from app.agents.base import StageInput
from app.agents.data_mapper import DataMapperStage
from app.agents.input_collector import InputCollectorStage
from app.agents.strategy_writer import StrategyWriterStage
from app.agents.report_builder import ReportBuilderStage
ig = Path("/app/Build-Information/Sample Data Input/Cif MW YTD Instagram 2026.xlsx")
tk = Path("/app/Build-Information/Sample Data Input/Cif MW YTD TikTok 2026.xlsx")
if not ig.exists() or not tk.exists():
print(f"missing fixtures: {ig} / {tk}", file=sys.stderr)
return 2
brand = {
"id": str(uuid4()),
"name": "Cif",
"slug": "cif",
"primary_colour": "#00A19A",
"secondary_colour": "#111827",
"accent_colour": "#67E8F9",
"logo_data_uri": None,
}
report_id = uuid4()
print("Stage 1: Data Mapper…")
benchmarks = {
"instagram": {"posts": 10, "views_growth": 40, "er": 2.0, "vtr": 10.0},
"tiktok": {"posts": 10, "views_growth": 40, "er": 2.0, "vtr": 10.0},
}
s1 = await DataMapperStage().run(
StageInput(
report_id=report_id,
stage_no=1,
attempt=1,
upstream_artefacts={},
user_inputs={
"selected_platforms": ["instagram", "tiktok"],
"reporting_month": date(2026, 4, 1),
"prev_month": date(2026, 3, 1),
"benchmarks": benchmarks,
"uploads": [
{
"kind": "meltwater_export",
"platform": "Instagram",
"path": str(ig),
"original_filename": ig.name,
},
{
"kind": "meltwater_export",
"platform": "TikTok",
"path": str(tk),
"original_filename": tk.name,
},
],
},
brand=brand,
upfront_context_md="",
)
)
print(f"{len(s1.inline_json.get('top_posts', {}))} top-post groups")
print("Stage 2: Input Collector…")
embeds = {}
insights = {}
for key_prefix in ("Cif_Instagram", "Cif_TikTok"):
for i in range(1, 4):
key = f"{key_prefix}_Post{i}"
embeds[key] = (
"<blockquote class='tiktok-embed'></blockquote><script async src='https://www.tiktok.com/embed.js'></script>"
if "TikTok" in key
else "<blockquote class='instagram-media'></blockquote><script async src='//www.instagram.com/embed.js'></script>"
)
insights[key] = "Strong hook, clear benefit, trending audio."
s2 = await InputCollectorStage().run(
StageInput(
report_id=report_id,
stage_no=2,
attempt=1,
upstream_artefacts={"stage_1_output": s1.inline_json},
user_inputs={
"logo_data_uri": None,
"embeds": embeds,
"insights": insights,
},
brand=brand,
upfront_context_md="",
)
)
print("")
if not os.environ.get("ANTHROPIC_API_KEY"):
print("ANTHROPIC_API_KEY not set — skipping stages 3 & 4.")
return 0
print("Stage 3: Strategy Writer…")
s3 = await StrategyWriterStage().run(
StageInput(
report_id=report_id,
stage_no=3,
attempt=1,
upstream_artefacts={
"stage_1_output": s1.inline_json,
"stage_2_output": s2.inline_json,
},
user_inputs={"benchmarks": benchmarks},
brand=brand,
upfront_context_md="Cif — affordable, trusted, satisfying clean.",
)
)
print(f"{len((s3.inline_text or '').split())} words")
print("Stage 4: Report Builder…")
s4 = await ReportBuilderStage().run(
StageInput(
report_id=report_id,
stage_no=4,
attempt=1,
upstream_artefacts={
"stage_1_output": s1.inline_json,
"stage_2_output": s2.inline_json,
"stage_3_output": s3.inline_json or {"text": s3.inline_text},
"stage_3_markdown": s3.inline_text or "",
},
user_inputs={"benchmarks": benchmarks},
brand=brand,
upfront_context_md="",
)
)
html = s4.inline_text or ""
if not html and s4.storage_key:
from app.config import get_settings
from app.services.storage import get_storage
settings = get_settings()
html = (
get_storage()
.get_bytes(settings.minio_bucket_reports, s4.storage_key)
.decode("utf-8", errors="ignore")
)
out_path = Path("/app/rendered") / f"cif-e2e-{report_id}.html"
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(html, encoding="utf-8")
print(f" ✓ wrote {out_path} ({len(html):,} chars)")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(_main()))