forge/backend/app/api/v1/auth.py
DJP 7a804e896d Initial commit - FORGE AI unified platform
Features:
- Image generation (OpenAI, Gemini, Leonardo, Bria, Stability, Flux)
- Nano Banana iterative editing
- Video generation and upscaling
- Audio TTS, STT, sound effects (ElevenLabs)
- Text prompt studio and alt text
- User authentication with JWT/cookies
- Admin panel with voice management
- Job queue with Celery
- PostgreSQL + Redis backend
- Next.js 15 + FastAPI architecture

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2025-12-09 20:39:00 -05:00

261 lines
7.7 KiB
Python

"""Authentication API Routes"""
from fastapi import APIRouter, Depends, HTTPException, status, Response, Cookie
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
from uuid import UUID
from app.database import get_db
from app.models.user import User
from app.schemas.user import (
UserSignUp, UserLogin, UserResponse, TokenResponse,
PasswordChange, UserUpdate
)
from app.config import settings
router = APIRouter()
security = HTTPBearer(auto_error=False)
# JWT Settings from config
SECRET_KEY = settings.jwt_secret_key
ALGORITHM = settings.jwt_algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.jwt_expire_minutes
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> Optional[dict]:
"""Verify a JWT token and return the payload"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
access_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db)
) -> User:
"""Get the current authenticated user from JWT token"""
token = None
# Check Authorization header first
if credentials:
token = credentials.credentials
# Fall back to cookie
elif access_token:
token = access_token
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
payload = verify_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled",
)
return user
async def get_optional_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
access_token: Optional[str] = Cookie(None),
db: Session = Depends(get_db)
) -> Optional[User]:
"""Get the current user if authenticated, otherwise return None"""
token = None
if credentials:
token = credentials.credentials
elif access_token:
token = access_token
if not token:
return None
payload = verify_token(token)
if not payload:
return None
user_id = payload.get("sub")
if not user_id:
return None
return db.query(User).filter(User.id == user_id, User.is_active == True).first()
@router.post("/signup", response_model=TokenResponse)
async def signup(user_data: UserSignUp, response: Response, db: Session = Depends(get_db)):
"""Register a new user"""
# Check if email already exists
existing_user = db.query(User).filter(User.email == user_data.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
user = User(
email=user_data.email,
display_name=user_data.display_name,
hashed_password=User.hash_password(user_data.password),
role="user",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
# Create access token
access_token = create_access_token(data={"sub": str(user.id)})
# Set cookie
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
samesite="lax",
secure=False, # Set to True in production with HTTPS
)
return TokenResponse(
access_token=access_token,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=UserResponse.model_validate(user)
)
@router.post("/login", response_model=TokenResponse)
async def login(credentials: UserLogin, response: Response, db: Session = Depends(get_db)):
"""Login with email and password"""
user = db.query(User).filter(User.email == credentials.email).first()
if not user or not user.verify_password(credentials.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled"
)
# Update last login
user.last_login_at = datetime.utcnow()
db.commit()
# Create access token
access_token = create_access_token(data={"sub": str(user.id)})
# Set cookie
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
samesite="lax",
secure=False, # Set to True in production with HTTPS
)
return TokenResponse(
access_token=access_token,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=UserResponse.model_validate(user)
)
@router.post("/logout")
async def logout(response: Response):
"""Logout by clearing the access token cookie"""
response.delete_cookie(key="access_token")
return {"message": "Successfully logged out"}
@router.get("/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_user)):
"""Get current authenticated user"""
return current_user
@router.patch("/me", response_model=UserResponse)
async def update_me(
user_data: UserUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Update current user profile"""
# Only allow updating certain fields
allowed_fields = ["display_name", "avatar_url"]
for key, value in user_data.model_dump(exclude_unset=True).items():
if key in allowed_fields and value is not None:
setattr(current_user, key, value)
db.commit()
db.refresh(current_user)
return current_user
@router.post("/me/change-password")
async def change_password(
password_data: PasswordChange,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Change current user's password"""
if not current_user.verify_password(password_data.current_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect"
)
current_user.hashed_password = User.hash_password(password_data.new_password)
db.commit()
return {"message": "Password changed successfully"}
@router.get("/verify")
async def verify_auth(current_user: User = Depends(get_current_user)):
"""Verify the current authentication token is valid"""
return {"valid": True, "user_id": str(current_user.id)}