Python 的多程式模組在 CPU 密集型任務中扮演關鍵角色。提升多程式應用效能的關鍵在於控制任務粒度,避免頻繁的行程間通訊。本文將探討任務聚合、例外處理、行程親和性、訊號處理等進階議題,並深入研究分享記憶體、管道、佇列等資料交換機制,最後示範如何結合多程式與 asyncio 進行高效平行處理,搭配實際程式碼案例,帶領讀者掌握多程式模組的最佳實踐。
多程式模組的高階應用與效能調校
在現代Python應用程式開發中,多程式(multiprocessing)模組扮演著至關重要的角色,特別是在處理CPU密集型任務時。本章將探討多程式模組的高階應用,包括效能調校、例外處理、以及進階的資料交換技術。
任務粒度控制與聚合
在使用多程式模組時,合理控制任務粒度(task granularity)對於避免過高的行程間通訊(IPC)成本至關重要。任務聚合(task aggregation)是一種有效的技術,透過將小任務批次處理後再分發給工作行程,可以減少資料交換的頻率並分攤序列化開銷。
程式碼範例:任務聚合
import multiprocessing as mp
import ctypes
def update_shared_array(shared_arr, index, value):
shared_arr[index] = value
if __name__ == '__main__':
size = 10
shared_arr = mp.Array(ctypes.c_double, size)
processes = [mp.Process(target=update_shared_array, args=(shared_arr, i, float(i))) for i in range(size)]
for p in processes:
p.start()
for p in processes:
p.join()
print("Shared array:", list(shared_arr))
內容解密:
- 使用
multiprocessing.Array建立分享陣列。 - 建立多個子行程平行更新分享陣列的不同索引位置。
- 主行程等待所有子行程完成後列印最終的分享陣列內容。
例外處理
在多程式環境中,子行程中的例外預設不會傳播到父行程。為瞭解決這一問題,可以使用自定義的包裝器或apply_async方法來處理例外。
程式碼範例:例外處理
import multiprocessing as mp
def task(x):
if x % 2 == 0:
raise ValueError(f"Invalid input: {x}")
return x * x
def error_callback(err):
print("Error encountered:", err)
if __name__ == '__main__':
with mp.Pool(processes=4) as pool:
results = []
for i in range(10):
results.append(pool.apply_async(task, args=(i,), error_callback=error_callback))
outputs = [r.get() for r in results if r.successful()]
print("Successful outputs:", outputs)
內容解密:
- 定義一個可能引發例外的任務函式
task。 - 使用
apply_async方法非同步執行任務,並指定例外回呼函式error_callback。 - 收整合功執行的任務結果。
行程親和性與訊號處理
在某些高效能應用中,需要控制行程親和性、優先處理任務或管理行程間訊號。雖然這通常需要與作業系統層級的API介面,但進階開發者可以使用os和signal模組來監控和調整行程行為。
程式碼範例:訊號處理
import multiprocessing as mp
import signal
import time
def worker():
print("Worker started with PID:", mp.current_process().pid)
while True:
time.sleep(1)
def init_worker():
signal.signal(signal.SIGTERM, signal.SIG_IGN)
if __name__ == '__main__':
pool = mp.Pool(processes=4, initializer=init_worker)
try:
pool.map_async(lambda x: time.sleep(5), range(10)).get(timeout=10)
except mp.TimeoutError:
print("Timeout reached, terminating the pool!")
pool.terminate()
finally:
pool.join()
內容解密:
- 定義一個持續執行的worker函式。
- 使用
signal.signal忽略SIGTERM訊號,使worker行程對該訊號免疫。 - 在主行程中建立一個行程池,並在超時後終止所有行程。
混合使用多程式與非同步程式設計
結合多程式模組與非同步程式設計框架(如asyncio)可以實作CPU密集型任務與I/O密集型任務的高效平行處理。
程式碼範例:混合使用多程式與asyncio
import asyncio
import concurrent.futures
import multiprocessing as mp
import time
def cpu_bound(x):
total = 0
for _ in range(1000000):
total += x * x
return total
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
tasks = [loop.run_in_executor(executor, cpu_bound, i) for i in range(10)]
results = await asyncio.gather(*tasks)
print("Async results:", results)
asyncio.run(main())
內容解密:
- 定義一個CPU密集型的計算函式
cpu_bound。 - 在非同步主函式
main中使用ProcessPoolExecutor平行執行CPU密集型任務。 - 使用
asyncio.gather收集所有任務的結果。
分享記憶體與資料交換
在平行程式設計中,高效的資料交換至關重要。Python的多程式模組提供了多種分享記憶體構件,如Value、Array以及新增的shared_memory模組。
程式碼範例:使用分享記憶體
import numpy as np
from multiprocessing import shared_memory, Process
def worker(shm_name, shape, dtype):
shm = shared_memory.SharedMemory(name=shm_name)
shared_array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
shared_array *= 2
shm.close()
if __name__ == '__main__':
data = np.arange(100, dtype=np.int64).reshape(10, 10)
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
np.copyto(shared_array, data)
p = Process(target=worker, args=(shm.name, data.shape, data.dtype))
p.start()
p.join()
print("Updated array:")
print(shared_array)
shm.close()
shm.unlink()
內容解密:
- 建立一個分享記憶體區塊並初始化為NumPy陣列。
- 在子行程中重新附加到該分享記憶體區塊並修改其內容。
- 主行程存取修改後的資料並列印結果。
進階多程式資料交換技術
在需要有序、事務性資料交換的場景中,例如傳播控制訊息或串流即時更新,管道(Pipes)是一個極佳的選擇。
管道實作範例
import multiprocessing as mp
def pipe_worker(conn):
# 等待來自父程式的訊息
msg = conn.recv()
conn.send(f"Received: {msg}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = mp.Pipe()
p = mp.Process(target=pipe_worker, args=(child_conn,))
p.start()
parent_conn.send("Hello from parent")
response = parent_conn.recv()
print(response)
p.join()
內容解密:
- 使用
mp.Pipe()建立父子連線。 - 子程式接收來自父程式的訊息並回傳確認訊息。
- 父程式傳送訊息並接收子程式的回應。
佇列(Queues)應用
佇列建立在管道之上,提供了一個 FIFO 介面,簡化了多程式間的大規模訊息傳遞。
import multiprocessing as mp
import time
def producer(q, items):
for item in items:
q.put(item)
time.sleep(0.1)
q.put(None) # 終止訊號
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Consumed: {item}")
time.sleep(0.2)
if __name__ == '__main__':
queue = mp.Queue()
p = mp.Process(target=producer, args=(queue, range(10)))
c = mp.Process(target=consumer, args=(queue,))
p.start()
c.start()
p.join()
c.join()
內容解密:
- 生產者程式將專案放入佇列。
- 消費者程式從佇列中取出專案並處理。
- 使用
None作為終止訊號。
結合分享記憶體與佇列的混合架構
在處理大型資料集時,主程式可將資料載入分享記憶體,並透過佇列傳送控制訊息。工作程式重新連線到分享記憶體,處理其分配的段,並透過佇列回傳進度或結果。
同步化與鎖定機制
使用分享記憶體時,需注意同步化問題。可結合鎖定機制(如 multiprocessing.Lock)來安全地執行平行寫入。
import multiprocessing as mp
import ctypes
def safe_update(shared_arr, index, value, lock):
with lock:
shared_arr[index] = value
if __name__ == '__main__':
size = 10
shared_arr = mp.Array(ctypes.c_int, size)
lock = mp.Lock()
processes = [mp.Process(target=safe_update, args=(shared_arr, i, i*10, lock)) for i in range(size)]
for p in processes:
p.start()
for p in processes:
p.join()
print("Final shared array:", list(shared_arr))
內容解密:
- 使用
mp.Array建立分享陣列。 - 使用
mp.Lock確保寫入操作的原子性。
記憶體對映檔案
在某些系統中,記憶體對映檔案(Memory-Mapped Files)比分享記憶體更適合處理持久或大規模資料集。
import mmap
import os
filename = 'shared_data.bin'
size = 1024 # 分配 1KB
with open(filename, 'wb') as f:
f.write(b'\x00' * size)
with open(filename, 'r+b') as f:
mm = mmap.mmap(f.fileno(), size)
mm[0:11] = b'Hello World'
print(mm[0:11])
mm.close()
os.remove(filename)
內容解密:
- 建立一個檔案並寫入初始資料。
- 使用
mmap.mmap將檔案對映到記憶體。 - 對對映的記憶體進行讀寫操作。