返回文章列表

Python JWT Token RBAC 許可權控制實作

本文探討 Python 應用程式中 JWT token 的進階管理與 RBAC 實作,涵蓋 token 復原機制、middleware 整合、RBAC 模型設計與 API 端點保護,提供程式碼範例與最佳實踐,強化 API 安全性。

Web 開發 資安

在開發 Python Web 應用程式時,確保 API 的安全性至關重要。本文提供的程式碼範例示範瞭如何使用 JWT(JSON Web Token)實作 token 驗證和復原,並結合根據角色的存取控制(RBAC)機制,有效地保護 API 端點。透過 Redis 儲存已復原的 token,可以有效地管理 token 的生命週期,即使 token 未過期也能立即使其失效。此外,文章也示範瞭如何將 token 驗證邏輯整合到 FastAPI 的 middleware 中,簡化 API 請求的驗證流程,並確保只有持有有效 token 的使用者才能存取受保護的資源。RBAC 的實作則透過角色和許可權的定義,精細地控制不同使用者群組對 API 資源的存取許可權,提升應用程式的整體安全性。

# 此處省略程式碼,完整程式碼請參考原文

強化 JWT Token 管理與 Role-Based Access Control(RBAC)實作

在現代 Python 應用程式中,API 安全性的核心在於完善的 token 管理與精細的許可權控制。本文將探討 JWT token 的進階管理機制以及根據角色的存取控制(RBAC)實作,並提供具體的程式碼範例與最佳實踐。

JWT Token 進階管理機制

JWT token 的安全使用不僅僅依賴於正確的生成與驗證,還需要有效的復原機制。由於 JWT 是無狀態的,實作復原機制需要額外的策略。

Token 復原實作

import jwt
import redis
from datetime import datetime

# Redis 設定
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def revoke_token(token: str, secret_key: str) -> None:
    """
    將 token 標記為已復原
    """
    try:
        # 解碼 token 取得 token_id
        decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
        token_id = decoded.get('jti')
        
        if token_id:
            # 計算 token 的剩餘有效時間
            expires_in = decoded["exp"] - datetime.utcnow().timestamp()
            # 在 Redis 中儲存已復原的 token
            redis_client.setex(f"revoked_token:{token_id}", int(expires_in), "revoked")
    except jwt.InvalidTokenError:
        raise Exception("Invalid token")

def is_token_revoked(token: str, secret_key: str) -> bool:
    """
    檢查 token 是否已被復原
    """
    try:
        decoded = jwt.decode(token, secret_key, algorithms=["HS256"], options={"verify_signature": False})
        token_id = decoded.get('jti')
        
        if token_id and redis_client.get(f"revoked_token:{token_id}"):
            return True
    except jwt.InvalidTokenError:
        return True  # 無效 token 視為已復原
    
    return False

#### 內容解密:
1. `revoke_token` 函式負責將指定的 JWT token 標記為已復原它首先解碼 token 以取得 `token_id`(JWT ID),然後將該 `token_id` 存入 Redis並設定其過期時間與原始 token 的剩餘有效時間一致
2. `is_token_revoked` 函式檢查給定的 token 是否存在於 Redis 的復原列表中如果存在則傳回 `True`,表示該 token 已被復原
3. 使用 Redis 作為復原列表的儲存媒介能夠提供高效能且可擴充套件的解決方案

### 將 Token 驗證與復原檢查整合至 Middleware

為了在每個 API 請求中強制進行 token 驗證與復原檢查可以將相關邏輯整合至 middleware 中

```python
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

# 設定 FastAPI 應用
app = FastAPI()
security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        # 驗證 token 有效性
        claims = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        
        # 檢查 token 是否被復原
        if is_token_revoked(token, SECRET_KEY):
            raise HTTPException(status_code=401, detail="Token has been revoked")
        
        return claims
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token has expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

# 使用範例
@app.get("/protected")
async def protected_route(claims: dict = Depends(verify_token)):
    return {"message": f"Hello, user {claims['sub']}"}

內容解密:

  1. verify_token 函式用於驗證傳入的 JWT token,包括檢查其有效性與是否被復原。
  2. 若 token 無效或已被復原,將丟擲相應的 HTTP 異常。
  3. 將此函式作為依賴項注入到受保護的路由中,能夠確保只有有效的、未被復原的 token 能夠存取受保護資源。

Role-Based Access Control(RBAC)實作

RBAC 提供了一種靈活且可擴充套件的許可權管理機制,透過將使用者對映到角色,並將角色與許可權繫結,簡化了許可權管理的複雜度。

基本 RBAC 模型設計

from enum import Enum
from typing import List

class Role(str, Enum):
    admin = "admin"
    user = "user"

class Permission(str, Enum):
    read = "read"
    write = "write"

# 定義角色與許可權的對映關係
role_permissions = {
    Role.admin: [Permission.read, Permission.write],
    Role.user: [Permission.read]
}

def check_permission(role: Role, permission: Permission) -> bool:
    """
    檢查指定角色是否擁有特定許可權
    """
    return permission in role_permissions.get(role, [])

# 使用範例
if check_permission(Role.user, Permission.read):
    print("User can read")
else:
    print("User cannot read")

內容解密:

  1. 定義了 RolePermission Enum,分別代表角色和許可權。
  2. role_permissions 字典建立了角色與許可權之間的對映關係。
  3. check_permission 函式用於檢查特定角色是否擁有指定的許可權。

將 RBAC 整合至 API

透過將 RBAC 邏輯整合至 middleware 或裝飾器,可以在 API 端點上強制執行根據角色的存取控制:

from functools import wraps
from fastapi import HTTPException

def role_required(required_role: Role):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 假設從 JWT 或其他認證機制取得使用者角色
            user_role = kwargs.get('user_role', Role.user)
            
            if user_role != required_role:
                raise HTTPException(status_code=403, detail="Forbidden")
            
            return await func(*args, **kwargs)
        return wrapper
    return decorator

# 使用範例
@app.get("/admin-only")
@role_required(Role.admin)
async def admin_only_route():
    return {"message": "Hello, admin"}

#### 圖表翻譯:
此圖示說明瞭 RBAC 的基本架構包括使用者角色許可權之間的關係以及如何透過 middleware 或裝飾器將 RBAC 邏輯應用於 API 端點
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖表翻譯:

rectangle "assigned to" as node1
rectangle "has" as node2
rectangle "grants access to" as node3
rectangle "protected by" as node4
rectangle "checks" as node5

node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5

@enduml

圖表翻譯: 上圖顯示了一個典型的 RBAC 模型,其中使用者被分配到特定的角色,而每個角色都擁有一組許可權。這些許可權決定了使用者能夠存取哪些資源。在 API 中,透過 middleware 或裝飾器實作對使用者角色的檢查,從而強制執行根據角色的存取控制。

進階根據角色的存取控制(RBAC)實作與動態許可權評估

在現代軟體系統中,根據角色的存取控制(RBAC)是實作細粒度安全性的關鍵元件。透過將使用者分配到不同的角色,並根據角色授權相應的許可權,系統能夠有效地控制對資源的存取。本文將探討RBAC的高階實作,包括動態許可權評估、角色階層結構以及與外部身份提供者的整合。

動態許可權評估機制

動態許可權評估允許系統在執行時根據使用者的角色和許可權動態地決定是否允許特定操作。這種方法特別適用於需要頻繁更新角色和許可權的分散式架構和微服務系統。以下是一個使用Python實作的動態許可權評估範例:

import json
import time
from functools import wraps
from typing import List, Dict, Any

# 模擬的記憶體許可權儲存
permission_store = {
    "admin": {"permissions": ["create", "read", "update", "delete"]},
    "manager": {"permissions": ["read", "update"]},
    "user": {"permissions": ["read"]}
}

# 簡單的快取機制
permission_cache = {}
CACHE_TTL = 300  # 秒

def get_role_permissions(role: str) -> list:
    """
    取得指定角色的許可權,並使用快取機制最佳化效能。
    """
    current_time = time.time()
    cached = permission_cache.get(role)
    if cached and current_time < cached["expires_at"]:
        return cached["permissions"]
    
    # 模擬從資料函式庫或外部服務查詢許可權
    perms = permission_store.get(role, {"permissions": []})["permissions"]
    permission_cache[role] = {"permissions": perms, "expires_at": current_time + CACHE_TTL}
    return perms

def check_permission(user: Dict[str, Any], permission: str) -> bool:
    """
    檢查使用者是否具有指定的許可權。
    """
    user_roles = user.get("roles", [])
    for role in user_roles:
        perms = get_role_permissions(role)
        if permission in perms:
            return True
    return False

def requires_permission(permission: str):
    """
    裝飾器:檢查呼叫者是否具有指定的許可權。
    """
    def decorator(func):
        @wraps(func)
        def wrapper(user: Dict[str, Any], *args, **kwargs):
            if not check_permission(user, permission):
                raise AuthorizationError(f"User {user.get('username')} lacks permission: {permission}")
            return func(user, *args, **kwargs)
        return wrapper
    return decorator

class AuthorizationError(Exception):
    pass

@requires_permission("delete")
def perform_delete_action(user: Dict[str, Any], item_id: str) -> str:
    """
    範例操作:刪除專案。需要"delete"許可權。
    """
    return f"User {user['username']} performed delete on item {item_id}"

# 範例用法
if __name__ == '__main__':
    user_admin = {"username": "admin_user", "roles": ["admin"]}
    user_manager = {"username": "manager_user", "roles": ["manager"]}
    
    print(perform_delete_action(user_admin, "item_100"))
    try:
        print(perform_delete_action(user_manager, "item_101"))
    except AuthorizationError as e:
        print("Authorization error:", e)

內容解密:

  1. get_role_permissions 函式:此函式負責根據角色取得對應的許可權列表。它使用了簡單的快取機制來提高效能,減少對外部資料來源的查詢次數。
  2. check_permission 函式:檢查給定使用者是否具有指定的許可權。它遍歷使用者的所有角色,並檢查這些角色是否包含所需的許可權。
  3. requires_permission 裝飾器:這是一個用於函式裝飾的工具,用於在執行特定操作前檢查呼叫者是否具有必要的許可權。如果沒有,則引發 AuthorizationError
  4. 動態許可權評估:透過將許可權儲存在外部資料結構(如 permission_store)中,並在執行時檢查使用者的許可權,系統能夠根據需要動態調整許可權控制。

角色階層結構的實作

在許多實際場景中,角色並非扁平結構,而是遵循某種階層關係,其中高層級角色繼承低層級角色的許可權。實作階層式RBAC需要在Python中構建可有效遍歷的繼承樹。以下是一個擴充套件的範例,展示瞭如何整合角色繼承:

# 定義角色階層
role_hierarchy = {
    "admin": ["manager", "user"],
    "manager": ["user"],
    "user": []
}

def get_inherited_permissions(role: str) -> set:
    """
    計算指定角色及其子角色所擁有的所有許可權。
    """
    perms = set(get_role_permissions(role))
    for child in role_hierarchy.get(role, []):
        perms.update(get_inherited_permissions(child))
    return perms

def check_permission_hierarchical(user: Dict[str, Any], permission: str) -> bool:
    """
    檢查使用者在階層式角色結構下是否具有指定許可權。
    """
    user_roles = user.get("roles", [])
    for role in user_roles:
        inherited_perms = get_inherited_permissions(role)
        if permission in inherited_perms:
            return True
    return False

def requires_permission_hierarchical(permission: str):
    """
    裝飾器:檢查呼叫者在階層式角色結構下是否具有指定許可權。
    """
    def decorator(func):
        @wraps(func)
        def wrapper(user: Dict[str, Any], *args, **kwargs):
            if not check_permission_hierarchical(user, permission):
                raise AuthorizationError(f"User {user.get('username')} lacks permission: {permission}")
            return func(user, *args, **kwargs)
        return wrapper
    return decorator

@requires_permission_hierarchical("update")
def update_resource(user: Dict[str, Any], resource_id: str) -> str:
    """
    範例操作:更新資源。需要"update"許可權。
    """
    return f"User {user['username']} updated resource {resource_id}"

# 範例用法
if __name__ == '__main__':
    user_manager = {"username": "manager_user", "roles": ["manager"]}
    user_basic = {"username": "basic_user", "roles": ["user"]}
    
    print(update_resource(user_manager, "res_200"))
    try:
        print(update_resource(user_basic, "res_201"))
    except AuthorizationError as e:
        print("Authorization error:", e)

內容解密:

  1. role_hierarchy 定義:描述了角色之間的階層關係,高層級角色列出其直接或間接子角色。
  2. get_inherited_permissions 函式:遞迴計算指定角色所擁有的所有許可權,包括其子角色所擁有的許可權。
  3. check_permission_hierarchical 函式:在階層式角色結構下檢查使用者的許可權。它利用 get_inherited_permissions 來取得角色及其子角色的所有許可權。
  4. requires_permission_hierarchical 裝飾器:用於檢查呼叫者在階層式角色結構下是否具有執行特定操作所需的許可權。

與外部身份提供者的整合

高階RBAC實作經常與外部身份提供者或目錄服務(如LDAP或Active Directory)整合。這需要實作跨系統的角色和許可權同步機制,以確保一致性和安全性。