- Remove contact references from system prompt, add language matching rule - Add copy-to-clipboard button on assistant messages with iframe fallback - Increase token lifetime to 24h/30d, add refresh queue, remove hard redirect - Fix adaptive layout for iframe/standalone, pin input at bottom - Fix CSS specificity conflict (8px→2px spacing), add markdown post-processing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
134 lines
2.6 KiB
Python
134 lines
2.6 KiB
Python
"""
|
|
Security utilities
|
|
|
|
JWT token management and password hashing
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict
|
|
import jwt
|
|
import hashlib
|
|
|
|
from app.config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
settings = get_settings()
|
|
|
|
|
|
def create_access_token(
|
|
data: Dict,
|
|
expires_delta: Optional[timedelta] = None
|
|
) -> str:
|
|
"""
|
|
Create JWT access token
|
|
|
|
Args:
|
|
data: Token payload data
|
|
expires_delta: Optional expiration time delta
|
|
|
|
Returns:
|
|
Encoded JWT token
|
|
"""
|
|
to_encode = data.copy()
|
|
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(hours=24)
|
|
|
|
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
|
|
|
|
encoded_jwt = jwt.encode(
|
|
to_encode,
|
|
settings.SECRET_KEY,
|
|
algorithm="HS256"
|
|
)
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
def create_refresh_token(data: Dict) -> str:
|
|
"""
|
|
Create JWT refresh token (longer expiration)
|
|
|
|
Args:
|
|
data: Token payload data
|
|
|
|
Returns:
|
|
Encoded JWT refresh token
|
|
"""
|
|
return create_access_token(
|
|
data,
|
|
expires_delta=timedelta(days=30)
|
|
)
|
|
|
|
|
|
def decode_token(token: str) -> Optional[Dict]:
|
|
"""
|
|
Decode and verify JWT token
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
Decoded token payload or None if invalid
|
|
"""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
settings.SECRET_KEY,
|
|
algorithms=["HS256"]
|
|
)
|
|
return payload
|
|
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("Token has expired")
|
|
return None
|
|
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning(f"Invalid token: {e}")
|
|
return None
|
|
|
|
|
|
def hash_token(token: str) -> str:
|
|
"""
|
|
Hash token for secure storage using SHA-256
|
|
|
|
Args:
|
|
token: Token string to hash
|
|
|
|
Returns:
|
|
Hashed token (hex digest)
|
|
"""
|
|
return hashlib.sha256(token.encode('utf-8')).hexdigest()
|
|
|
|
|
|
def verify_token_hash(token: str, hashed: str) -> bool:
|
|
"""
|
|
Verify token against hash
|
|
|
|
Args:
|
|
token: Plain token string
|
|
hashed: Hashed token
|
|
|
|
Returns:
|
|
True if token matches hash
|
|
"""
|
|
return hash_token(token) == hashed
|
|
|
|
|
|
def get_token_subject(token: str) -> Optional[str]:
|
|
"""
|
|
Extract subject (user_id) from token
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
User ID or None
|
|
"""
|
|
payload = decode_token(token)
|
|
if payload:
|
|
return payload.get("sub")
|
|
return None
|