- Move app.use(router) after await authStore.init() so the beforeEach guard sees the correct isAuthenticated state on page load - Fix backfill_session_costs._root_prefix: remove underscore→dash replacement that caused paths like /Users/ai_leed/... to not match ~/.claude/projects/ folders Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
221 lines
8.2 KiB
Python
221 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Backfill cost_usd / token counts for all existing sessions.
|
|
|
|
Reads ALL JSONL files in ~/.claude/projects/ (no lookback limit),
|
|
computes input_tokens / output_tokens / cost_usd per session-day bucket,
|
|
and re-POSTs them to /api/ingest (ON CONFLICT DO UPDATE updates cost fields).
|
|
|
|
Usage:
|
|
CC_API_KEY=cc_xxx CC_SERVER=https://optical-dev.oliver.solutions/cc-dashboard \
|
|
python3 scripts/backfill_session_costs.py
|
|
|
|
Optional env vars (same as cc-collector.py):
|
|
CC_ROOT_PATH — comma-separated project roots (default: $HOME)
|
|
CC_SERVER — dashboard base URL
|
|
CC_API_KEY — API key
|
|
"""
|
|
import json
|
|
import os
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
SERVER = os.environ.get("CC_SERVER", "https://optical-dev.oliver.solutions/cc-dashboard").rstrip("/")
|
|
API_KEY = os.environ.get("CC_API_KEY", "")
|
|
_raw_root = os.environ.get("CC_ROOT_PATH", str(Path.home()))
|
|
ROOT_PATHS = [p.strip() for p in _raw_root.split(",") if p.strip()]
|
|
ROOT_PATH = ROOT_PATHS[0]
|
|
|
|
CLAUDE_PROJECTS = Path.home() / ".claude" / "projects"
|
|
|
|
# Pricing per million tokens: (input, output, cache_read, cache_creation)
|
|
_MODEL_PRICING: dict[str, tuple[float, float, float, float]] = {
|
|
"claude-opus-4": (15.0, 75.0, 1.50, 18.75),
|
|
"claude-sonnet-4": (3.0, 15.0, 0.30, 3.75),
|
|
"claude-haiku-4": (0.80, 4.0, 0.08, 1.00),
|
|
"claude-opus-3": (15.0, 75.0, 1.50, 18.75),
|
|
"claude-sonnet-3": (3.0, 15.0, 0.30, 3.75),
|
|
"claude-haiku-3": (0.25, 1.25, 0.03, 0.30),
|
|
}
|
|
_DEFAULT_PRICING = (15.0, 75.0, 1.50, 18.75)
|
|
|
|
|
|
def _get_pricing(model: str) -> tuple[float, float, float, float]:
|
|
model = (model or "").lower()
|
|
for key, price in _MODEL_PRICING.items():
|
|
if key in model:
|
|
return price
|
|
return _DEFAULT_PRICING
|
|
|
|
|
|
def _root_prefix(root_path: str) -> str:
|
|
return root_path.rstrip("/").replace("/", "-")
|
|
|
|
|
|
def _match_root(folder_key: str) -> str | None:
|
|
for rp in ROOT_PATHS:
|
|
prefix = _root_prefix(rp)
|
|
if folder_key == prefix or folder_key.startswith(prefix + "-"):
|
|
return rp
|
|
return None
|
|
|
|
|
|
def _infer_slug(folder_name: str, root_path: str) -> str:
|
|
prefix = _root_prefix(root_path).lstrip("-")
|
|
name = folder_name.lstrip("-")
|
|
if name == prefix:
|
|
return "general"
|
|
if name.startswith(prefix + "-"):
|
|
return name[len(prefix) + 1:]
|
|
return name.split("-")[-1] or name
|
|
|
|
|
|
def collect_all() -> list[dict]:
|
|
if not CLAUDE_PROJECTS.exists():
|
|
print("~/.claude/projects/ not found", file=sys.stderr)
|
|
return []
|
|
|
|
sessions_to_send: list[dict] = []
|
|
total_files = 0
|
|
|
|
for folder in sorted(CLAUDE_PROJECTS.iterdir()):
|
|
if not folder.is_dir():
|
|
continue
|
|
matched_root = _match_root(folder.name)
|
|
if matched_root is None:
|
|
continue
|
|
|
|
slug = _infer_slug(folder.name, matched_root)
|
|
raw_sessions: dict = defaultdict(lambda: {"timestamps": [], "messages": []})
|
|
|
|
for jf in sorted(folder.glob("*.jsonl")):
|
|
total_files += 1
|
|
try:
|
|
with open(jf, encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
ts = obj.get("timestamp")
|
|
sid = obj.get("sessionId")
|
|
if not ts or not sid:
|
|
continue
|
|
try:
|
|
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
continue
|
|
raw_sessions[sid]["timestamps"].append(dt)
|
|
raw_sessions[sid]["messages"].append(obj)
|
|
except Exception as e:
|
|
print(f" Warning: could not read {jf}: {e}", file=sys.stderr)
|
|
|
|
for sid, data in raw_sessions.items():
|
|
if not data["timestamps"]:
|
|
continue
|
|
|
|
paired = sorted(zip(data["timestamps"], data["messages"]), key=lambda x: x[0])
|
|
day_buckets: dict = defaultdict(lambda: {"timestamps": [], "messages": []})
|
|
for dt, obj in paired:
|
|
day_buckets[dt.strftime("%Y-%m-%d")]["timestamps"].append(dt)
|
|
day_buckets[dt.strftime("%Y-%m-%d")]["messages"].append(obj)
|
|
|
|
for date_str, bucket in day_buckets.items():
|
|
ts_sorted = bucket["timestamps"]
|
|
start = ts_sorted[0]
|
|
end = ts_sorted[-1]
|
|
|
|
# Count tokens
|
|
input_tokens = output_tokens = 0
|
|
cost_usd = 0.0
|
|
for obj in bucket["messages"]:
|
|
msg = obj.get("message", {})
|
|
if not isinstance(msg, dict) or msg.get("role") != "assistant":
|
|
continue
|
|
usage = msg.get("usage")
|
|
if not isinstance(usage, dict):
|
|
continue
|
|
model = msg.get("model", "")
|
|
inp_p, out_p, cr_p, cc_p = _get_pricing(model)
|
|
m = 1_000_000
|
|
i = usage.get("input_tokens", 0)
|
|
o = usage.get("output_tokens", 0)
|
|
cr = usage.get("cache_read_input_tokens", 0)
|
|
cc = usage.get("cache_creation_input_tokens", 0)
|
|
input_tokens += i
|
|
output_tokens += o
|
|
cost_usd += i * inp_p / m + o * out_p / m + cr * cr_p / m + cc * cc_p / m
|
|
|
|
if input_tokens == 0 and output_tokens == 0:
|
|
continue # skip sessions with no usage data
|
|
|
|
sessions_to_send.append({
|
|
"session_id": sid,
|
|
"project_slug": slug,
|
|
"date": date_str,
|
|
"start_at": start.isoformat(),
|
|
"end_at": end.isoformat(),
|
|
"message_count": len(ts_sorted),
|
|
"active_hours": 0.0, # not updating hours in backfill
|
|
"work_summary": "",
|
|
"commits": [],
|
|
"tools_used": {},
|
|
"files_changed": [],
|
|
"repo_url": "",
|
|
"raw_stats": {},
|
|
"input_tokens": input_tokens,
|
|
"output_tokens": output_tokens,
|
|
"cost_usd": round(cost_usd, 6),
|
|
})
|
|
|
|
print(f"Scanned {total_files} JSONL files, found {len(sessions_to_send)} session-days with token data")
|
|
return sessions_to_send
|
|
|
|
|
|
def send_batch(sessions: list[dict]) -> None:
|
|
import urllib.request
|
|
BATCH = 50
|
|
total_accepted = total_skipped = 0
|
|
|
|
for i in range(0, len(sessions), BATCH):
|
|
batch = sessions[i:i + BATCH]
|
|
payload = json.dumps({"root_path": ROOT_PATH, "sessions": batch}).encode()
|
|
req = urllib.request.Request(
|
|
f"{SERVER}/api/ingest",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json", "X-API-Key": API_KEY},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
result = json.loads(resp.read())
|
|
total_accepted += result.get("accepted", 0)
|
|
total_skipped += result.get("skipped", 0)
|
|
print(f" Batch {i // BATCH + 1}: accepted={result.get('accepted', 0)}, skipped={result.get('skipped', 0)}")
|
|
except Exception as e:
|
|
print(f" Batch {i // BATCH + 1} failed: {e}", file=sys.stderr)
|
|
|
|
print(f"\nDone. Total accepted={total_accepted}, skipped={total_skipped}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if not API_KEY:
|
|
raise SystemExit("CC_API_KEY not set")
|
|
|
|
print(f"Server: {SERVER}")
|
|
print(f"Root paths: {ROOT_PATHS}\n")
|
|
|
|
sessions = collect_all()
|
|
if not sessions:
|
|
print("No sessions with token data found — nothing to send.")
|
|
sys.exit(0)
|
|
|
|
total_cost = sum(s["cost_usd"] for s in sessions)
|
|
print(f"Total estimated cost across all sessions: ${total_cost:.4f}\n")
|
|
|
|
send_batch(sessions)
|