返回文章列表

Python 非同步任務排程進階技巧

本文探討 Python `asyncio` 函式庫的進階用法,涵蓋任務排程、並發控制、逾時管理和任務取消等關鍵技術,並提供實務案例,演示如何使用 `asyncio.gather`、`asyncio.wait`

Python 後端開發

在非同步程式設計中,有效地管理大量平行任務對於提升應用程式效能至關重要。Python 的 asyncio 函式庫提供了一套完善的工具,讓開發者能夠精細地控制任務的執行流程。本文將探討如何使用 asyncio 進行進階任務排程,包含使用 asyncio.gather 執行多個非同步操作,並利用 asyncio.wait 實作更精細的任務控制,例如設定逾時或取消任務。此外,文章也將介紹如何設計管道模式,以串聯多個非同步操作,確保任務按順序執行。最後,將會探討如何結契約步和非同步程式碼,以及如何處理分享資源的同步問題,以確保應用程式的穩定性和可靠性。

高階非同步任務排程:深入理解與實踐

在現代軟體開發中,特別是在處理 I/O 繫結操作或需要高效能並發執行的應用程式時,Python 的 asyncio 函式庫提供了強大的非同步任務排程功能。本文將探討 asyncio 的進階用法,包括任務排程、並發控制、逾時管理和任務取消等關鍵技術。

使用 asyncio.gather 進行平行任務執行

asyncio.gather 是一種方便的方法,用於平行執行多個協程。它允許開發者將多個可等待物件聚合成一個,並等待它們全部完成。以下是一個簡單的例子:

import asyncio

async def compute_square(n: int) -> int:
    await asyncio.sleep(0.1)  # 模擬計算延遲
    return n * n

async def compute_sum_of_squares(numbers: list[int]) -> int:
    tasks = [compute_square(n) for n in numbers]
    results = await asyncio.gather(*tasks)
    return sum(results)

async def main() -> None:
    result = await compute_sum_of_squares([1, 2, 3, 4, 5])
    print("Sum of squares:", result)

asyncio.run(main())

內容解密:

  1. compute_square 函式使用 await asyncio.sleep(0.1) 模擬計算延遲。
  2. compute_sum_of_squares 函式建立多個任務,並使用 asyncio.gather 等待所有任務完成。
  3. asyncio.gather 傳回所有任務的結果,之後計算這些結果的總和。

使用 asyncio.wait 進行精細任務控制

當需要更精細地控制任務的執行順序或處理任務逾時和取消時,asyncio.wait 成為一個有力的工具。它傳回兩個集合:已完成的任務和待處理的任務。以下是一個範例:

import asyncio

async def delayed_task(id: int, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Task {id} finished after {delay} seconds"

async def scheduler() -> None:
    tasks = [asyncio.create_task(delayed_task(i, delay)) for i, delay in enumerate([3, 1, 2], start=1)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for completed in done:
        print("Completed:", await completed)
    for pending_task in pending:
        pending_task.cancel()
        print("Cancelled a pending task")

asyncio.run(scheduler())

內容解密:

  1. delayed_task 函式模擬延遲任務。
  2. scheduler 函式建立多個任務,並使用 asyncio.wait 等待第一個完成的任務。
  3. 已完成的任務被處理,而待處理的任務被取消。

使用管道模式進行依序任務執行

在某些場景下,任務需要依序執行,以確保資料流經各個階段的正確性。以下是一個管道模式的例子:

import asyncio

async def fetch_data(source: str) -> str:
    await asyncio.sleep(0.5)  # 模擬網路延遲
    return f"Data from {source}"

async def process_data(raw_data: str) -> str:
    await asyncio.sleep(0.2)  # 模擬處理延遲
    return raw_data.upper()

async def store_data(data: str) -> None:
    await asyncio.sleep(0.1)  # 模擬 I/O 延遲
    print("Stored:", data)

async def pipeline(source: str) -> None:
    raw_data = await fetch_data(source)
    processed = await process_data(raw_data)
    await store_data(processed)

async def main() -> None:
    await asyncio.gather(
        pipeline("Source A"),
        pipeline("Source B"),
        pipeline("Source C")
    )

asyncio.run(main())

內容解密:

  1. 資料從 fetch_data 取得,然後傳遞給 process_data 處理,最後由 store_data 儲存。
  2. 每個階段都使用 await 將控制權交還給事件迴圈,從而實作並發執行。

同步機制與分享資源保護

在多工環境中,保護分享資源免受競爭條件的影響是至關重要的。可以使用非同步鎖來實作這一點:

import asyncio

shared_resource = 0
lock = asyncio.Lock()

async def safe_increment(id: int) -> None:
    global shared_resource
    async with lock:
        local_copy = shared_resource
        await asyncio.sleep(0.1)  # 模擬計算延遲
        shared_resource = local_copy + 1
        print(f"Task {id} incremented to {shared_resource}")

async def run_concurrent_increments() -> None:
    tasks = [asyncio.create_task(safe_increment(i)) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(run_concurrent_increments())

內容解密:

  1. 使用 asyncio.Lock 確保在修改分享資源時的互斥存取。
  2. safe_increment 函式中,使用 async with lock 取得鎖,確保操作的原子性。

超時管理與任務取消

使用 asyncio.wait_for 可以對任務施加逾時限制,避免無限等待。而任務取消則允許應用程式在需要時終止正在執行的任務:

import asyncio

async def long_running_operation() -> str:
    await asyncio.sleep(5)
    return "Operation completed"

async def main() -> None:
    try:
        result = await asyncio.wait_for(long_running_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())

內容解密:

  1. 使用 asyncio.wait_forlong_running_operation 施加逾時限制。
  2. 當操作逾時時,會引發 asyncio.TimeoutError 例外。

自定義排程演算法

在某些情況下,開發者可能需要實作自定義的排程演算法,以滿足特定的效能需求或優先順序要求。這可以透過擴充套件 asyncio.create_task 或使用外部函式庫來實作。

非同步程式設計中的任務排程和平行控制

在非同步環境中,任務排程和平行控制是確保系統高效運作的關鍵。透過使用 await 關鍵字,可以有效地管理任務執行、同步依賴操作、實施超時和取消,以及整合自定義的排程策略。

優先順序任務排程

在某些應用場景中,任務的優先順序至關重要。以下是一個使用優先順序佇列排程任務的範例:

import asyncio
import heapq
from typing import Any

class PrioritizedTask:
    def __init__(self, priority: int, coro: Any):
        self.priority = priority
        self.coro = coro

    def __lt__(self, other: "PrioritizedTask") -> bool:
        return self.priority < other.priority

async def priority_scheduler(tasks: list[PrioritizedTask]) -> None:
    # 建立一個根據堆積的優先順序佇列
    heapq.heapify(tasks)
    while tasks:
        prioritized_task = heapq.heappop(tasks)
        result = await prioritized_task.coro
        print(f"優先順序 {prioritized_task.priority} 的任務結果:{result}")

async def sample_task(value: int, delay: float) -> int:
    await asyncio.sleep(delay)
    return value

async def main() -> None:
    tasks = [
        PrioritizedTask(3, sample_task(100, 0.3)),
        PrioritizedTask(1, sample_task(200, 0.1)),
        PrioritizedTask(2, sample_task(300, 0.2))
    ]
    await priority_scheduler(tasks)

asyncio.run(main())

內容解密:

  1. PrioritizedTask 類別:定義了一個具有優先順序屬性的任務類別,用於在優先順序佇列中排序。
  2. priority_scheduler 函式:實作了一個優先順序排程器,將任務按照優先順序順序執行。
  3. sample_task 函式:模擬了一個非同步任務,延遲一定時間後傳回一個值。
  4. main 函式:建立了一組具有不同優先順序的任務,並呼叫 priority_scheduler 進行排程。

生產者-消費者模式

生產者-消費者模式是一種常見的平行設計模式,用於協調資料的生產和消費。以下是一個使用 asyncio.Queue 實作的範例:

import asyncio

async def producer(queue: asyncio.Queue, count: int) -> None:
    for i in range(count):
        await asyncio.sleep(0.1)  # 模擬生產延遲
        await queue.put(i)
        print(f"生產了 {i}")
    await queue.put(None)  # 傳送結束訊號

async def consumer(queue: asyncio.Queue) -> None:
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"消費了 {item}")
        await asyncio.sleep(0.2)  # 模擬消費延遲

async def main() -> None:
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue)
    )

asyncio.run(main())

內容解密:

  1. producer 函式:模擬了一個生產者,不斷地將資料放入佇列中。
  2. consumer 函式:模擬了一個消費者,從佇列中取出資料並進行處理。
  3. main 函式:建立了一個佇列,並同時啟動了生產者和消費者。

結契約步和非同步程式碼

在實際應用中,往往需要結契約步和非同步程式碼,以充分利用兩者的優勢。以下是一些常見的整合策略:

使用 run_in_executor 方法

import asyncio
import concurrent.futures

def read_large_file(path: str) -> str:
    with open(path, 'r') as f:
        return f.read()

async def async_read_file(path: str) -> str:
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        return await loop.run_in_executor(pool, read_large_file, path)

async def main() -> None:
    content = await async_read_file("large_data.txt")
    print(content[:200])

asyncio.run(main())

內容解密:

  1. read_large_file 函式:一個同步的檔案讀取函式。
  2. async_read_file 函式:將同步的檔案讀取函式封裝為非同步函式,使用 run_in_executor 方法線上程池中執行。
  3. main 函式:呼叫 async_read_file 函式讀取檔案內容。

使用裝飾器將同步函式轉換為非同步函式

import asyncio
import functools

def run_sync_in_executor(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)
    return wrapper

@run_sync_in_executor
def blocking_fetch(url: str) -> str:
    import requests
    response = requests.get(url)
    response.raise_for_status()
    return response.text

async def main() -> None:
    url = "https://example.com"
    result = await blocking_fetch(url)
    print(result[:200])

asyncio.run(main())

內容解密:

  1. run_sync_in_executor 裝飾器:將同步函式轉換為非同步函式,使用 run_in_executor 方法線上程池中執行。
  2. blocking_fetch 函式:一個同步的網路請求函式,使用 requests 函式庫。
  3. main 函式:呼叫 blocking_fetch 函式進行網路請求。