ai-cost-tracker/backend/app/services/analytics_service.py
Vadym Samoilenko 9491a11903 fix: analytics query usage_events directly for real-time data
- analytics_service now queries usage_events (not usage_rollups)
  so data appears immediately without waiting for nightly rollup
- match on ts datetime range (was string 'date' field — broken)
- correct units field names: token_input/token_output/char
- rollup_daily.py: same field name fix for historical aggregation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 14:34:19 +01:00

175 lines
5.5 KiB
Python

"""MongoDB aggregation helpers for dashboard and pivot analytics."""
from datetime import datetime, timezone
from typing import Any, Optional
from motor.motor_asyncio import AsyncIOMotorDatabase
from ..core.logging import get_logger
logger = get_logger(__name__)
def _build_match(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
workspace_id: Optional[str] = None,
team_id: Optional[str] = None,
project_id: Optional[str] = None,
user_external_id: Optional[str] = None,
provider: Optional[str] = None,
model: Optional[str] = None,
source_app: Optional[str] = None,
) -> dict:
match: dict[str, Any] = {}
if start_date:
match.setdefault("ts", {})["$gte"] = datetime.fromisoformat(start_date + "T00:00:00+00:00")
if end_date:
match.setdefault("ts", {})["$lte"] = datetime.fromisoformat(end_date + "T23:59:59.999999+00:00")
if workspace_id:
match["workspace_id"] = workspace_id
if team_id:
match["team_id"] = team_id
if project_id:
match["project_id"] = project_id
if user_external_id:
match["user_external_id"] = user_external_id
if provider:
match["provider"] = provider
if model:
match["model"] = model
if source_app:
match["source_app"] = source_app
return match
async def get_summary(
db: AsyncIOMotorDatabase,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
**filters,
) -> dict:
match = _build_match(start_date=start_date, end_date=end_date, **filters)
pipeline = [
{"$match": match},
{"$group": {
"_id": None,
"total_cost_usd": {"$sum": "$cost_usd"},
"total_calls": {"$sum": 1},
"total_input_tokens": {"$sum": "$units.token_input"},
"total_output_tokens": {"$sum": "$units.token_output"},
"total_chars": {"$sum": "$units.char"},
}},
]
result = await db.usage_events.aggregate(pipeline).to_list(length=1)
if not result:
return {"total_cost_usd": 0, "total_calls": 0, "total_input_tokens": 0,
"total_output_tokens": 0, "total_chars": 0}
r = result[0]
r.pop("_id", None)
return r
async def get_timeseries(
db: AsyncIOMotorDatabase,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
metric: str = "total_cost_usd",
**filters,
) -> list[dict]:
match = _build_match(start_date=start_date, end_date=end_date, **filters)
_METRIC_FIELD = {
"total_cost_usd": "cost_usd",
"total_calls": None,
"total_input_tokens": "units.token_input",
"total_output_tokens": "units.token_output",
"total_chars": "units.char",
}
field = _METRIC_FIELD.get(metric, "cost_usd")
value_expr = {"$sum": 1} if field is None else {"$sum": f"${field}"}
pipeline = [
{"$match": match},
{"$group": {
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$ts"}},
"value": value_expr,
}},
{"$sort": {"_id": 1}},
{"$project": {"date": "$_id", "value": 1, "_id": 0}},
]
return await db.usage_events.aggregate(pipeline).to_list(length=365)
async def get_breakdown(
db: AsyncIOMotorDatabase,
dim: str,
metric: str = "total_cost_usd",
limit: int = 20,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
**filters,
) -> list[dict]:
match = _build_match(start_date=start_date, end_date=end_date, **filters)
_METRIC_FIELD = {
"total_cost_usd": "cost_usd",
"total_calls": None,
"total_input_tokens": "units.token_input",
"total_output_tokens": "units.token_output",
"total_chars": "units.char",
}
field = _METRIC_FIELD.get(metric, "cost_usd")
value_expr = {"$sum": 1} if field is None else {"$sum": f"${field}"}
pipeline = [
{"$match": match},
{"$group": {
"_id": f"${dim}",
"value": value_expr,
}},
{"$sort": {"value": -1}},
{"$limit": limit},
{"$project": {"name": "$_id", "value": 1, "_id": 0}},
]
return await db.usage_events.aggregate(pipeline).to_list(length=limit)
async def get_pivot(
db: AsyncIOMotorDatabase,
row_dims: list[str],
col_dim: str,
metric: str = "total_cost_usd",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 200,
**filters,
) -> list[dict]:
match = _build_match(start_date=start_date, end_date=end_date, **filters)
_METRIC_FIELD = {
"total_cost_usd": "cost_usd",
"total_calls": None,
"total_input_tokens": "units.token_input",
"total_output_tokens": "units.token_output",
"total_chars": "units.char",
}
field = _METRIC_FIELD.get(metric, "cost_usd")
value_expr = {"$sum": 1} if field is None else {"$sum": f"${field}"}
group_id = {dim: f"${dim}" for dim in row_dims}
group_id["__col__"] = f"${col_dim}"
pipeline = [
{"$match": match},
{"$group": {
"_id": group_id,
"value": value_expr,
}},
{"$sort": {"value": -1}},
{"$limit": limit},
]
raw = await db.usage_events.aggregate(pipeline).to_list(length=limit)
result = []
for r in raw:
row = {dim: r["_id"].get(dim) for dim in row_dims}
row["col_key"] = r["_id"].get("__col__")
row["value"] = r["value"]
result.append(row)
return result