返回文章列表

Python 單一登入與進階存取控制實作

本文探討 Python 中單一登入(SSO)與進階存取控制的實作,涵蓋根據角色的存取控制(RBAC)進階實作、OAuth 2.0 與 OpenID Connect 的 SSO 整合、SAML 協定應用、JWT 驗證、以及使用 Redis 進行 Session 管理,並以 FastAPI 示範 API Gateway

Web 開發 資安

現代應用程式中,安全存取控制和便捷的使用者經驗至關重要。本文將探討如何使用 Python 建構安全可靠的單一登入(SSO)系統,並結合進階存取控制機制,提升應用程式安全性。首先,我們將探討根據角色的存取控制(RBAC)的進階實作,包含動態許可權管理和稽核日誌,接著介紹如何整合 OAuth 2.0 和 OpenID Connect 以及 SAML 協定來實作 SSO,並示範 JWT 的驗證流程和使用 Redis 進行 Session 管理的最佳實務。最後,我們將探討如何使用 FastAPI 建構 API Gateway 並進行 Token 驗證,並深入研究資料保護與加密技術,包含對稱與非對稱加密、安全金鑰管理以及雜湊運算,以確保資料安全最佳實踐,保護應用程式免受未經授權的存取和資料洩漏。

進階存取控制與單一登入系統實作

在現代化的應用程式中,存取控制和單一登入(SSO)系統扮演著至關重要的角色。本章將探討根據角色的存取控制(RBAC)機制、其進階實作以及單一登入系統的技術細節。

根據角色的存取控制(RBAC)進階實作

RBAC 是一種廣泛採用的存取控制機制,其核心概念是將許可權分配給角色而非直接分配給使用者。使用者透過被分配不同的角色來獲得相應的系統許可權。

動態許可權管理與稽核

在複雜的系統環境中,動態許可權管理和稽核功能至關重要。透過 Python 開發者可以實作以下功能:

  1. 外部目錄服務整合:使用 ldap3 等套件與外部目錄服務進行整合,實作使用者角色與群組成員資格的同步。
  2. 結構化日誌記錄:實作結構化日誌記錄機制,記錄存取控制決策相關資訊,便於稽核和事件回應。
import logging
import json

#### 設定日誌記錄器
logger = logging.getLogger("rbac_logger")
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def log_rbac_event(user: dict, action: str, status: str, details: dict = None):
    """記錄 RBAC 事件"""
    event = {
        "user": user.get("username"),
        "roles": user.get("roles"),
        "action": action,
        "status": status,
        "details": details or {}
    }
    logger.info(json.dumps(event))

@requires_permission("create")
def create_resource(user: dict, resource_data: dict) -> str:
    """建立資源"""
    result = f"Resource created by {user['username']}"
    log_rbac_event(user, "create_resource", "success", {"resource": resource_data})
    return result

# 示例日誌記錄
if __name__ == '__main__':
    user_admin = {"username": "admin_user", "roles": ["admin"]}
    print(create_resource(user_admin, {"name": "Test Resource"}))

內容解密:

  1. 日誌設定:建立一個名為 rbac_logger 的日誌記錄器,並設定適當的日誌級別和格式。
  2. log_rbac_event 函式:負責記錄 RBAC 相關事件,包括使用者資訊、動作、狀態和詳細資訊。
  3. create_resource 函式:示範如何使用裝飾器進行許可權檢查,並在成功建立資源後記錄事件。

單一登入(SSO)系統實作

SSO 系統提供了一個集中式的身份驗證機制,允許使用者在多個應用程式之間無縫切換。實作 SSO 需要整合聯合身份驗證協定,如 SAML 和 OAuth 2.0 with OpenID Connect。

使用 OAuth 2.0 with OpenID Connect 實作 SSO

以下範例示範如何使用 Flask 和 Authlib 實作 SSO 登入流程:

from authlib.integrations.flask_client import OAuth
from flask import Flask, redirect, url_for, request, session
import os

app = Flask(__name__)
app.secret_key = os.urandom(24)

# 設定 OAuth for SSO
oauth = OAuth(app)
auth0 = oauth.register(
    'auth0',
    client_id='YOUR_CLIENT_ID',
    client_secret='YOUR_CLIENT_SECRET',
    api_base_url='https://your-idp-provider.com',
    access_token_url='https://your-idp-provider.com/oauth/token',
    authorize_url='https://your-idp-provider.com/authorize',
    client_kwargs={
        'scope': 'openid profile email',
    },
)

@app.route('/login')
def login():
    """SSO 登入端點"""
    redirect_uri = url_for('callback', _external=True)
    return auth0.authorize_redirect(redirect_uri)

@app.route('/callback')
def callback():
    """處理 SSO 回撥"""
    token = auth0.authorize_access_token()
    session['user'] = token
    return redirect('/dashboard')

@app.route('/dashboard')
def dashboard():
    """受保護的儀錶板頁面"""
    if 'user' not in session:
        return redirect(url_for('login'))
    return f"Welcome {session['user'].get('userinfo', {}).get('name', 'User')}"

#### 圖表翻譯:
此圖示呈現了 SSO 登入流程的主要步驟包括使用者驗證授權碼取得和存取令牌交換

內容解密:

  1. SSO 登入流程:使用者首先被重定向到 IdP 的驗證端點進行身份驗證。
  2. 授權碼交換:驗證成功後,IdP 傳回授權碼,SP 使用該碼交換存取令牌。
  3. 會話管理:令牌被儲存在使用者會話中,用於後續的身份驗證。

JWT 驗證實作

在 SSO 系統中,令牌驗證是至關重要的環節。以下範例示範如何使用 PyJWT 進行 JWT 驗證:

import jwt
import requests

def fetch_public_keys(jwks_url: str) -> dict:
    """從 JWKS 端點取得公鑰"""
    response = requests.get(jwks_url)
    response.raise_for_status()
    return response.json()

def verify_id_token(id_token: str, jwks_url: str, audience: str, issuer: str):
    """驗證 ID 令牌"""
    jwks = fetch_public_keys(jwks_url)
    public_key = jwks['keys'][0]
    try:
        decoded = jwt.decode(
            id_token,
            key=jwt.algorithms.RSAAlgorithm.from_jwk(public_key),
            algorithms=['RS256'],
            audience=audience,
            issuer=issuer
        )
        return decoded
    except jwt.ExpiredSignatureError:
        raise Exception("ID token expired")
    except jwt.InvalidTokenError as e:
        raise Exception("Invalid ID token: " + str(e))

#### 圖表翻譯:
此圖示展示了 JWT 驗證流程包括公鑰取得令牌解碼和驗證檢查

內容解密:

  1. 公鑰取得:從 JWKS 端點取得用於簽名驗證的公鑰。
  2. 令牌解碼:使用取得的公鑰對 JWT 進行解碼和驗證。
  3. 錯誤處理:處理可能的過期或無效令牌錯誤。

單一登入(SSO)在Python中的進階實作

使用Redis進行Session管理

在現代的單一登入(SSO)系統中,高效的session管理是確保使用者經驗流暢的關鍵。以下是一個使用Redis進行session管理的Python範例:

import redis
import json
import datetime

# 初始化Redis客戶端
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def store_user_session(session_id: str, session_data: dict, expiration_seconds: int):
    """
    將使用者session儲存到Redis中。
    
    :param session_id: 唯一的session ID
    :param session_data: session資料
    :param expiration_seconds: session過期時間(秒)
    """
    session_data['expires_at'] = (datetime.datetime.utcnow() + datetime.timedelta(seconds=expiration_seconds)).isoformat()
    redis_client.setex(f"session:{session_id}", expiration_seconds, json.dumps(session_data))

def retrieve_user_session(session_id: str) -> dict:
    """
    從Redis中檢索使用者session。
    
    :param session_id: 唯一的session ID
    :return: session資料,若不存在則傳回空字典
    """
    session_json = redis_client.get(f"session:{session_id}")
    if session_json:
        return json.loads(session_json)
    return {}

def delete_user_session(session_id: str):
    """
    從Redis中刪除使用者session。
    
    :param session_id: 唯一的session ID
    """
    redis_client.delete(f"session:{session_id}")

# 範例用法
session_id = "unique-user-session-id"
store_user_session(session_id, {"user_id": "user123", "roles": ["admin"]}, 3600)
session_data = retrieve_user_session(session_id)
print(session_data)
delete_user_session(session_id)

內容解密:

  1. store_user_session函式將session資料儲存到Redis中,並設定過期時間。過期時間的設定確保了session不會無限期存在於系統中。
  2. retrieve_user_session函式從Redis中檢索指定的session資料。若session存在,則傳回解析後的JSON資料;否則傳回空字典。
  3. delete_user_session函式用於刪除指定的session,確保在使用者登出或session過期時能夠正確清理資源。

SAML協定在SSO中的應用

SAML(Security Assertion Markup Language)是一種常見的SSO協定,用於在不同系統間交換認證和授權資料。以下是一個使用python3-saml函式庫實作SAML SSO的範例:

from onelogin.saml2.auth import OneLogin_Saml2_Auth
from flask import request, redirect, session

def init_saml_auth(req):
    # 從設定檔或環境變數載入SAML設定
    auth = OneLogin_Saml2_Auth(req, custom_base_path='/path/to/saml/settings')
    return auth

@app.route('/sso/saml', methods=['GET', 'POST'])
def saml_sso():
    req = prepare_flask_request(request)
    auth = init_saml_auth(req)
    
    if 'SAMLResponse' in request.form:
        # 處理SAML回應
        auth.process_response()
        errors = auth.get_errors()
        if not errors:
            # 將使用者屬性儲存到session中
            session['user'] = auth.get_attributes()
            return redirect('/dashboard')
        else:
            return "SAML回應錯誤: " + ','.join(errors)
    else:
        # 重定向到SAML IdP進行認證
        return redirect(auth.login())

內容解密:

  1. init_saml_auth函式初始化SAML Auth物件,載入SAML相關設定。
  2. /sso/saml路由中,若收到SAML回應,則處理該回應並驗證。若無錯誤,則將使用者屬性儲存到session並重定向到/dashboard
  3. 若未收到SAML回應,則重定向到SAML IdP進行使用者認證。

使用FastAPI進行Token驗證

在微服務架構中,使用API Gateway進行集中式的token驗證是常見的做法。以下是一個使用FastAPI進行token驗證的範例:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"

def token_validator(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        decoded = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return decoded
    except jwt.PyJWTError:
        raise HTTPException(status_code=403, detail="無效的認證token")

@app.get("/secure-endpoint")
async def secure_endpoint(user=Depends(token_validator)):
    return {"message": f"Access granted to {user['sub']}"}

內容解密:

  1. token_validator函式用於驗證傳入的Bearer Token。若token有效,則傳回解碼後的token內容;否則丟擲HTTPException。
  2. /secure-endpoint路由中,使用Depends機制依賴token_validator進行token驗證。只有驗證透過的使用者才能存取該端點。

資料保護與加密技術在Python中的應用

本章節探討透過密碼學方法進行資料保護,利用Python函式庫如PyCryptodome和cryptography。涵蓋對稱與非對稱加密技術、安全金鑰管理以及用於資料完整性的雜湊運算。強調資料安全最佳實踐,確保機密性,並保護傳輸中和靜態資料,從而加強Python應用程式抵禦未經授權的存取和資料洩漏。

瞭解資料保護原則

現代系統中的資料保護是一門多導向的學科,遠遠超出了簡單的加密程式。隱私、機密性、完整性和可用性(通常簡稱為CIA三元組,加上隱私作為一個整體概念)是構建安全軟體系統的基礎。每個原則不僅需要對密碼學技術有具體的瞭解,還需要對架構策略和軟體工程最佳實踐有深入的理解。在本文中,我們將分析這些方面,並提供對於進階從業者不可或缺的精確技術細節。

隱私原則的核心是控制和限制對敏感資料的存取。這涉及確保只有授權實體能夠檢視、操作或傳輸資料。在實際環境中,嚴格的隱私控制是透過根據角色的存取控制(RBAC)和資料分類別模式來實施的。進階實作通常整合了根據策略的管理框架,其中策略決策是從應用程式邏輯中外部化的。一個有效的機制是整合根據屬性的存取控制(ABAC)系統。透過ABAC,上下文屬性(如使用者身份、位置、時間和特定於會話的令牌)會根據定義的資料處理策略進行評估。在Python中,這類別系統的實作可以涉及將密碼學認證與細粒度的授權檢查相結合。以下程式碼片段展示了一個使用密碼學簽名驗證令牌完整性後再進行策略評估的ABAC檢查功能的例子:

from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.exceptions import InvalidSignature

def verify_token_signature(token: bytes, signature: bytes, public_key: rsa.RSAPublicKey) -> bool:
    try:
        public_key.verify(
            signature,
            token,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return True
    except InvalidSignature:
        return False

內容解密:

此段程式碼定義了一個函式verify_token_signature,用於驗證令牌的密碼學簽名。函式接收三個引數:token(待驗證的令牌)、signature(簽名)和public_key(公鑰)。它使用cryptography函式庫中的verify方法來檢查簽名是否有效。如果簽名有效,函式傳回True;否則,捕捉InvalidSignature異常並傳回False。此過程確保了令牌在傳輸或儲存過程中的完整性和真實性。

嚴格執行的隱私政策不僅控制資料存取,還要求具備日誌記錄和稽核措施,使用密碼學上安全的軌跡日誌來確保防篡改。這些日誌的底層完整性是透過安全雜湊函式或默克爾樹結構來維持的。

機密性主要是透過對靜態資料和傳輸中資料進行密碼學手段來實作的。加密對於保護資料免受未經授權的暴露是不可或缺的。選擇強大的密碼學原語至關重要;例如,像Galois/計數器模式(GCM)中的高階加密標準(AES)這樣的對稱加密演算法同時提供了機密性和完整性屬性。保持機密性要求金鑰管理過程是安全的,並且金鑰根據已建立的生命週期管理協定頻繁輪換。進階的Python實作使用安全的金鑰派生函式(KDF),如PBKDF2、scrypt或Argon2,從密碼生成金鑰,同時減輕暴力破解風險。以下程式碼片段展示了使用cryptography函式庫中的PBKDF2從密碼短語派生金鑰的過程:

from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os

def derive_key(password: bytes, salt: bytes = os.urandom(16)) -> bytes:
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=100000,
        backend=default_backend()
    )
    key = kdf.derive(password)
    return key

內容解密:

此函式derive_key使用PBKDF2演算法從給定的密碼和鹽值派生出一個金鑰。它接收兩個引數:password(密碼)和salt(鹽值,預設為隨機生成的16位元組)。PBKDF2是一種廣泛使用的金鑰派生函式,能夠減緩暴力破解攻擊,因為它需要大量的計算資源。此實作使用了SHA-256作為雜湊演算法,金鑰長度為32位元組,迭代次數為100,000次,以提高安全性。

在成熟的系統中,加密過程應該與防篡改的日誌記錄機制相結合。為了確保機密性和相關的資料完整性,建議將加密與訊息認證碼(MAC),如HMACs,相結合。作為一種進階技術,像AES-GCM或ChaCha20-Poly1305這樣的認證加密模式是較好的選擇,因為它們消除了單獨計算MAC的需求,從而降低了實作複雜度和密碼學誤用的可能性。

完整性涉及確保資料在儲存或傳輸過程中保持不變的機制。像SHA-256或SHA-3這樣的密碼學雜湊函式為給定的輸入資料提供了確定性的輸出,並且對於檢測任何修改或未經授權的更改至關重要。將資料完整性驗證整合到更大的系統中可能涉及數位簽章或用於分散式系統的默克爾樹,其中葉節點代表個別資料元素,而根雜湊提供整個資料集的整體完整性證明。

資料保護原則與技術關係圖示

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 資料保護原則與技術關係圖示

rectangle "存取控制" as node1
rectangle "屬性評估" as node2
rectangle "加密" as node3
rectangle "金鑰管理" as node4
rectangle "雜湊函式" as node5
rectangle "數位簽章" as node6

node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5
node5 --> node6

@enduml

圖表翻譯: 此圖示闡述了資料保護的三個主要原則(隱私、機密性和完整性)及其相關技術。隱私透過RBAC和ABAC來控制存取;機密性透過AES-GCM加密和PBKDF2金鑰派生來保護;完整性則透過SHA-256雜湊函式和ECDSA數位簽章來確保。每個技術都在其對應的原則下發揮著關鍵作用,共同構成了全面的資料保護體系。

綜上所述,Python中的資料保護與加密技術是一個多導向且複雜的話題,需要對密碼學原理、軟體工程最佳實踐以及架構策略有深入的理解。透過結合對稱和非對稱加密、安全的金鑰管理、以及用於完整性驗證的雜湊函式,可以有效地保護資料免受未經授權的存取和篡改,從而增強Python應用程式的安全性和可靠性。