Backend: - token_version in JWT (bump_token_version, get_token_version on User model); jwt_required checks tv claim → 401 on mismatch; login routes embed version - Quota pre-flight in all 3 LLM public methods (QuotaExceededError bubbles up) - AI runner catches QuotaExceededError → sets status paused_quota + emits WS event - Admin routes: POST /users (create), POST /users/<id>/reset-password, POST /pricing, GET /focus-groups with aggregated cost; PUT /users/<id> now bumps token_version on disable or role change - backfill_usage.py: idempotent estimated-event generator for historical data, tiktoken for GPT models, char/3.8 for Gemini, --dry-run flag Frontend: - 402 interceptor dispatches quota_exceeded CustomEvent - adminApi: createUser, resetPassword, createPricing, listFocusGroups - UsersTab: New User dialog + Reset Password in edit dialog - PricingTab: New Price dialog (model, provider, input/output/cached prices) - FocusGroupsTab: focus groups table sorted by total cost - Admin.tsx: 4th tab (Focus Groups) - FocusGroupSession: admin-only cost badge + dismissable quota exceeded banner Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
251 lines
10 KiB
Python
251 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Backfill usage_events from existing focus-group messages and personas.
|
|
|
|
Creates estimated usage_event docs (is_estimated=True) so the admin dashboard
|
|
can show historical cost data for sessions that pre-date the usage tracking system.
|
|
|
|
Idempotent: skips documents that already have an estimated event in the collection.
|
|
|
|
Usage:
|
|
cd backend
|
|
python scripts/backfill_usage.py [--dry-run]
|
|
|
|
Environment:
|
|
MONGO_URI — connection string (falls back to localhost:27017 without auth)
|
|
DB_NAME — database name (default: semblance_db)
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pymongo import MongoClient
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Token estimation helpers
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
def _estimate_tokens(text: str, model: str) -> dict:
|
|
"""Estimate prompt/completion tokens for a piece of text."""
|
|
if not text:
|
|
return {"prompt": 0, "completion": 0}
|
|
|
|
# Try tiktoken for OpenAI models, fall back to char-based estimate
|
|
if model and ("gpt" in model.lower() or "openai" in model.lower()):
|
|
try:
|
|
import tiktoken
|
|
enc = tiktoken.encoding_for_model("gpt-4")
|
|
n = len(enc.encode(text))
|
|
return {"prompt": n, "completion": 0}
|
|
except Exception:
|
|
pass
|
|
|
|
# Gemini / unknown: ~3.8 chars per token
|
|
n = max(1, int(len(text) / 3.8))
|
|
return {"prompt": n, "completion": 0}
|
|
|
|
|
|
def _estimate_cost(prompt_tokens: int, completion_tokens: int, model: str) -> float:
|
|
"""Very rough cost estimate in USD (used only for backfill estimates)."""
|
|
# Approximate per-million-token prices for common models
|
|
rate_per_m = {
|
|
"gemini": (0.35, 1.05), # input, output USD/1M tokens
|
|
"gpt-4": (30.00, 60.00),
|
|
"gpt-3": (0.50, 1.50),
|
|
}
|
|
key = "gemini"
|
|
if model:
|
|
m = model.lower()
|
|
if "gpt-4" in m or "gpt-5" in m:
|
|
key = "gpt-4"
|
|
elif "gpt-3" in m:
|
|
key = "gpt-3"
|
|
|
|
input_rate, output_rate = rate_per_m[key]
|
|
cost = (prompt_tokens / 1_000_000) * input_rate + (completion_tokens / 1_000_000) * output_rate
|
|
return round(cost, 8)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# DB connection (sync PyMongo)
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
def connect():
|
|
mongo_uri = os.environ.get("MONGO_URI", "mongodb://localhost:27017")
|
|
db_name = os.environ.get("DB_NAME", "semblance_db")
|
|
try:
|
|
client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
|
|
client.admin.command("ping")
|
|
print(f"Connected to MongoDB: {db_name}")
|
|
return client[db_name]
|
|
except Exception as e:
|
|
print(f"ERROR: Could not connect to MongoDB: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Backfill focus-group messages
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
def backfill_messages(db, dry_run: bool) -> int:
|
|
"""Walk all focus groups and create estimated usage events for messages."""
|
|
created = 0
|
|
focus_groups = list(db.focus_groups.find({}))
|
|
print(f"\n[messages] Found {len(focus_groups)} focus groups to process")
|
|
|
|
for fg in focus_groups:
|
|
fg_id = str(fg["_id"])
|
|
fg_model = fg.get("llm_model") or "gemini-3.1-pro-preview"
|
|
messages = fg.get("messages", [])
|
|
|
|
for msg in messages:
|
|
msg_id = str(msg.get("id") or msg.get("_id") or "")
|
|
if not msg_id:
|
|
continue
|
|
|
|
# Idempotent: skip if an estimated event already exists for this message
|
|
existing = db.usage_events.find_one({
|
|
"source_message_id": msg_id,
|
|
"is_estimated": True,
|
|
})
|
|
if existing:
|
|
continue
|
|
|
|
text = msg.get("content") or ""
|
|
tokens = _estimate_tokens(text, fg_model)
|
|
# For responses we add a rough output token estimate
|
|
tokens["completion"] = max(1, int(len(text) / 5.0))
|
|
cost = _estimate_cost(tokens["prompt"], tokens["completion"], fg_model)
|
|
|
|
ts = msg.get("timestamp")
|
|
if isinstance(ts, str):
|
|
try:
|
|
ts = datetime.fromisoformat(ts)
|
|
except Exception:
|
|
ts = None
|
|
ts = ts or fg.get("date") or datetime.now(timezone.utc)
|
|
|
|
event = {
|
|
"ts": ts,
|
|
"provider": "gemini" if "gemini" in fg_model.lower() else "openai",
|
|
"model": fg_model,
|
|
"feature": "autonomous_conversation",
|
|
"user_id": str(fg.get("user_id") or ""),
|
|
"focus_group_id": fg_id,
|
|
"persona_id": str(msg.get("personaId") or msg.get("persona_id") or ""),
|
|
"prompt_tokens": tokens["prompt"],
|
|
"completion_tokens": tokens["completion"],
|
|
"cached_tokens": 0,
|
|
"reasoning_tokens": 0,
|
|
"cost_usd": {
|
|
"input": round(cost * 0.4, 8),
|
|
"output": round(cost * 0.6, 8),
|
|
"total": cost,
|
|
},
|
|
"duration_ms": 0,
|
|
"retry_count": 0,
|
|
"status": "estimated",
|
|
"is_estimated": True,
|
|
"source_message_id": msg_id,
|
|
}
|
|
|
|
if not dry_run:
|
|
db.usage_events.insert_one(event)
|
|
created += 1
|
|
|
|
print(f"[messages] {'Would create' if dry_run else 'Created'} {created} estimated usage events")
|
|
return created
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Backfill persona generation
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
def backfill_personas(db, dry_run: bool) -> int:
|
|
"""Walk all personas and create an estimated usage event for narrative generation."""
|
|
created = 0
|
|
personas = list(db.personas.find({}))
|
|
print(f"\n[personas] Found {len(personas)} personas to process")
|
|
|
|
for persona in personas:
|
|
persona_id = str(persona["_id"])
|
|
narrative = persona.get("narrative") or ""
|
|
if not narrative:
|
|
continue # No narrative to estimate from — skip
|
|
|
|
# Idempotent check
|
|
existing = db.usage_events.find_one({
|
|
"persona_id": persona_id,
|
|
"feature": "persona_generate",
|
|
"is_estimated": True,
|
|
})
|
|
if existing:
|
|
continue
|
|
|
|
model = "gemini-3.1-pro-preview" # default; personas are usually generated via default model
|
|
tokens = _estimate_tokens(narrative, model)
|
|
tokens["completion"] = max(1, int(len(narrative) / 4.0))
|
|
cost = _estimate_cost(tokens["prompt"], tokens["completion"], model)
|
|
|
|
ts = persona.get("created_at") or persona.get("updatedAt") or datetime.now(timezone.utc)
|
|
if isinstance(ts, str):
|
|
try:
|
|
ts = datetime.fromisoformat(ts)
|
|
except Exception:
|
|
ts = datetime.now(timezone.utc)
|
|
|
|
event = {
|
|
"ts": ts,
|
|
"provider": "gemini",
|
|
"model": model,
|
|
"feature": "persona_generate",
|
|
"user_id": str(persona.get("user_id") or ""),
|
|
"focus_group_id": str(persona.get("focus_group_id") or ""),
|
|
"persona_id": persona_id,
|
|
"prompt_tokens": tokens["prompt"],
|
|
"completion_tokens": tokens["completion"],
|
|
"cached_tokens": 0,
|
|
"reasoning_tokens": 0,
|
|
"cost_usd": {
|
|
"input": round(cost * 0.4, 8),
|
|
"output": round(cost * 0.6, 8),
|
|
"total": cost,
|
|
},
|
|
"duration_ms": 0,
|
|
"retry_count": 0,
|
|
"status": "estimated",
|
|
"is_estimated": True,
|
|
}
|
|
|
|
if not dry_run:
|
|
db.usage_events.insert_one(event)
|
|
created += 1
|
|
|
|
print(f"[personas] {'Would create' if dry_run else 'Created'} {created} estimated usage events")
|
|
return created
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Main
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Backfill usage_events from existing data")
|
|
parser.add_argument("--dry-run", action="store_true", help="Preview what would be created without writing")
|
|
args = parser.parse_args()
|
|
|
|
if args.dry_run:
|
|
print("=== DRY RUN — no data will be written ===\n")
|
|
|
|
db = connect()
|
|
|
|
total = 0
|
|
total += backfill_messages(db, args.dry_run)
|
|
total += backfill_personas(db, args.dry_run)
|
|
|
|
print(f"\n{'[DRY RUN] ' if args.dry_run else ''}Backfill complete — {total} events total")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|