440 lines
No EOL
17 KiB
Python
440 lines
No EOL
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cost Calculator Module
|
|
Tracks OpenAI API usage and calculates costs for the master image detection application
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
|
|
|
|
@dataclass
|
|
class TokenUsage:
|
|
"""Data class to track token usage for a single API call"""
|
|
prompt_tokens: int
|
|
completion_tokens: int
|
|
total_tokens: int
|
|
cached_tokens: int = 0
|
|
|
|
def __post_init__(self):
|
|
"""Validate token counts"""
|
|
if self.prompt_tokens < 0 or self.completion_tokens < 0:
|
|
raise ValueError("Token counts cannot be negative")
|
|
if self.total_tokens != self.prompt_tokens + self.completion_tokens:
|
|
raise ValueError("Total tokens must equal prompt + completion tokens")
|
|
|
|
|
|
@dataclass
|
|
class ApiCallCost:
|
|
"""Data class to track cost information for a single API call"""
|
|
operation_type: str
|
|
timestamp: str
|
|
token_usage: TokenUsage
|
|
input_cost: float
|
|
output_cost: float
|
|
cached_cost: float
|
|
total_cost: float
|
|
layout_name: str = ""
|
|
master_id: str = ""
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convert to dictionary for JSON serialization"""
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class LayoutCostSummary:
|
|
"""Data class to track cost summary for a single layout"""
|
|
layout_name: str
|
|
total_cost: float
|
|
total_input_tokens: int
|
|
total_output_tokens: int
|
|
total_cached_tokens: int
|
|
api_calls_made: int
|
|
operation_types: List[str]
|
|
processing_time: float = 0.0
|
|
detected_masters: List[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.detected_masters is None:
|
|
self.detected_masters = []
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convert to dictionary for JSON serialization"""
|
|
return asdict(self)
|
|
|
|
|
|
class CostCalculator:
|
|
"""
|
|
Main cost calculator class for tracking OpenAI API usage and costs
|
|
"""
|
|
|
|
# OpenAI o3 pricing as of 2025
|
|
INPUT_COST_PER_MILLION = 2.00
|
|
CACHED_INPUT_COST_PER_MILLION = 0.50
|
|
OUTPUT_COST_PER_MILLION = 8.00
|
|
|
|
def __init__(self, enable_tracking: bool = True):
|
|
"""
|
|
Initialize the cost calculator
|
|
|
|
Args:
|
|
enable_tracking: Whether to enable cost tracking (default: True)
|
|
"""
|
|
self.enable_tracking = enable_tracking
|
|
self.api_calls: List[ApiCallCost] = []
|
|
self.layout_costs: Dict[str, LayoutCostSummary] = {}
|
|
self.session_start_time = time.time()
|
|
|
|
# Session totals
|
|
self.total_input_tokens = 0
|
|
self.total_output_tokens = 0
|
|
self.total_cached_tokens = 0
|
|
self.total_cost = 0.0
|
|
self.total_api_calls = 0
|
|
|
|
# Only print initialization message once and only in main process
|
|
import multiprocessing
|
|
if multiprocessing.current_process().name == 'MainProcess':
|
|
if not hasattr(CostCalculator, '_main_process_initialized'):
|
|
CostCalculator._main_process_initialized = True
|
|
print(f"Cost Calculator initialized (tracking: {'enabled' if enable_tracking else 'disabled'})")
|
|
if enable_tracking:
|
|
print(f"Current OpenAI o3 pricing:")
|
|
print(f" Input tokens: ${self.INPUT_COST_PER_MILLION:.2f} per million")
|
|
print(f" Cached input: ${self.CACHED_INPUT_COST_PER_MILLION:.2f} per million")
|
|
print(f" Output tokens: ${self.OUTPUT_COST_PER_MILLION:.2f} per million")
|
|
|
|
def calculate_cost(self, prompt_tokens: int, completion_tokens: int, cached_tokens: int = 0) -> Tuple[float, float, float, float]:
|
|
"""
|
|
Calculate cost for a single API call
|
|
|
|
Args:
|
|
prompt_tokens: Number of input tokens
|
|
completion_tokens: Number of output tokens
|
|
cached_tokens: Number of cached input tokens
|
|
|
|
Returns:
|
|
Tuple of (input_cost, output_cost, cached_cost, total_cost)
|
|
"""
|
|
if not self.enable_tracking:
|
|
return 0.0, 0.0, 0.0, 0.0
|
|
|
|
# Calculate costs
|
|
input_cost = (prompt_tokens * self.INPUT_COST_PER_MILLION) / 1_000_000
|
|
output_cost = (completion_tokens * self.OUTPUT_COST_PER_MILLION) / 1_000_000
|
|
cached_cost = (cached_tokens * self.CACHED_INPUT_COST_PER_MILLION) / 1_000_000
|
|
total_cost = input_cost + output_cost + cached_cost
|
|
|
|
return input_cost, output_cost, cached_cost, total_cost
|
|
|
|
def track_api_call(self, operation_type: str, prompt_tokens: int, completion_tokens: int,
|
|
cached_tokens: int = 0, layout_name: str = "", master_id: str = "") -> ApiCallCost:
|
|
"""
|
|
Track a single API call and calculate its cost
|
|
|
|
Args:
|
|
operation_type: Type of operation (e.g., 'panel_counting', 'detection', 'fallback')
|
|
prompt_tokens: Number of input tokens
|
|
completion_tokens: Number of output tokens
|
|
cached_tokens: Number of cached input tokens
|
|
layout_name: Name of the layout being processed
|
|
master_id: ID of the master image (if applicable)
|
|
|
|
Returns:
|
|
ApiCallCost object with tracking information
|
|
"""
|
|
if not self.enable_tracking:
|
|
# Return dummy cost object when tracking is disabled
|
|
return ApiCallCost(
|
|
operation_type=operation_type,
|
|
timestamp=datetime.now().isoformat(),
|
|
token_usage=TokenUsage(0, 0, 0, 0),
|
|
input_cost=0.0,
|
|
output_cost=0.0,
|
|
cached_cost=0.0,
|
|
total_cost=0.0,
|
|
layout_name=layout_name,
|
|
master_id=master_id
|
|
)
|
|
|
|
# Create token usage object
|
|
token_usage = TokenUsage(
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens,
|
|
total_tokens=prompt_tokens + completion_tokens,
|
|
cached_tokens=cached_tokens
|
|
)
|
|
|
|
# Calculate costs
|
|
input_cost, output_cost, cached_cost, total_cost = self.calculate_cost(
|
|
prompt_tokens, completion_tokens, cached_tokens
|
|
)
|
|
|
|
# Create cost tracking object
|
|
api_call_cost = ApiCallCost(
|
|
operation_type=operation_type,
|
|
timestamp=datetime.now().isoformat(),
|
|
token_usage=token_usage,
|
|
input_cost=input_cost,
|
|
output_cost=output_cost,
|
|
cached_cost=cached_cost,
|
|
total_cost=total_cost,
|
|
layout_name=layout_name,
|
|
master_id=master_id
|
|
)
|
|
|
|
# Add to tracking
|
|
self.api_calls.append(api_call_cost)
|
|
|
|
# Update session totals
|
|
self.total_input_tokens += prompt_tokens
|
|
self.total_output_tokens += completion_tokens
|
|
self.total_cached_tokens += cached_tokens
|
|
self.total_cost += total_cost
|
|
self.total_api_calls += 1
|
|
|
|
# Update layout-specific tracking
|
|
if layout_name:
|
|
self._update_layout_cost(layout_name, api_call_cost)
|
|
|
|
return api_call_cost
|
|
|
|
def _update_layout_cost(self, layout_name: str, api_call_cost: ApiCallCost):
|
|
"""Update cost tracking for a specific layout"""
|
|
if layout_name not in self.layout_costs:
|
|
self.layout_costs[layout_name] = LayoutCostSummary(
|
|
layout_name=layout_name,
|
|
total_cost=0.0,
|
|
total_input_tokens=0,
|
|
total_output_tokens=0,
|
|
total_cached_tokens=0,
|
|
api_calls_made=0,
|
|
operation_types=[]
|
|
)
|
|
|
|
layout_summary = self.layout_costs[layout_name]
|
|
layout_summary.total_cost += api_call_cost.total_cost
|
|
layout_summary.total_input_tokens += api_call_cost.token_usage.prompt_tokens
|
|
layout_summary.total_output_tokens += api_call_cost.token_usage.completion_tokens
|
|
layout_summary.total_cached_tokens += api_call_cost.token_usage.cached_tokens
|
|
layout_summary.api_calls_made += 1
|
|
|
|
if api_call_cost.operation_type not in layout_summary.operation_types:
|
|
layout_summary.operation_types.append(api_call_cost.operation_type)
|
|
|
|
def get_layout_cost_breakdown(self, layout_name: str) -> Optional[Dict]:
|
|
"""
|
|
Get detailed cost breakdown for a specific layout
|
|
|
|
Args:
|
|
layout_name: Name of the layout
|
|
|
|
Returns:
|
|
Dictionary with cost breakdown or None if layout not found
|
|
"""
|
|
if not self.enable_tracking or layout_name not in self.layout_costs:
|
|
return None
|
|
|
|
layout_summary = self.layout_costs[layout_name]
|
|
return {
|
|
'layout_name': layout_name,
|
|
'total_cost': round(layout_summary.total_cost, 4),
|
|
'cost_breakdown': {
|
|
'input_tokens': layout_summary.total_input_tokens,
|
|
'output_tokens': layout_summary.total_output_tokens,
|
|
'cached_tokens': layout_summary.total_cached_tokens,
|
|
'api_calls_made': layout_summary.api_calls_made,
|
|
'operation_types': layout_summary.operation_types
|
|
},
|
|
'cost_per_token': {
|
|
'input': round(layout_summary.total_cost / max(layout_summary.total_input_tokens, 1) * 1000, 4),
|
|
'output': round(layout_summary.total_cost / max(layout_summary.total_output_tokens, 1) * 1000, 4)
|
|
}
|
|
}
|
|
|
|
def get_session_summary(self) -> Dict:
|
|
"""
|
|
Get summary of costs for the entire session
|
|
|
|
Returns:
|
|
Dictionary with session cost summary
|
|
"""
|
|
if not self.enable_tracking:
|
|
return {
|
|
'tracking_enabled': False,
|
|
'message': 'Cost tracking is disabled'
|
|
}
|
|
|
|
session_duration = time.time() - self.session_start_time
|
|
layouts_processed = len(self.layout_costs)
|
|
|
|
# Calculate averages
|
|
avg_cost_per_layout = self.total_cost / max(layouts_processed, 1)
|
|
avg_tokens_per_layout = (self.total_input_tokens + self.total_output_tokens) / max(layouts_processed, 1)
|
|
avg_api_calls_per_layout = self.total_api_calls / max(layouts_processed, 1)
|
|
|
|
# Calculate cost efficiency
|
|
total_tokens = self.total_input_tokens + self.total_output_tokens
|
|
cost_per_thousand_tokens = (self.total_cost / max(total_tokens, 1)) * 1000
|
|
|
|
# Operation type breakdown
|
|
operation_counts = {}
|
|
for api_call in self.api_calls:
|
|
op_type = api_call.operation_type
|
|
operation_counts[op_type] = operation_counts.get(op_type, 0) + 1
|
|
|
|
return {
|
|
'tracking_enabled': True,
|
|
'session_totals': {
|
|
'total_cost': round(self.total_cost, 4),
|
|
'total_input_tokens': self.total_input_tokens,
|
|
'total_output_tokens': self.total_output_tokens,
|
|
'total_cached_tokens': self.total_cached_tokens,
|
|
'total_api_calls': self.total_api_calls,
|
|
'layouts_processed': layouts_processed,
|
|
'session_duration_minutes': round(session_duration / 60, 2)
|
|
},
|
|
'averages': {
|
|
'cost_per_layout': round(avg_cost_per_layout, 4),
|
|
'tokens_per_layout': round(avg_tokens_per_layout, 1),
|
|
'api_calls_per_layout': round(avg_api_calls_per_layout, 1),
|
|
'cost_per_thousand_tokens': round(cost_per_thousand_tokens, 4)
|
|
},
|
|
'operation_breakdown': operation_counts,
|
|
'pricing_info': {
|
|
'input_cost_per_million': self.INPUT_COST_PER_MILLION,
|
|
'output_cost_per_million': self.OUTPUT_COST_PER_MILLION,
|
|
'cached_input_cost_per_million': self.CACHED_INPUT_COST_PER_MILLION
|
|
}
|
|
}
|
|
|
|
def save_cost_report(self, filename: str = None) -> str:
|
|
"""
|
|
Save detailed cost report to JSON file
|
|
|
|
Args:
|
|
filename: Output filename (optional)
|
|
|
|
Returns:
|
|
Path to saved file
|
|
"""
|
|
if not self.enable_tracking:
|
|
print("Cost tracking is disabled, no report to save")
|
|
return ""
|
|
|
|
if filename is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"cost_report_{timestamp}.json"
|
|
|
|
# Ensure .json extension
|
|
if not filename.endswith('.json'):
|
|
filename += '.json'
|
|
|
|
output_path = Path("results") / filename
|
|
output_path.parent.mkdir(exist_ok=True)
|
|
|
|
report_data = {
|
|
'session_summary': self.get_session_summary(),
|
|
'layout_costs': {name: summary.to_dict() for name, summary in self.layout_costs.items()},
|
|
'detailed_api_calls': [call.to_dict() for call in self.api_calls],
|
|
'generated_at': datetime.now().isoformat(),
|
|
'pricing_model': 'OpenAI o3'
|
|
}
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(report_data, f, indent=2)
|
|
|
|
print(f"Cost report saved to: {output_path}")
|
|
return str(output_path)
|
|
|
|
def print_cost_summary(self):
|
|
"""Print a formatted cost summary to console"""
|
|
if not self.enable_tracking:
|
|
print("Cost tracking is disabled")
|
|
return
|
|
|
|
summary = self.get_session_summary()
|
|
|
|
print("\n" + "="*60)
|
|
print("COST TRACKING SUMMARY")
|
|
print("="*60)
|
|
|
|
session = summary['session_totals']
|
|
averages = summary['averages']
|
|
|
|
print(f"Total cost: ${session['total_cost']:.4f}")
|
|
print(f"Total tokens: {session['total_input_tokens'] + session['total_output_tokens']:,}")
|
|
print(f" - Input tokens: {session['total_input_tokens']:,}")
|
|
print(f" - Output tokens: {session['total_output_tokens']:,}")
|
|
print(f" - Cached tokens: {session['total_cached_tokens']:,}")
|
|
print(f"Total API calls: {session['total_api_calls']}")
|
|
print(f"Layouts processed: {session['layouts_processed']}")
|
|
|
|
print(f"\nAverages:")
|
|
print(f" - Cost per layout: ${averages['cost_per_layout']:.4f}")
|
|
print(f" - Tokens per layout: {averages['tokens_per_layout']:.1f}")
|
|
print(f" - API calls per layout: {averages['api_calls_per_layout']:.1f}")
|
|
print(f" - Cost per 1K tokens: ${averages['cost_per_thousand_tokens']:.4f}")
|
|
|
|
if summary['operation_breakdown']:
|
|
print(f"\nOperation breakdown:")
|
|
for op_type, count in summary['operation_breakdown'].items():
|
|
print(f" - {op_type}: {count} calls")
|
|
|
|
print("="*60)
|
|
|
|
def estimate_monthly_cost(self, layouts_per_month: int = 300) -> Dict:
|
|
"""
|
|
Estimate monthly cost based on current usage patterns
|
|
|
|
Args:
|
|
layouts_per_month: Estimated number of layouts to process per month
|
|
|
|
Returns:
|
|
Dictionary with cost estimates
|
|
"""
|
|
if not self.enable_tracking or len(self.layout_costs) == 0:
|
|
return {'error': 'No cost data available for estimation'}
|
|
|
|
avg_cost_per_layout = self.total_cost / len(self.layout_costs)
|
|
estimated_monthly_cost = avg_cost_per_layout * layouts_per_month
|
|
|
|
return {
|
|
'average_cost_per_layout': round(avg_cost_per_layout, 4),
|
|
'layouts_per_month': layouts_per_month,
|
|
'estimated_monthly_cost': round(estimated_monthly_cost, 2),
|
|
'estimated_annual_cost': round(estimated_monthly_cost * 12, 2),
|
|
'based_on_layouts': len(self.layout_costs)
|
|
}
|
|
|
|
|
|
def extract_token_usage_from_response(response) -> TokenUsage:
|
|
"""
|
|
Extract token usage from OpenAI API response
|
|
|
|
Args:
|
|
response: OpenAI API response object
|
|
|
|
Returns:
|
|
TokenUsage object with extracted token counts
|
|
"""
|
|
if not hasattr(response, 'usage') or response.usage is None:
|
|
# Fallback if usage information is not available
|
|
return TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0, cached_tokens=0)
|
|
|
|
usage = response.usage
|
|
return TokenUsage(
|
|
prompt_tokens=usage.prompt_tokens,
|
|
completion_tokens=usage.completion_tokens,
|
|
total_tokens=usage.total_tokens,
|
|
cached_tokens=getattr(usage, 'cached_tokens', 0)
|
|
)
|
|
|
|
|
|
# Global cost calculator instance (can be configured from CLI)
|
|
cost_calculator = CostCalculator(enable_tracking=False) # Disabled by default |