master_adapt_detect/cost_calculator.py
2025-10-01 14:32:55 -05:00

440 lines
No EOL
17 KiB
Python

#!/usr/bin/env python3
"""
Cost Calculator Module
Tracks OpenAI API usage and calculates costs for the master image detection application
"""
import json
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
@dataclass
class TokenUsage:
"""Data class to track token usage for a single API call"""
prompt_tokens: int
completion_tokens: int
total_tokens: int
cached_tokens: int = 0
def __post_init__(self):
"""Validate token counts"""
if self.prompt_tokens < 0 or self.completion_tokens < 0:
raise ValueError("Token counts cannot be negative")
if self.total_tokens != self.prompt_tokens + self.completion_tokens:
raise ValueError("Total tokens must equal prompt + completion tokens")
@dataclass
class ApiCallCost:
"""Data class to track cost information for a single API call"""
operation_type: str
timestamp: str
token_usage: TokenUsage
input_cost: float
output_cost: float
cached_cost: float
total_cost: float
layout_name: str = ""
master_id: str = ""
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)
@dataclass
class LayoutCostSummary:
"""Data class to track cost summary for a single layout"""
layout_name: str
total_cost: float
total_input_tokens: int
total_output_tokens: int
total_cached_tokens: int
api_calls_made: int
operation_types: List[str]
processing_time: float = 0.0
detected_masters: List[str] = None
def __post_init__(self):
if self.detected_masters is None:
self.detected_masters = []
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)
class CostCalculator:
"""
Main cost calculator class for tracking OpenAI API usage and costs
"""
# OpenAI o3 pricing as of 2025
INPUT_COST_PER_MILLION = 2.00
CACHED_INPUT_COST_PER_MILLION = 0.50
OUTPUT_COST_PER_MILLION = 8.00
def __init__(self, enable_tracking: bool = True):
"""
Initialize the cost calculator
Args:
enable_tracking: Whether to enable cost tracking (default: True)
"""
self.enable_tracking = enable_tracking
self.api_calls: List[ApiCallCost] = []
self.layout_costs: Dict[str, LayoutCostSummary] = {}
self.session_start_time = time.time()
# Session totals
self.total_input_tokens = 0
self.total_output_tokens = 0
self.total_cached_tokens = 0
self.total_cost = 0.0
self.total_api_calls = 0
# Only print initialization message once and only in main process
import multiprocessing
if multiprocessing.current_process().name == 'MainProcess':
if not hasattr(CostCalculator, '_main_process_initialized'):
CostCalculator._main_process_initialized = True
print(f"Cost Calculator initialized (tracking: {'enabled' if enable_tracking else 'disabled'})")
if enable_tracking:
print(f"Current OpenAI o3 pricing:")
print(f" Input tokens: ${self.INPUT_COST_PER_MILLION:.2f} per million")
print(f" Cached input: ${self.CACHED_INPUT_COST_PER_MILLION:.2f} per million")
print(f" Output tokens: ${self.OUTPUT_COST_PER_MILLION:.2f} per million")
def calculate_cost(self, prompt_tokens: int, completion_tokens: int, cached_tokens: int = 0) -> Tuple[float, float, float, float]:
"""
Calculate cost for a single API call
Args:
prompt_tokens: Number of input tokens
completion_tokens: Number of output tokens
cached_tokens: Number of cached input tokens
Returns:
Tuple of (input_cost, output_cost, cached_cost, total_cost)
"""
if not self.enable_tracking:
return 0.0, 0.0, 0.0, 0.0
# Calculate costs
input_cost = (prompt_tokens * self.INPUT_COST_PER_MILLION) / 1_000_000
output_cost = (completion_tokens * self.OUTPUT_COST_PER_MILLION) / 1_000_000
cached_cost = (cached_tokens * self.CACHED_INPUT_COST_PER_MILLION) / 1_000_000
total_cost = input_cost + output_cost + cached_cost
return input_cost, output_cost, cached_cost, total_cost
def track_api_call(self, operation_type: str, prompt_tokens: int, completion_tokens: int,
cached_tokens: int = 0, layout_name: str = "", master_id: str = "") -> ApiCallCost:
"""
Track a single API call and calculate its cost
Args:
operation_type: Type of operation (e.g., 'panel_counting', 'detection', 'fallback')
prompt_tokens: Number of input tokens
completion_tokens: Number of output tokens
cached_tokens: Number of cached input tokens
layout_name: Name of the layout being processed
master_id: ID of the master image (if applicable)
Returns:
ApiCallCost object with tracking information
"""
if not self.enable_tracking:
# Return dummy cost object when tracking is disabled
return ApiCallCost(
operation_type=operation_type,
timestamp=datetime.now().isoformat(),
token_usage=TokenUsage(0, 0, 0, 0),
input_cost=0.0,
output_cost=0.0,
cached_cost=0.0,
total_cost=0.0,
layout_name=layout_name,
master_id=master_id
)
# Create token usage object
token_usage = TokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
cached_tokens=cached_tokens
)
# Calculate costs
input_cost, output_cost, cached_cost, total_cost = self.calculate_cost(
prompt_tokens, completion_tokens, cached_tokens
)
# Create cost tracking object
api_call_cost = ApiCallCost(
operation_type=operation_type,
timestamp=datetime.now().isoformat(),
token_usage=token_usage,
input_cost=input_cost,
output_cost=output_cost,
cached_cost=cached_cost,
total_cost=total_cost,
layout_name=layout_name,
master_id=master_id
)
# Add to tracking
self.api_calls.append(api_call_cost)
# Update session totals
self.total_input_tokens += prompt_tokens
self.total_output_tokens += completion_tokens
self.total_cached_tokens += cached_tokens
self.total_cost += total_cost
self.total_api_calls += 1
# Update layout-specific tracking
if layout_name:
self._update_layout_cost(layout_name, api_call_cost)
return api_call_cost
def _update_layout_cost(self, layout_name: str, api_call_cost: ApiCallCost):
"""Update cost tracking for a specific layout"""
if layout_name not in self.layout_costs:
self.layout_costs[layout_name] = LayoutCostSummary(
layout_name=layout_name,
total_cost=0.0,
total_input_tokens=0,
total_output_tokens=0,
total_cached_tokens=0,
api_calls_made=0,
operation_types=[]
)
layout_summary = self.layout_costs[layout_name]
layout_summary.total_cost += api_call_cost.total_cost
layout_summary.total_input_tokens += api_call_cost.token_usage.prompt_tokens
layout_summary.total_output_tokens += api_call_cost.token_usage.completion_tokens
layout_summary.total_cached_tokens += api_call_cost.token_usage.cached_tokens
layout_summary.api_calls_made += 1
if api_call_cost.operation_type not in layout_summary.operation_types:
layout_summary.operation_types.append(api_call_cost.operation_type)
def get_layout_cost_breakdown(self, layout_name: str) -> Optional[Dict]:
"""
Get detailed cost breakdown for a specific layout
Args:
layout_name: Name of the layout
Returns:
Dictionary with cost breakdown or None if layout not found
"""
if not self.enable_tracking or layout_name not in self.layout_costs:
return None
layout_summary = self.layout_costs[layout_name]
return {
'layout_name': layout_name,
'total_cost': round(layout_summary.total_cost, 4),
'cost_breakdown': {
'input_tokens': layout_summary.total_input_tokens,
'output_tokens': layout_summary.total_output_tokens,
'cached_tokens': layout_summary.total_cached_tokens,
'api_calls_made': layout_summary.api_calls_made,
'operation_types': layout_summary.operation_types
},
'cost_per_token': {
'input': round(layout_summary.total_cost / max(layout_summary.total_input_tokens, 1) * 1000, 4),
'output': round(layout_summary.total_cost / max(layout_summary.total_output_tokens, 1) * 1000, 4)
}
}
def get_session_summary(self) -> Dict:
"""
Get summary of costs for the entire session
Returns:
Dictionary with session cost summary
"""
if not self.enable_tracking:
return {
'tracking_enabled': False,
'message': 'Cost tracking is disabled'
}
session_duration = time.time() - self.session_start_time
layouts_processed = len(self.layout_costs)
# Calculate averages
avg_cost_per_layout = self.total_cost / max(layouts_processed, 1)
avg_tokens_per_layout = (self.total_input_tokens + self.total_output_tokens) / max(layouts_processed, 1)
avg_api_calls_per_layout = self.total_api_calls / max(layouts_processed, 1)
# Calculate cost efficiency
total_tokens = self.total_input_tokens + self.total_output_tokens
cost_per_thousand_tokens = (self.total_cost / max(total_tokens, 1)) * 1000
# Operation type breakdown
operation_counts = {}
for api_call in self.api_calls:
op_type = api_call.operation_type
operation_counts[op_type] = operation_counts.get(op_type, 0) + 1
return {
'tracking_enabled': True,
'session_totals': {
'total_cost': round(self.total_cost, 4),
'total_input_tokens': self.total_input_tokens,
'total_output_tokens': self.total_output_tokens,
'total_cached_tokens': self.total_cached_tokens,
'total_api_calls': self.total_api_calls,
'layouts_processed': layouts_processed,
'session_duration_minutes': round(session_duration / 60, 2)
},
'averages': {
'cost_per_layout': round(avg_cost_per_layout, 4),
'tokens_per_layout': round(avg_tokens_per_layout, 1),
'api_calls_per_layout': round(avg_api_calls_per_layout, 1),
'cost_per_thousand_tokens': round(cost_per_thousand_tokens, 4)
},
'operation_breakdown': operation_counts,
'pricing_info': {
'input_cost_per_million': self.INPUT_COST_PER_MILLION,
'output_cost_per_million': self.OUTPUT_COST_PER_MILLION,
'cached_input_cost_per_million': self.CACHED_INPUT_COST_PER_MILLION
}
}
def save_cost_report(self, filename: str = None) -> str:
"""
Save detailed cost report to JSON file
Args:
filename: Output filename (optional)
Returns:
Path to saved file
"""
if not self.enable_tracking:
print("Cost tracking is disabled, no report to save")
return ""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"cost_report_{timestamp}.json"
# Ensure .json extension
if not filename.endswith('.json'):
filename += '.json'
output_path = Path("results") / filename
output_path.parent.mkdir(exist_ok=True)
report_data = {
'session_summary': self.get_session_summary(),
'layout_costs': {name: summary.to_dict() for name, summary in self.layout_costs.items()},
'detailed_api_calls': [call.to_dict() for call in self.api_calls],
'generated_at': datetime.now().isoformat(),
'pricing_model': 'OpenAI o3'
}
with open(output_path, 'w') as f:
json.dump(report_data, f, indent=2)
print(f"Cost report saved to: {output_path}")
return str(output_path)
def print_cost_summary(self):
"""Print a formatted cost summary to console"""
if not self.enable_tracking:
print("Cost tracking is disabled")
return
summary = self.get_session_summary()
print("\n" + "="*60)
print("COST TRACKING SUMMARY")
print("="*60)
session = summary['session_totals']
averages = summary['averages']
print(f"Total cost: ${session['total_cost']:.4f}")
print(f"Total tokens: {session['total_input_tokens'] + session['total_output_tokens']:,}")
print(f" - Input tokens: {session['total_input_tokens']:,}")
print(f" - Output tokens: {session['total_output_tokens']:,}")
print(f" - Cached tokens: {session['total_cached_tokens']:,}")
print(f"Total API calls: {session['total_api_calls']}")
print(f"Layouts processed: {session['layouts_processed']}")
print(f"\nAverages:")
print(f" - Cost per layout: ${averages['cost_per_layout']:.4f}")
print(f" - Tokens per layout: {averages['tokens_per_layout']:.1f}")
print(f" - API calls per layout: {averages['api_calls_per_layout']:.1f}")
print(f" - Cost per 1K tokens: ${averages['cost_per_thousand_tokens']:.4f}")
if summary['operation_breakdown']:
print(f"\nOperation breakdown:")
for op_type, count in summary['operation_breakdown'].items():
print(f" - {op_type}: {count} calls")
print("="*60)
def estimate_monthly_cost(self, layouts_per_month: int = 300) -> Dict:
"""
Estimate monthly cost based on current usage patterns
Args:
layouts_per_month: Estimated number of layouts to process per month
Returns:
Dictionary with cost estimates
"""
if not self.enable_tracking or len(self.layout_costs) == 0:
return {'error': 'No cost data available for estimation'}
avg_cost_per_layout = self.total_cost / len(self.layout_costs)
estimated_monthly_cost = avg_cost_per_layout * layouts_per_month
return {
'average_cost_per_layout': round(avg_cost_per_layout, 4),
'layouts_per_month': layouts_per_month,
'estimated_monthly_cost': round(estimated_monthly_cost, 2),
'estimated_annual_cost': round(estimated_monthly_cost * 12, 2),
'based_on_layouts': len(self.layout_costs)
}
def extract_token_usage_from_response(response) -> TokenUsage:
"""
Extract token usage from OpenAI API response
Args:
response: OpenAI API response object
Returns:
TokenUsage object with extracted token counts
"""
if not hasattr(response, 'usage') or response.usage is None:
# Fallback if usage information is not available
return TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0, cached_tokens=0)
usage = response.usage
return TokenUsage(
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
total_tokens=usage.total_tokens,
cached_tokens=getattr(usage, 'cached_tokens', 0)
)
# Global cost calculator instance (can be configured from CLI)
cost_calculator = CostCalculator(enable_tracking=False) # Disabled by default