Python 的 threading 模組提供簡潔的介面管理多執行緒,但受限於全域解析器鎖(GIL),CPU 密集型任務無法充分發揮多核優勢。本文介紹如何利用 threading 模組建立及管理執行緒,包含繼承 threading.Thread 類別並覆寫 run() 方法,以及使用 join() 等待執行緒完成。同時也說明如何利用 ThreadPoolExecutor 和上下文管理器,更有效地管理執行緒池,提交非同步任務,以提升程式效能。此外,本文也示範如何整合 Socket.IO 建立即時通訊應用,包含伺服器端使用 aiohttp 與 asyncio 處理非同步連線,以及客戶端如何連線、傳送和接收訊息。透過結合多執行緒和 Socket.IO,開發者能有效地構建高效率、即時互動的網路應用。
管理Python中的執行緒
執行緒是作業系統可以排程的資料流,可以在單核心上平行執行,或在多核心上平行執行。執行緒與行程的概念相似:它們都是正在執行的程式碼。主要區別在於,執行緒是在行程內執行的,而行程之間分享資源,如記憶體。
建立簡單的執行緒
要在Python中處理執行緒,我們需要使用threading模組,該模組提供了一個更方便的介面,讓開發者可以處理多個執行緒。以下範例中,我們建立了四個執行緒,每個執行緒都列印出一條不同的訊息,這些訊息是作為引數傳遞給thread_message(message)方法的。
import threading
import time
num_threads = 4
def thread_message(message):
global num_threads
num_threads -= 1
print('Message from thread %s\n' % message)
while num_threads > 0:
print("I am the %s thread" % num_threads)
threading.Thread(target=thread_message, args=("I am the %s thread" % num_threads,)).start()
time.sleep(0.1)
# #### 內容解密:
# 這段程式碼展示瞭如何使用`threading`模組建立多個執行緒。每個執行緒都會執行`thread_message`函式,並傳遞一條訊息作為引數。`num_threads`變數用於控制執行緒的數量。
### 使用threading模組
`threading`模組包含了一個`Thread`類別,我們需要擴充這個類別來建立自己的執行緒。`run`方法將包含我們要在執行緒中執行的程式碼。
```python
import threading
def myTask():
print("Hello World: {}".format(threading.current_thread()))
myFirstThread = threading.Thread(target=myTask)
myFirstThread.start()
# #### 內容解密:
# 在這個範例中,我們建立了一個名為`myTask`的函式,並將其作為目標傳遞給`Thread`建構函式。然後,我們呼叫`start()`方法來啟動執行緒。
### 建立自定義執行緒類別
以下範例中,我們建立了一個名為`MyThread`的類別,繼承自`threading.Thread`。`run()`方法包含了每個執行緒要執行的程式碼。
```python
import threading
class MyThread(threading.Thread):
def __init__(self, message):
threading.Thread.__init__(self)
self.message = message
def run(self):
print(self.message)
def test():
for num in range(0, 10):
thread = MyThread("I am the " + str(num) + " thread")
thread.name = num
thread.start()
if __name__ == '__main__':
import timeit
print(timeit.timeit("test()", setup="from __main__ import test", number=5))
# #### 內容解密:
# 在這個範例中,我們定義了一個名為`MyThread`的類別,繼承自`threading.Thread`。我們覆寫了`run()`方法,使其列印出傳遞給建構函式的訊息。
### 使用join()方法等待執行緒完成
我們可以使用`join()`方法來等待執行緒完成。
```python
import threading
class thread_message(threading.Thread):
def __init__(self, message):
threading.Thread.__init__(self)
self.message = message
def run(self):
print(self.message)
threads = []
def test():
for num in range(0, 10):
thread = thread_message("I am the " + str(num) + " thread")
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if __name__ == '__main__':
import timeit
print(timeit.timeit("test()", setup="from __main__ import test", number=5))
# #### 內容解密:
# 在這個範例中,我們使用`join()`方法來等待所有執行緒完成。這確保了主執行緒不會在子執行緒完成之前離開。
## Python中的多執行緒與平行處理
多執行緒應用程式的概念是,它允許我們在額外的執行緒上提供程式碼副本並執行它們。這允許同時執行多個操作。此外,當一個行程被阻塞時,例如等待輸入/輸出操作,作業系統可以將計算時間分配給其他行程。
### 多執行緒圖表
```plantuml
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 多執行緒圖表
rectangle "產生" as node1
rectangle "執行" as node2
node1 --> node2
@enduml
此圖示展示了多執行緒的基本概念:主行程產生多個子執行緒,每個子執行緒獨立執行不同的任務。
Python中的多執行緒
Python有一個API,允許開發者編寫具有多個執行緒的應用程式。要開始使用多執行緒,我們將在Python類別中建立一個新的執行緒。這個類別繼承自threading.Thread,並包含管理一個執行緒的程式碼。
透過多執行緒,我們可以從主行程產生幾個行程,並使用每個執行緒以獨立的方式執行不同的任務。
多執行緒與平行處理在 Python 中的應用
在 Python 中,多執行緒是一種常見的平行處理技術,允許開發者同時執行多個任務,以提高程式的執行效率。本章節將探討 Python 中的多執行緒技術,包括其基本概念、使用方法以及相關限制。
建立多執行緒
首先,我們需要建立一個多執行緒的類別。在 ThreadWorker.py 檔案中,我們定義了一個 ThreadWorker 類別,繼承自 threading.Thread:
import threading
class ThreadWorker(threading.Thread):
def __init__(self):
super(ThreadWorker, self).__init__()
def run(self):
for i in range(10):
print(i)
內容解密:
threading.Thread是 Python 中用於建立執行緒的基本類別。super(ThreadWorker, self).__init__()初始化父類別。run方法定義了執行緒要執行的任務,在此例中是一個簡單的迴圈列印數字。
接下來,在 main.py 檔案中,我們建立了一個 ThreadWorker 例項,並啟動它:
import threading
from ThreadWorker import ThreadWorker
def main():
thread = ThreadWorker()
thread.start()
if __name__ == "__main__":
main()
內容解密:
thread = ThreadWorker()建立了一個ThreadWorker例項。thread.start()啟動了執行緒,呼叫了ThreadWorker中的run方法。
Python 傳統執行緒的限制
Python 的傳統執行緒實作存在一個主要問題,即其執行並非完全非同步。Python 的執行緒執行受限於 全域解析器鎖(GIL),這意味著同一時間內只有一個執行緒能夠被執行,無論有多少個處理器可用。
內容解密:
- GIL 是 Python 解析器用於同步執行緒執行的機制。
- 由於 GIL 的存在,多執行緒在 CPU 密集型任務中無法充分利用多核處理器的優勢。
為了最小化 GIL 對效能的影響,可以使用 -O 旗標來呼叫解析器,生成最佳化後的位元碼,減少上下文切換。此外,也可以考慮使用 multiprocessing 模組來繞過 GIL 的限制。
使用 ThreadPoolExecutor 實作平行處理
ThreadPoolExecutor 提供了一個介面來非同步執行任務。我們可以定義一個 ThreadPoolExecutor 物件,並使用其 submit 方法來提交任務:
from concurrent.futures import ThreadPoolExecutor
import threading
def view_thread():
print("執行中的執行緒")
print("存取中的執行緒識別碼:{}".format(threading.get_ident()))
print("執行完成的執行緒:{}".format(threading.current_thread()))
def main():
executor = ThreadPoolExecutor(max_workers=3)
thread1 = executor.submit(view_thread)
thread2 = executor.submit(view_thread)
thread3 = executor.submit(view_thread)
if __name__ == '__main__':
main()
內容解密:
ThreadPoolExecutor(max_workers=3)建立了一個最大工作者數為 3 的執行緒池。executor.submit(view_thread)提交了一個任務到執行緒池中。
使用上下文管理器(Context Manager)
我們也可以將 ThreadPoolExecutor 用作上下文管理器,使用 with 陳述式:
from concurrent.futures import ThreadPoolExecutor
def message(msg):
print("處理中:{}".format(msg))
def main():
print("啟動 ThreadPoolExecutor")
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(message, ('訊息 1'))
future = executor.submit(message, ('訊息 2'))
print("所有任務完成")
if __name__ == '__main__':
main()
內容解密:
with ThreadPoolExecutor(max_workers=2) as executor:使用上下文管理器建立了一個最大工作者數為 2 的執行緒池。- 在
with陳述式區塊內提交的任務會在區塊結束時自動等待完成。
使用socket.io進行即時通訊
WebSockets是一種技術,能夠在客戶端和伺服器之間透過TCP連線實作即時通訊,無需客戶端不斷檢查API端點是否有更新或新內容。客戶端建立一個與WebSocket伺服器的連線,並等待監聽新的伺服器事件或訊息。
WebSockets的主要優勢在於它們更高效,因為減少了網路負載,並以訊息的形式向大量客戶端傳送資訊。WebSockets的主要特點包括:
- 透過單一TCP連線提供雙向(全雙工)通訊。
- 提供伺服器和其連線客戶端之間的即時通訊,從而實作了新的應用程式的發展,以非同步方式處理事件。
- 提供平行性並提高效能,最佳化回應時間,從而實作更可靠的網路應用程式。
使用socket.io實作伺服器
要使用socket.io實作伺服器,我們需要引入其他模組,如asyncio和aiohttp:
- asyncio是一個Python模組,幫助我們在Python中進行單執行緒的平行程式設計。它在Python 3.7中可用,檔案可在https://docs.python.org/3/library/asyncio.html找到。
- aiohttp是一個用於建立根據asyncio的伺服器和客戶端應用程式的函式庫。該模組使用WebSockets的優勢,在應用程式的不同部分之間進行非同步通訊。檔案可在http://aiohttp.readthedocs.io/en/stable找到。
socket.io伺服器可在官方Python儲存函式庫中找到,可以透過pip安裝:
$ pip3 install python-socketio
完整的檔案可在https://python-socketio.readthedocs.io/en/latest找到。
以下是一個使用Python 3.5+的WebSockets範例,其中我們使用aiohttp框架實作了一個socket.io伺服器。您可以使用pip3 install aiohttp命令安裝此模組。
from aiohttp import web
import socketio
socket_io = socketio.AsyncServer()
app = web.Application()
socket_io.attach(app)
async def index(request):
return web.Response(text='Hello world from socketio', content_type='text/html')
@socket_io.on('message')
def print_message(socket_id, data):
print("Socket ID: ", socket_id)
print("Data: ", data)
app.router.add_get('/', index)
if __name__ == '__main__':
web.run_app(app)
在上述程式碼中,我們實作了一個根據socket.io的伺服器,使用了aiohttp模組。如您在程式碼中所見,我們定義了兩個方法:index()方法和print_message()方法。index()方法將根據“/”根端點請求傳回一個回應訊息,而print_message()方法將列印通訊端識別碼和由事件發出的資料。此方法使用@socketio.on('message')註解。
實作使用者端連線到伺服器
要實作客戶端,可以在web_socket_client.py檔案中找到以下程式碼:
import socketio
sio = socketio.Client()
@sio.event
def connect():
print('connection established')
@sio.event
def disconnect():
print('disconnected from server')
sio.connect('http://localhost:8080')
sio.emit('message', {'data': 'my_data'})
sio.wait()
在上述程式碼中,我們使用socketio.Client()類別的connect()方法連線到正在監聽8080埠的伺服器。我們定義了兩個方法,一個用於連線,另一個用於斷開連線。
要呼叫伺服器中的print_message()函式,我們需要發出訊息事件,並將資料作為物件字典傳遞。
要執行前兩個指令碼,我們需要分別執行兩個終端,一個用於客戶端,另一個用於伺服器。首先,您需要執行伺服器,然後執行客戶端,以檢查作為訊息傳送的資訊。
程式碼解析:
伺服器端實作:
- 使用
aiohttp建立一個Web應用,並附加socketio.AsyncServer()例項。 - 定義了一個
index函式來處理根URL請求,傳回一個簡單的回應。 - 使用
@socket_io.on('message')裝飾器定義了一個事件處理函式print_message,當接收到message事件時列印出通訊端ID和傳遞的資料。
- 使用
客戶端實作:
- 建立了一個
socketio.Client()例項,並定義了連線建立和斷開時的事件處理函式。 - 使用
sio.connect()方法連線到伺服器,並使用sio.emit()方法發出一個message事件,將資料傳送到伺服器。
- 建立了一個
執行與測試:
- 分別在兩個終端中執行伺服器和客戶端指令碼,觀察伺服器是否正確接收並列印出客戶端傳送的資料。
本章節介紹瞭如何使用Python的socket.io函式庫來建立支援即時通訊的WebSocket伺服器和客戶端。透過使用asyncio和aiohttp等非同步程式設計工具,能夠有效地處理平行連線和訊息傳遞,為開發即時網路應用提供了有力的支援。