"""JWT auth — DB-backed users, bcrypt password hashing, token + user dependency.""" from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from app import db from app.config import settings from app.security import verify_password oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/v1/admin/auth/login") async def authenticate(username: str, password: str) -> dict | None: """Validate credentials against the DB users table.""" user = await db.get_user_auth(username) if not user: return None if user.get("status") != "active": return None if not verify_password(password, user.get("hashed_password") or ""): return None return { "username": user["username"], "full_name": user.get("full_name"), "roles": list(user.get("roles") or []), } def create_access_token(data: dict) -> str: payload = data.copy() payload["exp"] = datetime.now(timezone.utc) + timedelta(minutes=settings.auth_token_expire_minutes) return jwt.encode(payload, settings.auth_secret, algorithm=settings.auth_algorithm) def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> dict: """Identity is reconstructed from JWT claims (sub + roles) — no per-request DB hit.""" credentials_exc = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.auth_secret, algorithms=[settings.auth_algorithm]) username: str = payload.get("sub", "") if not username: raise credentials_exc except JWTError: raise credentials_exc return {"username": username, "roles": list(payload.get("roles") or [])} CurrentUser = Annotated[dict, Depends(get_current_user)]