ac-tool/backend/core/llm_service/google_provider.py
Vadym Samoilenko 72c50b2c92 Initial commit — AC Tool unified application
Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI)
into a single Docker app with React/TypeScript frontend.

Features:
- Brief upload → AI extraction → review → Activation Calendar import
- Handsontable v17 spreadsheet with dependent dropdowns (148 categories)
- AI natural language commands via Gemini (YOLO mode, voice input)
- Azure AD MSAL SPA PKCE authentication, user roles (user/admin)
- CSV Activation Calendar export
- Real-time WebSocket job progress
- Admin: user management, dropdown Excel upload
- Multi-stage Dockerfile, docker-compose, nginx proxy instructions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 13:24:46 +00:00

256 lines
No EOL
10 KiB
Python
Executable file

"""
Google provider implementation for Gemini 2.5 Pro using the new google-genai SDK
"""
import time
import json
import logging
from typing import List, Dict, Any, Optional
try:
from google import genai
from google.genai.types import GenerateContentConfig, ThinkingConfig
except ImportError:
genai = None
GenerateContentConfig = None
ThinkingConfig = None
from .base_provider import BaseLLMProvider, LLMResponse, TokenUsage
from ..config import config
class GoogleProvider(BaseLLMProvider):
"""Google Gemini 2.5 Pro provider using new google-genai SDK"""
def __init__(self, api_key: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
if genai is None:
raise ImportError("google-genai package not installed. Run: pip install google-genai")
provider_config = config.get_provider_config('google')
super().__init__(
api_key=api_key or provider_config['api_key'],
model_name=model_name or provider_config['model'],
**kwargs
)
self.temperature = kwargs.get('temperature', provider_config['temperature'])
self.max_output_tokens = kwargs.get('max_output_tokens', provider_config['max_output_tokens'])
self.thinking_budget = kwargs.get('thinking_budget', provider_config['thinking_budget'])
self.timeout = kwargs.get('timeout', provider_config['timeout'])
self.client = None
self._setup_client()
def _setup_client(self):
"""Initialize Google GenAI client"""
try:
self.client = genai.Client(api_key=self.api_key)
self.logger.info(f"Google GenAI client initialized - Model: {self.model_name}")
except Exception as e:
self.logger.error(f"Failed to initialize Google GenAI client: {e}")
raise
async def generate_response(
self,
messages: List[Dict[str, str]],
schema: Optional[Dict[str, Any]] = None,
**kwargs
) -> LLMResponse:
"""Generate response using Google Gemini 2.5 Pro"""
start_time = time.time()
try:
self.logger.info(f"Google Request - Model: {self.model_name} (thinking enabled: {self.thinking_budget} budget)")
# Convert messages to Google format
content = self._prepare_content(messages)
# Configure generation with thinking capabilities
config_dict = {
'temperature': self.temperature,
'max_output_tokens': self.max_output_tokens,
'thinking_config': ThinkingConfig(thinking_budget=self.thinking_budget) if ThinkingConfig else None,
}
# Add JSON schema for structured output if provided
if schema:
config_dict['response_mime_type'] = 'application/json'
converted_schema = self._convert_schema_to_google_format(schema)
# Google GenAI SDK expects response_schema, not response_json_schema
config_dict['response_schema'] = converted_schema
self.logger.info("Using structured output with converted schema")
generation_config = GenerateContentConfig(**config_dict)
# Generate response using native async API
response = await self.client.aio.models.generate_content(
model=self.model_name,
contents=content,
config=generation_config
)
# Extract content
if hasattr(response, 'text'):
content = response.text
elif hasattr(response, 'candidates') and response.candidates:
content = response.candidates[0].content.parts[0].text
else:
content = str(response)
# Extract token usage
token_usage = TokenUsage()
if hasattr(response, 'usage_metadata'):
# Safely extract token counts with proper defaults
input_tokens = getattr(response.usage_metadata, 'prompt_token_count', None) or 0
output_tokens = getattr(response.usage_metadata, 'candidates_token_count', None) or 0
cached_tokens = getattr(response.usage_metadata, 'cached_content_token_count', None) or 0
usage_dict = {
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'cached_input_tokens': cached_tokens
}
self.logger.debug(f"Google token usage: {usage_dict}")
token_usage.add_usage(usage_dict)
else:
self.logger.warning("No usage_metadata found in Google response")
processing_time = time.time() - start_time
llm_response = LLMResponse(
content=content,
raw_response=response,
token_usage=token_usage,
model_used=self.model_name,
provider="google",
success=True,
processing_time=processing_time
)
self.log_response(llm_response)
return llm_response
except Exception as e:
processing_time = time.time() - start_time
self.logger.error(f"Google request failed: {e}")
return LLMResponse(
content="",
raw_response=None,
token_usage=TokenUsage(),
model_used=self.model_name,
provider="google",
success=False,
error=str(e),
processing_time=processing_time
)
def _prepare_content(self, messages: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""Convert standard messages to Google GenAI format"""
contents = []
for message in messages:
role = message['role']
text = message['content']
# Map roles to Google format
if role == 'system':
# System messages go into parts directly
contents.append({
'role': 'user', # Google doesn't have explicit system role
'parts': [{'text': f"System: {text}"}]
})
elif role == 'user':
contents.append({
'role': 'user',
'parts': [{'text': text}]
})
elif role == 'assistant':
contents.append({
'role': 'model',
'parts': [{'text': text}]
})
return contents
def _convert_schema_to_google_format(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Convert OpenAI JSON schema to Google GenAI format"""
def convert_type(openai_type: str) -> str:
"""Convert OpenAI type to Google GenAI type"""
type_mapping = {
'string': 'STRING',
'array': 'ARRAY',
'object': 'OBJECT',
'integer': 'INTEGER',
'number': 'NUMBER',
'boolean': 'BOOLEAN'
}
return type_mapping.get(openai_type.lower(), 'STRING')
def convert_schema_node(node):
if isinstance(node, dict):
converted = {}
for key, value in node.items():
if key == 'type':
# Convert type to Google format
converted['type'] = convert_type(value)
elif key == 'oneOf':
# Google doesn't support oneOf - use the string type option
if isinstance(value, list) and len(value) > 0:
string_option = next((item for item in value if item.get('type') == 'string'), value[0])
return convert_schema_node(string_option)
elif key == 'items':
# Convert array items
converted['items'] = convert_schema_node(value)
elif key == 'properties':
# Convert object properties
converted['properties'] = {}
for prop_name, prop_schema in value.items():
converted['properties'][prop_name] = convert_schema_node(prop_schema)
elif key == 'required':
# Keep required fields as-is
converted['required'] = value
elif key == 'additionalProperties':
# Skip additionalProperties - not supported by Gemini API
self.logger.debug(f"Skipping unsupported 'additionalProperties' in Google schema")
continue
elif key in ['description', 'title']:
# Keep description and title
converted[key] = value
# Skip other OpenAI-specific fields like 'name'
return converted
elif isinstance(node, list):
return [convert_schema_node(item) for item in node]
else:
return node
# Extract the actual schema from OpenAI format
if 'schema' in schema:
google_schema = convert_schema_node(schema['schema'])
else:
google_schema = convert_schema_node(schema)
return google_schema
def validate_config(self) -> bool:
"""Validate Google configuration"""
if not self.api_key or self.api_key == 'your-google-api-key-here':
self.logger.error("Google API key not configured")
return False
if genai is None:
self.logger.error("google-genai package not installed")
return False
return True
def estimate_cost(self, input_tokens: int, output_tokens: int, cached_tokens: int = 0) -> float:
"""Estimate cost using Google Gemini pricing"""
return config.estimate_cost('google-gemini31', input_tokens, output_tokens, cached_tokens)
def get_max_tokens(self) -> int:
"""Get maximum token limit for Gemini 3.1 Pro"""
return 2000000 # Gemini 3.1 Pro context window