返回文章列表

Python OAuth OpenID Connect 安全應用

本文探討在 Python 應用程式中整合 OAuth 2.0 和 OpenID Connect 的實務做法,涵蓋授權流程、Token 管理、ID Token 驗證、Token 重新整理機制、錯誤處理與日誌記錄,以及安全性最佳實踐。文章提供程式碼範例,演示如何使用 Authlib 和 requests-oauthlib

Web 開發 資安

在現今的網路應用程式開發中,OAuth 2.0 和 OpenID Connect 已成為不可或缺的授權和驗證機制。OAuth 2.0 負責授權客戶端應用程式存取使用者資源,而 OpenID Connect 則建立在 OAuth 2.0 之上,提供使用者身份驗證的功能。透過妥善運用 Python 的相關函式庫,開發者可以有效地整合這些協定,確保應用程式的安全性。文章將進一步探討如何安全地生成、儲存和驗證 Token,以及如何使用 JWT 和 bcrypt 等技術強化 API 的安全性,並提供實用的程式碼範例,幫助開發者理解並應用這些安全機制。更重要的是,文章強調了安全最佳實踐,例如保護客戶端金鑰、使用 PKCE 和實作 Token 快取,以降低潛在的風險。

OAuth 與 OpenID Connect 在 Python 中的應用

在 Python 應用程式中整合 OAuth 和 OpenID Connect 需要深入瞭解分散式安全協定和根據令牌的驗證機制。OAuth 2.0 是一個授權框架,允許客戶端透過取得存取令牌來存取受保護的資源,而 OpenID Connect 則擴充套件了 OAuth 2.0,透過在 JSON Web Tokens(JWT)中嵌入標準化的宣告來提供身份驗證。

OAuth 2.0 和 OpenID Connect 的工作原理

OAuth 2.0 和 OpenID Connect 的運作依賴於客戶端應用程式、授權伺服器和資源伺服器之間的互動。客戶端發起授權請求,通常將使用者重定向到授權伺服器進行身份驗證。驗證成功後,授權伺服器發出授權碼,客戶端然後將其兌換為存取令牌和(如果適用)ID 令牌。這種程式碼流程對於伺服器端應用程式來說尤其有利,因為它具有令牌壽命短和能夠在伺服器上安全儲存秘密等安全特性。

Python 中的 OAuth 2.0 和 OpenID Connect 實作

在 Python 中,像 Authlib 和 requests-oauthlib 這樣的函式庫為 OAuth 2.0 和 OpenID Connect 流程提供了全面的支援。這些函式庫抽象出了許多底層的 HTTP 重定向和令牌交換的複雜性。然而,進階開發者必須仔細檢查組態細節,確保像 client_idclient_secretredirect_uriscope 這樣的引數以防止像開放重定向或狀態引數篡改等漏洞的方式被解析。

範例程式碼

from authlib.integrations.requests_client import OAuth2Session
import jwt
import datetime
import requests

# 組態 OAuth2 引數以用於 OpenID Connect 提供者
CLIENT_ID = 'your_client_id'
CLIENT_SECRET = 'your_client_secret'
REDIRECT_URI = 'https://yourapp.example.com/callback'
AUTHORIZATION_ENDPOINT = 'https://provider.example.com/oauth/authorize'
TOKEN_ENDPOINT = 'https://provider.example.com/oauth/token'
USERINFO_ENDPOINT = 'https://provider.example.com/oauth/userinfo'
SCOPE = 'openid profile email'

# 初始化 OAuth2 會話
def create_oauth_session(state: str = None) -> OAuth2Session:
    return OAuth2Session(
        CLIENT_ID,
        CLIENT_SECRET,
        scope=SCOPE,
        redirect_uri=REDIRECT_URI,
        state=state
    )

# 將使用者重定向到提供者的授權 URL
def get_authorization_url(oauth: OAuth2Session) -> (str, str):
    uri, state = oauth.create_authorization_url(AUTHORIZATION_ENDPOINT)
    return uri, state

# 將授權碼兌換為令牌
def fetch_tokens(oauth: OAuth2Session, authorization_response: str) -> dict:
    token = oauth.fetch_token(
        TOKEN_ENDPOINT,
        authorization_response=authorization_response,
        client_secret=CLIENT_SECRET,
    )
    return token

# 使用存取令牌檢索使用者資訊
def fetch_userinfo(oauth: OAuth2Session) -> dict:
    return oauth.get(USERINFO_ENDPOINT).json()

# 使用 JWT 驗證 ID 令牌的範例實作
def validate_id_token(id_token: str, issuer: str, audience: str, public_key: str):
    try:
        claims = jwt.decode(
            id_token,
            public_key,
            algorithms=['RS256'],
            issuer=issuer,
            audience=audience,
        )
        # 可以在此處插入額外的宣告檢查
        return claims
    except jwt.ExpiredSignatureError:
        raise Exception("ID 令牌已過期")
    except jwt.InvalidTokenError as e:
        raise Exception("無效的 ID 令牌: " + str(e))

# 範例用法:
if __name__ == '__main__':
    oauth = create_oauth_session()
    authorization_url, state = get_authorization_url(oauth)
    print(f"請前往 {authorization_url} 進行授權")

程式碼解密:

  1. 匯入必要的模組:匯入 OAuth2Sessionjwtdatetimerequests 以支援 OAuth 2.0 和 OpenID Connect 的功能。
  2. 組態 OAuth2 引數:設定客戶端 ID、客戶端秘密、重定向 URI、授權端點、令牌端點、使用者資訊端點和範圍,以初始化 OAuth2 會話。
  3. create_oauth_session 函式:根據提供的狀態建立一個 OAuth2 會話,用於管理和重新整理令牌。
  4. get_authorization_url 函式:生成授權 URL 和狀態,將使用者重定向到授權伺服器進行身份驗證。
  5. fetch_tokens 函式:使用授權碼取得存取令牌和 ID 令牌。
  6. fetch_userinfo 函式:使用存取令牌從使用者資訊端點取得使用者資訊。
  7. validate_id_token 函式:使用公鑰和指定的演算法驗證 ID 令牌的有效性,包括檢查簽名過期和無效令牌錯誤。

多因素身份驗證(MFA)的實作

多因素身份驗證(MFA)是增強應用程式安全性的重要手段。透過結合密碼和一次性密碼(TOTP),可以顯著提高帳戶安全性。

範例程式碼

async def async_verify_totp(token: str, secret: str) -> bool:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, verify_totp, token, secret)

async def mfa_flow(user_id: str, totp_secret: str):
    # 模擬非同步等待使用者輸入或外部挑戰回應
    token = await asyncio.to_thread(input, "輸入 TOTP 令牌: ")
    valid = await async_verify_totp(token, totp_secret)
    if valid:
        print("使用者 MFA 成功:", user_id)
        log_mfa_event(user_id, "totp_verification", {"status": "success"})
    else:
        print("使用者 MFA 失敗:", user_id)
        log_mfa_event(user_id, "totp_verification", {"status": "failure"})

if __name__ == '__main__':
    totp_secret = generate_totp_secret()
    # 為了演示,註冊 URI 將提供給使用者
    print("使用以下 URI 註冊您的裝置:", totp_provisioning_uri("[email protected]", totp_secret))
    asyncio.run(mfa_flow("user42", totp_secret))

程式碼解密:

  1. async_verify_totp 函式:非同步驗證 TOTP 令牌,使用 asyncio.get_event_loop().run_in_executor 在執行緒池中執行同步的 verify_totp 函式。
  2. mfa_flow 函式:執行 MFA 流程,包括等待使用者輸入 TOTP 令牌、驗證令牌並記錄 MFA 事件。
  3. 主程式:生成 TOTP 秘密,顯示註冊 URI 給使用者,並執行 MFA 流程。

OAuth 與 OpenID Connect 的進階實作與安全性考量

在現代的應用程式開發中,OAuth 和 OpenID Connect 已成為驗證和授權的標準協定。這些協定提供了一種安全的方式來管理使用者身份和資源存取許可權。本文將探討 OAuth 和 OpenID Connect 的進階實作,以及相關的安全性考量。

OAuth 流程與 Token 管理

OAuth 流程涉及多個步驟,包括客戶端註冊、授權請求、授權碼交換和 Token 取得。以下是一個簡單的範例,展示瞭如何使用 OAuth2Session 管理 OAuth 流程:

oauth = create_oauth_session()
auth_url, state = get_authorization_url(oauth)
print("Visit this URL to authorize:", auth_url)

# 模擬回撥處理
authorization_response = input("Enter the full callback URL: ")
tokens = fetch_tokens(oauth, authorization_response)
print("Access Token:", tokens.get('access_token'))
print("ID Token:", tokens.get('id_token'))

userinfo = fetch_userinfo(oauth)
print("User Info:", userinfo)

內容解密:

  • create_oauth_session():建立一個 OAuth2Session 例項,用於管理 OAuth 流程。
  • get_authorization_url(oauth):取得授權 URL 和狀態值。
  • fetch_tokens(oauth, authorization_response):使用授權回撥 URL 取得 Token。
  • fetch_userinfo(oauth):使用取得的 Token 取得使用者資訊。

ID Token 驗證

在 OpenID Connect 中,ID Token 是一個 JWT(JSON Web Token),包含使用者的身份資訊。驗證 ID Token 的完整性和真實性是非常重要的。以下是一個範例,展示瞭如何驗證 ID Token:

def fetch_jwks(jwks_uri: str) -> dict:
    response = requests.get(jwks_uri)
    response.raise_for_status()
    return response.json()

JWKS_URI = 'https://provider.example.com/.well-known/jwks.json'
jwks = fetch_jwks(JWKS_URI)
# 使用 jwks 提取相關的公鑰進行簽名驗證

內容解密:

  • fetch_jwks(jwks_uri):從指定的 JWKS URI 取得 JSON Web Key Set。
  • 使用取得的 JWKS 提取相關的公鑰,用於驗證 ID Token 的簽名。

Token 重新整理機制

為了保持無縫的使用者經驗,Token 重新整理機制是必要的。以下是一個範例,展示瞭如何重新整理 Access Token:

def refresh_access_token(oauth: OAuth2Session, refresh_token: str) -> dict:
    new_token = oauth.refresh_token(
        TOKEN_ENDPOINT,
        refresh_token=refresh_token,
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
    )
    return new_token

# 示例用法
if 'refresh_token' in tokens:
    tokens = refresh_access_token(oauth, tokens['refresh_token'])
    print("New Access Token:", tokens.get('access_token'))

內容解密:

  • refresh_access_token(oauth, refresh_token):使用重新整理 Token 取得新的 Access Token。
  • oauth.refresh_token():呼叫 refresh_token 方法重新整理 Token。

錯誤處理與日誌記錄

在 OAuth 和 OpenID Connect 流程中,錯誤處理和日誌記錄是非常重要的。以下是一個範例,展示瞭如何使用 try-except 子句捕捉錯誤:

try:
    # OAuth 和 OpenID Connect 流程
except requests.exceptions.HTTPError as http_err:
    print(f'HTTP error occurred: {http_err}')
except Exception as err:
    print(f'Other error occurred: {err}')

內容解密:

  • 使用 try-except 子句捕捉 HTTP 錯誤和其他異常。
  • 日誌記錄錯誤和相關的上下文資訊,以便快速故障排除和稽核。

安全最佳實踐

  1. 保護客戶端金鑰:將客戶端金鑰儲存在安全的位置,如安全保管函式庫或環境變數中。
  2. 使用 PKCE:在公共客戶端中使用 PKCE(Proof Key for Code Exchange)擴充套件,以防止授權碼攔截攻擊。
  3. 實作 Token 快取:使用快取機制(如 Redis)儲存 Token,以避免重複的 Token 交換。
import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cache_token(user_id: str, token_data: dict) -> None:
    expires_in = token_data.get('expires_in', 3600)
    redis_client.setex(f"token:{user_id}", expires_in, json.dumps(token_data))

def get_cached_token(user_id: str) -> dict:
    token_json = redis_client.get(f"token:{user_id}")
    if token_json:
        return json.loads(token_json)
    return {}

# 示例用法
cache_token("user42", tokens)
cached_tokens = get_cached_token("user42")
print("Cached tokens:", cached_tokens)

內容解密:

  • cache_token(user_id, token_data):將 Token 儲存在 Redis 中,並設定過期時間。
  • get_cached_token(user_id):從 Redis 中取得快取的 Token。

2.5 保護 API 存取的 Token 安全

安全地生成、儲存和驗證 Token 是保護 API 存取和使用者會話管理的關鍵要素。一個健全的根據 Token 的驗證機制,不僅能夠將使用者身份與個別 API 呼叫分離,還能透過嚴格的過期策略、密碼學簽名和安全儲存實踐來最小化潛在的攻擊向量。進階的 Python 實作需要綜合安全隨機 Token 生成、防篡改 Token 編碼和動態復原能力等策略。

Token 生成與安全儲存

Token 的生成應使用密碼學上安全的偽隨機數生成器。Python 的 secrets 模組提供了一套函式來生成不可預測的 Token。開發者通常會使用 Token 作為不透明的會話識別符號或自包含的斷言(如 JSON Web Tokens)。

使用 secrets 生成不透明 Token

import secrets

def generate_opaque_token(length: int = 32) -> str:
    # 生成一個 URL 安全、Base64 編碼的隨機 Token
    return secrets.token_urlsafe(length)

# 使用範例:
opaque_token = generate_opaque_token()

內容解密:

  • secrets.token_urlsafe(length) 用於生成一個 URL 安全的隨機 Token,適合用於會話識別或其他安全相關的用途。
  • length 引數控制生成的 Token 長度,預設為 32 位元組。

使用 JSON Web Tokens (JWT)

當 Token 包含使用者宣告或後設資料時,JSON Web Tokens (JWT) 提供了一個安全且可擴充套件的解決方案。在這種情況下,Token 使用非對稱或對稱金鑰進行數位簽名。PyJWT 函式庫在生成和驗證 JWT 時非常有用。以下範例展示瞭如何使用對稱金鑰生成 JWT,包括自定義宣告和過期強制執行:

import jwt
import datetime

SECRET_KEY = "your-super-secure-secret-key"

def generate_jwt_token(user_id: str, scopes: list, expiration_seconds: int = 3600) -> str:
    payload = {
        "sub": user_id,
        "scopes": scopes,
        "iat": datetime.datetime.utcnow(),
        "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expiration_seconds)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def decode_and_verify_jwt(token: str) -> dict:
    try:
        decoded = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return decoded
    except jwt.ExpiredSignatureError:
        raise Exception("Token 已過期")
    except jwt.InvalidTokenError as e:
        raise Exception("無效的 Token: {}".format(e))

# 使用範例:
token = generate_jwt_token("user123", ["read", "write"])
claims = decode_and_verify_jwt(token)

內容解密:

  • jwt.encode(payload, SECRET_KEY, algorithm="HS256") 使用 HS256 演算法對 payload 進行簽名,生成 JWT。
  • jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) 用於驗證和解碼 JWT,若 Token 過期或無效,會丟擲相應的異常。

安全儲存 Token

對於不透明 Token,通常需要在伺服器端持久化層(如關聯式資料函式庫或 Redis)中儲存其與會話後設資料的關聯。儲存 Token 時,絕不能以明文形式儲存,而應使用密碼學雜湊函式結合鹽值進行雜湊後再儲存。

以下範例展示瞭如何使用 Redis 安全地儲存不透明 Token,並使用 bcrypt 對 Token 進行雜湊:

import bcrypt
import redis
import json
import datetime

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def hash_token(token: str) -> bytes:
    salt = bcrypt.gensalt(rounds=12)
    return bcrypt.hashpw(token.encode('utf-8'), salt)

def store_token(user_id: str, token: str, expiration_seconds: int):
    hashed = hash_token(token)
    token_data = {
        "hash": hashed.decode('utf-8'),
        "expires_at": (datetime.datetime.utcnow() + datetime.timedelta(seconds=expiration_seconds)).isoformat()
    }
    redis_client.setex(f"api_token:{user_id}", expiration_seconds, json.dumps(token_data))

def verify_stored_token(user_id: str, token: str) -> bool:
    token_json = redis_client.get(f"api_token:{user_id}")
    if not token_json:
        return False
    token_data = json.loads(token_json)
    stored_hash = token_data["hash"].encode('utf-8')
    return bcrypt.checkpw(token.encode('utf-8'), stored_hash)

# 使用範例:
user_token = generate_opaque_token()
store_token("user123", user_token, 3600)
assert verify_stored_token("user123", user_token)

內容解密:

  • bcrypt.hashpw(token.encode('utf-8'), salt) 對 Token 進行雜湊處理,使用鹽值增加安全性。
  • redis_client.setex(key, expiration_seconds, value) 將雜湊後的 Token 資料儲存到 Redis,並設定過期時間。

Token 驗證與 API 端點保護

在 API 端點中整合 Token 驗證,可以透過裝飾器(decorator)優雅地實作。以下是一個使用 JWT 進行 Token 驗證的範例:

from functools import wraps
from flask import request, jsonify

def token_required(func):
    @wraps(func)
    def decorated(*args, **kwargs):
        token = None
        if 'Authorization' in request.headers:
            auth_header = request.headers['Authorization']
            parts = auth_header.split()
            if parts[0].lower() == 'bearer' and len(parts) > 1:
                token = parts[1]
        if not token:
            return jsonify({'message': 'Token 缺失!'}), 401
        try:
            claims = decode_and_verify_jwt(token)
            request.user_claims = claims
        except Exception as e:
            return jsonify({'message': str(e)}), 401
        return func(*args, **kwargs)
    return decorated

# 受保護的 API 端點範例
@app.route('/api/secure-data')
@token_required
def secure_data():
    return jsonify({
        "data": "使用者 {} 可存取的安全資料".format(request.user_claims['sub'])
    })

內容解密:

  • 裝飾器 token_required 用於檢查請求頭中的 Authorization,提取並驗證 JWT。
  • 若驗證失敗,傳回相應的錯誤訊息和 HTTP 狀態碼。