返回文章列表

Python 非同步程式設計

本文探討如何將舊有 Python 同步程式碼轉換為非同步操作,提升系統效能和可擴充套件性。文章涵蓋建立非同步介面卡、依賴注入、版本控制、日誌記錄等關鍵技術,並提供程式碼範例和最佳實踐,幫助開發者逐步實作系統現代化。

Web 開發 系統設計

在 Python 開發中,將同步程式碼轉換為非同步操作是提升系統效能的常見手段。本文介紹如何使用非同步介面卡封裝舊有同步函式,並結合依賴注入、版本控制和日誌記錄等技術,逐步實作系統現代化,同時兼顧程式碼的可維護性和穩定性。文章提供多個程式碼範例,演示如何使用 asynciocProfile 等工具進行效能監控和最佳化,並探討了事件迴圈延遲、Profiling 工具、系統層級監控等進階議題,幫助開發者更好地掌握非同步程式設計的技巧。

將同步舊系統元件改寫為非同步操作的技術

在維護和升級舊有系統的過程中,將同步(synchronous)操作改寫為非同步(asynchronous)操作是一項重要的技術挑戰。這種轉變不僅能提升系統的效能和可擴充套件性,還能改善資源利用率,特別是在處理大量平行操作時。本文將探討如何透過建立非同步介面卡(asynchronous adapters)來封裝舊有的同步函式,以及在這個過程中需要注意的關鍵技術和最佳實踐。

1. 建立非同步介面卡

要將舊有的同步函式整合到非同步環境中,建立一個非同步介面卡是一種有效的策略。介面卡模式允許開發者將現有的同步程式碼包裝在一個非同步介面後面,從而使其能夠在非同步框架中無縫執行。

程式碼範例:建立非同步介面卡

import asyncio
import functools

def legacy_function(param):
    # 模擬阻塞式操作
    import time
    time.sleep(1)
    return f"已處理 {param} 同步操作"

async def legacy_adapter(param):
    loop = asyncio.get_running_loop()
    # 將同步來電轉接到執行器(executor)
    result = await loop.run_in_executor(None, functools.partial(legacy_function, param))
    return result

async def modern_operation():
    # 將非同步介面卡整合到非阻塞流程中
    result = await legacy_adapter("輸入資料")
    return result

if __name__ == '__main__':
    print(asyncio.run(modern_operation()))

內容解密:

  1. legacy_function:這是一個模擬舊有同步操作的函式,它使用 time.sleep(1) 來模擬一個耗時的操作。
  2. legacy_adapter:這個非同步函式作為介面卡,將 legacy_function 封裝在一個非同步操作中。它使用 asyncio.get_running_loop().run_in_executor 將同步函式的執行轉移到執行器中,避免阻塞事件迴圈。
  3. modern_operation:展示瞭如何將 legacy_adapter 整合到一個非阻塞的非同步流程中。

2. 依賴注入與模組化解耦

依賴注入(Dependency Injection, DI)是一種設計模式,用於減少元件之間的耦合度。在舊系統現代化的過程中,DI 可以幫助開發者輕鬆地在同步和非同步實作之間切換,而無需修改高層次的業務邏輯。

程式碼範例:依賴注入實作

import asyncio
from concurrent.futures import ThreadPoolExecutor

class LegacyRepository:
    def fetch_record(self, query):
        import time
        time.sleep(0.5)  # 模擬阻塞式 I/O 操作
        return {"result": "舊資料", "query": query}

class AsyncRepositoryAdapter:
    def __init__(self, legacy_repo, executor=None):
        self.legacy_repo = legacy_repo
        self.executor = executor or ThreadPoolExecutor(max_workers=4)

    async def fetch_record(self, query):
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(self.executor, lambda: self.legacy_repo.fetch_record(query))
        return result

class BusinessService:
    def __init__(self, repository):
        self.repository = repository

    async def process_query(self, query):
        record = await self.repository.fetch_record(query)
        # 在非同步上下文中進行進一步處理
        await asyncio.sleep(0.1)
        return record

if __name__ == '__main__':
    legacy_repo = LegacyRepository()
    async_repo = AsyncRepositoryAdapter(legacy_repo)
    service = BusinessService(async_repo)
    result = asyncio.run(service.process_query("SELECT * FROM table"))
    print(result)

內容解密:

  1. LegacyRepository:代表舊有的資料存取層,使用同步操作。
  2. AsyncRepositoryAdapter:作為介面卡,將 LegacyRepository 的同步操作封裝為非同步介面。
  3. BusinessService:展示瞭如何透過依賴注入,使用 AsyncRepositoryAdapter 來處理查詢。

3. 版本控制與漸進式現代化

在管理舊系統的現代化過程中,制定明確的版本控制策略和漸進式現代化路線圖至關重要。這有助於確保系統的穩定性,同時逐步引入新的非同步元件。

4. 維護透明度與日誌記錄

在橋接舊元件與非同步框架時,全面的日誌記錄和可觀察性變得不可或缺。開發者應當在非同步介面卡中加入日誌記錄,以捕捉關鍵指標,如任務持續時間、狀態轉換和異常事件。

程式碼範例:加入日誌記錄

import asyncio
import logging

# 設定日誌記錄
logging.basicConfig(level=logging.INFO)

async def legacy_adapter_with_logging(param):
    try:
        # 省略實作細節...
        result = await legacy_adapter(param)
        logging.info(f"成功處理 {param}")
        return result
    except Exception as e:
        logging.error(f"處理 {param} 時發生錯誤: {e}")
        raise

# 在現代化操作中使用帶有日誌記錄的介面卡
async def modern_operation_with_logging():
    result = await legacy_adapter_with_logging("輸入資料")
    return result

if __name__ == '__main__':
    print(asyncio.run(modern_operation_with_logging()))

內容解密:

  1. legacy_adapter_with_logging:在原有的 legacy_adapter 上加入了日誌記錄功能,用於捕捉成功或失敗的日誌。
  2. modern_operation_with_logging:展示瞭如何在現代化操作中使用帶有日誌記錄的介面卡。

效能監控與非同步整合的最佳實踐

在將非同步方法整合到舊有或混合程式碼函式庫後,詳細的效能監控變得至關重要,以識別瓶頸、確保回應性,並驗證新的平行模型是否帶來預期的效率提升。進階的開發者必須採用多層次的監控策略,涵蓋應用層級的檢測和系統層級的可觀測性。本文闡述了在混契約步-非同步環境中監控效能的技術和最佳實踐,重點關注細粒度指標、診斷日誌記錄和自動化的效能分析工具。

應用層級的監控

在應用層級,開發者應該利用結構化日誌記錄和自定義檢測來追蹤任務執行時間、資源使用情況和錯誤頻率。在非同步上下文中,挑戰包括準確測量協程的延遲、追蹤等待 I/O 所花費的時間,以及跨平行任務關聯事件。一種有效的方法是整合細粒度的計時裝飾器,記錄非同步操作的開始和結束時間。以下程式碼片段展示了一個用於非同步函式的進階計時器裝飾器:

import asyncio
import time
import functools
import logging

logging.basicConfig(level=logging.INFO)

def async_timer(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            result = await func(*args, **kwargs)
            return result
        finally:
            elapsed = time.perf_counter() - start
            logging.info("函式 %s 執行時間 %.4f 秒", func.__name__, elapsed)
    return wrapper

@async_timer
async def simulated_io_operation(duration):
    await asyncio.sleep(duration)
    return f"在 {duration} 秒內完成"

async def main_monitor():
    tasks = [simulated_io_operation(0.5), simulated_io_operation(1)]
    results = await asyncio.gather(*tasks)
    return results

if __name__ == '__main__':
    asyncio.run(main_monitor())

內容解密:

  1. async_timer 裝飾器:此裝飾器用於測量非同步函式的執行時間。它記錄函式執行的開始和結束時間,並計算差值。
  2. simulated_io_operation 函式:模擬一個 I/O 操作,使用 asyncio.sleep 來模擬延遲。
  3. main_monitor 函式:建立多個任務並使用 asyncio.gather 平行執行它們。
  4. 日誌記錄:透過 logging.info 將函式執行時間記錄到日誌中,提供效能監控的依據。

效能分析

在效能分析方面,Python 的內建 cProfile 模組可以與自定義鉤子結合使用,以捕捉非同步操作的效能特徵。由於 cProfile 本質上是同步的,因此必須設計包裝器來排程非同步任務,同時收集效能分析資料。以下方法結合使用 asynciocProfile

import asyncio
import cProfile
import pstats
import io

async def workload():
    await asyncio.sleep(0.75)
    return "工作負載已完成"

async def run_profiled_task():
    return await workload()

def profile_async(coro):
    pr = cProfile.Profile()
    pr.enable()
    result = asyncio.run(coro)
    pr.disable()
    s = io.StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumtime')
    print(s.getvalue())
    return result

if __name__ == '__main__':
    profile_async(run_profiled_task())

內容解密:

  1. workload 函式:模擬一個非同步工作負載,使用 asyncio.sleep 來模擬延遲。
  2. run_profiled_task 函式:呼叫 workload 函式以執行效能分析。
  3. profile_async 函式:使用 cProfile 對非同步協程進行效能分析,記錄執行時間並輸出統計資料。
  4. 效能統計:透過 pstats.Stats 對效能資料進行排序和輸出,提供詳細的效能分析結果。

監控與最佳化非同步程式的效能

在開發非同步應用程式時,監控和最佳化效能是至關重要的。非同步程式的特性使得傳統的效能監控方法可能不再適用,因此需要採用更為進階的技術和工具。

使用 Profiling 工具分析非同步程式

為了了解非同步程式的效能瓶頸,首先需要使用 profiling 工具對程式進行分析。Python 中的 cProfile 模組可以與非同步程式結合使用,以收集函式呼叫頻率和累積時間等資訊。

import asyncio
import cProfile
import io
import pstats

async def run_profiled_task():
    # 模擬一些非同步操作
    await asyncio.sleep(0.1)
    await asyncio.sleep(0.2)

def profile_async(coro):
    pr = cProfile.Profile()
    pr.enable()
    result = asyncio.run(coro)
    pr.disable()
    s = io.StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
    ps.print_stats()
    print(s.getvalue())
    return result

if __name__ == '__main__':
    result = profile_async(run_profiled_task())
    print(result)

內容解密:

  1. cProfile.Profile():建立一個 profiling 物件,用於收集程式執行期間的效能資料。
  2. pr.enable()pr.disable():啟動和停止 profiling,以捕捉指定程式碼段的效能資訊。
  3. pstats.Stats():將 profiling 結果排序並輸出,幫助開發者找出效能瓶頸。

監控事件迴圈延遲

事件迴圈的延遲是衡量非同步程式效能的重要指標。透過定期檢查預期執行時間與實際執行時間之間的差異,可以監測事件迴圈的延遲。

import asyncio
import time
import logging

logging.basicConfig(level=logging.INFO)

async def monitor_loop_delay(interval=1.0):
    expected = time.monotonic() + interval
    while True:
        await asyncio.sleep(interval)
        delay = time.monotonic() - expected
        logging.info("事件迴圈延遲:%.4f 秒", delay)
        expected = time.monotonic() + interval

async def main_with_monitoring():
    monitor_task = asyncio.create_task(monitor_loop_delay())
    simulated_tasks = [asyncio.create_task(asyncio.sleep(0.5)) for _ in range(10)]
    await asyncio.gather(*simulated_tasks)
    monitor_task.cancel()
    try:
        await monitor_task
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    asyncio.run(main_with_monitoring())

內容解密:

  1. monitor_loop_delay():此函式用於監控事件迴圈的延遲,透過計算預期執行時間與實際執行時間的差異來得出延遲值。
  2. asyncio.create_task():將 monitor_loop_delay() 轉為任務並在背景執行,以持續監控事件迴圈。
  3. logging.info():記錄事件迴圈的延遲,有助於開發者瞭解系統的即時效能狀態。

系統層級的效能監控

除了應用程式內部的監控外,系統層級的效能監控同樣重要。使用 htopiotopperf 等工具,可以深入瞭解非同步操作對硬體資源的影響。

分散式追蹤

分散式追蹤框架(如 Jaeger 或 Zipkin)與 OpenTelemetry 的結合,可以實作跨多個服務的非同步流程追蹤。透過在非同步呼叫中傳遞追蹤識別碼,開發者能夠重建完整的執行路徑。

import asyncio
import contextvars
import logging

trace_id = contextvars.ContextVar("trace_id", default="unknown_trace")

async def traced_operation():
    current_trace = trace_id.get()
    logging.info("開始操作,追蹤 ID:%s", current_trace)
    await asyncio.sleep(0.2)
    logging.info("操作 %s 完成", current_trace)
    return f"來自追蹤 {current_trace} 的結果"

async def main_trace():
    trace_id.set("trace-001")
    result = await traced_operation()
    return result

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    print(asyncio.run(main_trace()))

內容解密:

  1. contextvars.ContextVar():建立一個上下文變數,用於在非同步操作中傳遞追蹤 ID。
  2. trace_id.get()trace_id.set():取得和設定當前操作的追蹤 ID,以實作跨多個非同步操作的追蹤。
  3. logging.info():記錄操作的日誌,並包含追蹤 ID,有助於日後分析和除錯。

即時儀錶板與自動化測試

結合 Prometheus 和 Grafana,可以建立即時儀錶板以視覺化自定義指標。此外,將效能基準測試納入持續整合流程,有助於及早發現效能退化問題。