Files
bxh/app/api/rbac.py

156 lines
5.0 KiB
Python

"""RBAC & Accounts (P1) — roles, capability matrix, users CRUD."""
from fastapi import APIRouter, HTTPException
from app.auth import CurrentUser
from app.contracts import (
CapabilityCreate,
RoleCapCell,
RoleCreate,
RoleUpdate,
UserAreasSet,
UserCreate,
UserUpdate,
)
from app.db import (
create_capability,
create_role,
create_user,
delete_capability,
delete_role,
delete_user,
get_permission_matrix,
list_capabilities,
list_roles,
list_user_areas,
list_users,
set_role_cap,
set_user_areas,
update_role,
update_user,
upsert_custom_area,
)
from app.security import hash_password
router = APIRouter()
# ── Roles ────────────────────────────────────────────────────────────────────
@router.get("/roles")
async def _list_roles(_user: CurrentUser = None):
return await list_roles()
@router.post("/roles")
async def _create_role(body: RoleCreate, _user: CurrentUser):
try:
return await create_role(body.model_dump())
except Exception as e: # unique violation etc.
raise HTTPException(400, f"创建角色失败:{str(e)[:200]}")
@router.patch("/roles/{role_key}")
async def _update_role(role_key: str, body: RoleUpdate, _user: CurrentUser):
row = await update_role(role_key, body.model_dump(exclude_none=True))
if not row:
raise HTTPException(404, "角色不存在")
return row
@router.delete("/roles/{role_key}")
async def _delete_role(role_key: str, _user: CurrentUser):
try:
ok = await delete_role(role_key)
except ValueError as e:
raise HTTPException(400, str(e))
if not ok:
raise HTTPException(404, "角色不存在")
return {"deleted": role_key}
# ── Capabilities ─────────────────────────────────────────────────────────────
@router.get("/capabilities")
async def _list_caps(_user: CurrentUser = None):
return await list_capabilities()
@router.post("/capabilities")
async def _create_cap(body: CapabilityCreate, _user: CurrentUser):
try:
return await create_capability(body.model_dump())
except Exception as e:
raise HTTPException(400, f"创建能力项失败:{str(e)[:200]}")
@router.delete("/capabilities/{cap_key}")
async def _delete_cap(cap_key: str, _user: CurrentUser):
await delete_capability(cap_key)
return {"deleted": cap_key}
# ── Permission matrix ────────────────────────────────────────────────────────
@router.get("/permission-matrix")
async def _matrix(_user: CurrentUser = None):
return await get_permission_matrix()
@router.put("/permission-matrix")
async def _set_cell(body: RoleCapCell, _user: CurrentUser):
await set_role_cap(body.role_key, body.cap_key, body.value)
return {"ok": True, "role_key": body.role_key, "cap_key": body.cap_key, "value": body.value}
# ── Users ────────────────────────────────────────────────────────────────────
@router.get("/users")
async def _list_users(_user: CurrentUser = None):
return await list_users()
@router.post("/users")
async def _create_user(body: UserCreate, _user: CurrentUser):
data = body.model_dump()
roles = data.pop("roles", [])
pw = data.pop("password")
data["hashed_password"] = hash_password(pw)
try:
return await create_user(data, roles)
except Exception as e:
raise HTTPException(400, f"创建用户失败:{str(e)[:200]}")
@router.patch("/users/{user_id}")
async def _update_user(user_id: int, body: UserUpdate, _user: CurrentUser):
data = body.model_dump(exclude_none=True)
roles = data.pop("roles", None)
if "password" in data:
data["hashed_password"] = hash_password(data.pop("password"))
row = await update_user(user_id, data, roles)
if not row:
raise HTTPException(404, "用户不存在")
return row
@router.delete("/users/{user_id}")
async def _delete_user(user_id: int, _user: CurrentUser):
await delete_user(user_id)
return {"deleted": user_id}
@router.get("/users/{user_id}/areas")
async def _get_user_areas(user_id: int, _user: CurrentUser = None):
return await list_user_areas(user_id)
@router.put("/users/{user_id}/areas")
async def _set_user_areas(user_id: int, body: UserAreasSet, _user: CurrentUser):
"""Set the areas this user is responsible for (existing + free-text)."""
area_ids = list(body.area_ids)
for name in body.custom_areas:
if name and name.strip():
area_ids.append(await upsert_custom_area(name))
await set_user_areas(user_id, area_ids)
return await list_user_areas(user_id)