返回文章列表

分散式系統非同步程式設計應用

本文探討分散式系統中非同步程式設計的應用,涵蓋非同步 HTTP 操作、訊息消費者和斷路器模式等關鍵技術,並提供 Python 程式碼範例,同時也探討非同步應用程式中的安全性考量,例如保護分享狀態、輸入驗證和清理以及拒絕服務攻擊防護。

分散式系統 後端開發

非同步程式設計是建構高效能分散式系統的根本,特別是在微服務架構和事件驅動系統中,它能有效處理大量並發連線和服務間通訊。本文除了介紹非同步程式設計的核心概念和優勢,更著重於實務應用,提供 Python aiohttpasyncio 框架的程式碼範例,示範如何建構非同步 HTTP 伺服器和客戶端、非同步訊息消費者,以及如何應用斷路器模式提升系統容錯能力。此外,文章也探討了非同步應用程式中常見的安全性挑戰,例如分享狀態保護、輸入驗證和DoS攻擊防護,並提供相應的解決方案和程式碼示例,幫助開發者構建更安全可靠的非同步應用程式。

10.5 分散式系統中的非同步程式設計應用

非同步程式設計為分散式系統提供了一個強大的正規化,能夠實作高效的資源利用、降低延遲並提高並發性。在微服務架構、事件驅動的基礎設施和異質網路環境中,這些特性至關重要,因為可擴充套件性和回應速度是關鍵。利用分散式系統中的非同步方法涉及一個全面的方法,涵蓋網路I/O、分散式訊息傳遞和服務間通訊。

核心優勢:避免阻塞I/O操作

非同步程式設計的核心優勢在於透過避免阻塞I/O操作來減少網路延遲。在分散式系統中,遠端程式呼叫(RPC)和HTTP請求非常常見,使用非同步通訊端和I/O多路復用等非阻塞抽象確保單個服務例項能夠管理大量並發連線。這對於整合需要高吞吐量的協定(如WebSockets或HTTP/2)尤其有利。一個有效的技術是實作非同步事件迴圈來協調I/O活動,從而將網路操作與應用邏輯分離。

實際範例:使用aiohttp進行非同步HTTP客戶端和伺服器整合

在分散式環境中,微服務可能需要處理傳入的HTTP請求,同時查詢其他服務。aiohttp框架利用事件迴圈來管理多個HTTP連線。以下程式碼片段展示了服務如何同時提供HTTP請求和執行非同步的服務間通訊:

import aiohttp
import asyncio

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.text()

async def handle_request(request):
    async with aiohttp.ClientSession() as session:
        data = await fetch_data(session, 'http://example.com/data')
        return aiohttp.web.Response(text=data)

app = aiohttp.web.Application()
app.router.add_get('/', handle_request)

async def main():
    runner = aiohttp.web.AppRunner(app)
    await runner.setup()
    site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
    await site.start()

asyncio.run(main())

內容解密:

  1. fetch_data 函式:此函式使用 aiohttp.ClientSession 非同步地從指定的 URL 取得資料。它展示瞭如何使用非同步 HTTP 客戶端進行服務間通訊。
  2. handle_request 函式:此函式處理傳入的 HTTP 請求。它呼叫 fetch_data 從另一個服務取得資料,並將結果作為 HTTP 回應傳回,體現了非同步處理請求的能力。
  3. main 函式:設定並啟動 aiohttp web 伺服器,展示瞭如何將非同步應用程式佈署為 HTTP 服務。

非同步程式設計在分散式系統中的優勢

  1. 提高並發性:非同步程式設計允許服務處理多個請求而無需等待前一個請求完成,從而提高了並發性。
  2. 降低延遲:透過避免阻塞操作,非同步程式設計減少了整體延遲,提高了系統的回應速度。
  3. 高效的資源利用:非同步操作使得資源(如 CPU 和網路頻寬)能夠更有效地被利用,因為執行緒不需要長時間等待 I/O 操作完成。

分散式系統中的非同步程式設計

分散式系統的複雜性要求高效的非同步程式設計來處理網路延遲、服務間通訊和資料一致性等挑戰。非同步操作使系統能夠在等待遠端回應的同時處理其他任務,從而提高整體吞吐量並降低延遲。

非同步HTTP操作

在分散式系統中,非同步HTTP操作對於服務間通訊至關重要。以下範例展示瞭如何使用aiohttp函式庫建立非同步HTTP伺服器,並從遠端服務擷取資料:

from aiohttp import web, ClientSession
import asyncio

async def fetch_data(session: ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def handle_request(request: web.Request) -> web.Response:
    async with ClientSession() as session:
        data = await fetch_data(session, 'http://remote-service/api/data')
        processed_data = await process_data(data)
        return web.json_response(processed_data)

async def process_data(data: dict) -> dict:
    await asyncio.sleep(0.1)  # 模擬非同步處理
    return {k: v * 2 for k, v in data.items()}

app = web.Application()
app.router.add_get('/', handle_request)

if __name__ == '__main__':
    web.run_app(app, port=8080)

內容解密:

  1. fetch_data函式:使用ClientSession非同步擷取遠端服務的資料。
  2. handle_request函式:處理進入的請求,並呼叫fetch_data擷取資料後進行處理。
  3. process_data函式:模擬非同步資料處理,將輸入資料的每個值乘以2。

非同步訊息消費者

在分散式系統中,訊息代理(如RabbitMQ)用於服務間通訊。以下範例展示瞭如何使用aio_pika函式庫建立非同步訊息消費者:

import asyncio
import aio_pika

async def process_message(message: aio_pika.IncomingMessage) -> None:
    async with message.process():
        payload = message.body.decode()
        print(f"Processing message: {payload}")
        await asyncio.sleep(0.1)  # 模擬處理時間

async def consume(queue_name: str) -> None:
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    queue = await channel.declare_queue(queue_name, durable=True)
    await queue.consume(process_message)
    print("Consuming messages...")
    return connection

async def main():
    connection = await consume("distributed_queue")
    try:
        await asyncio.Future()
    finally:
        await connection.close()

if __name__ == "__main__":
    asyncio.run(main())

內容解密:

  1. process_message函式:處理接收到的訊息,解碼並模擬處理過程。
  2. consume函式:建立與RabbitMQ的連線,並設定訊息佇列的消費者。
  3. main函式:啟動訊息消費者的主函式,保持連線直到手動停止。

非同步斷路器模式

斷路器模式用於防止分散式系統中的級聯失敗。以下範例展示瞭如何實作非同步斷路器:

import asyncio
from aiohttp import ClientSession, ClientError

class AsyncCircuitBreaker:
    def __init__(self, threshold: int, recovery_timeout: float):
        self.failure_count = 0
        self.threshold = threshold
        self.recovery_timeout = recovery_timeout
        self.state = "CLOSED"
        self.last_failure_time = None

    async def call(self, coro):
        if self.state == "OPEN":
            if (asyncio.get_event_loop().time() - self.last_failure_time) > self.recovery_timeout:
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit Breaker is OPEN")
        try:
            result = await coro
            self.failure_count = 0
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = asyncio.get_event_loop().time()
            if self.failure_count >= self.threshold:
                self.state = "OPEN"
            raise e

async def fetch_with_cb(url: str, cb: AsyncCircuitBreaker) -> dict:
    async with ClientSession() as session:
        async def fetch():
            async with session.get(url) as response:
                return await response.json()
        return await cb.call(fetch())

async def main():
    cb = AsyncCircuitBreaker(threshold=3, recovery_timeout=5)
    try:
        data = await fetch_with_cb("http://remote-service/api/info", cb)
        print("Fetched data:", data)
    except Exception as ex:
        print("Request failed:", ex)

if __name__ == "__main__":
    asyncio.run(main())

內容解密:

  1. AsyncCircuitBreaker類別:實作斷路器模式,追蹤失敗次數並在必要時開啟或關閉電路。
  2. call方法:包裝非同步操作,根據斷路器的狀態決定是否執行或丟擲異常。
  3. fetch_with_cb函式:使用斷路器包裝HTTP請求,增強系統的容錯能力。

非同步應用程式中的安全性考量

非同步應用程式由於其平行和非阻塞架構,引入了獨特的安全挑戰,需要謹慎的設計和實作。非同步正規化依賴事件迴圈和交錯任務執行,需要重新審視傳統的安全模型。進階開發人員必須考慮非同步漏洞的微妙之處,例如競爭條件、具有安全隱患的死鎖、不正確的狀態傳播,以及可能利用事件迴圈飽和的拒絕服務攻擊。

保護分享狀態

在非同步應用程式中,一個基本問題是保護分享狀態。多個協程平行執行可能會存取分享的敏感資料,而沒有同步執行所提供的典型序列化保證。這增加了競爭條件的風險,可能導致狀態不一致或資訊洩漏。開發人員應使用非同步鎖(如 asyncio.Lock)來強制存取可變分享資源時的互斥。

範例:安全地讀取和更新組態

import asyncio

shared_config = {"api_key": "secret_value"}
config_lock = asyncio.Lock()

async def read_config(key: str) -> str:
    async with config_lock:
        return shared_config.get(key, "")

async def update_config(key: str, value: str) -> None:
    async with config_lock:
        shared_config[key] = value

async def config_worker():
    # 安全地讀取和更新組態
    api_key = await read_config("api_key")
    print("Current API Key:", api_key)
    await update_config("api_key", "new_secret_value")

asyncio.run(config_worker())

內容解密:

  1. asyncio.Lock 用途:使用 asyncio.Lock 來確保在存取分享資源 shared_config 時的互斥,防止多個任務同時修改或讀取組態,導致競爭條件。
  2. read_configupdate_config 函式:這兩個函式透過 async with config_lock 來鎖定分享資源,確保在讀取或更新組態時不會被其他任務幹擾。
  3. config_worker 示例:展示如何安全地讀取和更新 API 金鑰,避免潛在的競爭條件。

輸入驗證和清理

在非同步應用程式中,輸入驗證和清理必須保持嚴格,特別是因為輸入請求通常是平行處理的。非同步 I/O 框架(如 aiohttp)需要中介軟體,可以在不阻塞事件迴圈的情況下執行令牌驗證、輸入檢查和速率限制。

範例:根據令牌的驗證中介軟體

from aiohttp import web
import asyncio
import jwt

SECRET = "very_secret_key"

@web.middleware
async def auth_middleware(request: web.Request, handler):
    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    if not token:
        return web.json_response({"error": "Missing token"}, status=401)
    try:
        # 非同步解碼令牌(模擬密集的加密操作)
        decoded = await asyncio.to_thread(jwt.decode, token, SECRET, algorithms=["HS256"])
        request["user"] = decoded.get("user")
    except jwt.PyJWTError as ex:
        return web.json_response({"error": "Invalid token"}, status=403)
    return await handler(request)

async def protected_handler(request: web.Request) -> web.Response:
    user = request.get("user", "anonymous")
    return web.json_response({"message": f"Hello, {user}"})

app = web.Application(middlewares=[auth_middleware])
app.router.add_get("/protected", protected_handler)

if __name__ == "__main__":
    web.run_app(app, port=8081)

內容解密:

  1. auth_middleware 函式:此中介軟體檢查請求中的授權令牌,並使用 jwt.decode 驗證令牌的有效性。如果令牌無效或缺失,則傳回錯誤回應。
  2. asyncio.to_thread 用途:將 CPU 密集型的令牌解碼操作轉移到執行緒中,避免阻塞事件迴圈,確保系統吞吐量和低延遲。
  3. protected_handler 函式:處理受保護的路由,只有透過驗證的使用者才能存取。

拒絕服務(DoS)攻擊防護

非同步應用程式面臨著拒絕服務(DoS)攻擊的重大威脅。攻擊者可能利用事件迴圈的非阻塞特性,透過傳送大量輕量但資源密集的任務來使系統飽和。這類別攻擊可能導致排程佇列飽和,增加延遲並可能導致服務不可用。

防護措施:

  1. 速率限制:對輸入請求實施速率限制,防止過多的請求使系統飽和。
  2. 資源管理:有效管理任務佇列,避免無限制地接受新任務。
  3. 超時機制:為任務設定合理的超時機制,確保長時間執行的任務不會佔用資源過久。