apac-ops-bot/backend/app/services/chat_service.py
SamoilenkoVadym 2e6597ee08 Add admin analytics and update OpenAI integration
Backend changes:
- Add admin analytics endpoints for daily usage per user
- Add GET /tokens/daily-users endpoint with date/user breakdown
- Update OpenAI SDK from 1.58.1 to 2.6.1
- Switch from Assistants API to Responses API with file_search tool
- Implement strict RAG-only system instructions
- Add citation validation to prevent hallucinations
- Add get_daily_usage_by_user repository method
- Add DailyUserUsage schema for admin analytics

Frontend changes:
- Implement comprehensive admin usage dashboard
- Add overall system statistics (users, conversations, messages, tokens, cost)
- Add daily usage table with per-user breakdown
- Add chat state clearing on logout and user change for isolation
- Center welcome message and input field in chat interface
- Add admin-specific styling for usage analytics tables
- Fix useCallback dependencies to prevent infinite loops

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-27 21:36:36 +00:00

451 lines
14 KiB
Python

"""
Chat Service
Orchestrates chat functionality between API, repositories, and OpenAI service
"""
import logging
from typing import Dict, Optional, List
from uuid import UUID
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.openai_service import OpenAIService
from app.repositories.conversation_repository import ConversationRepository
from app.repositories.message_repository import MessageRepository
from app.repositories.token_usage_repository import TokenUsageRepository
from app.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class ChatService:
"""
Main chat service for handling conversations and messages.
Orchestrates:
- Conversation management
- Message creation and retrieval
- OpenAI API integration
- Token usage tracking
"""
def __init__(
self,
session: AsyncSession,
openai_service: Optional[OpenAIService] = None
):
"""
Initialize chat service
Args:
session: Database session
openai_service: Optional OpenAI service instance
"""
self.session = session
self.openai_service = openai_service or OpenAIService()
# Initialize repositories
self.conversation_repo = ConversationRepository(session)
self.message_repo = MessageRepository(session)
self.token_repo = TokenUsageRepository(session)
async def create_conversation(
self,
user_id: UUID,
title: Optional[str] = None
) -> Dict:
"""
Create a new conversation
Args:
user_id: User UUID
title: Optional conversation title
Returns:
Dict with conversation data
"""
conversation = await self.conversation_repo.create(
user_id=user_id,
title=title or "New Conversation"
)
logger.info(f"Created conversation {conversation.id} for user {user_id}")
return {
"id": str(conversation.id),
"user_id": str(conversation.user_id),
"title": conversation.title,
"created_at": conversation.created_at.isoformat(),
"last_message_at": conversation.last_message_at.isoformat() if conversation.last_message_at else None,
"is_archived": conversation.is_archived,
}
async def get_conversation(self, conversation_id: UUID) -> Optional[Dict]:
"""
Get conversation by ID
Args:
conversation_id: Conversation UUID
Returns:
Dict with conversation data or None
"""
conversation = await self.conversation_repo.get_by_id(conversation_id)
if not conversation:
return None
return {
"id": str(conversation.id),
"user_id": str(conversation.user_id),
"title": conversation.title,
"created_at": conversation.created_at.isoformat(),
"last_message_at": conversation.last_message_at.isoformat() if conversation.last_message_at else None,
"is_archived": conversation.is_archived,
"last_response_id": conversation.last_response_id,
}
async def list_conversations(
self,
user_id: UUID,
include_archived: bool = False,
skip: int = 0,
limit: int = 50
) -> List[Dict]:
"""
List conversations for a user
Args:
user_id: User UUID
include_archived: Include archived conversations
skip: Number of records to skip
limit: Maximum number of records
Returns:
List of conversation dicts
"""
conversations = await self.conversation_repo.get_by_user(
user_id=user_id,
include_archived=include_archived,
skip=skip,
limit=limit
)
return [
{
"id": str(conv.id),
"title": conv.title,
"created_at": conv.created_at.isoformat(),
"last_message_at": conv.last_message_at.isoformat() if conv.last_message_at else None,
"is_archived": conv.is_archived,
}
for conv in conversations
]
async def send_message(
self,
user_id: UUID,
conversation_id: UUID,
message_content: str
) -> Dict:
"""
Send a message and get AI response
Args:
user_id: User UUID
conversation_id: Conversation UUID
message_content: User's message text
Returns:
Dict with user message and assistant response
"""
# Verify conversation exists and belongs to user
conversation = await self.conversation_repo.get_by_id(conversation_id)
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
if conversation.user_id != user_id:
raise PermissionError(f"User {user_id} does not have access to conversation {conversation_id}")
logger.info(f"Processing message from user {user_id} in conversation {conversation_id}")
# 1. Save user message
user_message = await self.message_repo.create_user_message(
conversation_id=conversation_id,
content=message_content,
token_count=self._estimate_tokens(message_content)
)
# 2. Get previous response ID for multi-turn conversation
previous_response_id = conversation.last_response_id
# 3. Generate AI response
try:
openai_response = await self.openai_service.generate_response(
user_message=message_content,
previous_response_id=previous_response_id
)
logger.info(
f"Generated OpenAI response {openai_response['response_id']} "
f"with {openai_response['usage']['total_tokens']} tokens"
)
except Exception as e:
logger.error(f"OpenAI API error: {e}", exc_info=True)
raise
# 4. Save assistant message
assistant_message = await self.message_repo.create_assistant_message(
conversation_id=conversation_id,
content=openai_response["content"],
openai_response_id=openai_response["response_id"],
token_count=openai_response["usage"]["completion_tokens"],
metadata={
"file_search_results": openai_response["file_search_results"],
"has_citations": openai_response["has_citations"],
"needs_review": openai_response.get("needs_review", False)
}
)
# 5. Update conversation's last_response_id for next turn
await self.conversation_repo.update_last_response_id(
conversation_id=conversation_id,
response_id=openai_response["response_id"]
)
# 6. Record token usage
cost_usd = self._calculate_cost(
prompt_tokens=openai_response["usage"]["prompt_tokens"],
completion_tokens=openai_response["usage"]["completion_tokens"],
cached_tokens=openai_response["usage"].get("cached_tokens", 0)
)
await self.token_repo.record_usage(
user_id=user_id,
conversation_id=conversation_id,
message_id=assistant_message.id,
prompt_tokens=openai_response["usage"]["prompt_tokens"],
cached_tokens=openai_response["usage"].get("cached_tokens", 0),
completion_tokens=openai_response["usage"]["completion_tokens"],
total_tokens=openai_response["usage"]["total_tokens"],
model=settings.OPENAI_MODEL,
cost_usd=cost_usd,
operation_type="chat"
)
logger.info(
f"Completed message exchange in conversation {conversation_id}. "
f"Cost: ${cost_usd:.6f}"
)
# 7. Return response
return {
"user_message": {
"id": str(user_message.id),
"content": user_message.content,
"created_at": user_message.created_at.isoformat()
},
"assistant_message": {
"id": str(assistant_message.id),
"content": assistant_message.content,
"created_at": assistant_message.created_at.isoformat(),
"file_search_results": openai_response["file_search_results"],
"needs_review": openai_response.get("needs_review", False)
},
"usage": openai_response["usage"],
"cost_usd": float(cost_usd)
}
async def get_messages(
self,
conversation_id: UUID,
skip: int = 0,
limit: int = 100
) -> List[Dict]:
"""
Get messages for a conversation
Args:
conversation_id: Conversation UUID
skip: Number of messages to skip
limit: Maximum number of messages
Returns:
List of message dicts
"""
messages = await self.message_repo.get_by_conversation(
conversation_id=conversation_id,
skip=skip,
limit=limit
)
return [
{
"id": str(msg.id),
"role": msg.role,
"content": msg.content,
"created_at": msg.created_at.isoformat(),
"metadata": msg.metadata
}
for msg in messages
]
async def update_conversation_title(
self,
conversation_id: UUID,
title: str
) -> bool:
"""
Update conversation title
Args:
conversation_id: Conversation UUID
title: New title
Returns:
True if updated successfully
"""
await self.conversation_repo.update(conversation_id, title=title)
logger.info(f"Updated conversation {conversation_id} title to: {title}")
return True
async def archive_conversation(self, conversation_id: UUID) -> bool:
"""
Archive a conversation
Args:
conversation_id: Conversation UUID
Returns:
True if archived successfully
"""
await self.conversation_repo.archive(conversation_id)
logger.info(f"Archived conversation {conversation_id}")
return True
async def delete_conversation(
self,
user_id: UUID,
conversation_id: UUID
) -> bool:
"""
Delete a conversation (with permission check)
Args:
user_id: User UUID
conversation_id: Conversation UUID
Returns:
True if deleted successfully
"""
# Verify ownership
conversation = await self.conversation_repo.get_by_id(conversation_id)
if not conversation:
raise ValueError(f"Conversation {conversation_id} not found")
if conversation.user_id != user_id:
raise PermissionError(f"User {user_id} does not have access to conversation {conversation_id}")
# Delete (will cascade to messages)
await self.conversation_repo.delete(conversation_id)
logger.info(f"Deleted conversation {conversation_id} for user {user_id}")
return True
async def get_token_usage_summary(
self,
user_id: UUID,
days: Optional[int] = None
) -> Dict:
"""
Get token usage summary for user
Args:
user_id: User UUID
days: Optional filter for last N days
Returns:
Dict with usage statistics
"""
total_tokens = await self.token_repo.get_user_total_tokens(user_id, days)
total_cost = await self.token_repo.get_user_total_cost(user_id, days)
daily_usage = await self.token_repo.get_daily_usage(user_id, days or 30)
return {
"total_tokens": total_tokens,
"total_cost_usd": float(total_cost),
"daily_breakdown": daily_usage,
"period_days": days
}
async def get_all_users_token_usage(
self,
days: Optional[int] = None
) -> List[Dict]:
"""
Get token usage for all users (admin only)
Args:
days: Optional filter for last N days
Returns:
List of dicts with per-user statistics
"""
return await self.token_repo.get_all_users_stats(days)
async def get_daily_usage_by_user(
self,
days: Optional[int] = None
) -> List[Dict]:
"""
Get daily token usage breakdown by user (admin only)
Args:
days: Optional filter for last N days
Returns:
List of dicts with daily usage per user
"""
return await self.token_repo.get_daily_usage_by_user(days)
def _estimate_tokens(self, text: str) -> int:
"""
Estimate token count for text (rough approximation)
Args:
text: Input text
Returns:
Estimated token count
"""
# Rough estimate: ~4 characters per token
return len(text) // 4
def _calculate_cost(
self,
prompt_tokens: int,
completion_tokens: int,
cached_tokens: int = 0
) -> Decimal:
"""
Calculate cost in USD
Args:
prompt_tokens: Number of prompt tokens (total input tokens)
completion_tokens: Number of completion tokens
cached_tokens: Number of cached input tokens (charged at lower rate)
Returns:
Total cost in USD
"""
# Calculate non-cached prompt tokens
non_cached_prompt_tokens = prompt_tokens - cached_tokens
# Calculate costs
prompt_cost = Decimal(str(non_cached_prompt_tokens)) * Decimal(str(settings.PROMPT_TOKEN_COST)) / Decimal("1000")
cached_cost = Decimal(str(cached_tokens)) * Decimal(str(settings.CACHED_PROMPT_TOKEN_COST)) / Decimal("1000")
completion_cost = Decimal(str(completion_tokens)) * Decimal(str(settings.COMPLETION_TOKEN_COST)) / Decimal("1000")
return prompt_cost + cached_cost + completion_cost