Files
bxh/app/auth.py

59 lines
2.0 KiB
Python

"""JWT auth — DB-backed users, bcrypt password hashing, token + user dependency."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from app import db
from app.config import settings
from app.security import verify_password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/v1/admin/auth/login")
async def authenticate(username: str, password: str) -> dict | None:
"""Validate credentials against the DB users table."""
user = await db.get_user_auth(username)
if not user:
return None
if user.get("status") != "active":
return None
if not verify_password(password, user.get("hashed_password") or ""):
return None
return {
"username": user["username"],
"full_name": user.get("full_name"),
"roles": list(user.get("roles") or []),
}
def create_access_token(data: dict) -> str:
payload = data.copy()
payload["exp"] = datetime.now(timezone.utc) + timedelta(minutes=settings.auth_token_expire_minutes)
return jwt.encode(payload, settings.auth_secret, algorithm=settings.auth_algorithm)
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> dict:
"""Identity is reconstructed from JWT claims (sub + roles) — no per-request DB hit."""
credentials_exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.auth_secret, algorithms=[settings.auth_algorithm])
username: str = payload.get("sub", "")
if not username:
raise credentials_exc
except JWTError:
raise credentials_exc
return {"username": username, "roles": list(payload.get("roles") or [])}
CurrentUser = Annotated[dict, Depends(get_current_user)]