返回文章列表

Python多執行緒與SocketIO即時通訊

本文探討 Python 多執行緒技術與 Socket.IO 的整合應用,涵蓋執行緒管理、GIL 限制、ThreadPoolExecutor 的使用,以及構建即時通訊應用程式。文章提供程式碼範例,解析多執行緒的建立、同步、平行處理,以及如何使用 Socket.IO

Web 開發 後端開發

Python 的 threading 模組提供簡潔的介面管理多執行緒,但受限於全域解析器鎖(GIL),CPU 密集型任務無法充分發揮多核優勢。本文介紹如何利用 threading 模組建立及管理執行緒,包含繼承 threading.Thread 類別並覆寫 run() 方法,以及使用 join() 等待執行緒完成。同時也說明如何利用 ThreadPoolExecutor 和上下文管理器,更有效地管理執行緒池,提交非同步任務,以提升程式效能。此外,本文也示範如何整合 Socket.IO 建立即時通訊應用,包含伺服器端使用 aiohttp 與 asyncio 處理非同步連線,以及客戶端如何連線、傳送和接收訊息。透過結合多執行緒和 Socket.IO,開發者能有效地構建高效率、即時互動的網路應用。

管理Python中的執行緒

執行緒是作業系統可以排程的資料流,可以在單核心上平行執行,或在多核心上平行執行。執行緒與行程的概念相似:它們都是正在執行的程式碼。主要區別在於,執行緒是在行程內執行的,而行程之間分享資源,如記憶體。

建立簡單的執行緒

要在Python中處理執行緒,我們需要使用threading模組,該模組提供了一個更方便的介面,讓開發者可以處理多個執行緒。以下範例中,我們建立了四個執行緒,每個執行緒都列印出一條不同的訊息,這些訊息是作為引數傳遞給thread_message(message)方法的。

import threading
import time

num_threads = 4

def thread_message(message):
    global num_threads
    num_threads -= 1
    print('Message from thread %s\n' % message)
    while num_threads > 0:
        print("I am the %s thread" % num_threads)
        threading.Thread(target=thread_message, args=("I am the %s thread" % num_threads,)).start()
        time.sleep(0.1)

# #### 內容解密:
# 這段程式碼展示瞭如何使用`threading`模組建立多個執行緒。每個執行緒都會執行`thread_message`函式,並傳遞一條訊息作為引數。`num_threads`變數用於控制執行緒的數量。

### 使用threading模組

`threading`模組包含了一個`Thread`類別我們需要擴充這個類別來建立自己的執行緒。`run`方法將包含我們要在執行緒中執行的程式碼

```python
import threading

def myTask():
    print("Hello World: {}".format(threading.current_thread()))

myFirstThread = threading.Thread(target=myTask)
myFirstThread.start()

# #### 內容解密:
# 在這個範例中,我們建立了一個名為`myTask`的函式,並將其作為目標傳遞給`Thread`建構函式。然後,我們呼叫`start()`方法來啟動執行緒。

### 建立自定義執行緒類別

以下範例中我們建立了一個名為`MyThread`的類別繼承自`threading.Thread`。`run()`方法包含了每個執行緒要執行的程式碼

```python
import threading

class MyThread(threading.Thread):
    def __init__(self, message):
        threading.Thread.__init__(self)
        self.message = message

    def run(self):
        print(self.message)

def test():
    for num in range(0, 10):
        thread = MyThread("I am the " + str(num) + " thread")
        thread.name = num
        thread.start()

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test", number=5))

# #### 內容解密:
# 在這個範例中,我們定義了一個名為`MyThread`的類別,繼承自`threading.Thread`。我們覆寫了`run()`方法,使其列印出傳遞給建構函式的訊息。

### 使用join()方法等待執行緒完成

我們可以使用`join()`方法來等待執行緒完成

```python
import threading

class thread_message(threading.Thread):
    def __init__(self, message):
        threading.Thread.__init__(self)
        self.message = message

    def run(self):
        print(self.message)

threads = []

def test():
    for num in range(0, 10):
        thread = thread_message("I am the " + str(num) + " thread")
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test", number=5))

# #### 內容解密:
# 在這個範例中,我們使用`join()`方法來等待所有執行緒完成。這確保了主執行緒不會在子執行緒完成之前離開。

## Python中的多執行緒與平行處理

多執行緒應用程式的概念是它允許我們在額外的執行緒上提供程式碼副本並執行它們這允許同時執行多個操作此外當一個行程被阻塞時例如等待輸入/輸出操作作業系統可以將計算時間分配給其他行程

### 多執行緒圖表

```plantuml
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 多執行緒圖表

rectangle "產生" as node1
rectangle "執行" as node2

node1 --> node2

@enduml

此圖示展示了多執行緒的基本概念:主行程產生多個子執行緒,每個子執行緒獨立執行不同的任務。

Python中的多執行緒

Python有一個API,允許開發者編寫具有多個執行緒的應用程式。要開始使用多執行緒,我們將在Python類別中建立一個新的執行緒。這個類別繼承自threading.Thread,並包含管理一個執行緒的程式碼。

透過多執行緒,我們可以從主行程產生幾個行程,並使用每個執行緒以獨立的方式執行不同的任務。

多執行緒與平行處理在 Python 中的應用

在 Python 中,多執行緒是一種常見的平行處理技術,允許開發者同時執行多個任務,以提高程式的執行效率。本章節將探討 Python 中的多執行緒技術,包括其基本概念、使用方法以及相關限制。

建立多執行緒

首先,我們需要建立一個多執行緒的類別。在 ThreadWorker.py 檔案中,我們定義了一個 ThreadWorker 類別,繼承自 threading.Thread

import threading

class ThreadWorker(threading.Thread):
    def __init__(self):
        super(ThreadWorker, self).__init__()

    def run(self):
        for i in range(10):
            print(i)

內容解密:

  • threading.Thread 是 Python 中用於建立執行緒的基本類別。
  • super(ThreadWorker, self).__init__() 初始化父類別。
  • run 方法定義了執行緒要執行的任務,在此例中是一個簡單的迴圈列印數字。

接下來,在 main.py 檔案中,我們建立了一個 ThreadWorker 例項,並啟動它:

import threading
from ThreadWorker import ThreadWorker

def main():
    thread = ThreadWorker()
    thread.start()

if __name__ == "__main__":
    main()

內容解密:

  • thread = ThreadWorker() 建立了一個 ThreadWorker 例項。
  • thread.start() 啟動了執行緒,呼叫了 ThreadWorker 中的 run 方法。

Python 傳統執行緒的限制

Python 的傳統執行緒實作存在一個主要問題,即其執行並非完全非同步。Python 的執行緒執行受限於 全域解析器鎖(GIL),這意味著同一時間內只有一個執行緒能夠被執行,無論有多少個處理器可用。

內容解密:

  • GIL 是 Python 解析器用於同步執行緒執行的機制。
  • 由於 GIL 的存在,多執行緒在 CPU 密集型任務中無法充分利用多核處理器的優勢。

為了最小化 GIL 對效能的影響,可以使用 -O 旗標來呼叫解析器,生成最佳化後的位元碼,減少上下文切換。此外,也可以考慮使用 multiprocessing 模組來繞過 GIL 的限制。

使用 ThreadPoolExecutor 實作平行處理

ThreadPoolExecutor 提供了一個介面來非同步執行任務。我們可以定義一個 ThreadPoolExecutor 物件,並使用其 submit 方法來提交任務:

from concurrent.futures import ThreadPoolExecutor
import threading

def view_thread():
    print("執行中的執行緒")
    print("存取中的執行緒識別碼:{}".format(threading.get_ident()))
    print("執行完成的執行緒:{}".format(threading.current_thread()))

def main():
    executor = ThreadPoolExecutor(max_workers=3)
    thread1 = executor.submit(view_thread)
    thread2 = executor.submit(view_thread)
    thread3 = executor.submit(view_thread)

if __name__ == '__main__':
    main()

內容解密:

  • ThreadPoolExecutor(max_workers=3) 建立了一個最大工作者數為 3 的執行緒池。
  • executor.submit(view_thread) 提交了一個任務到執行緒池中。

使用上下文管理器(Context Manager)

我們也可以將 ThreadPoolExecutor 用作上下文管理器,使用 with 陳述式:

from concurrent.futures import ThreadPoolExecutor

def message(msg):
    print("處理中:{}".format(msg))

def main():
    print("啟動 ThreadPoolExecutor")
    with ThreadPoolExecutor(max_workers=2) as executor:
        future = executor.submit(message, ('訊息 1'))
        future = executor.submit(message, ('訊息 2'))
    print("所有任務完成")

if __name__ == '__main__':
    main()

內容解密:

  • with ThreadPoolExecutor(max_workers=2) as executor: 使用上下文管理器建立了一個最大工作者數為 2 的執行緒池。
  • with 陳述式區塊內提交的任務會在區塊結束時自動等待完成。

使用socket.io進行即時通訊

WebSockets是一種技術,能夠在客戶端和伺服器之間透過TCP連線實作即時通訊,無需客戶端不斷檢查API端點是否有更新或新內容。客戶端建立一個與WebSocket伺服器的連線,並等待監聽新的伺服器事件或訊息。

WebSockets的主要優勢在於它們更高效,因為減少了網路負載,並以訊息的形式向大量客戶端傳送資訊。WebSockets的主要特點包括:

  • 透過單一TCP連線提供雙向(全雙工)通訊。
  • 提供伺服器和其連線客戶端之間的即時通訊,從而實作了新的應用程式的發展,以非同步方式處理事件。
  • 提供平行性並提高效能,最佳化回應時間,從而實作更可靠的網路應用程式。

使用socket.io實作伺服器

要使用socket.io實作伺服器,我們需要引入其他模組,如asyncio和aiohttp:

  • asyncio是一個Python模組,幫助我們在Python中進行單執行緒的平行程式設計。它在Python 3.7中可用,檔案可在https://docs.python.org/3/library/asyncio.html找到。
  • aiohttp是一個用於建立根據asyncio的伺服器和客戶端應用程式的函式庫。該模組使用WebSockets的優勢,在應用程式的不同部分之間進行非同步通訊。檔案可在http://aiohttp.readthedocs.io/en/stable找到。

socket.io伺服器可在官方Python儲存函式庫中找到,可以透過pip安裝:

$ pip3 install python-socketio

完整的檔案可在https://python-socketio.readthedocs.io/en/latest找到。

以下是一個使用Python 3.5+的WebSockets範例,其中我們使用aiohttp框架實作了一個socket.io伺服器。您可以使用pip3 install aiohttp命令安裝此模組。

from aiohttp import web
import socketio

socket_io = socketio.AsyncServer()
app = web.Application()
socket_io.attach(app)

async def index(request):
    return web.Response(text='Hello world from socketio', content_type='text/html')

@socket_io.on('message')
def print_message(socket_id, data):
    print("Socket ID: ", socket_id)
    print("Data: ", data)

app.router.add_get('/', index)

if __name__ == '__main__':
    web.run_app(app)

在上述程式碼中,我們實作了一個根據socket.io的伺服器,使用了aiohttp模組。如您在程式碼中所見,我們定義了兩個方法:index()方法和print_message()方法。index()方法將根據“/”根端點請求傳回一個回應訊息,而print_message()方法將列印通訊端識別碼和由事件發出的資料。此方法使用@socketio.on('message')註解。

實作使用者端連線到伺服器

要實作客戶端,可以在web_socket_client.py檔案中找到以下程式碼:

import socketio

sio = socketio.Client()

@sio.event
def connect():
    print('connection established')

@sio.event
def disconnect():
    print('disconnected from server')

sio.connect('http://localhost:8080')
sio.emit('message', {'data': 'my_data'})
sio.wait()

在上述程式碼中,我們使用socketio.Client()類別的connect()方法連線到正在監聽8080埠的伺服器。我們定義了兩個方法,一個用於連線,另一個用於斷開連線。

要呼叫伺服器中的print_message()函式,我們需要發出訊息事件,並將資料作為物件字典傳遞。

要執行前兩個指令碼,我們需要分別執行兩個終端,一個用於客戶端,另一個用於伺服器。首先,您需要執行伺服器,然後執行客戶端,以檢查作為訊息傳送的資訊。

程式碼解析:

  1. 伺服器端實作

    • 使用aiohttp建立一個Web應用,並附加socketio.AsyncServer()例項。
    • 定義了一個index函式來處理根URL請求,傳回一個簡單的回應。
    • 使用@socket_io.on('message')裝飾器定義了一個事件處理函式print_message,當接收到message事件時列印出通訊端ID和傳遞的資料。
  2. 客戶端實作

    • 建立了一個socketio.Client()例項,並定義了連線建立和斷開時的事件處理函式。
    • 使用sio.connect()方法連線到伺服器,並使用sio.emit()方法發出一個message事件,將資料傳送到伺服器。
  3. 執行與測試

    • 分別在兩個終端中執行伺服器和客戶端指令碼,觀察伺服器是否正確接收並列印出客戶端傳送的資料。

本章節介紹瞭如何使用Python的socket.io函式庫來建立支援即時通訊的WebSocket伺服器和客戶端。透過使用asyncio和aiohttp等非同步程式設計工具,能夠有效地處理平行連線和訊息傳遞,為開發即時網路應用提供了有力的支援。