UsageLog now records input_tokens and output_tokens separately and costs each side at its real rate. The old single 'blended' rate underpriced input-heavy workloads (vision/QC) and overpriced output-heavy ones. COST_PER_MILLION_TOKENS rebuilt against the live OpenAI, Gemini and Anthropic pricing pages (GPT-5.4 family, GPT-4.x, o4-mini; Gemini 2.5 Pro/Flash/Flash-Lite + 1.5 legacy; Claude 4.7/4.6/4.5 + 3.x legacy). Unknown models now warn instead of silently defaulting to $5/1M. Adds idempotent ALTER TABLE migration on startup so existing SQLite DBs pick up the new columns. Dashboard + API surface the input/output split. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
118 lines
4.2 KiB
Python
118 lines
4.2 KiB
Python
"""Usage Dashboard Routes."""
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, render_template, request, jsonify
|
|
from sqlalchemy import func, desc
|
|
from core.models.usage_log import UsageLog
|
|
from core.models.database import db
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
usage_bp = Blueprint('usage', __name__, url_prefix='/usage',
|
|
template_folder='templates')
|
|
|
|
|
|
@usage_bp.route('/')
|
|
@usage_bp.route('/index')
|
|
def index():
|
|
"""Render usage dashboard."""
|
|
return render_template('usage/dashboard.html', active_tab='usage')
|
|
|
|
|
|
@usage_bp.route('/api/stats')
|
|
def api_stats():
|
|
"""Get usage statistics as JSON, with optional time filter."""
|
|
try:
|
|
days = request.args.get('days', type=int)
|
|
filters = _time_filter(days)
|
|
|
|
query = UsageLog.query
|
|
if filters:
|
|
query = query.filter(*filters)
|
|
|
|
# Total stats
|
|
total_calls = query.count()
|
|
totals = db.session.query(
|
|
func.sum(UsageLog.tokens_used),
|
|
func.sum(UsageLog.input_tokens),
|
|
func.sum(UsageLog.output_tokens),
|
|
func.sum(UsageLog.estimated_cost_usd),
|
|
).filter(*filters).one()
|
|
total_tokens = int(totals[0] or 0)
|
|
total_input_tokens = int(totals[1] or 0)
|
|
total_output_tokens = int(totals[2] or 0)
|
|
total_cost = float(totals[3] or 0)
|
|
|
|
group_cols = [
|
|
func.count(UsageLog.id).label('calls'),
|
|
func.sum(UsageLog.input_tokens).label('input_tokens'),
|
|
func.sum(UsageLog.output_tokens).label('output_tokens'),
|
|
func.sum(UsageLog.tokens_used).label('tokens'),
|
|
func.sum(UsageLog.estimated_cost_usd).label('cost'),
|
|
]
|
|
|
|
by_provider = db.session.query(UsageLog.provider, *group_cols).filter(
|
|
*filters
|
|
).group_by(UsageLog.provider).all()
|
|
|
|
by_model = db.session.query(
|
|
UsageLog.provider, UsageLog.model, *group_cols
|
|
).filter(*filters).group_by(UsageLog.provider, UsageLog.model).all()
|
|
|
|
by_module = db.session.query(UsageLog.module, *group_cols).filter(
|
|
*filters
|
|
).group_by(UsageLog.module).all()
|
|
|
|
by_user = db.session.query(UsageLog.user, *group_cols).filter(
|
|
*filters
|
|
).group_by(UsageLog.user).all()
|
|
|
|
# Daily usage (last 30 days max)
|
|
daily_cutoff = datetime.utcnow() - timedelta(days=min(days or 30, 30))
|
|
daily = db.session.query(
|
|
func.date(UsageLog.timestamp).label('date'), *group_cols
|
|
).filter(UsageLog.timestamp >= daily_cutoff).group_by(
|
|
func.date(UsageLog.timestamp)
|
|
).order_by(func.date(UsageLog.timestamp)).all()
|
|
|
|
recent = query.order_by(desc(UsageLog.timestamp)).limit(20).all()
|
|
|
|
def row(base, r):
|
|
return {
|
|
**base,
|
|
'calls': r.calls,
|
|
'input_tokens': int(r.input_tokens or 0),
|
|
'output_tokens': int(r.output_tokens or 0),
|
|
'tokens': int(r.tokens or 0),
|
|
'cost': round(float(r.cost or 0), 4),
|
|
}
|
|
|
|
return jsonify({
|
|
'summary': {
|
|
'total_calls': total_calls,
|
|
'total_tokens': total_tokens,
|
|
'total_input_tokens': total_input_tokens,
|
|
'total_output_tokens': total_output_tokens,
|
|
'total_cost': round(total_cost, 4),
|
|
},
|
|
'by_provider': [row({'provider': r.provider}, r) for r in by_provider],
|
|
'by_model': [
|
|
row({'provider': r.provider, 'model': r.model}, r) for r in by_model
|
|
],
|
|
'by_module': [row({'module': r.module or 'unknown'}, r) for r in by_module],
|
|
'by_user': [row({'user': r.user or 'anonymous'}, r) for r in by_user],
|
|
'daily': [row({'date': str(r.date)}, r) for r in daily],
|
|
'recent': [r.to_dict() for r in recent],
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Usage stats error: {e}", exc_info=True)
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
def _time_filter(days):
|
|
"""Build time filter conditions."""
|
|
if days:
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
return [UsageLog.timestamp >= cutoff]
|
|
return []
|