""" Admin Endpoints API endpoints for administrative functions (user management, analytics, system settings) """ import logging from typing import List from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from datetime import datetime, timedelta from app.database import get_db from app.core.middleware import get_current_active_user from app.core.rbac import require_role, UserRole, Permission, require_permission from app.models.user import User from app.models.conversation import Conversation from app.models.message import Message from app.models.token_usage import TokenUsage from pydantic import BaseModel, EmailStr logger = logging.getLogger(__name__) router = APIRouter(prefix="/admin", tags=["Admin"]) # Schemas class UserListItem(BaseModel): """User list item for admin""" id: str email: EmailStr display_name: str role: str is_active: bool created_at: datetime last_login_at: datetime | None total_conversations: int total_tokens: int class Config: from_attributes = True class UpdateUserRoleRequest(BaseModel): """Request to update user role""" role: str class SystemAnalytics(BaseModel): """System-wide analytics""" total_users: int active_users: int total_conversations: int total_messages: int total_tokens_used: int total_cost_usd: float avg_tokens_per_user: float avg_messages_per_conversation: float class UserAnalytics(BaseModel): """Per-user analytics""" user_id: str email: str display_name: str conversations_count: int messages_count: int tokens_used: int cost_usd: float last_activity: datetime | None class MessageDetail(BaseModel): """Message detail for user details view""" id: str role: str content: str created_at: datetime token_count: int cost_usd: float class ConversationDetail(BaseModel): """Conversation detail for user details view""" id: str title: str created_at: datetime last_message_at: datetime | None message_count: int total_tokens: int total_cost_usd: float messages: List[MessageDetail] class UserDetails(BaseModel): """Detailed user information with conversations and messages""" user_id: str email: str display_name: str role: str is_active: bool created_at: datetime last_login_at: datetime | None total_conversations: int total_messages: int total_tokens: int prompt_tokens: int completion_tokens: int cached_tokens: int total_cost_usd: float avg_tokens_per_message: float conversations: List[ConversationDetail] # Endpoints @router.get("/users", response_model=List[UserListItem]) async def list_all_users( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), skip: int = 0, limit: int = 100 ): """ List all users (admin only) Args: current_user: Current authenticated user db: Database session skip: Number of users to skip limit: Max number of users to return Returns: List of users with stats Raises: HTTPException: If user is not admin """ require_permission(current_user.role, Permission.READ_ALL_USERS) # Query users with aggregated stats query = ( select( User, func.count(Conversation.id).label("total_conversations"), func.sum(TokenUsage.total_tokens).label("total_tokens") ) .outerjoin(Conversation, User.id == Conversation.user_id) .outerjoin(TokenUsage, User.id == TokenUsage.user_id) .group_by(User.id) .order_by(User.created_at.desc()) .offset(skip) .limit(limit) ) result = await db.execute(query) users_with_stats = result.all() return [ UserListItem( id=str(user.id), email=user.email, display_name=user.display_name, role=user.role, is_active=user.is_active, created_at=user.created_at, last_login_at=user.last_login_at, total_conversations=total_conversations or 0, total_tokens=int(total_tokens or 0) ) for user, total_conversations, total_tokens in users_with_stats ] @router.put("/users/{user_id}/role") async def update_user_role( user_id: str, request: UpdateUserRoleRequest, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """ Update user role (admin only) Args: user_id: User ID to update request: New role current_user: Current authenticated user db: Database session Returns: Updated user info Raises: HTTPException: If user is not admin or user not found """ require_permission(current_user.role, Permission.UPDATE_USER_ROLE) # Validate role try: new_role = UserRole(request.role) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid role. Must be one of: {[r.value for r in UserRole]}" ) # Get user result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Update role user.role = new_role.value await db.commit() logger.info(f"Admin {current_user.id} updated user {user_id} role to {new_role.value}") return { "success": True, "user_id": str(user.id), "email": user.email, "new_role": user.role } @router.put("/users/{user_id}/activate") async def activate_user( user_id: str, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """ Activate user account (admin only) Args: user_id: User ID to activate current_user: Current authenticated user db: Database session Returns: Success message Raises: HTTPException: If user is not admin or user not found """ require_permission(current_user.role, Permission.READ_ALL_USERS) result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_active = True await db.commit() logger.info(f"Admin {current_user.id} activated user {user_id}") return {"success": True, "message": f"User {user.email} activated"} @router.put("/users/{user_id}/deactivate") async def deactivate_user( user_id: str, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """ Deactivate user account (admin only) Args: user_id: User ID to deactivate current_user: Current authenticated user db: Database session Returns: Success message Raises: HTTPException: If user is not admin or user not found """ require_permission(current_user.role, Permission.READ_ALL_USERS) result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Can't deactivate self if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate your own account" ) user.is_active = False await db.commit() logger.info(f"Admin {current_user.id} deactivated user {user_id}") return {"success": True, "message": f"User {user.email} deactivated"} @router.get("/analytics/system", response_model=SystemAnalytics) async def get_system_analytics( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db) ): """ Get system-wide analytics (admin only) Args: current_user: Current authenticated user db: Database session Returns: System analytics Raises: HTTPException: If user is not admin """ require_permission(current_user.role, Permission.VIEW_ANALYTICS) # Get counts total_users = await db.scalar(select(func.count(User.id))) active_users = await db.scalar( select(func.count(User.id)).where(User.is_active == True) ) total_conversations = await db.scalar(select(func.count(Conversation.id))) total_messages = await db.scalar(select(func.count(Message.id))) # Get token usage stats token_stats = await db.execute( select( func.sum(TokenUsage.total_tokens).label("total_tokens"), func.sum(TokenUsage.cost_usd).label("total_cost") ) ) token_row = token_stats.first() total_tokens = int(token_row.total_tokens or 0) total_cost = float(token_row.total_cost or 0.0) # Calculate averages avg_tokens_per_user = total_tokens / active_users if active_users > 0 else 0 avg_messages_per_conv = total_messages / total_conversations if total_conversations > 0 else 0 return SystemAnalytics( total_users=total_users or 0, active_users=active_users or 0, total_conversations=total_conversations or 0, total_messages=total_messages or 0, total_tokens_used=total_tokens, total_cost_usd=total_cost, avg_tokens_per_user=avg_tokens_per_user, avg_messages_per_conversation=avg_messages_per_conv ) @router.get("/analytics/users", response_model=List[UserAnalytics]) async def get_users_analytics( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), days: int = 30, limit: int = 50 ): """ Get per-user analytics (admin only) Args: current_user: Current authenticated user db: Database session days: Number of days to analyze limit: Max number of users to return Returns: List of user analytics Raises: HTTPException: If user is not admin """ require_permission(current_user.role, Permission.VIEW_ANALYTICS) since_date = datetime.utcnow() - timedelta(days=days) query = ( select( User, func.count(Conversation.id).label("conversations_count"), func.count(Message.id).label("messages_count"), func.sum(TokenUsage.total_tokens).label("tokens_used"), func.sum(TokenUsage.cost_usd).label("cost_usd"), func.max(Conversation.last_message_at).label("last_activity") ) .outerjoin(Conversation, User.id == Conversation.user_id) .outerjoin(Message, Conversation.id == Message.conversation_id) .outerjoin(TokenUsage, User.id == TokenUsage.user_id) .where( (Conversation.created_at >= since_date) | (Conversation.id == None) ) .group_by(User.id) .order_by(func.sum(TokenUsage.total_tokens).desc().nullslast()) .limit(limit) ) result = await db.execute(query) users_analytics = result.all() return [ UserAnalytics( user_id=str(user.id), email=user.email, display_name=user.display_name, conversations_count=conversations_count or 0, messages_count=messages_count or 0, tokens_used=int(tokens_used or 0), cost_usd=float(cost_usd or 0.0), last_activity=last_activity ) for user, conversations_count, messages_count, tokens_used, cost_usd, last_activity in users_analytics ] @router.get("/conversations/all") async def get_all_conversations( current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), skip: int = 0, limit: int = 50 ): """ Get all conversations across all users (admin only) Args: current_user: Current authenticated user db: Database session skip: Number to skip limit: Max number to return Returns: List of conversations Raises: HTTPException: If user is not admin """ require_permission(current_user.role, Permission.READ_ALL_CHATS) query = ( select(Conversation, User) .join(User, Conversation.user_id == User.id) .order_by(Conversation.created_at.desc()) .offset(skip) .limit(limit) ) result = await db.execute(query) conversations = result.all() return [ { "id": str(conv.id), "title": conv.title, "user_email": user.email, "user_display_name": user.display_name, "created_at": conv.created_at, "last_message_at": conv.last_message_at, "is_archived": conv.is_archived } for conv, user in conversations ] @router.get("/users/{user_id}/details", response_model=UserDetails) async def get_user_details( user_id: str, current_user: User = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), limit_conversations: int = 20 ): """ Get detailed information for a specific user (admin only) Args: user_id: User ID to get details for current_user: Current authenticated user db: Database session limit_conversations: Max number of conversations to include Returns: Detailed user information with conversations and messages Raises: HTTPException: If user is not admin or user not found """ require_permission(current_user.role, Permission.READ_ALL_CHATS) # Get user user_result = await db.execute( select(User).where(User.id == user_id) ) user = user_result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Get user's token usage stats token_stats = await db.execute( select( func.count(TokenUsage.id).label("total_messages"), func.sum(TokenUsage.total_tokens).label("total_tokens"), func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"), func.sum(TokenUsage.completion_tokens).label("completion_tokens"), func.sum(TokenUsage.cached_tokens).label("cached_tokens"), func.sum(TokenUsage.cost_usd).label("total_cost_usd") ).where(TokenUsage.user_id == user_id) ) stats_row = token_stats.first() total_messages = int(stats_row.total_messages or 0) total_tokens = int(stats_row.total_tokens or 0) prompt_tokens = int(stats_row.prompt_tokens or 0) completion_tokens = int(stats_row.completion_tokens or 0) cached_tokens = int(stats_row.cached_tokens or 0) total_cost_usd = float(stats_row.total_cost_usd or 0.0) avg_tokens_per_message = total_tokens / total_messages if total_messages > 0 else 0 # Get user's conversations with message counts and token stats conv_query = ( select( Conversation, func.count(Message.id).label("message_count"), func.sum(TokenUsage.total_tokens).label("total_tokens"), func.sum(TokenUsage.cost_usd).label("total_cost") ) .outerjoin(Message, Conversation.id == Message.conversation_id) .outerjoin(TokenUsage, Conversation.id == TokenUsage.conversation_id) .where(Conversation.user_id == user_id) .group_by(Conversation.id) .order_by(Conversation.last_message_at.desc().nullslast()) .limit(limit_conversations) ) conv_result = await db.execute(conv_query) conversations_data = conv_result.all() # Build conversation details with messages conversations = [] for conv, message_count, conv_tokens, conv_cost in conversations_data: # Get messages for this conversation msg_query = ( select(Message, TokenUsage) .outerjoin(TokenUsage, (TokenUsage.conversation_id == Message.conversation_id) & (TokenUsage.message_id == Message.id)) .where(Message.conversation_id == conv.id) .order_by(Message.created_at.asc()) ) msg_result = await db.execute(msg_query) messages_data = msg_result.all() messages = [ MessageDetail( id=str(msg.id), role=msg.role, content=msg.content, created_at=msg.created_at, token_count=int(token.total_tokens) if token else 0, cost_usd=float(token.cost_usd) if token else 0.0 ) for msg, token in messages_data ] conversations.append( ConversationDetail( id=str(conv.id), title=conv.title, created_at=conv.created_at, last_message_at=conv.last_message_at, message_count=int(message_count or 0), total_tokens=int(conv_tokens or 0), total_cost_usd=float(conv_cost or 0.0), messages=messages ) ) # Get total conversation count total_conv_count = await db.scalar( select(func.count(Conversation.id)) .where(Conversation.user_id == user_id) ) logger.info(f"Admin {current_user.id} viewed details for user {user_id}") return UserDetails( user_id=str(user.id), email=user.email, display_name=user.display_name, role=user.role, is_active=user.is_active, created_at=user.created_at, last_login_at=user.last_login_at, total_conversations=total_conv_count or 0, total_messages=total_messages, total_tokens=total_tokens, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, cached_tokens=cached_tokens, total_cost_usd=total_cost_usd, avg_tokens_per_message=round(avg_tokens_per_message, 1), conversations=conversations )