返回文章列表

Python多程式模組進階應用與效能調校

本文探討 Python 多程式模組的高階應用,包含任務粒度控制、例外處理、行程親和性與訊號處理、非同步程式設計整合、以及分享記憶體與資料交換技術,涵蓋 Value、Array、shared_memory、管道和佇列等方法,並提供程式碼範例與詳細解說,有效提升多程式應用程式的效能和穩定性。

Python 多程式

Python 的多程式模組在 CPU 密集型任務中扮演關鍵角色。提升多程式應用效能的關鍵在於控制任務粒度,避免頻繁的行程間通訊。本文將探討任務聚合、例外處理、行程親和性、訊號處理等進階議題,並深入研究分享記憶體、管道、佇列等資料交換機制,最後示範如何結合多程式與 asyncio 進行高效平行處理,搭配實際程式碼案例,帶領讀者掌握多程式模組的最佳實踐。

多程式模組的高階應用與效能調校

在現代Python應用程式開發中,多程式(multiprocessing)模組扮演著至關重要的角色,特別是在處理CPU密集型任務時。本章將探討多程式模組的高階應用,包括效能調校、例外處理、以及進階的資料交換技術。

任務粒度控制與聚合

在使用多程式模組時,合理控制任務粒度(task granularity)對於避免過高的行程間通訊(IPC)成本至關重要。任務聚合(task aggregation)是一種有效的技術,透過將小任務批次處理後再分發給工作行程,可以減少資料交換的頻率並分攤序列化開銷。

程式碼範例:任務聚合

import multiprocessing as mp
import ctypes

def update_shared_array(shared_arr, index, value):
    shared_arr[index] = value

if __name__ == '__main__':
    size = 10
    shared_arr = mp.Array(ctypes.c_double, size)
    processes = [mp.Process(target=update_shared_array, args=(shared_arr, i, float(i))) for i in range(size)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print("Shared array:", list(shared_arr))

內容解密:

  1. 使用multiprocessing.Array建立分享陣列。
  2. 建立多個子行程平行更新分享陣列的不同索引位置。
  3. 主行程等待所有子行程完成後列印最終的分享陣列內容。

例外處理

在多程式環境中,子行程中的例外預設不會傳播到父行程。為瞭解決這一問題,可以使用自定義的包裝器或apply_async方法來處理例外。

程式碼範例:例外處理

import multiprocessing as mp

def task(x):
    if x % 2 == 0:
        raise ValueError(f"Invalid input: {x}")
    return x * x

def error_callback(err):
    print("Error encountered:", err)

if __name__ == '__main__':
    with mp.Pool(processes=4) as pool:
        results = []
        for i in range(10):
            results.append(pool.apply_async(task, args=(i,), error_callback=error_callback))
        outputs = [r.get() for r in results if r.successful()]
    print("Successful outputs:", outputs)

內容解密:

  1. 定義一個可能引發例外的任務函式task
  2. 使用apply_async方法非同步執行任務,並指定例外回呼函式error_callback
  3. 收整合功執行的任務結果。

行程親和性與訊號處理

在某些高效能應用中,需要控制行程親和性、優先處理任務或管理行程間訊號。雖然這通常需要與作業系統層級的API介面,但進階開發者可以使用ossignal模組來監控和調整行程行為。

程式碼範例:訊號處理

import multiprocessing as mp
import signal
import time

def worker():
    print("Worker started with PID:", mp.current_process().pid)
    while True:
        time.sleep(1)

def init_worker():
    signal.signal(signal.SIGTERM, signal.SIG_IGN)

if __name__ == '__main__':
    pool = mp.Pool(processes=4, initializer=init_worker)
    try:
        pool.map_async(lambda x: time.sleep(5), range(10)).get(timeout=10)
    except mp.TimeoutError:
        print("Timeout reached, terminating the pool!")
        pool.terminate()
    finally:
        pool.join()

內容解密:

  1. 定義一個持續執行的worker函式。
  2. 使用signal.signal忽略SIGTERM訊號,使worker行程對該訊號免疫。
  3. 在主行程中建立一個行程池,並在超時後終止所有行程。

混合使用多程式與非同步程式設計

結合多程式模組與非同步程式設計框架(如asyncio)可以實作CPU密集型任務與I/O密集型任務的高效平行處理。

程式碼範例:混合使用多程式與asyncio

import asyncio
import concurrent.futures
import multiprocessing as mp
import time

def cpu_bound(x):
    total = 0
    for _ in range(1000000):
        total += x * x
    return total

async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
        tasks = [loop.run_in_executor(executor, cpu_bound, i) for i in range(10)]
        results = await asyncio.gather(*tasks)
    print("Async results:", results)

asyncio.run(main())

內容解密:

  1. 定義一個CPU密集型的計算函式cpu_bound
  2. 在非同步主函式main中使用ProcessPoolExecutor平行執行CPU密集型任務。
  3. 使用asyncio.gather收集所有任務的結果。

分享記憶體與資料交換

在平行程式設計中,高效的資料交換至關重要。Python的多程式模組提供了多種分享記憶體構件,如ValueArray以及新增的shared_memory模組。

程式碼範例:使用分享記憶體

import numpy as np
from multiprocessing import shared_memory, Process

def worker(shm_name, shape, dtype):
    shm = shared_memory.SharedMemory(name=shm_name)
    shared_array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
    shared_array *= 2
    shm.close()

if __name__ == '__main__':
    data = np.arange(100, dtype=np.int64).reshape(10, 10)
    shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
    shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
    np.copyto(shared_array, data)
    
    p = Process(target=worker, args=(shm.name, data.shape, data.dtype))
    p.start()
    p.join()
    
    print("Updated array:")
    print(shared_array)
    shm.close()
    shm.unlink()

內容解密:

  1. 建立一個分享記憶體區塊並初始化為NumPy陣列。
  2. 在子行程中重新附加到該分享記憶體區塊並修改其內容。
  3. 主行程存取修改後的資料並列印結果。

進階多程式資料交換技術

在需要有序、事務性資料交換的場景中,例如傳播控制訊息或串流即時更新,管道(Pipes)是一個極佳的選擇。

管道實作範例

import multiprocessing as mp

def pipe_worker(conn):
    # 等待來自父程式的訊息
    msg = conn.recv()
    conn.send(f"Received: {msg}")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = mp.Pipe()
    p = mp.Process(target=pipe_worker, args=(child_conn,))
    p.start()
    parent_conn.send("Hello from parent")
    response = parent_conn.recv()
    print(response)
    p.join()

內容解密:

  1. 使用 mp.Pipe() 建立父子連線。
  2. 子程式接收來自父程式的訊息並回傳確認訊息。
  3. 父程式傳送訊息並接收子程式的回應。

佇列(Queues)應用

佇列建立在管道之上,提供了一個 FIFO 介面,簡化了多程式間的大規模訊息傳遞。

import multiprocessing as mp
import time

def producer(q, items):
    for item in items:
        q.put(item)
        time.sleep(0.1)
    q.put(None)  # 終止訊號

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        time.sleep(0.2)

if __name__ == '__main__':
    queue = mp.Queue()
    p = mp.Process(target=producer, args=(queue, range(10)))
    c = mp.Process(target=consumer, args=(queue,))
    p.start()
    c.start()
    p.join()
    c.join()

內容解密:

  1. 生產者程式將專案放入佇列。
  2. 消費者程式從佇列中取出專案並處理。
  3. 使用 None 作為終止訊號。

結合分享記憶體與佇列的混合架構

在處理大型資料集時,主程式可將資料載入分享記憶體,並透過佇列傳送控制訊息。工作程式重新連線到分享記憶體,處理其分配的段,並透過佇列回傳進度或結果。

同步化與鎖定機制

使用分享記憶體時,需注意同步化問題。可結合鎖定機制(如 multiprocessing.Lock)來安全地執行平行寫入。

import multiprocessing as mp
import ctypes

def safe_update(shared_arr, index, value, lock):
    with lock:
        shared_arr[index] = value

if __name__ == '__main__':
    size = 10
    shared_arr = mp.Array(ctypes.c_int, size)
    lock = mp.Lock()
    processes = [mp.Process(target=safe_update, args=(shared_arr, i, i*10, lock)) for i in range(size)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print("Final shared array:", list(shared_arr))

內容解密:

  1. 使用 mp.Array 建立分享陣列。
  2. 使用 mp.Lock 確保寫入操作的原子性。

記憶體對映檔案

在某些系統中,記憶體對映檔案(Memory-Mapped Files)比分享記憶體更適合處理持久或大規模資料集。

import mmap
import os

filename = 'shared_data.bin'
size = 1024  # 分配 1KB

with open(filename, 'wb') as f:
    f.write(b'\x00' * size)

with open(filename, 'r+b') as f:
    mm = mmap.mmap(f.fileno(), size)
    mm[0:11] = b'Hello World'
    print(mm[0:11])
    mm.close()

os.remove(filename)

內容解密:

  1. 建立一個檔案並寫入初始資料。
  2. 使用 mmap.mmap 將檔案對映到記憶體。
  3. 對對映的記憶體進行讀寫操作。