from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.auth.providers.jwt_provider import JWTAuthProvider from app.models.user import User class AuthService: """Authentication service wrapping the JWT provider.""" def __init__(self) -> None: self.provider = JWTAuthProvider() async def login( self, email: str, password: str, db: AsyncSession ) -> dict[str, str] | None: """Authenticate a user and return tokens, or None if invalid.""" result = await db.execute(select(User).where(User.email == email)) user = result.scalar_one_or_none() if user is None: return None if not self.provider.verify_password(password, user.password_hash): return None if user.status.value != "active": return None token_data = { "sub": str(user.id), "email": user.email, "role": user.role.value, "name": user.name, } return { "access_token": self.provider.create_access_token(token_data), "refresh_token": self.provider.create_refresh_token(token_data), "token_type": "bearer", } def refresh_tokens(self, refresh_token: str) -> dict[str, str] | None: """Validate a refresh token and issue new token pair.""" claims = self.provider.validate_token(refresh_token) if claims is None: return None if claims.get("type") != "refresh": return None token_data = { "sub": claims["sub"], "email": claims.get("email", ""), "role": claims.get("role", ""), "name": claims.get("name", ""), } return { "access_token": self.provider.create_access_token(token_data), "refresh_token": self.provider.create_refresh_token(token_data), "token_type": "bearer", } def validate_token(self, token: str) -> dict[str, Any] | None: """Validate a token and return claims.""" return self.provider.validate_token(token) def hash_password(self, password: str) -> str: """Hash a password.""" return self.provider.hash_password(password)