返回文章列表

Rust 分享狀態管理與同步機制

本文探討 Rust 中分享可變狀態的管理與同步機制,包含 Mutex、RwLock、Condvar 和原子操作的應用場景、實作細節以及避免死鎖的最佳實務。文章以實際程式碼範例說明如何使用 Mutex 保護分享資料、處理被毒化的 Mutex、使用 RwLock 保護不常變動的資料,以及如何使用 Condvar

程式語言 平行計算

在 Rust 的多執行緒環境中,安全地管理分享可變狀態至關重要。本文將會探討 Rust 提供的同步機制,包含 Mutex、RwLock、Condvar 和原子操作,並輔以程式碼範例說明如何正確地使用這些工具。Mutex 提供了對分享資源的獨佔存取,適用於需要修改分享資料的場景。RwLock 允許多個讀取者同時存取,但寫入者需要獨佔存取,適用於讀取操作頻繁的場景。Condvar 則允許執行緒等待特定條件,並在條件滿足時被喚醒,適用於需要執行緒間協作的場景。原子操作提供了一種無鎖的同步方式,適用於簡單的計數器或標誌位等場景。此外,文章也將探討如何避免 Mutex 死鎖以及如何處理被毒化的 Mutex。最後,文章還會介紹 Rust 中的全域變數的處理方式以及巨集的應用,例如使用 lazy_static 巨集初始化全域變數,以及使用巨集來產生更具彈性的程式碼。

Mutex 的應用與注意事項

在 Rust 中,Mutex 是一種用於管理分享可變狀態的同步機制。它允許在多執行緒環境中安全地存取和修改資料。建立一個新的 Mutex 與建立一個新的 BoxArc 相似,但 Mutex 主要關注的是鎖定機制。如果希望 Mutex 被分配在堆積上,需要明確指定,例如使用 Arc::new 來建立整個應用程式的例項,並使用 Mutex::new 來保護資料。

實作 join_waiting_list 方法

以下是一個使用 Mutex 的例子,實作了 join_waiting_list 方法,用於將玩家新增到等待列表中,並在達到一定數量時開始新遊戲:

impl FernEmpireApp {
    /// 將玩家新增到等待列表中,以便參與下一場遊戲。
    /// 當等待列表中的玩家數量達到一定數量時,立即開始新遊戲。
    fn join_waiting_list(&self, player: PlayerId) {
        // 鎖定 Mutex 並取得對內部資料的存取權。
        // `guard` 的作用域是一個臨界區。
        let mut guard = self.waiting_list.lock().unwrap();
        // 執行遊戲邏輯。
        guard.push(player);
        if guard.len() == GAME_SIZE {
            let players = guard.split_off(0);
            self.start_game(players);
        }
    }
}

內容解密:

  1. self.waiting_list.lock().unwrap():這行程式碼嘗試鎖定 Mutex。如果鎖定成功,則傳回一個 MutexGuard,它是一個對 WaitingList 的可變參照包裝器。
  2. guard.push(player):透過 guardWaitingList 進行操作,將新的玩家新增到列表中。
  3. if guard.len() == GAME_SIZE:檢查等待列表中的玩家數量是否達到遊戲所需的玩家數量。
  4. let players = guard.split_off(0):如果達到遊戲人數,則將等待列表中的玩家轉移到 players 變數中。
  5. self.start_game(players):呼叫 start_game 方法開始新遊戲。

Mutex 與 mut 的關係

在 Rust 中,mut 表示獨佔存取,而非 mut 表示分享存取。對於大多數型別,如果你沒有對父結構的獨佔存取權,你也就無法對其子結構進行獨佔存取。但是,Mutex 提供了一種機制,可以在多執行緒分享存取 Mutex 的情況下,對其內部資料進行獨佔存取。

為何 Mutex 不總是最佳選擇

雖然 Rust 的 Mutex 設計可以幫助開發者更系統化、更合理地使用互斥鎖,但仍然需要注意以下問題:

  • 競爭條件(Race Conditions):即使在安全的 Rust 程式碼中,也可能出現競爭條件,導致程式行為取決於執行緒之間的時序安排。
  • 分享可變狀態:使用 Mutex 可能會導致程式碼設計上的問題,例如使得程式碼變得更為相互關聯和難以測試。
  • 死鎖(Deadlock):執行緒可能會因為嘗試取得已經持有的鎖而導致死鎖。

死鎖問題

當一個執行緒嘗試取得它已經持有的鎖時,就會發生死鎖。例如:

let mut guard1 = self.waiting_list.lock().unwrap();
let mut guard2 = self.waiting_list.lock().unwrap(); // 死鎖

在這個例子中,第一個 lock() 呼叫成功取得了鎖,第二個呼叫則會阻塞,等待鎖被釋放,但由於第一個鎖沒有被釋放,因此會導致死鎖。

內容解密:

  1. 第一個 lock() 呼叫成功取得鎖。
  2. 第二個 lock() 呼叫會阻塞,因為鎖已經被持有。
  3. 由於執行緒持有鎖並等待鎖被釋放,因此發生死鎖。

總之,在使用 Mutex 時,需要謹慎處理鎖的取得和釋放,以避免死鎖等問題,並且要注意程式碼的設計,以減少分享可變狀態的複雜性。

分享可變狀態的管理與同步機制

在多執行緒程式設計中,分享可變狀態的管理是一個重要的課題。Rust 提供了一系列的同步機制來幫助開發者有效地管理分享狀態,避免資料競爭和死鎖等問題。

互斥鎖(Mutex)與死鎖

互斥鎖是一種常見的同步機制,用於保護分享資源。Rust 的 std::sync::Mutex 提供了互斥鎖的功能。然而,不當的使用互斥鎖可能會導致死鎖。死鎖發生在兩個或多個執行緒互相等待對方釋放資源的情況下。

避免死鎖的最佳實踐

  1. 保持臨界區小: 盡量縮短持有互斥鎖的時間,以減少死鎖的可能性。
  2. 設計良好的程式結構: 避免多個執行緒互相等待對方的資源。

被毒化的互斥鎖(Poisoned Mutex)

當一個執行緒在持有互斥鎖時發生 panic,Rust 會將該互斥鎖標記為被毒化。任何後續嘗試鎖定該互斥鎖的操作都會傳回錯誤結果。這種機制是為了防止其他執行緒無意中使用可能處於不一致狀態的資料。

處理被毒化的互斥鎖

雖然被毒化的互斥鎖看起來很嚴重,但實際上並不一定是致命的。開發者仍然可以透過 PoisonError::into_inner() 方法來存取被毒化的互斥鎖中的資料。

使用互斥鎖實作多生產者通道

Rust 的標準函式庫提供了單一接收者的多生產者通道(mpsc)。然而,透過在接收者周圍新增互斥鎖,可以實作多執行緒分享接收者的功能。

分享接收者的實作範例

pub mod shared_channel {
    use std::sync::{Arc, Mutex};
    use std::sync::mpsc::{channel, Sender, Receiver};

    #[derive(Clone)]
    pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);

    impl<T> Iterator for SharedReceiver<T> {
        type Item = T;

        fn next(&mut self) -> Option<T> {
            let guard = self.0.lock().unwrap();
            guard.recv().ok()
        }
    }

    pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
        let (sender, receiver) = channel();
        (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
    }
}

內容解密:

  1. 使用 ArcMutex: 將 Receiver 包裝在 ArcMutex 中,以實作執行緒安全的分享。
  2. 實作 Iterator 特徵: 為 SharedReceiver 實作 Iterator 特徵,以便於遍歷接收到的訊息。
  3. next 方法的實作: 在 next 方法中,先鎖定 Mutex,然後從 Receiver 接收訊息。

讀寫鎖(RwLock)

讀寫鎖是一種允許多個讀取者同時存取分享資源,但寫入者具有獨佔存取權的同步機制。Rust 的 std::sync::RwLock 提供了讀寫鎖的功能。

使用讀寫鎖的最佳實踐

  1. 使用 RwLock 保護不常變動的資料: 例如,伺服器程式的組態資訊。
  2. 區分讀取和寫入操作: 使用 RwLock::read() 進行讀取操作,使用 RwLock::write() 進行寫入操作。

使用讀寫鎖的範例

use std::sync::RwLock;

struct FernEmpireApp {
    config: RwLock<AppConfig>,
}

impl FernEmpireApp {
    fn mushrooms_enabled(&self) -> bool {
        let config_guard = self.config.read().unwrap();
        config_guard.mushrooms_enabled
    }

    fn reload_config(&self) -> io::Result<()> {
        let new_config = AppConfig::load()?;
        let mut config_guard = self.config.write().unwrap();
        *config_guard = new_config;
        Ok(())
    }
}

內容解密:

  1. 使用 RwLock 保護組態資訊: 將 AppConfig 包裝在 RwLock 中,以實作執行緒安全的分享。
  2. mushrooms_enabled 方法的實作: 使用 RwLock::read() 方法讀取組態資訊。
  3. reload_config 方法的實作: 使用 RwLock::write() 方法更新組態資訊。

條件變數(Condvar)與原子操作(Atomics)在平行程式設計中的應用

在平行程式設計中,執行緒經常需要等待某個條件成立後才能繼續執行。Rust 提供兩種重要的同步機制來處理這種情況:條件變數(Condvar)和原子操作(Atomics)。

條件變數(Condvar)

條件變數是一種同步原語,允許執行緒等待某個條件成立。當條件成立時,其他執行緒可以通知等待中的執行緒繼續執行。

使用場景

  • 伺服器關閉時,主執行緒需要等待其他執行緒完成離開。
  • 工作執行緒需要等待資料準備好後才能處理。
  • 分散式共識協定的實作中,執行緒需要等待足夠多的節點回應。

Condvar 的基本用法

在 Rust 中,std::sync::Condvar 型別實作了條件變數。Condvar 提供了 .wait().notify_all() 方法,分別用於等待條件成立和通知等待中的執行緒。

當條件成立時,呼叫 Condvar::notify_all() 通知所有等待中的執行緒:

self.has_data_condvar.notify_all();

等待條件成立的執行緒使用 Condvar::wait() 方法:

while !guard.has_data() {
    guard = self.has_data_condvar.wait(guard).unwrap();
}

實作細節

  • Condvar::wait() 方法會釋放 Mutex,然後在條件成立時重新取得 Mutex。
  • Condvar::wait() 方法的簽名比較特殊,它會消耗 MutexGuard 物件,並在成功時傳回新的 MutexGuard。

原子操作(Atomics)

原子操作是一種無鎖的平行程式設計技術,提供了一種高效的方式來更新分享變數。

原子型別的使用

Rust 的 std::sync::atomic 模組提供了多種原子型別,如 AtomicIsizeAtomicUsizeAtomicBool。這些型別提供了原子操作方法,如 .fetch_add().load(),用於執行安全的更新操作。

例如,更新一個 AtomicIsize 變數:

use std::sync::atomic::Ordering;
atom.fetch_add(1, Ordering::SeqCst);

記憶體排序(Memory Ordering)

原子操作需要指定記憶體排序,以確保正確性和效能。Rust 繼承了 C++ 的記憶體排序模型,提供了多種排序選項,如 Ordering::SeqCstOrdering::AcquireOrdering::Release

使用場景

  • 取消非同步任務:使用 AtomicBool 來通知工作執行緒取消任務。
  • 簡單的全域變數:使用原子型別來實作簡單的全域變數。

範例:使用 AtomicBool 取消非同步任務

以下範例示範如何使用 AtomicBool 來取消非同步任務:

use std::sync::atomic::{AtomicBool, Ordering};
let cancel_flag = Arc::new(AtomicBool::new(false));
let worker_cancel_flag = cancel_flag.clone();

let worker_handle = spawn(move || {
    for pixel in animation.pixels_mut() {
        render(pixel); // ray-tracing - this takes a few microseconds
        if worker_cancel_flag.load(Ordering::SeqCst) {
            return None;
        }
    }
    Some(animation)
});

// Cancel rendering.
cancel_flag.store(true, Ordering::SeqCst);
// Discard the result, which is probably `None`.
worker_handle.join().unwrap();

內容解密:

  1. Arc的使用:透過Arc(原子參考計數)分享AtomicBool變數,讓主執行緒和工作執行緒都能存取同一個標誌位。
  2. AtomicBool的原子操作:使用.load().store()方法對標誌位進行讀寫,這些操作是原子的,能夠避免資料競爭。
  3. 取消任務的流程:主執行緒設定cancel_flagtrue,工作執行緒定期檢查該標誌位,一旦發現被設定為true,立即終止任務並傳回None
  4. Ordering::SeqCst的作用:使用最嚴格的記憶體排序模型,確保所有執行緒對標誌位的讀寫順序一致,避免因記憶體排序導致的錯誤。

Rust 中的全域變數與巨集

全域變數的挑戰與解決方案

在撰寫網路程式碼時,我們經常需要使用全域變數來記錄某些狀態,例如伺服器已成功處理的封包數量。以下是一個簡單的例子:

/// 伺服器已成功處理的封包數量。
static PACKETS_SERVED: usize = 0;

然而,這段程式碼存在一個問題:PACKETS_SERVED 是不可變的,因此我們無法修改它。

Rust 語言對於全域可變狀態有嚴格的限制。預設情況下,靜態變數是不可變的,這是為了確保執行緒安全。雖然我們可以宣告一個可變的靜態變數,但存取它是不安全的。

使用原子整數

最簡單的方法是將 PACKETS_SERVED 宣告為一個原子整數:

use std::sync::atomic::{AtomicUsize, Ordering};

static PACKETS_SERVED: AtomicUsize = AtomicUsize::new(0);

這樣,我們就可以使用 fetch_add 方法來原子地增加 PACKETS_SERVED 的值:

PACKETS_SERVED.fetch_add(1, Ordering::SeqCst);

使用 Mutex 或 RwLock

對於其他型別的全域變數,我們可以使用 MutexRwLock 來確保執行緒安全。例如:

#[macro_use]
extern crate lazy_static;

use std::sync::Mutex;

lazy_static! {
    static ref HOSTNAME: Mutex<String> = Mutex::new(String::new());
}

這裡使用了 lazy_static 巨集來初始化 HOSTNAME 變數。lazy_static 允許我們使用任意表達式來初始化靜態變數,並且只在第一次存取時執行初始化。

巨集的威力

Rust 的巨集是一種強大的工具,可以擴充套件語言的功能。巨集在編譯期間展開,產生新的 Rust 程式碼。例如,assert_eq! 巨集可以用於測試:

assert_eq!(gcd(6, 10), 2);

這個巨集呼叫會展開成以下程式碼:

match (&gcd(6, 10), &2) {
    (left_val, right_val) => {
        if !(*left_val == *right_val) {
            panic!("assertion failed: `(left == right)`, \
                    (left: `{:?}`, right: `{:?}`)", left_val, right_val);
        }
    }
}

內容解密:

  • assert_eq! 巨集用於比較兩個表示式的值是否相等。
  • 如果比較失敗,巨集會產生一個包含檔名和行號的錯誤訊息。
  • 這個巨集不能用函式實作,因為函式無法取得呼叫者的檔名和行號。

為什麼使用巨集?

  • 巨集可以在編譯期間產生程式碼,提供更高的靈活性。
  • 巨集可以取得呼叫者的上下文資訊,例如檔名和行號。