#!/usr/bin/env python3 """ Backfill usage_events from existing focus-group messages and personas. Creates estimated usage_event docs (is_estimated=True) so the admin dashboard can show historical cost data for sessions that pre-date the usage tracking system. Token estimation approach: - completion = actual output text length / 3.8 chars-per-token - prompt = base template size + ALL prior messages in conversation (accumulated context) This mirrors the real LLM call: each turn sends the full conversation history. Usage: cd backend python scripts/backfill_usage.py [--dry-run] [--delete-existing-estimates] Environment: MONGO_URI — connection string (falls back to localhost:27017 without auth) DB_NAME — database name (default: semblance_db) """ import argparse import os import sys from collections import defaultdict from datetime import datetime, timezone from pymongo import MongoClient # ───────────────────────────────────────────────────────────────────────────── # Prompt template size constants (measured from actual files in backend/prompts/) # These are the BASE tokens before any dynamic content is added. # ───────────────────────────────────────────────────────────────────────────── # focus-group-response.md (~941 tok) + persona details (~350 tok) + topic/instructions (~200 tok) BASE_PROMPT_PERSONA_RESPONSE = 1_500 # ai-moderator-system.md (~738 tok) + focus group context (~500 tok) BASE_PROMPT_MODERATOR = 1_200 # persona-detailed-generation.md (~2307 tok) + focus group brief (~200 tok) BASE_PROMPT_PERSONA_GENERATE = 2_500 CHARS_PER_TOKEN = 3.8 # Gemini approximation # ───────────────────────────────────────────────────────────────────────────── # Token helpers # ───────────────────────────────────────────────────────────────────────────── def _chars_to_tokens(chars: int) -> int: return max(1, int(chars / CHARS_PER_TOKEN)) def _to_str(v) -> str: if isinstance(v, list): return " ".join(str(i) for i in v if i) return str(v) if v else "" # ───────────────────────────────────────────────────────────────────────────── # Pricing # ───────────────────────────────────────────────────────────────────────────── _pricing_cache: dict = {} def _load_pricing(db) -> None: for row in db.model_pricing.find({"effective_until": None}): model = row.get("model", "") tiers = row.get("tiers") or [] if tiers: t = tiers[0] _pricing_cache[model] = ( t.get("input_per_mtok", 2.0), t.get("output_per_mtok", 12.0), ) def _estimate_cost(prompt_tokens: int, completion_tokens: int, model: str) -> dict: rates = _pricing_cache.get(model) if not rates: for key, val in _pricing_cache.items(): if model and key and (key in model or model in key): rates = val break if not rates: m = (model or "").lower() if "gpt-5" in m or "gpt-4" in m: rates = (2.50, 15.00) else: rates = (2.00, 12.00) input_rate, output_rate = rates cost_input = (prompt_tokens / 1_000_000) * input_rate cost_output = (completion_tokens / 1_000_000) * output_rate total = round(cost_input + cost_output, 8) return { "input": round(cost_input, 8), "output": round(cost_output, 8), "cached": 0.0, "reasoning": 0.0, "total": total, } # ───────────────────────────────────────────────────────────────────────────── # DB connection (sync PyMongo) # ───────────────────────────────────────────────────────────────────────────── def connect(): mongo_uri = os.environ.get("MONGO_URI", "mongodb://localhost:27017") db_name = os.environ.get("DB_NAME", "semblance_db") try: client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) client.admin.command("ping") print(f"Connected to MongoDB: {db_name}") return client[db_name] except Exception as e: print(f"ERROR: Could not connect to MongoDB: {e}") sys.exit(1) # ───────────────────────────────────────────────────────────────────────────── # Backfill focus-group messages # # Real prompt structure per call: # system prompt template (~1200-1500 tok) + all prior messages (accumulated) # Real completion: # the response text # # We sort messages per focus group by timestamp and accumulate context, # so that message N has all N-1 prior messages as context — matching reality. # ───────────────────────────────────────────────────────────────────────────── def backfill_messages(db, dry_run: bool) -> int: created = 0 # Build focus-group metadata lookup fg_meta = {} for fg in db.focus_groups.find({}, {"llm_model": 1, "created_by": 1}): fg_meta[str(fg["_id"])] = { "model": fg.get("llm_model") or "gemini-3.1-pro-preview", "user_id": str(fg.get("created_by") or ""), } # Collect all messages, group by focus_group_id, sort chronologically all_messages = list(db.focus_group_messages.find({})) print(f"\n[messages] Found {len(all_messages)} messages across all focus groups") # Bucket by focus group by_fg: dict = defaultdict(list) for msg in all_messages: fg_id = str(msg.get("focus_group_id") or "") msg_type = msg.get("type", "") # Only AI-generated messages cost money if msg_type not in ("response", "question", "moderator", "ai", ""): continue by_fg[fg_id].append(msg) # Sort each group chronologically def _ts(m): t = m.get("created_at") or m.get("timestamp") if isinstance(t, str): try: return datetime.fromisoformat(t) except Exception: pass if isinstance(t, datetime): return t return datetime.min for fg_id, msgs in by_fg.items(): msgs.sort(key=_ts) # Already-estimated message IDs (for idempotency) existing_ids = set( str(e["source_message_id"]) for e in db.usage_events.find( {"is_estimated": True, "source_message_id": {"$exists": True}}, {"source_message_id": 1} ) ) for fg_id, msgs in by_fg.items(): meta = fg_meta.get(fg_id, {"model": "gemini-3.1-pro-preview", "user_id": ""}) fg_model = meta["model"] user_id = meta["user_id"] provider = "gemini" if "gemini" in fg_model.lower() else "openai" accumulated_context_chars = 0 # sum of all prior message text lengths for msg in msgs: msg_id = str(msg["_id"]) if msg_id in existing_ids: # Still accumulate context so subsequent messages are correct text = msg.get("text") or msg.get("content") or "" accumulated_context_chars += len(text) continue text = msg.get("text") or msg.get("content") or "" msg_type = msg.get("type", "") # completion = what the model actually generated completion_tokens = _chars_to_tokens(len(text)) # prompt = base template + full conversation history up to this point context_tokens = _chars_to_tokens(accumulated_context_chars) if msg_type in ("question", "moderator"): prompt_tokens = BASE_PROMPT_MODERATOR + context_tokens else: prompt_tokens = BASE_PROMPT_PERSONA_RESPONSE + context_tokens cost = _estimate_cost(prompt_tokens, completion_tokens, fg_model) ts = _ts(msg) or datetime.now(timezone.utc) feature = "moderator" if msg_type in ("question", "moderator") else "persona_response" event = { "ts": ts, "provider": provider, "model": fg_model, "feature": feature, "user_id": user_id, "focus_group_id": fg_id, "persona_id": str(msg.get("senderId") or msg.get("persona_id") or ""), "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "cached_tokens": 0, "reasoning_tokens": 0, "total_tokens": prompt_tokens + completion_tokens, "cost_usd": cost, "duration_ms": 0, "retry_count": 0, "status": "success", "is_estimated": True, "estimate_method": "accumulated_context", "source_message_id": msg_id, } if not dry_run: db.usage_events.insert_one(event) created += 1 # Add this message to the accumulated context for subsequent messages accumulated_context_chars += len(text) print(f"[messages] {'Would create' if dry_run else 'Created'} {created} estimated usage events") return created # ───────────────────────────────────────────────────────────────────────────── # Backfill persona generation # # Real prompt: persona-detailed-generation.md template (~2307 tok) + fg brief (~200 tok) # Real completion: the generated persona profile text # ───────────────────────────────────────────────────────────────────────────── def backfill_personas(db, dry_run: bool) -> int: created = 0 personas = list(db.personas.find({})) print(f"\n[personas] Found {len(personas)} personas to process") existing_persona_ids = set( str(e["source_persona_id"]) for e in db.usage_events.find( {"is_estimated": True, "source_persona_id": {"$exists": True}, "feature": "persona_generate"}, {"source_persona_id": 1} ) ) for persona in personas: persona_id = str(persona["_id"]) if persona_id in existing_persona_ids: continue # The generated output is the persona profile text text = " ".join(filter(None, [ _to_str(persona.get("background")), _to_str(persona.get("description")), _to_str(persona.get("goals")), _to_str(persona.get("name")), ])).strip() if not text: continue model = "gemini-3.1-pro-preview" # completion = the generated persona text completion_tokens = _chars_to_tokens(len(text)) # prompt = template + focus group brief (fixed base) prompt_tokens = BASE_PROMPT_PERSONA_GENERATE cost = _estimate_cost(prompt_tokens, completion_tokens, model) ts = persona.get("created_at") or persona.get("updatedAt") or datetime.now(timezone.utc) if isinstance(ts, str): try: ts = datetime.fromisoformat(ts) except Exception: ts = datetime.now(timezone.utc) event = { "ts": ts, "provider": "gemini", "model": model, "feature": "persona_generate", "user_id": str(persona.get("created_by") or persona.get("user_id") or ""), "focus_group_id": str(persona.get("focus_group_id") or ""), "persona_id": persona_id, "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "cached_tokens": 0, "reasoning_tokens": 0, "total_tokens": prompt_tokens + completion_tokens, "cost_usd": cost, "duration_ms": 0, "retry_count": 0, "status": "success", "is_estimated": True, "estimate_method": "accumulated_context", "source_persona_id": persona_id, } if not dry_run: db.usage_events.insert_one(event) created += 1 print(f"[personas] {'Would create' if dry_run else 'Created'} {created} estimated usage events") return created # ───────────────────────────────────────────────────────────────────────────── # Main # ───────────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Backfill usage_events from existing data") parser.add_argument("--dry-run", action="store_true", help="Preview without writing") parser.add_argument("--delete-existing-estimates", action="store_true", help="Delete previously created estimated events before backfilling") args = parser.parse_args() if args.dry_run: print("=== DRY RUN — no data will be written ===\n") db = connect() if args.delete_existing_estimates and not args.dry_run: result = db.usage_events.delete_many({"is_estimated": True}) print(f"Deleted {result.deleted_count} existing estimated events\n") _load_pricing(db) print(f"Loaded {len(_pricing_cache)} pricing rows: {list(_pricing_cache.keys())}") # Dry-run: show a sample of what the cost distribution looks like if args.dry_run: _dry_run_sample(db) total = 0 total += backfill_messages(db, args.dry_run) total += backfill_personas(db, args.dry_run) print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Backfill complete — {total} events total") def _dry_run_sample(db): """Print a sample of estimated costs to sanity-check before real run.""" from collections import defaultdict by_fg: dict = defaultdict(list) for msg in db.focus_group_messages.find({}): fg_id = str(msg.get("focus_group_id") or "") if msg.get("type", "") in ("response", "question", "moderator", "ai", ""): by_fg[fg_id].append(msg) print("\n[dry-run sample] Estimated cost per focus group (top 5 by message count):") fg_meta = {str(fg["_id"]): fg.get("llm_model") or "gemini-3.1-pro-preview" for fg in db.focus_groups.find({}, {"llm_model": 1})} rows = [] for fg_id, msgs in by_fg.items(): model = fg_meta.get(fg_id, "gemini-3.1-pro-preview") accumulated = 0 total_cost = 0 for msg in sorted(msgs, key=lambda m: m.get("created_at") or datetime.min): text = msg.get("text") or msg.get("content") or "" completion = _chars_to_tokens(len(text)) prompt = BASE_PROMPT_PERSONA_RESPONSE + _chars_to_tokens(accumulated) cost = _estimate_cost(prompt, completion, model) total_cost += cost["total"] accumulated += len(text) rows.append((fg_id, len(msgs), total_cost)) for fg_id, count, cost in sorted(rows, key=lambda r: -r[1])[:5]: fg = db.focus_groups.find_one({"_id": __import__("bson").ObjectId(fg_id)}, {"name": 1}) if fg_id else None name = (fg or {}).get("name", fg_id[:8]) print(f" {name}: {count} messages → estimated ${cost:.4f}") if __name__ == "__main__": main()