Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI) into a single Docker app with React/TypeScript frontend. Features: - Brief upload → AI extraction → review → Activation Calendar import - Handsontable v17 spreadsheet with dependent dropdowns (148 categories) - AI natural language commands via Gemini (YOLO mode, voice input) - Azure AD MSAL SPA PKCE authentication, user roles (user/admin) - CSV Activation Calendar export - Real-time WebSocket job progress - Admin: user management, dropdown Excel upload - Multi-stage Dockerfile, docker-compose, nginx proxy instructions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
256 lines
No EOL
10 KiB
Python
Executable file
256 lines
No EOL
10 KiB
Python
Executable file
"""
|
|
Google provider implementation for Gemini 2.5 Pro using the new google-genai SDK
|
|
"""
|
|
|
|
import time
|
|
import json
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
try:
|
|
from google import genai
|
|
from google.genai.types import GenerateContentConfig, ThinkingConfig
|
|
except ImportError:
|
|
genai = None
|
|
GenerateContentConfig = None
|
|
ThinkingConfig = None
|
|
|
|
from .base_provider import BaseLLMProvider, LLMResponse, TokenUsage
|
|
from ..config import config
|
|
|
|
class GoogleProvider(BaseLLMProvider):
|
|
"""Google Gemini 2.5 Pro provider using new google-genai SDK"""
|
|
|
|
def __init__(self, api_key: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
|
|
if genai is None:
|
|
raise ImportError("google-genai package not installed. Run: pip install google-genai")
|
|
|
|
provider_config = config.get_provider_config('google')
|
|
|
|
super().__init__(
|
|
api_key=api_key or provider_config['api_key'],
|
|
model_name=model_name or provider_config['model'],
|
|
**kwargs
|
|
)
|
|
|
|
self.temperature = kwargs.get('temperature', provider_config['temperature'])
|
|
self.max_output_tokens = kwargs.get('max_output_tokens', provider_config['max_output_tokens'])
|
|
self.thinking_budget = kwargs.get('thinking_budget', provider_config['thinking_budget'])
|
|
self.timeout = kwargs.get('timeout', provider_config['timeout'])
|
|
|
|
self.client = None
|
|
self._setup_client()
|
|
|
|
def _setup_client(self):
|
|
"""Initialize Google GenAI client"""
|
|
try:
|
|
self.client = genai.Client(api_key=self.api_key)
|
|
self.logger.info(f"Google GenAI client initialized - Model: {self.model_name}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to initialize Google GenAI client: {e}")
|
|
raise
|
|
|
|
async def generate_response(
|
|
self,
|
|
messages: List[Dict[str, str]],
|
|
schema: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> LLMResponse:
|
|
"""Generate response using Google Gemini 2.5 Pro"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
self.logger.info(f"Google Request - Model: {self.model_name} (thinking enabled: {self.thinking_budget} budget)")
|
|
|
|
# Convert messages to Google format
|
|
content = self._prepare_content(messages)
|
|
|
|
# Configure generation with thinking capabilities
|
|
config_dict = {
|
|
'temperature': self.temperature,
|
|
'max_output_tokens': self.max_output_tokens,
|
|
'thinking_config': ThinkingConfig(thinking_budget=self.thinking_budget) if ThinkingConfig else None,
|
|
}
|
|
|
|
# Add JSON schema for structured output if provided
|
|
if schema:
|
|
config_dict['response_mime_type'] = 'application/json'
|
|
converted_schema = self._convert_schema_to_google_format(schema)
|
|
|
|
# Google GenAI SDK expects response_schema, not response_json_schema
|
|
config_dict['response_schema'] = converted_schema
|
|
self.logger.info("Using structured output with converted schema")
|
|
|
|
generation_config = GenerateContentConfig(**config_dict)
|
|
|
|
# Generate response using native async API
|
|
response = await self.client.aio.models.generate_content(
|
|
model=self.model_name,
|
|
contents=content,
|
|
config=generation_config
|
|
)
|
|
|
|
# Extract content
|
|
if hasattr(response, 'text'):
|
|
content = response.text
|
|
elif hasattr(response, 'candidates') and response.candidates:
|
|
content = response.candidates[0].content.parts[0].text
|
|
else:
|
|
content = str(response)
|
|
|
|
# Extract token usage
|
|
token_usage = TokenUsage()
|
|
if hasattr(response, 'usage_metadata'):
|
|
# Safely extract token counts with proper defaults
|
|
input_tokens = getattr(response.usage_metadata, 'prompt_token_count', None) or 0
|
|
output_tokens = getattr(response.usage_metadata, 'candidates_token_count', None) or 0
|
|
cached_tokens = getattr(response.usage_metadata, 'cached_content_token_count', None) or 0
|
|
|
|
usage_dict = {
|
|
'input_tokens': input_tokens,
|
|
'output_tokens': output_tokens,
|
|
'cached_input_tokens': cached_tokens
|
|
}
|
|
|
|
self.logger.debug(f"Google token usage: {usage_dict}")
|
|
token_usage.add_usage(usage_dict)
|
|
else:
|
|
self.logger.warning("No usage_metadata found in Google response")
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
llm_response = LLMResponse(
|
|
content=content,
|
|
raw_response=response,
|
|
token_usage=token_usage,
|
|
model_used=self.model_name,
|
|
provider="google",
|
|
success=True,
|
|
processing_time=processing_time
|
|
)
|
|
|
|
self.log_response(llm_response)
|
|
return llm_response
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
self.logger.error(f"Google request failed: {e}")
|
|
|
|
return LLMResponse(
|
|
content="",
|
|
raw_response=None,
|
|
token_usage=TokenUsage(),
|
|
model_used=self.model_name,
|
|
provider="google",
|
|
success=False,
|
|
error=str(e),
|
|
processing_time=processing_time
|
|
)
|
|
|
|
def _prepare_content(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]:
|
|
"""Convert standard messages to Google GenAI format"""
|
|
contents = []
|
|
|
|
for message in messages:
|
|
role = message['role']
|
|
text = message['content']
|
|
|
|
# Map roles to Google format
|
|
if role == 'system':
|
|
# System messages go into parts directly
|
|
contents.append({
|
|
'role': 'user', # Google doesn't have explicit system role
|
|
'parts': [{'text': f"System: {text}"}]
|
|
})
|
|
elif role == 'user':
|
|
contents.append({
|
|
'role': 'user',
|
|
'parts': [{'text': text}]
|
|
})
|
|
elif role == 'assistant':
|
|
contents.append({
|
|
'role': 'model',
|
|
'parts': [{'text': text}]
|
|
})
|
|
|
|
return contents
|
|
|
|
def _convert_schema_to_google_format(self, schema: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Convert OpenAI JSON schema to Google GenAI format"""
|
|
|
|
def convert_type(openai_type: str) -> str:
|
|
"""Convert OpenAI type to Google GenAI type"""
|
|
type_mapping = {
|
|
'string': 'STRING',
|
|
'array': 'ARRAY',
|
|
'object': 'OBJECT',
|
|
'integer': 'INTEGER',
|
|
'number': 'NUMBER',
|
|
'boolean': 'BOOLEAN'
|
|
}
|
|
return type_mapping.get(openai_type.lower(), 'STRING')
|
|
|
|
def convert_schema_node(node):
|
|
if isinstance(node, dict):
|
|
converted = {}
|
|
for key, value in node.items():
|
|
if key == 'type':
|
|
# Convert type to Google format
|
|
converted['type'] = convert_type(value)
|
|
elif key == 'oneOf':
|
|
# Google doesn't support oneOf - use the string type option
|
|
if isinstance(value, list) and len(value) > 0:
|
|
string_option = next((item for item in value if item.get('type') == 'string'), value[0])
|
|
return convert_schema_node(string_option)
|
|
elif key == 'items':
|
|
# Convert array items
|
|
converted['items'] = convert_schema_node(value)
|
|
elif key == 'properties':
|
|
# Convert object properties
|
|
converted['properties'] = {}
|
|
for prop_name, prop_schema in value.items():
|
|
converted['properties'][prop_name] = convert_schema_node(prop_schema)
|
|
elif key == 'required':
|
|
# Keep required fields as-is
|
|
converted['required'] = value
|
|
elif key == 'additionalProperties':
|
|
# Skip additionalProperties - not supported by Gemini API
|
|
self.logger.debug(f"Skipping unsupported 'additionalProperties' in Google schema")
|
|
continue
|
|
elif key in ['description', 'title']:
|
|
# Keep description and title
|
|
converted[key] = value
|
|
# Skip other OpenAI-specific fields like 'name'
|
|
return converted
|
|
elif isinstance(node, list):
|
|
return [convert_schema_node(item) for item in node]
|
|
else:
|
|
return node
|
|
|
|
# Extract the actual schema from OpenAI format
|
|
if 'schema' in schema:
|
|
google_schema = convert_schema_node(schema['schema'])
|
|
else:
|
|
google_schema = convert_schema_node(schema)
|
|
|
|
return google_schema
|
|
|
|
def validate_config(self) -> bool:
|
|
"""Validate Google configuration"""
|
|
if not self.api_key or self.api_key == 'your-google-api-key-here':
|
|
self.logger.error("Google API key not configured")
|
|
return False
|
|
|
|
if genai is None:
|
|
self.logger.error("google-genai package not installed")
|
|
return False
|
|
|
|
return True
|
|
|
|
def estimate_cost(self, input_tokens: int, output_tokens: int, cached_tokens: int = 0) -> float:
|
|
"""Estimate cost using Google Gemini pricing"""
|
|
return config.estimate_cost('google-gemini31', input_tokens, output_tokens, cached_tokens)
|
|
|
|
def get_max_tokens(self) -> int:
|
|
"""Get maximum token limit for Gemini 3.1 Pro"""
|
|
return 2000000 # Gemini 3.1 Pro context window |