在非同步程式設計中,有效地管理大量平行任務對於提升應用程式效能至關重要。Python 的 asyncio 函式庫提供了一套完善的工具,讓開發者能夠精細地控制任務的執行流程。本文將探討如何使用 asyncio 進行進階任務排程,包含使用 asyncio.gather 執行多個非同步操作,並利用 asyncio.wait 實作更精細的任務控制,例如設定逾時或取消任務。此外,文章也將介紹如何設計管道模式,以串聯多個非同步操作,確保任務按順序執行。最後,將會探討如何結契約步和非同步程式碼,以及如何處理分享資源的同步問題,以確保應用程式的穩定性和可靠性。
高階非同步任務排程:深入理解與實踐
在現代軟體開發中,特別是在處理 I/O 繫結操作或需要高效能並發執行的應用程式時,Python 的 asyncio 函式庫提供了強大的非同步任務排程功能。本文將探討 asyncio 的進階用法,包括任務排程、並發控制、逾時管理和任務取消等關鍵技術。
使用 asyncio.gather 進行平行任務執行
asyncio.gather 是一種方便的方法,用於平行執行多個協程。它允許開發者將多個可等待物件聚合成一個,並等待它們全部完成。以下是一個簡單的例子:
import asyncio
async def compute_square(n: int) -> int:
await asyncio.sleep(0.1) # 模擬計算延遲
return n * n
async def compute_sum_of_squares(numbers: list[int]) -> int:
tasks = [compute_square(n) for n in numbers]
results = await asyncio.gather(*tasks)
return sum(results)
async def main() -> None:
result = await compute_sum_of_squares([1, 2, 3, 4, 5])
print("Sum of squares:", result)
asyncio.run(main())
內容解密:
compute_square函式使用await asyncio.sleep(0.1)模擬計算延遲。compute_sum_of_squares函式建立多個任務,並使用asyncio.gather等待所有任務完成。asyncio.gather傳回所有任務的結果,之後計算這些結果的總和。
使用 asyncio.wait 進行精細任務控制
當需要更精細地控制任務的執行順序或處理任務逾時和取消時,asyncio.wait 成為一個有力的工具。它傳回兩個集合:已完成的任務和待處理的任務。以下是一個範例:
import asyncio
async def delayed_task(id: int, delay: float) -> str:
await asyncio.sleep(delay)
return f"Task {id} finished after {delay} seconds"
async def scheduler() -> None:
tasks = [asyncio.create_task(delayed_task(i, delay)) for i, delay in enumerate([3, 1, 2], start=1)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for completed in done:
print("Completed:", await completed)
for pending_task in pending:
pending_task.cancel()
print("Cancelled a pending task")
asyncio.run(scheduler())
內容解密:
delayed_task函式模擬延遲任務。scheduler函式建立多個任務,並使用asyncio.wait等待第一個完成的任務。- 已完成的任務被處理,而待處理的任務被取消。
使用管道模式進行依序任務執行
在某些場景下,任務需要依序執行,以確保資料流經各個階段的正確性。以下是一個管道模式的例子:
import asyncio
async def fetch_data(source: str) -> str:
await asyncio.sleep(0.5) # 模擬網路延遲
return f"Data from {source}"
async def process_data(raw_data: str) -> str:
await asyncio.sleep(0.2) # 模擬處理延遲
return raw_data.upper()
async def store_data(data: str) -> None:
await asyncio.sleep(0.1) # 模擬 I/O 延遲
print("Stored:", data)
async def pipeline(source: str) -> None:
raw_data = await fetch_data(source)
processed = await process_data(raw_data)
await store_data(processed)
async def main() -> None:
await asyncio.gather(
pipeline("Source A"),
pipeline("Source B"),
pipeline("Source C")
)
asyncio.run(main())
內容解密:
- 資料從
fetch_data取得,然後傳遞給process_data處理,最後由store_data儲存。 - 每個階段都使用
await將控制權交還給事件迴圈,從而實作並發執行。
同步機制與分享資源保護
在多工環境中,保護分享資源免受競爭條件的影響是至關重要的。可以使用非同步鎖來實作這一點:
import asyncio
shared_resource = 0
lock = asyncio.Lock()
async def safe_increment(id: int) -> None:
global shared_resource
async with lock:
local_copy = shared_resource
await asyncio.sleep(0.1) # 模擬計算延遲
shared_resource = local_copy + 1
print(f"Task {id} incremented to {shared_resource}")
async def run_concurrent_increments() -> None:
tasks = [asyncio.create_task(safe_increment(i)) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(run_concurrent_increments())
內容解密:
- 使用
asyncio.Lock確保在修改分享資源時的互斥存取。 - 在
safe_increment函式中,使用async with lock取得鎖,確保操作的原子性。
超時管理與任務取消
使用 asyncio.wait_for 可以對任務施加逾時限制,避免無限等待。而任務取消則允許應用程式在需要時終止正在執行的任務:
import asyncio
async def long_running_operation() -> str:
await asyncio.sleep(5)
return "Operation completed"
async def main() -> None:
try:
result = await asyncio.wait_for(long_running_operation(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
內容解密:
- 使用
asyncio.wait_for對long_running_operation施加逾時限制。 - 當操作逾時時,會引發
asyncio.TimeoutError例外。
自定義排程演算法
在某些情況下,開發者可能需要實作自定義的排程演算法,以滿足特定的效能需求或優先順序要求。這可以透過擴充套件 asyncio.create_task 或使用外部函式庫來實作。
非同步程式設計中的任務排程和平行控制
在非同步環境中,任務排程和平行控制是確保系統高效運作的關鍵。透過使用 await 關鍵字,可以有效地管理任務執行、同步依賴操作、實施超時和取消,以及整合自定義的排程策略。
優先順序任務排程
在某些應用場景中,任務的優先順序至關重要。以下是一個使用優先順序佇列排程任務的範例:
import asyncio
import heapq
from typing import Any
class PrioritizedTask:
def __init__(self, priority: int, coro: Any):
self.priority = priority
self.coro = coro
def __lt__(self, other: "PrioritizedTask") -> bool:
return self.priority < other.priority
async def priority_scheduler(tasks: list[PrioritizedTask]) -> None:
# 建立一個根據堆積的優先順序佇列
heapq.heapify(tasks)
while tasks:
prioritized_task = heapq.heappop(tasks)
result = await prioritized_task.coro
print(f"優先順序 {prioritized_task.priority} 的任務結果:{result}")
async def sample_task(value: int, delay: float) -> int:
await asyncio.sleep(delay)
return value
async def main() -> None:
tasks = [
PrioritizedTask(3, sample_task(100, 0.3)),
PrioritizedTask(1, sample_task(200, 0.1)),
PrioritizedTask(2, sample_task(300, 0.2))
]
await priority_scheduler(tasks)
asyncio.run(main())
內容解密:
PrioritizedTask類別:定義了一個具有優先順序屬性的任務類別,用於在優先順序佇列中排序。priority_scheduler函式:實作了一個優先順序排程器,將任務按照優先順序順序執行。sample_task函式:模擬了一個非同步任務,延遲一定時間後傳回一個值。main函式:建立了一組具有不同優先順序的任務,並呼叫priority_scheduler進行排程。
生產者-消費者模式
生產者-消費者模式是一種常見的平行設計模式,用於協調資料的生產和消費。以下是一個使用 asyncio.Queue 實作的範例:
import asyncio
async def producer(queue: asyncio.Queue, count: int) -> None:
for i in range(count):
await asyncio.sleep(0.1) # 模擬生產延遲
await queue.put(i)
print(f"生產了 {i}")
await queue.put(None) # 傳送結束訊號
async def consumer(queue: asyncio.Queue) -> None:
while True:
item = await queue.get()
if item is None:
break
print(f"消費了 {item}")
await asyncio.sleep(0.2) # 模擬消費延遲
async def main() -> None:
queue = asyncio.Queue()
await asyncio.gather(
producer(queue, 10),
consumer(queue)
)
asyncio.run(main())
內容解密:
producer函式:模擬了一個生產者,不斷地將資料放入佇列中。consumer函式:模擬了一個消費者,從佇列中取出資料並進行處理。main函式:建立了一個佇列,並同時啟動了生產者和消費者。
結契約步和非同步程式碼
在實際應用中,往往需要結契約步和非同步程式碼,以充分利用兩者的優勢。以下是一些常見的整合策略:
使用 run_in_executor 方法
import asyncio
import concurrent.futures
def read_large_file(path: str) -> str:
with open(path, 'r') as f:
return f.read()
async def async_read_file(path: str) -> str:
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
return await loop.run_in_executor(pool, read_large_file, path)
async def main() -> None:
content = await async_read_file("large_data.txt")
print(content[:200])
asyncio.run(main())
內容解密:
read_large_file函式:一個同步的檔案讀取函式。async_read_file函式:將同步的檔案讀取函式封裝為非同步函式,使用run_in_executor方法線上程池中執行。main函式:呼叫async_read_file函式讀取檔案內容。
使用裝飾器將同步函式轉換為非同步函式
import asyncio
import functools
def run_sync_in_executor(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
@run_sync_in_executor
def blocking_fetch(url: str) -> str:
import requests
response = requests.get(url)
response.raise_for_status()
return response.text
async def main() -> None:
url = "https://example.com"
result = await blocking_fetch(url)
print(result[:200])
asyncio.run(main())
內容解密:
run_sync_in_executor裝飾器:將同步函式轉換為非同步函式,使用run_in_executor方法線上程池中執行。blocking_fetch函式:一個同步的網路請求函式,使用requests函式庫。main函式:呼叫blocking_fetch函式進行網路請求。