"""Admin API routes - Admin only access""" from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from sqlalchemy import func, desc from datetime import datetime, timedelta from typing import Optional from app.database import get_db from app.models.user import User from app.models.job import Job from app.models.usage import UsageLog from app.schemas.user import UserResponse router = APIRouter(prefix="/admin", tags=["admin"]) def get_current_admin_user(db: Session = Depends(get_db)) -> User: """Dependency to verify admin access - placeholder for real auth""" # TODO: Implement real auth with JWT/session user = db.query(User).filter(User.role.in_(['admin', 'super_admin'])).first() if not user: raise HTTPException(status_code=403, detail="Admin access required") return user @router.get("/stats") async def get_admin_stats( db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Get admin dashboard statistics""" today = datetime.utcnow().date() total_users = db.query(func.count(User.id)).scalar() active_users = db.query(func.count(User.id)).filter(User.is_active == True).scalar() total_jobs = db.query(func.count(Job.id)).scalar() jobs_today = db.query(func.count(Job.id)).filter( func.date(Job.created_at) == today ).scalar() failed_jobs = db.query(func.count(Job.id)).filter( func.date(Job.created_at) == today, Job.status == 'failed' ).scalar() # Calculate average processing time for completed jobs avg_time_result = db.query( func.avg( func.extract('epoch', Job.completed_at) - func.extract('epoch', Job.created_at) ) ).filter( Job.status == 'completed', Job.completed_at.isnot(None) ).scalar() avg_processing_time = round(avg_time_result or 0, 1) # Estimate API costs from usage logs total_cost = db.query(func.sum(UsageLog.estimated_cost_usd)).filter( func.date(UsageLog.created_at) >= today.replace(day=1) ).scalar() or 0 return { "totalUsers": total_users, "activeUsers": active_users, "totalJobs": total_jobs, "jobsToday": jobs_today, "failedJobs": failed_jobs, "avgProcessingTime": avg_processing_time, "apiCosts": round(total_cost, 2) } @router.get("/activity") async def get_recent_activity( limit: int = Query(10, le=50), db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Get recent system activity""" # Get recent jobs with user info recent_jobs = db.query(Job, User).join( User, Job.user_id == User.id ).order_by(desc(Job.created_at)).limit(limit).all() items = [] for job, user in recent_jobs: action_map = { 'pending': 'Started', 'processing': 'Processing', 'completed': 'Completed', 'failed': 'Failed' } action = f"{action_map.get(job.status, 'Created')} {job.module.replace('_', ' ')}" items.append({ "id": str(job.id), "user": user.email, "action": action, "module": job.module, "time": _format_relative_time(job.created_at) }) return {"items": items} @router.get("/users") async def list_users( page: int = Query(1, ge=1), limit: int = Query(20, le=100), role: Optional[str] = None, db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """List all users (admin only)""" query = db.query(User) if role: query = query.filter(User.role == role) total = query.count() users = query.order_by(desc(User.created_at)).offset((page - 1) * limit).limit(limit).all() return { "items": [ { "id": str(u.id), "email": u.email, "name": u.display_name, "role": u.role, "is_active": u.is_active, "created_at": u.created_at.isoformat(), "last_login": u.last_login_at.isoformat() if u.last_login_at else None } for u in users ], "total": total, "page": page, "limit": limit } @router.patch("/users/{user_id}") async def update_user( user_id: str, role: Optional[str] = None, is_active: Optional[bool] = None, db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Update user role or status (admin only)""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") if role and role in ['user', 'admin', 'super_admin']: # Only super_admin can create other super_admins if role == 'super_admin' and admin.role != 'super_admin': raise HTTPException(status_code=403, detail="Only super admins can create super admins") user.role = role if is_active is not None: user.is_active = is_active db.commit() db.refresh(user) return {"message": "User updated", "user_id": str(user.id)} @router.get("/reports") async def get_usage_reports( range: str = Query("7d"), db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Get usage reports and analytics""" days = {"7d": 7, "30d": 30, "90d": 90, "365d": 365}.get(range, 7) start_date = datetime.utcnow() - timedelta(days=days) # Usage over time usage_query = db.query( func.date(Job.created_at).label('date'), func.count(Job.id).label('jobs') ).filter( Job.created_at >= start_date ).group_by( func.date(Job.created_at) ).order_by( func.date(Job.created_at) ).all() usage_over_time = [ {"date": str(row.date), "jobs": row.jobs, "cost": row.jobs * 0.15} for row in usage_query ] # Module breakdown module_query = db.query( Job.module, func.count(Job.id).label('count') ).filter( Job.created_at >= start_date ).group_by(Job.module).all() total_jobs = sum(m.count for m in module_query) module_breakdown = [ { "module": m.module.replace('_', ' ').title(), "count": m.count, "percentage": round(m.count / total_jobs * 100 if total_jobs > 0 else 0) } for m in module_query ] # Top users top_users_query = db.query( User.id, User.email, func.count(Job.id).label('job_count') ).join( Job, Job.user_id == User.id ).filter( Job.created_at >= start_date ).group_by(User.id, User.email).order_by( desc(func.count(Job.id)) ).limit(10).all() top_users = [ { "user_id": str(u.id), "user_email": u.email, "job_count": u.job_count, "total_cost": round(u.job_count * 0.15, 2) } for u in top_users_query ] return { "usage_over_time": usage_over_time, "module_breakdown": module_breakdown, "top_users": top_users, "totals": { "totalJobs": total_jobs, "totalCost": round(total_jobs * 0.15, 2), "avgJobsPerDay": round(total_jobs / days, 1) if days > 0 else 0 } } @router.get("/logs/search") async def search_usage_logs( query: Optional[str] = None, provider: Optional[str] = None, user_id: Optional[str] = None, start_date: Optional[str] = None, # ISO format page: int = Query(1, ge=1), limit: int = Query(20, le=100), db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """ Search usage logs by filename, prompt, user, or provider. Surface detailed cost and metadata. """ sql_query = db.query(UsageLog, User).join(User, UsageLog.user_id == User.id) # 1. Text Search (Metadata) if query: # Search inside JSONB metadata fields (filename, prompt, etc) # Cast JSONB to text for searching search_term = f"%{query}%" sql_query = sql_query.filter( func.cast(UsageLog.request_metadata, String).ilike(search_term) | func.cast(UsageLog.response_metadata, String).ilike(search_term) | UsageLog.action.ilike(search_term) ) # 2. Filters if provider: sql_query = sql_query.filter(UsageLog.api_provider == provider) if user_id: sql_query = sql_query.filter(UsageLog.user_id == user_id) if start_date: try: dt = datetime.fromisoformat(start_date.replace('Z', '+00:00')) sql_query = sql_query.filter(UsageLog.created_at >= dt) except ValueError: pass # Pagination total = sql_query.count() logs = sql_query.order_by(desc(UsageLog.created_at)).offset((page - 1) * limit).limit(limit).all() items = [] for log, user in logs: items.append({ "id": str(log.id), "timestamp": log.created_at.isoformat(), "user": { "id": str(user.id), "email": user.email, "name": user.display_name }, "service": { "module": log.module, "provider": log.api_provider, "model": log.api_model }, "metrics": { "tokens_in": log.tokens_input, "tokens_out": log.tokens_output, "cost_usd": float(log.estimated_cost_usd or 0), "latency_ms": log.processing_time_ms }, # Return specific metadata fields relevant for UI "request_details": log.request_metadata, "response_details": log.response_metadata }) return { "items": items, "total": total, "page": page, "limit": limit } @router.get("/audit-logs") async def get_audit_logs( page: int = Query(1, ge=1), limit: int = Query(50, le=100), severity: Optional[str] = None, action: Optional[str] = None, db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Get audit logs""" # For now, generate from job history - in production would use dedicated audit table query = db.query(Job, User).join(User, Job.user_id == User.id) if action: if 'failed' in action: query = query.filter(Job.status == 'failed') elif 'completed' in action: query = query.filter(Job.status == 'completed') total = query.count() results = query.order_by(desc(Job.created_at)).offset((page - 1) * limit).limit(limit).all() items = [] for job, user in results: severity = 'error' if job.status == 'failed' else 'info' action = f"job.{job.status}" items.append({ "id": str(job.id), "user_id": str(user.id), "user_email": user.email, "action": action, "resource_type": "job", "resource_id": str(job.id), "details": { "module": job.module, "error": job.error_message if job.error_message else None }, "ip_address": "192.168.1.100", # Placeholder "created_at": job.created_at.isoformat(), "severity": severity }) return { "items": items, "total": total, "page": page, "limit": limit } def _format_relative_time(dt: datetime) -> str: """Format datetime as relative time string""" now = datetime.utcnow() diff = now - dt if diff.seconds < 60: return "Just now" elif diff.seconds < 3600: mins = diff.seconds // 60 return f"{mins} min{'s' if mins > 1 else ''} ago" elif diff.seconds < 86400: hours = diff.seconds // 3600 return f"{hours} hour{'s' if hours > 1 else ''} ago" else: days = diff.days return f"{days} day{'s' if days > 1 else ''} ago" # ============== VOICE MANAGEMENT ============== @router.get("/voices") async def get_voices( db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Get all ElevenLabs voices including custom cloned voices""" import httpx from app.config import settings if not settings.elevenlabs_api_key: raise HTTPException(status_code=500, detail="ElevenLabs API key not configured") async with httpx.AsyncClient(timeout=30) as client: response = await client.get( "https://api.elevenlabs.io/v1/voices", headers={"xi-api-key": settings.elevenlabs_api_key} ) response.raise_for_status() data = response.json() voices = [] for voice in data.get("voices", []): voices.append({ "voice_id": voice.get("voice_id"), "name": voice.get("name"), "category": voice.get("category"), "description": voice.get("description"), "labels": voice.get("labels", {}), "preview_url": voice.get("preview_url"), "available_for_tiers": voice.get("available_for_tiers", []), "settings": voice.get("settings"), "sharing": voice.get("sharing"), "high_quality_base_model_ids": voice.get("high_quality_base_model_ids", []), "samples": voice.get("samples", []) }) return { "voices": voices, "total": len(voices) } @router.get("/voices/{voice_id}") async def get_voice_details( voice_id: str, db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Get detailed information about a specific voice""" import httpx from app.config import settings if not settings.elevenlabs_api_key: raise HTTPException(status_code=500, detail="ElevenLabs API key not configured") async with httpx.AsyncClient(timeout=30) as client: response = await client.get( f"https://api.elevenlabs.io/v1/voices/{voice_id}", headers={"xi-api-key": settings.elevenlabs_api_key} ) if response.status_code == 404: raise HTTPException(status_code=404, detail="Voice not found") response.raise_for_status() return response.json() @router.post("/voices/clone") async def clone_voice( name: str, description: Optional[str] = None, files: list = None, labels: Optional[dict] = None, db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Clone a voice using audio samples (Instant Voice Cloning)""" import httpx from app.config import settings if not settings.elevenlabs_api_key: raise HTTPException(status_code=500, detail="ElevenLabs API key not configured") # For now, return instructions - actual implementation requires file upload return { "message": "Voice cloning requires audio file upload", "instructions": { "endpoint": "POST /api/v1/admin/voices/clone-with-files", "required": ["name", "files (audio samples)"], "optional": ["description", "labels"], "notes": [ "Upload 1-25 audio samples (max 10MB each)", "Supported formats: mp3, wav, m4a, ogg, flac", "Minimum sample length: 30 seconds combined", "Best results: clear speech, no background noise" ] } } @router.delete("/voices/{voice_id}") async def delete_voice( voice_id: str, db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Delete a custom voice (only works for cloned voices)""" import httpx from app.config import settings if not settings.elevenlabs_api_key: raise HTTPException(status_code=500, detail="ElevenLabs API key not configured") async with httpx.AsyncClient(timeout=30) as client: response = await client.delete( f"https://api.elevenlabs.io/v1/voices/{voice_id}", headers={"xi-api-key": settings.elevenlabs_api_key} ) if response.status_code == 404: raise HTTPException(status_code=404, detail="Voice not found") if response.status_code == 400: raise HTTPException(status_code=400, detail="Cannot delete premade voices") response.raise_for_status() return {"message": f"Voice {voice_id} deleted successfully"} @router.patch("/voices/{voice_id}/settings") async def update_voice_settings( voice_id: str, name: Optional[str] = None, description: Optional[str] = None, labels: Optional[dict] = None, db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Update voice name, description or labels""" import httpx from app.config import settings if not settings.elevenlabs_api_key: raise HTTPException(status_code=500, detail="ElevenLabs API key not configured") payload = {} if name: payload["name"] = name if description: payload["description"] = description if labels: payload["labels"] = labels if not payload: raise HTTPException(status_code=400, detail="No updates provided") async with httpx.AsyncClient(timeout=30) as client: response = await client.patch( f"https://api.elevenlabs.io/v1/voices/{voice_id}/edit", headers={ "xi-api-key": settings.elevenlabs_api_key, "Content-Type": "application/json" }, json=payload ) if response.status_code == 404: raise HTTPException(status_code=404, detail="Voice not found") response.raise_for_status() return {"message": f"Voice {voice_id} updated successfully"} @router.get("/voices/models") async def get_voice_models( db: Session = Depends(get_db), admin: User = Depends(get_current_admin_user) ): """Get available TTS models from ElevenLabs""" import httpx from app.config import settings if not settings.elevenlabs_api_key: raise HTTPException(status_code=500, detail="ElevenLabs API key not configured") async with httpx.AsyncClient(timeout=30) as client: response = await client.get( "https://api.elevenlabs.io/v1/models", headers={"xi-api-key": settings.elevenlabs_api_key} ) response.raise_for_status() return response.json()