forge/backend/app/api/v1/admin.py

590 lines
18 KiB
Python

"""Admin API routes - Admin only access"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from datetime import datetime, timedelta
from typing import Optional
from app.database import get_db
from app.models.user import User
from app.models.job import Job
from app.models.usage import UsageLog
from app.schemas.user import UserResponse
router = APIRouter(prefix="/admin", tags=["admin"])
def get_current_admin_user(db: Session = Depends(get_db)) -> User:
"""Dependency to verify admin access - placeholder for real auth"""
# TODO: Implement real auth with JWT/session
user = db.query(User).filter(User.role.in_(['admin', 'super_admin'])).first()
if not user:
raise HTTPException(status_code=403, detail="Admin access required")
return user
@router.get("/stats")
async def get_admin_stats(
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Get admin dashboard statistics"""
today = datetime.utcnow().date()
total_users = db.query(func.count(User.id)).scalar()
active_users = db.query(func.count(User.id)).filter(User.is_active == True).scalar()
total_jobs = db.query(func.count(Job.id)).scalar()
jobs_today = db.query(func.count(Job.id)).filter(
func.date(Job.created_at) == today
).scalar()
failed_jobs = db.query(func.count(Job.id)).filter(
func.date(Job.created_at) == today,
Job.status == 'failed'
).scalar()
# Calculate average processing time for completed jobs
avg_time_result = db.query(
func.avg(
func.extract('epoch', Job.completed_at) - func.extract('epoch', Job.created_at)
)
).filter(
Job.status == 'completed',
Job.completed_at.isnot(None)
).scalar()
avg_processing_time = round(avg_time_result or 0, 1)
# Estimate API costs from usage logs
total_cost = db.query(func.sum(UsageLog.estimated_cost_usd)).filter(
func.date(UsageLog.created_at) >= today.replace(day=1)
).scalar() or 0
return {
"totalUsers": total_users,
"activeUsers": active_users,
"totalJobs": total_jobs,
"jobsToday": jobs_today,
"failedJobs": failed_jobs,
"avgProcessingTime": avg_processing_time,
"apiCosts": round(total_cost, 2)
}
@router.get("/activity")
async def get_recent_activity(
limit: int = Query(10, le=50),
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Get recent system activity"""
# Get recent jobs with user info
recent_jobs = db.query(Job, User).join(
User, Job.user_id == User.id
).order_by(desc(Job.created_at)).limit(limit).all()
items = []
for job, user in recent_jobs:
action_map = {
'pending': 'Started',
'processing': 'Processing',
'completed': 'Completed',
'failed': 'Failed'
}
action = f"{action_map.get(job.status, 'Created')} {job.module.replace('_', ' ')}"
items.append({
"id": str(job.id),
"user": user.email,
"action": action,
"module": job.module,
"time": _format_relative_time(job.created_at)
})
return {"items": items}
@router.get("/users")
async def list_users(
page: int = Query(1, ge=1),
limit: int = Query(20, le=100),
role: Optional[str] = None,
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""List all users (admin only)"""
query = db.query(User)
if role:
query = query.filter(User.role == role)
total = query.count()
users = query.order_by(desc(User.created_at)).offset((page - 1) * limit).limit(limit).all()
return {
"items": [
{
"id": str(u.id),
"email": u.email,
"name": u.display_name,
"role": u.role,
"is_active": u.is_active,
"created_at": u.created_at.isoformat(),
"last_login": u.last_login_at.isoformat() if u.last_login_at else None
}
for u in users
],
"total": total,
"page": page,
"limit": limit
}
@router.patch("/users/{user_id}")
async def update_user(
user_id: str,
role: Optional[str] = None,
is_active: Optional[bool] = None,
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Update user role or status (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if role and role in ['user', 'admin', 'super_admin']:
# Only super_admin can create other super_admins
if role == 'super_admin' and admin.role != 'super_admin':
raise HTTPException(status_code=403, detail="Only super admins can create super admins")
user.role = role
if is_active is not None:
user.is_active = is_active
db.commit()
db.refresh(user)
return {"message": "User updated", "user_id": str(user.id)}
@router.get("/reports")
async def get_usage_reports(
range: str = Query("7d"),
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Get usage reports and analytics"""
days = {"7d": 7, "30d": 30, "90d": 90, "365d": 365}.get(range, 7)
start_date = datetime.utcnow() - timedelta(days=days)
# Usage over time
usage_query = db.query(
func.date(Job.created_at).label('date'),
func.count(Job.id).label('jobs')
).filter(
Job.created_at >= start_date
).group_by(
func.date(Job.created_at)
).order_by(
func.date(Job.created_at)
).all()
usage_over_time = [
{"date": str(row.date), "jobs": row.jobs, "cost": row.jobs * 0.15}
for row in usage_query
]
# Module breakdown
module_query = db.query(
Job.module,
func.count(Job.id).label('count')
).filter(
Job.created_at >= start_date
).group_by(Job.module).all()
total_jobs = sum(m.count for m in module_query)
module_breakdown = [
{
"module": m.module.replace('_', ' ').title(),
"count": m.count,
"percentage": round(m.count / total_jobs * 100 if total_jobs > 0 else 0)
}
for m in module_query
]
# Top users
top_users_query = db.query(
User.id,
User.email,
func.count(Job.id).label('job_count')
).join(
Job, Job.user_id == User.id
).filter(
Job.created_at >= start_date
).group_by(User.id, User.email).order_by(
desc(func.count(Job.id))
).limit(10).all()
top_users = [
{
"user_id": str(u.id),
"user_email": u.email,
"job_count": u.job_count,
"total_cost": round(u.job_count * 0.15, 2)
}
for u in top_users_query
]
return {
"usage_over_time": usage_over_time,
"module_breakdown": module_breakdown,
"top_users": top_users,
"totals": {
"totalJobs": total_jobs,
"totalCost": round(total_jobs * 0.15, 2),
"avgJobsPerDay": round(total_jobs / days, 1) if days > 0 else 0
}
}
@router.get("/logs/search")
async def search_usage_logs(
query: Optional[str] = None,
provider: Optional[str] = None,
user_id: Optional[str] = None,
start_date: Optional[str] = None, # ISO format
page: int = Query(1, ge=1),
limit: int = Query(20, le=100),
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""
Search usage logs by filename, prompt, user, or provider.
Surface detailed cost and metadata.
"""
sql_query = db.query(UsageLog, User).join(User, UsageLog.user_id == User.id)
# 1. Text Search (Metadata)
if query:
# Search inside JSONB metadata fields (filename, prompt, etc)
# Cast JSONB to text for searching
search_term = f"%{query}%"
sql_query = sql_query.filter(
func.cast(UsageLog.request_metadata, String).ilike(search_term) |
func.cast(UsageLog.response_metadata, String).ilike(search_term) |
UsageLog.action.ilike(search_term)
)
# 2. Filters
if provider:
sql_query = sql_query.filter(UsageLog.api_provider == provider)
if user_id:
sql_query = sql_query.filter(UsageLog.user_id == user_id)
if start_date:
try:
dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
sql_query = sql_query.filter(UsageLog.created_at >= dt)
except ValueError:
pass
# Pagination
total = sql_query.count()
logs = sql_query.order_by(desc(UsageLog.created_at)).offset((page - 1) * limit).limit(limit).all()
items = []
for log, user in logs:
items.append({
"id": str(log.id),
"timestamp": log.created_at.isoformat(),
"user": {
"id": str(user.id),
"email": user.email,
"name": user.display_name
},
"service": {
"module": log.module,
"provider": log.api_provider,
"model": log.api_model
},
"metrics": {
"tokens_in": log.tokens_input,
"tokens_out": log.tokens_output,
"cost_usd": float(log.estimated_cost_usd or 0),
"latency_ms": log.processing_time_ms
},
# Return specific metadata fields relevant for UI
"request_details": log.request_metadata,
"response_details": log.response_metadata
})
return {
"items": items,
"total": total,
"page": page,
"limit": limit
}
@router.get("/audit-logs")
async def get_audit_logs(
page: int = Query(1, ge=1),
limit: int = Query(50, le=100),
severity: Optional[str] = None,
action: Optional[str] = None,
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Get audit logs"""
# For now, generate from job history - in production would use dedicated audit table
query = db.query(Job, User).join(User, Job.user_id == User.id)
if action:
if 'failed' in action:
query = query.filter(Job.status == 'failed')
elif 'completed' in action:
query = query.filter(Job.status == 'completed')
total = query.count()
results = query.order_by(desc(Job.created_at)).offset((page - 1) * limit).limit(limit).all()
items = []
for job, user in results:
severity = 'error' if job.status == 'failed' else 'info'
action = f"job.{job.status}"
items.append({
"id": str(job.id),
"user_id": str(user.id),
"user_email": user.email,
"action": action,
"resource_type": "job",
"resource_id": str(job.id),
"details": {
"module": job.module,
"error": job.error_message if job.error_message else None
},
"ip_address": "192.168.1.100", # Placeholder
"created_at": job.created_at.isoformat(),
"severity": severity
})
return {
"items": items,
"total": total,
"page": page,
"limit": limit
}
def _format_relative_time(dt: datetime) -> str:
"""Format datetime as relative time string"""
now = datetime.utcnow()
diff = now - dt
if diff.seconds < 60:
return "Just now"
elif diff.seconds < 3600:
mins = diff.seconds // 60
return f"{mins} min{'s' if mins > 1 else ''} ago"
elif diff.seconds < 86400:
hours = diff.seconds // 3600
return f"{hours} hour{'s' if hours > 1 else ''} ago"
else:
days = diff.days
return f"{days} day{'s' if days > 1 else ''} ago"
# ============== VOICE MANAGEMENT ==============
@router.get("/voices")
async def get_voices(
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Get all ElevenLabs voices including custom cloned voices"""
import httpx
from app.config import settings
if not settings.elevenlabs_api_key:
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
"https://api.elevenlabs.io/v1/voices",
headers={"xi-api-key": settings.elevenlabs_api_key}
)
response.raise_for_status()
data = response.json()
voices = []
for voice in data.get("voices", []):
voices.append({
"voice_id": voice.get("voice_id"),
"name": voice.get("name"),
"category": voice.get("category"),
"description": voice.get("description"),
"labels": voice.get("labels", {}),
"preview_url": voice.get("preview_url"),
"available_for_tiers": voice.get("available_for_tiers", []),
"settings": voice.get("settings"),
"sharing": voice.get("sharing"),
"high_quality_base_model_ids": voice.get("high_quality_base_model_ids", []),
"samples": voice.get("samples", [])
})
return {
"voices": voices,
"total": len(voices)
}
@router.get("/voices/{voice_id}")
async def get_voice_details(
voice_id: str,
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Get detailed information about a specific voice"""
import httpx
from app.config import settings
if not settings.elevenlabs_api_key:
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
f"https://api.elevenlabs.io/v1/voices/{voice_id}",
headers={"xi-api-key": settings.elevenlabs_api_key}
)
if response.status_code == 404:
raise HTTPException(status_code=404, detail="Voice not found")
response.raise_for_status()
return response.json()
@router.post("/voices/clone")
async def clone_voice(
name: str,
description: Optional[str] = None,
files: list = None,
labels: Optional[dict] = None,
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Clone a voice using audio samples (Instant Voice Cloning)"""
import httpx
from app.config import settings
if not settings.elevenlabs_api_key:
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
# For now, return instructions - actual implementation requires file upload
return {
"message": "Voice cloning requires audio file upload",
"instructions": {
"endpoint": "POST /api/v1/admin/voices/clone-with-files",
"required": ["name", "files (audio samples)"],
"optional": ["description", "labels"],
"notes": [
"Upload 1-25 audio samples (max 10MB each)",
"Supported formats: mp3, wav, m4a, ogg, flac",
"Minimum sample length: 30 seconds combined",
"Best results: clear speech, no background noise"
]
}
}
@router.delete("/voices/{voice_id}")
async def delete_voice(
voice_id: str,
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Delete a custom voice (only works for cloned voices)"""
import httpx
from app.config import settings
if not settings.elevenlabs_api_key:
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.delete(
f"https://api.elevenlabs.io/v1/voices/{voice_id}",
headers={"xi-api-key": settings.elevenlabs_api_key}
)
if response.status_code == 404:
raise HTTPException(status_code=404, detail="Voice not found")
if response.status_code == 400:
raise HTTPException(status_code=400, detail="Cannot delete premade voices")
response.raise_for_status()
return {"message": f"Voice {voice_id} deleted successfully"}
@router.patch("/voices/{voice_id}/settings")
async def update_voice_settings(
voice_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
labels: Optional[dict] = None,
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Update voice name, description or labels"""
import httpx
from app.config import settings
if not settings.elevenlabs_api_key:
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
payload = {}
if name:
payload["name"] = name
if description:
payload["description"] = description
if labels:
payload["labels"] = labels
if not payload:
raise HTTPException(status_code=400, detail="No updates provided")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.patch(
f"https://api.elevenlabs.io/v1/voices/{voice_id}/edit",
headers={
"xi-api-key": settings.elevenlabs_api_key,
"Content-Type": "application/json"
},
json=payload
)
if response.status_code == 404:
raise HTTPException(status_code=404, detail="Voice not found")
response.raise_for_status()
return {"message": f"Voice {voice_id} updated successfully"}
@router.get("/voices/models")
async def get_voice_models(
db: Session = Depends(get_db),
admin: User = Depends(get_current_admin_user)
):
"""Get available TTS models from ElevenLabs"""
import httpx
from app.config import settings
if not settings.elevenlabs_api_key:
raise HTTPException(status_code=500, detail="ElevenLabs API key not configured")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
"https://api.elevenlabs.io/v1/models",
headers={"xi-api-key": settings.elevenlabs_api_key}
)
response.raise_for_status()
return response.json()