semblance/backend/scripts/backfill_usage.py

301 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Backfill usage_events from existing focus-group messages and personas.
Creates estimated usage_event docs (is_estimated=True) so the admin dashboard
can show historical cost data for sessions that pre-date the usage tracking system.
Idempotent: skips documents that already have an estimated event in the collection.
Usage:
cd backend
python scripts/backfill_usage.py [--dry-run]
Environment:
MONGO_URI — connection string (falls back to localhost:27017 without auth)
DB_NAME — database name (default: semblance_db)
"""
import argparse
import os
import sys
from datetime import datetime, timezone
from pymongo import MongoClient
# ─────────────────────────────────────────────────────────────────────────────
# Token estimation helpers
# ─────────────────────────────────────────────────────────────────────────────
def _estimate_tokens(text: str, model: str) -> dict:
"""Estimate prompt/completion tokens for a piece of text."""
if not text:
return {"prompt": 0, "completion": 0}
if model and ("gpt" in model.lower() or "openai" in model.lower()):
try:
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4")
n = len(enc.encode(text))
return {"prompt": n, "completion": 0}
except Exception:
pass
# Gemini / unknown: ~3.8 chars per token
n = max(1, int(len(text) / 3.8))
return {"prompt": n, "completion": 0}
_pricing_cache: dict = {}
def _load_pricing(db) -> None:
"""Load current pricing from model_pricing collection into cache."""
for row in db.model_pricing.find({"effective_until": None}):
model = row.get("model", "")
tiers = row.get("tiers") or []
if tiers:
t = tiers[0]
_pricing_cache[model] = (
t.get("input_per_mtok", 2.0),
t.get("output_per_mtok", 12.0),
)
def _estimate_cost(prompt_tokens: int, completion_tokens: int, model: str) -> float:
"""Cost estimate in USD using model_pricing collection rates."""
# Try exact match, then prefix match
rates = _pricing_cache.get(model)
if not rates:
for key, val in _pricing_cache.items():
if model and key and (key in model or model in key):
rates = val
break
# Final fallback matching seed_model_pricing.py values
if not rates:
m = (model or "").lower()
if "gpt-5" in m or "gpt-4" in m:
rates = (2.50, 15.00) # gpt-5.4 pricing from seed
else:
rates = (2.00, 12.00) # gemini-3.1-pro-preview pricing from seed
input_rate, output_rate = rates
cost = (prompt_tokens / 1_000_000) * input_rate + (completion_tokens / 1_000_000) * output_rate
return round(cost, 8)
# ─────────────────────────────────────────────────────────────────────────────
# DB connection (sync PyMongo)
# ─────────────────────────────────────────────────────────────────────────────
def connect():
mongo_uri = os.environ.get("MONGO_URI", "mongodb://localhost:27017")
db_name = os.environ.get("DB_NAME", "semblance_db")
try:
client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
print(f"Connected to MongoDB: {db_name}")
return client[db_name]
except Exception as e:
print(f"ERROR: Could not connect to MongoDB: {e}")
sys.exit(1)
# ─────────────────────────────────────────────────────────────────────────────
# Backfill focus-group messages
# Messages are in the separate `focus_group_messages` collection (NOT embedded).
# Fields: focus_group_id (str), text, type, senderId, created_at
# ─────────────────────────────────────────────────────────────────────────────
def backfill_messages(db, dry_run: bool) -> int:
created = 0
# Build a lookup: focus_group_id -> {llm_model, user_id}
fg_meta = {}
for fg in db.focus_groups.find({}, {"llm_model": 1, "created_by": 1}):
fg_meta[str(fg["_id"])] = {
"model": fg.get("llm_model") or "gemini-3.1-pro-preview",
"user_id": str(fg.get("created_by") or ""),
}
total_messages = db.focus_group_messages.count_documents({})
print(f"\n[messages] Found {total_messages} messages across all focus groups")
for msg in db.focus_group_messages.find({}):
msg_id = str(msg["_id"])
fg_id = str(msg.get("focus_group_id") or "")
# Skip non-AI messages (only persona responses and moderator questions cost money)
msg_type = msg.get("type", "")
if msg_type not in ("response", "question", "moderator", "ai", ""):
continue
# Idempotent check
if db.usage_events.find_one({"source_message_id": msg_id, "is_estimated": True}):
continue
meta = fg_meta.get(fg_id, {"model": "gemini-3.1-pro-preview", "user_id": ""})
fg_model = meta["model"]
user_id = meta["user_id"]
text = msg.get("text") or msg.get("content") or ""
tokens = _estimate_tokens(text, fg_model)
tokens["completion"] = max(1, int(len(text) / 5.0))
cost = _estimate_cost(tokens["prompt"], tokens["completion"], fg_model)
ts = msg.get("created_at") or msg.get("timestamp")
if isinstance(ts, str):
try:
ts = datetime.fromisoformat(ts)
except Exception:
ts = None
ts = ts or datetime.now(timezone.utc)
feature = "moderator" if msg_type in ("question", "moderator") else "persona_response"
event = {
"ts": ts,
"provider": "gemini" if "gemini" in fg_model.lower() else "openai",
"model": fg_model,
"feature": feature,
"user_id": user_id,
"focus_group_id": fg_id,
"persona_id": str(msg.get("senderId") or msg.get("persona_id") or ""),
"prompt_tokens": tokens["prompt"],
"completion_tokens": tokens["completion"],
"cached_tokens": 0,
"reasoning_tokens": 0,
"total_tokens": tokens["prompt"] + tokens["completion"],
"cost_usd": {
"input": round(cost * 0.4, 8),
"output": round(cost * 0.6, 8),
"cached": 0,
"reasoning": 0,
"total": cost,
},
"duration_ms": 0,
"retry_count": 0,
"status": "success",
"is_estimated": True,
"estimate_method": "char_div_3_8",
"source_message_id": msg_id,
}
if not dry_run:
db.usage_events.insert_one(event)
created += 1
print(f"[messages] {'Would create' if dry_run else 'Created'} {created} estimated usage events")
return created
# ─────────────────────────────────────────────────────────────────────────────
# Backfill persona generation
# Personas: fields background, description, name; created_by = user_id
# ─────────────────────────────────────────────────────────────────────────────
def backfill_personas(db, dry_run: bool) -> int:
created = 0
personas = list(db.personas.find({}))
print(f"\n[personas] Found {len(personas)} personas to process")
for persona in personas:
persona_id = str(persona["_id"])
def _to_str(v):
if isinstance(v, list):
return " ".join(str(i) for i in v if i)
return str(v) if v else ""
# Use background + description as the generation text
text = " ".join(filter(None, [
_to_str(persona.get("background")),
_to_str(persona.get("description")),
_to_str(persona.get("goals")),
])).strip()
if not text:
continue
# Idempotent check
if db.usage_events.find_one({"source_persona_id": persona_id, "feature": "persona_generate", "is_estimated": True}):
continue
model = "gemini-3.1-pro-preview"
tokens = _estimate_tokens(text, model)
tokens["completion"] = max(1, int(len(text) / 4.0))
cost = _estimate_cost(tokens["prompt"], tokens["completion"], model)
ts = persona.get("created_at") or persona.get("updatedAt") or datetime.now(timezone.utc)
if isinstance(ts, str):
try:
ts = datetime.fromisoformat(ts)
except Exception:
ts = datetime.now(timezone.utc)
event = {
"ts": ts,
"provider": "gemini",
"model": model,
"feature": "persona_generate",
"user_id": str(persona.get("created_by") or persona.get("user_id") or ""),
"focus_group_id": "",
"persona_id": persona_id,
"prompt_tokens": tokens["prompt"],
"completion_tokens": tokens["completion"],
"cached_tokens": 0,
"reasoning_tokens": 0,
"total_tokens": tokens["prompt"] + tokens["completion"],
"cost_usd": {
"input": round(cost * 0.4, 8),
"output": round(cost * 0.6, 8),
"cached": 0,
"reasoning": 0,
"total": cost,
},
"duration_ms": 0,
"retry_count": 0,
"status": "success",
"is_estimated": True,
"estimate_method": "char_div_3_8",
"source_persona_id": persona_id,
}
if not dry_run:
db.usage_events.insert_one(event)
created += 1
print(f"[personas] {'Would create' if dry_run else 'Created'} {created} estimated usage events")
return created
# ─────────────────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Backfill usage_events from existing data")
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
parser.add_argument("--delete-existing-estimates", action="store_true",
help="Delete previously created estimated events before backfilling")
args = parser.parse_args()
if args.dry_run:
print("=== DRY RUN — no data will be written ===\n")
db = connect()
if args.delete_existing_estimates and not args.dry_run:
result = db.usage_events.delete_many({"is_estimated": True})
print(f"Deleted {result.deleted_count} existing estimated events\n")
_load_pricing(db)
print(f"Loaded {len(_pricing_cache)} pricing rows: {list(_pricing_cache.keys())}")
total = 0
total += backfill_messages(db, args.dry_run)
total += backfill_personas(db, args.dry_run)
print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Backfill complete — {total} events total")
if __name__ == "__main__":
main()