590 lines
18 KiB
Python
590 lines
18 KiB
Python
"""Admin API routes - Admin only access"""
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, desc
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.models.job import Job
|
|
from app.models.usage import UsageLog
|
|
from app.schemas.user import UserResponse
|
|
|
|
router = APIRouter(prefix="/admin", tags=["admin"])
|
|
|
|
|
|
def get_current_admin_user(db: Session = Depends(get_db)) -> User:
|
|
"""Dependency to verify admin access - placeholder for real auth"""
|
|
# TODO: Implement real auth with JWT/session
|
|
user = db.query(User).filter(User.role.in_(['admin', 'super_admin'])).first()
|
|
if not user:
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return user
|
|
|
|
|
|
@router.get("/stats")
|
|
async def get_admin_stats(
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get admin dashboard statistics"""
|
|
today = datetime.utcnow().date()
|
|
|
|
total_users = db.query(func.count(User.id)).scalar()
|
|
active_users = db.query(func.count(User.id)).filter(User.is_active == True).scalar()
|
|
|
|
total_jobs = db.query(func.count(Job.id)).scalar()
|
|
jobs_today = db.query(func.count(Job.id)).filter(
|
|
func.date(Job.created_at) == today
|
|
).scalar()
|
|
failed_jobs = db.query(func.count(Job.id)).filter(
|
|
func.date(Job.created_at) == today,
|
|
Job.status == 'failed'
|
|
).scalar()
|
|
|
|
# Calculate average processing time for completed jobs
|
|
avg_time_result = db.query(
|
|
func.avg(
|
|
func.extract('epoch', Job.completed_at) - func.extract('epoch', Job.created_at)
|
|
)
|
|
).filter(
|
|
Job.status == 'completed',
|
|
Job.completed_at.isnot(None)
|
|
).scalar()
|
|
avg_processing_time = round(avg_time_result or 0, 1)
|
|
|
|
# Estimate API costs from usage logs
|
|
total_cost = db.query(func.sum(UsageLog.estimated_cost_usd)).filter(
|
|
func.date(UsageLog.created_at) >= today.replace(day=1)
|
|
).scalar() or 0
|
|
|
|
return {
|
|
"totalUsers": total_users,
|
|
"activeUsers": active_users,
|
|
"totalJobs": total_jobs,
|
|
"jobsToday": jobs_today,
|
|
"failedJobs": failed_jobs,
|
|
"avgProcessingTime": avg_processing_time,
|
|
"apiCosts": round(total_cost, 2)
|
|
}
|
|
|
|
|
|
@router.get("/activity")
|
|
async def get_recent_activity(
|
|
limit: int = Query(10, le=50),
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get recent system activity"""
|
|
# Get recent jobs with user info
|
|
recent_jobs = db.query(Job, User).join(
|
|
User, Job.user_id == User.id
|
|
).order_by(desc(Job.created_at)).limit(limit).all()
|
|
|
|
items = []
|
|
for job, user in recent_jobs:
|
|
action_map = {
|
|
'pending': 'Started',
|
|
'processing': 'Processing',
|
|
'completed': 'Completed',
|
|
'failed': 'Failed'
|
|
}
|
|
action = f"{action_map.get(job.status, 'Created')} {job.module.replace('_', ' ')}"
|
|
|
|
items.append({
|
|
"id": str(job.id),
|
|
"user": user.email,
|
|
"action": action,
|
|
"module": job.module,
|
|
"time": _format_relative_time(job.created_at)
|
|
})
|
|
|
|
return {"items": items}
|
|
|
|
|
|
@router.get("/users")
|
|
async def list_users(
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(20, le=100),
|
|
role: Optional[str] = None,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""List all users (admin only)"""
|
|
query = db.query(User)
|
|
|
|
if role:
|
|
query = query.filter(User.role == role)
|
|
|
|
total = query.count()
|
|
users = query.order_by(desc(User.created_at)).offset((page - 1) * limit).limit(limit).all()
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": str(u.id),
|
|
"email": u.email,
|
|
"name": u.display_name,
|
|
"role": u.role,
|
|
"is_active": u.is_active,
|
|
"created_at": u.created_at.isoformat(),
|
|
"last_login": u.last_login_at.isoformat() if u.last_login_at else None
|
|
}
|
|
for u in users
|
|
],
|
|
"total": total,
|
|
"page": page,
|
|
"limit": limit
|
|
}
|
|
|
|
|
|
@router.patch("/users/{user_id}")
|
|
async def update_user(
|
|
user_id: str,
|
|
role: Optional[str] = None,
|
|
is_active: Optional[bool] = None,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Update user role or status (admin only)"""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if role and role in ['user', 'admin', 'super_admin']:
|
|
# Only super_admin can create other super_admins
|
|
if role == 'super_admin' and admin.role != 'super_admin':
|
|
raise HTTPException(status_code=403, detail="Only super admins can create super admins")
|
|
user.role = role
|
|
|
|
if is_active is not None:
|
|
user.is_active = is_active
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return {"message": "User updated", "user_id": str(user.id)}
|
|
|
|
|
|
@router.get("/reports")
|
|
async def get_usage_reports(
|
|
range: str = Query("7d"),
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get usage reports and analytics"""
|
|
days = {"7d": 7, "30d": 30, "90d": 90, "365d": 365}.get(range, 7)
|
|
start_date = datetime.utcnow() - timedelta(days=days)
|
|
|
|
# Usage over time
|
|
usage_query = db.query(
|
|
func.date(Job.created_at).label('date'),
|
|
func.count(Job.id).label('jobs')
|
|
).filter(
|
|
Job.created_at >= start_date
|
|
).group_by(
|
|
func.date(Job.created_at)
|
|
).order_by(
|
|
func.date(Job.created_at)
|
|
).all()
|
|
|
|
usage_over_time = [
|
|
{"date": str(row.date), "jobs": row.jobs, "cost": row.jobs * 0.15}
|
|
for row in usage_query
|
|
]
|
|
|
|
# Module breakdown
|
|
module_query = db.query(
|
|
Job.module,
|
|
func.count(Job.id).label('count')
|
|
).filter(
|
|
Job.created_at >= start_date
|
|
).group_by(Job.module).all()
|
|
|
|
total_jobs = sum(m.count for m in module_query)
|
|
module_breakdown = [
|
|
{
|
|
"module": m.module.replace('_', ' ').title(),
|
|
"count": m.count,
|
|
"percentage": round(m.count / total_jobs * 100 if total_jobs > 0 else 0)
|
|
}
|
|
for m in module_query
|
|
]
|
|
|
|
# Top users
|
|
top_users_query = db.query(
|
|
User.id,
|
|
User.email,
|
|
func.count(Job.id).label('job_count')
|
|
).join(
|
|
Job, Job.user_id == User.id
|
|
).filter(
|
|
Job.created_at >= start_date
|
|
).group_by(User.id, User.email).order_by(
|
|
desc(func.count(Job.id))
|
|
).limit(10).all()
|
|
|
|
top_users = [
|
|
{
|
|
"user_id": str(u.id),
|
|
"user_email": u.email,
|
|
"job_count": u.job_count,
|
|
"total_cost": round(u.job_count * 0.15, 2)
|
|
}
|
|
for u in top_users_query
|
|
]
|
|
|
|
return {
|
|
"usage_over_time": usage_over_time,
|
|
"module_breakdown": module_breakdown,
|
|
"top_users": top_users,
|
|
"totals": {
|
|
"totalJobs": total_jobs,
|
|
"totalCost": round(total_jobs * 0.15, 2),
|
|
"avgJobsPerDay": round(total_jobs / days, 1) if days > 0 else 0
|
|
}
|
|
}
|
|
|
|
|
|
@router.get("/logs/search")
|
|
async def search_usage_logs(
|
|
query: Optional[str] = None,
|
|
provider: Optional[str] = None,
|
|
user_id: Optional[str] = None,
|
|
start_date: Optional[str] = None, # ISO format
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(20, le=100),
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""
|
|
Search usage logs by filename, prompt, user, or provider.
|
|
Surface detailed cost and metadata.
|
|
"""
|
|
sql_query = db.query(UsageLog, User).join(User, UsageLog.user_id == User.id)
|
|
|
|
# 1. Text Search (Metadata)
|
|
if query:
|
|
# Search inside JSONB metadata fields (filename, prompt, etc)
|
|
# Cast JSONB to text for searching
|
|
search_term = f"%{query}%"
|
|
sql_query = sql_query.filter(
|
|
func.cast(UsageLog.request_metadata, String).ilike(search_term) |
|
|
func.cast(UsageLog.response_metadata, String).ilike(search_term) |
|
|
UsageLog.action.ilike(search_term)
|
|
)
|
|
|
|
# 2. Filters
|
|
if provider:
|
|
sql_query = sql_query.filter(UsageLog.api_provider == provider)
|
|
|
|
if user_id:
|
|
sql_query = sql_query.filter(UsageLog.user_id == user_id)
|
|
|
|
if start_date:
|
|
try:
|
|
dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
sql_query = sql_query.filter(UsageLog.created_at >= dt)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Pagination
|
|
total = sql_query.count()
|
|
logs = sql_query.order_by(desc(UsageLog.created_at)).offset((page - 1) * limit).limit(limit).all()
|
|
|
|
items = []
|
|
for log, user in logs:
|
|
items.append({
|
|
"id": str(log.id),
|
|
"timestamp": log.created_at.isoformat(),
|
|
"user": {
|
|
"id": str(user.id),
|
|
"email": user.email,
|
|
"name": user.display_name
|
|
},
|
|
"service": {
|
|
"module": log.module,
|
|
"provider": log.api_provider,
|
|
"model": log.api_model
|
|
},
|
|
"metrics": {
|
|
"tokens_in": log.tokens_input,
|
|
"tokens_out": log.tokens_output,
|
|
"cost_usd": float(log.estimated_cost_usd or 0),
|
|
"latency_ms": log.processing_time_ms
|
|
},
|
|
# Return specific metadata fields relevant for UI
|
|
"request_details": log.request_metadata,
|
|
"response_details": log.response_metadata
|
|
})
|
|
|
|
return {
|
|
"items": items,
|
|
"total": total,
|
|
"page": page,
|
|
"limit": limit
|
|
}
|
|
|
|
|
|
@router.get("/audit-logs")
|
|
async def get_audit_logs(
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(50, le=100),
|
|
severity: Optional[str] = None,
|
|
action: Optional[str] = None,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get audit logs"""
|
|
# For now, generate from job history - in production would use dedicated audit table
|
|
query = db.query(Job, User).join(User, Job.user_id == User.id)
|
|
|
|
if action:
|
|
if 'failed' in action:
|
|
query = query.filter(Job.status == 'failed')
|
|
elif 'completed' in action:
|
|
query = query.filter(Job.status == 'completed')
|
|
|
|
total = query.count()
|
|
results = query.order_by(desc(Job.created_at)).offset((page - 1) * limit).limit(limit).all()
|
|
|
|
items = []
|
|
for job, user in results:
|
|
severity = 'error' if job.status == 'failed' else 'info'
|
|
action = f"job.{job.status}"
|
|
|
|
items.append({
|
|
"id": str(job.id),
|
|
"user_id": str(user.id),
|
|
"user_email": user.email,
|
|
"action": action,
|
|
"resource_type": "job",
|
|
"resource_id": str(job.id),
|
|
"details": {
|
|
"module": job.module,
|
|
"error": job.error_message if job.error_message else None
|
|
},
|
|
"ip_address": "192.168.1.100", # Placeholder
|
|
"created_at": job.created_at.isoformat(),
|
|
"severity": severity
|
|
})
|
|
|
|
return {
|
|
"items": items,
|
|
"total": total,
|
|
"page": page,
|
|
"limit": limit
|
|
}
|
|
|
|
|
|
def _format_relative_time(dt: datetime) -> str:
|
|
"""Format datetime as relative time string"""
|
|
now = datetime.utcnow()
|
|
diff = now - dt
|
|
|
|
if diff.seconds < 60:
|
|
return "Just now"
|
|
elif diff.seconds < 3600:
|
|
mins = diff.seconds // 60
|
|
return f"{mins} min{'s' if mins > 1 else ''} ago"
|
|
elif diff.seconds < 86400:
|
|
hours = diff.seconds // 3600
|
|
return f"{hours} hour{'s' if hours > 1 else ''} ago"
|
|
else:
|
|
days = diff.days
|
|
return f"{days} day{'s' if days > 1 else ''} ago"
|
|
|
|
|
|
# ============== VOICE MANAGEMENT ==============
|
|
|
|
@router.get("/voices")
|
|
async def get_voices(
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get all ElevenLabs voices including custom cloned voices"""
|
|
import httpx
|
|
from app.config import settings
|
|
|
|
if not settings.elevenlabs_api_key:
|
|
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.get(
|
|
"https://api.elevenlabs.io/v1/voices",
|
|
headers={"xi-api-key": settings.elevenlabs_api_key}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
voices = []
|
|
for voice in data.get("voices", []):
|
|
voices.append({
|
|
"voice_id": voice.get("voice_id"),
|
|
"name": voice.get("name"),
|
|
"category": voice.get("category"),
|
|
"description": voice.get("description"),
|
|
"labels": voice.get("labels", {}),
|
|
"preview_url": voice.get("preview_url"),
|
|
"available_for_tiers": voice.get("available_for_tiers", []),
|
|
"settings": voice.get("settings"),
|
|
"sharing": voice.get("sharing"),
|
|
"high_quality_base_model_ids": voice.get("high_quality_base_model_ids", []),
|
|
"samples": voice.get("samples", [])
|
|
})
|
|
|
|
return {
|
|
"voices": voices,
|
|
"total": len(voices)
|
|
}
|
|
|
|
|
|
@router.get("/voices/{voice_id}")
|
|
async def get_voice_details(
|
|
voice_id: str,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get detailed information about a specific voice"""
|
|
import httpx
|
|
from app.config import settings
|
|
|
|
if not settings.elevenlabs_api_key:
|
|
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.get(
|
|
f"https://api.elevenlabs.io/v1/voices/{voice_id}",
|
|
headers={"xi-api-key": settings.elevenlabs_api_key}
|
|
)
|
|
if response.status_code == 404:
|
|
raise HTTPException(status_code=404, detail="Voice not found")
|
|
response.raise_for_status()
|
|
|
|
return response.json()
|
|
|
|
|
|
@router.post("/voices/clone")
|
|
async def clone_voice(
|
|
name: str,
|
|
description: Optional[str] = None,
|
|
files: list = None,
|
|
labels: Optional[dict] = None,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Clone a voice using audio samples (Instant Voice Cloning)"""
|
|
import httpx
|
|
from app.config import settings
|
|
|
|
if not settings.elevenlabs_api_key:
|
|
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
|
|
|
|
# For now, return instructions - actual implementation requires file upload
|
|
return {
|
|
"message": "Voice cloning requires audio file upload",
|
|
"instructions": {
|
|
"endpoint": "POST /api/v1/admin/voices/clone-with-files",
|
|
"required": ["name", "files (audio samples)"],
|
|
"optional": ["description", "labels"],
|
|
"notes": [
|
|
"Upload 1-25 audio samples (max 10MB each)",
|
|
"Supported formats: mp3, wav, m4a, ogg, flac",
|
|
"Minimum sample length: 30 seconds combined",
|
|
"Best results: clear speech, no background noise"
|
|
]
|
|
}
|
|
}
|
|
|
|
|
|
@router.delete("/voices/{voice_id}")
|
|
async def delete_voice(
|
|
voice_id: str,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Delete a custom voice (only works for cloned voices)"""
|
|
import httpx
|
|
from app.config import settings
|
|
|
|
if not settings.elevenlabs_api_key:
|
|
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.delete(
|
|
f"https://api.elevenlabs.io/v1/voices/{voice_id}",
|
|
headers={"xi-api-key": settings.elevenlabs_api_key}
|
|
)
|
|
if response.status_code == 404:
|
|
raise HTTPException(status_code=404, detail="Voice not found")
|
|
if response.status_code == 400:
|
|
raise HTTPException(status_code=400, detail="Cannot delete premade voices")
|
|
response.raise_for_status()
|
|
|
|
return {"message": f"Voice {voice_id} deleted successfully"}
|
|
|
|
|
|
@router.patch("/voices/{voice_id}/settings")
|
|
async def update_voice_settings(
|
|
voice_id: str,
|
|
name: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
labels: Optional[dict] = None,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Update voice name, description or labels"""
|
|
import httpx
|
|
from app.config import settings
|
|
|
|
if not settings.elevenlabs_api_key:
|
|
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
|
|
|
|
payload = {}
|
|
if name:
|
|
payload["name"] = name
|
|
if description:
|
|
payload["description"] = description
|
|
if labels:
|
|
payload["labels"] = labels
|
|
|
|
if not payload:
|
|
raise HTTPException(status_code=400, detail="No updates provided")
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.patch(
|
|
f"https://api.elevenlabs.io/v1/voices/{voice_id}/edit",
|
|
headers={
|
|
"xi-api-key": settings.elevenlabs_api_key,
|
|
"Content-Type": "application/json"
|
|
},
|
|
json=payload
|
|
)
|
|
if response.status_code == 404:
|
|
raise HTTPException(status_code=404, detail="Voice not found")
|
|
response.raise_for_status()
|
|
|
|
return {"message": f"Voice {voice_id} updated successfully"}
|
|
|
|
|
|
@router.get("/voices/models")
|
|
async def get_voice_models(
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get available TTS models from ElevenLabs"""
|
|
import httpx
|
|
from app.config import settings
|
|
|
|
if not settings.elevenlabs_api_key:
|
|
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.get(
|
|
"https://api.elevenlabs.io/v1/models",
|
|
headers={"xi-api-key": settings.elevenlabs_api_key}
|
|
)
|
|
response.raise_for_status()
|
|
|
|
return response.json()
|