非同步程式設計是建構高效能分散式系統的根本,特別是在微服務架構和事件驅動系統中,它能有效處理大量並發連線和服務間通訊。本文除了介紹非同步程式設計的核心概念和優勢,更著重於實務應用,提供 Python aiohttp 和 asyncio 框架的程式碼範例,示範如何建構非同步 HTTP 伺服器和客戶端、非同步訊息消費者,以及如何應用斷路器模式提升系統容錯能力。此外,文章也探討了非同步應用程式中常見的安全性挑戰,例如分享狀態保護、輸入驗證和DoS攻擊防護,並提供相應的解決方案和程式碼示例,幫助開發者構建更安全可靠的非同步應用程式。
10.5 分散式系統中的非同步程式設計應用
非同步程式設計為分散式系統提供了一個強大的正規化,能夠實作高效的資源利用、降低延遲並提高並發性。在微服務架構、事件驅動的基礎設施和異質網路環境中,這些特性至關重要,因為可擴充套件性和回應速度是關鍵。利用分散式系統中的非同步方法涉及一個全面的方法,涵蓋網路I/O、分散式訊息傳遞和服務間通訊。
核心優勢:避免阻塞I/O操作
非同步程式設計的核心優勢在於透過避免阻塞I/O操作來減少網路延遲。在分散式系統中,遠端程式呼叫(RPC)和HTTP請求非常常見,使用非同步通訊端和I/O多路復用等非阻塞抽象確保單個服務例項能夠管理大量並發連線。這對於整合需要高吞吐量的協定(如WebSockets或HTTP/2)尤其有利。一個有效的技術是實作非同步事件迴圈來協調I/O活動,從而將網路操作與應用邏輯分離。
實際範例:使用aiohttp進行非同步HTTP客戶端和伺服器整合
在分散式環境中,微服務可能需要處理傳入的HTTP請求,同時查詢其他服務。aiohttp框架利用事件迴圈來管理多個HTTP連線。以下程式碼片段展示了服務如何同時提供HTTP請求和執行非同步的服務間通訊:
import aiohttp
import asyncio
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.text()
async def handle_request(request):
async with aiohttp.ClientSession() as session:
data = await fetch_data(session, 'http://example.com/data')
return aiohttp.web.Response(text=data)
app = aiohttp.web.Application()
app.router.add_get('/', handle_request)
async def main():
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
await site.start()
asyncio.run(main())
內容解密:
fetch_data函式:此函式使用aiohttp.ClientSession非同步地從指定的 URL 取得資料。它展示瞭如何使用非同步 HTTP 客戶端進行服務間通訊。handle_request函式:此函式處理傳入的 HTTP 請求。它呼叫fetch_data從另一個服務取得資料,並將結果作為 HTTP 回應傳回,體現了非同步處理請求的能力。main函式:設定並啟動 aiohttp web 伺服器,展示瞭如何將非同步應用程式佈署為 HTTP 服務。
非同步程式設計在分散式系統中的優勢
- 提高並發性:非同步程式設計允許服務處理多個請求而無需等待前一個請求完成,從而提高了並發性。
- 降低延遲:透過避免阻塞操作,非同步程式設計減少了整體延遲,提高了系統的回應速度。
- 高效的資源利用:非同步操作使得資源(如 CPU 和網路頻寬)能夠更有效地被利用,因為執行緒不需要長時間等待 I/O 操作完成。
分散式系統中的非同步程式設計
分散式系統的複雜性要求高效的非同步程式設計來處理網路延遲、服務間通訊和資料一致性等挑戰。非同步操作使系統能夠在等待遠端回應的同時處理其他任務,從而提高整體吞吐量並降低延遲。
非同步HTTP操作
在分散式系統中,非同步HTTP操作對於服務間通訊至關重要。以下範例展示瞭如何使用aiohttp函式庫建立非同步HTTP伺服器,並從遠端服務擷取資料:
from aiohttp import web, ClientSession
import asyncio
async def fetch_data(session: ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def handle_request(request: web.Request) -> web.Response:
async with ClientSession() as session:
data = await fetch_data(session, 'http://remote-service/api/data')
processed_data = await process_data(data)
return web.json_response(processed_data)
async def process_data(data: dict) -> dict:
await asyncio.sleep(0.1) # 模擬非同步處理
return {k: v * 2 for k, v in data.items()}
app = web.Application()
app.router.add_get('/', handle_request)
if __name__ == '__main__':
web.run_app(app, port=8080)
內容解密:
fetch_data函式:使用ClientSession非同步擷取遠端服務的資料。handle_request函式:處理進入的請求,並呼叫fetch_data擷取資料後進行處理。process_data函式:模擬非同步資料處理,將輸入資料的每個值乘以2。
非同步訊息消費者
在分散式系統中,訊息代理(如RabbitMQ)用於服務間通訊。以下範例展示瞭如何使用aio_pika函式庫建立非同步訊息消費者:
import asyncio
import aio_pika
async def process_message(message: aio_pika.IncomingMessage) -> None:
async with message.process():
payload = message.body.decode()
print(f"Processing message: {payload}")
await asyncio.sleep(0.1) # 模擬處理時間
async def consume(queue_name: str) -> None:
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
queue = await channel.declare_queue(queue_name, durable=True)
await queue.consume(process_message)
print("Consuming messages...")
return connection
async def main():
connection = await consume("distributed_queue")
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
內容解密:
process_message函式:處理接收到的訊息,解碼並模擬處理過程。consume函式:建立與RabbitMQ的連線,並設定訊息佇列的消費者。main函式:啟動訊息消費者的主函式,保持連線直到手動停止。
非同步斷路器模式
斷路器模式用於防止分散式系統中的級聯失敗。以下範例展示瞭如何實作非同步斷路器:
import asyncio
from aiohttp import ClientSession, ClientError
class AsyncCircuitBreaker:
def __init__(self, threshold: int, recovery_timeout: float):
self.failure_count = 0
self.threshold = threshold
self.recovery_timeout = recovery_timeout
self.state = "CLOSED"
self.last_failure_time = None
async def call(self, coro):
if self.state == "OPEN":
if (asyncio.get_event_loop().time() - self.last_failure_time) > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit Breaker is OPEN")
try:
result = await coro
self.failure_count = 0
if self.state == "HALF_OPEN":
self.state = "CLOSED"
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = asyncio.get_event_loop().time()
if self.failure_count >= self.threshold:
self.state = "OPEN"
raise e
async def fetch_with_cb(url: str, cb: AsyncCircuitBreaker) -> dict:
async with ClientSession() as session:
async def fetch():
async with session.get(url) as response:
return await response.json()
return await cb.call(fetch())
async def main():
cb = AsyncCircuitBreaker(threshold=3, recovery_timeout=5)
try:
data = await fetch_with_cb("http://remote-service/api/info", cb)
print("Fetched data:", data)
except Exception as ex:
print("Request failed:", ex)
if __name__ == "__main__":
asyncio.run(main())
內容解密:
AsyncCircuitBreaker類別:實作斷路器模式,追蹤失敗次數並在必要時開啟或關閉電路。call方法:包裝非同步操作,根據斷路器的狀態決定是否執行或丟擲異常。fetch_with_cb函式:使用斷路器包裝HTTP請求,增強系統的容錯能力。
非同步應用程式中的安全性考量
非同步應用程式由於其平行和非阻塞架構,引入了獨特的安全挑戰,需要謹慎的設計和實作。非同步正規化依賴事件迴圈和交錯任務執行,需要重新審視傳統的安全模型。進階開發人員必須考慮非同步漏洞的微妙之處,例如競爭條件、具有安全隱患的死鎖、不正確的狀態傳播,以及可能利用事件迴圈飽和的拒絕服務攻擊。
保護分享狀態
在非同步應用程式中,一個基本問題是保護分享狀態。多個協程平行執行可能會存取分享的敏感資料,而沒有同步執行所提供的典型序列化保證。這增加了競爭條件的風險,可能導致狀態不一致或資訊洩漏。開發人員應使用非同步鎖(如 asyncio.Lock)來強制存取可變分享資源時的互斥。
範例:安全地讀取和更新組態
import asyncio
shared_config = {"api_key": "secret_value"}
config_lock = asyncio.Lock()
async def read_config(key: str) -> str:
async with config_lock:
return shared_config.get(key, "")
async def update_config(key: str, value: str) -> None:
async with config_lock:
shared_config[key] = value
async def config_worker():
# 安全地讀取和更新組態
api_key = await read_config("api_key")
print("Current API Key:", api_key)
await update_config("api_key", "new_secret_value")
asyncio.run(config_worker())
內容解密:
asyncio.Lock用途:使用asyncio.Lock來確保在存取分享資源shared_config時的互斥,防止多個任務同時修改或讀取組態,導致競爭條件。read_config和update_config函式:這兩個函式透過async with config_lock來鎖定分享資源,確保在讀取或更新組態時不會被其他任務幹擾。config_worker示例:展示如何安全地讀取和更新 API 金鑰,避免潛在的競爭條件。
輸入驗證和清理
在非同步應用程式中,輸入驗證和清理必須保持嚴格,特別是因為輸入請求通常是平行處理的。非同步 I/O 框架(如 aiohttp)需要中介軟體,可以在不阻塞事件迴圈的情況下執行令牌驗證、輸入檢查和速率限制。
範例:根據令牌的驗證中介軟體
from aiohttp import web
import asyncio
import jwt
SECRET = "very_secret_key"
@web.middleware
async def auth_middleware(request: web.Request, handler):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token:
return web.json_response({"error": "Missing token"}, status=401)
try:
# 非同步解碼令牌(模擬密集的加密操作)
decoded = await asyncio.to_thread(jwt.decode, token, SECRET, algorithms=["HS256"])
request["user"] = decoded.get("user")
except jwt.PyJWTError as ex:
return web.json_response({"error": "Invalid token"}, status=403)
return await handler(request)
async def protected_handler(request: web.Request) -> web.Response:
user = request.get("user", "anonymous")
return web.json_response({"message": f"Hello, {user}"})
app = web.Application(middlewares=[auth_middleware])
app.router.add_get("/protected", protected_handler)
if __name__ == "__main__":
web.run_app(app, port=8081)
內容解密:
auth_middleware函式:此中介軟體檢查請求中的授權令牌,並使用jwt.decode驗證令牌的有效性。如果令牌無效或缺失,則傳回錯誤回應。asyncio.to_thread用途:將 CPU 密集型的令牌解碼操作轉移到執行緒中,避免阻塞事件迴圈,確保系統吞吐量和低延遲。protected_handler函式:處理受保護的路由,只有透過驗證的使用者才能存取。
拒絕服務(DoS)攻擊防護
非同步應用程式面臨著拒絕服務(DoS)攻擊的重大威脅。攻擊者可能利用事件迴圈的非阻塞特性,透過傳送大量輕量但資源密集的任務來使系統飽和。這類別攻擊可能導致排程佇列飽和,增加延遲並可能導致服務不可用。
防護措施:
- 速率限制:對輸入請求實施速率限制,防止過多的請求使系統飽和。
- 資源管理:有效管理任務佇列,避免無限制地接受新任務。
- 超時機制:為任務設定合理的超時機制,確保長時間執行的任務不會佔用資源過久。