返回文章列表

Rust 非同步觀察者模式與 Axum 框架實踐

本文探討 Rust 中非同步觀察者模式的實作,解決了`dyn Future` 的 unpin 問題,並以 Axum 框架為例,示範如何構建非同步 HTTP REST API 服務,涵蓋了 Tokio 執行環境管理、同步與非同步程式碼混合使用、錯誤處理、日誌追蹤以及單元測試等關鍵技巧。

Web 開發 Rust

在 Rust 生態系統中,非同步程式設計已成為主流,尤其在網路服務開發領域。本文將探討如何在 Rust 中實作非同步觀察者模式,並以 Axum 框架為例,展示如何構建高效能的非同步 Web 服務。首先,我們會深入剖析非同步觀察者模式的實作細節,並解決常見的 dyn Future unpin 問題。接著,我們將逐步講解如何使用 Axum 框架建立 RESTful API,涵蓋路由設定、請求處理、回應格式以及錯誤處理等方面。同時,我們也會探討 Tokio 執行環境的管理,以及如何有效地混合使用同步和非同步程式碼。此外,我們將介紹如何使用 tracing crate 和 tokio-console 工具來追蹤和除錯非同步程式碼,以及如何編寫有效的非同步程式碼單元測試。最後,我們會簡要比較 Axum 與其他 Web 框架(如 Rocket 和 Actix)的特性,並說明 Axum 的優勢和適用場景。

實作非同步觀察者模式

在 Rust 中實作觀察者模式時,我們需要處理非同步的情況。讓我們一步一步來分析如何完成這個任務。

遇到問題

當我們嘗試測試觀察者模式的實作時,遇到了編譯錯誤。錯誤訊息指出 dyn Future<Output = ()> 無法被 unpin。

#[tokio::main]
async fn main() {
    let subject = Subject;
    let observer = MyObserver;
    observer.observe(&subject).await;
}

錯誤訊息如下:

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:29:31
|
29 | observer.observe(&subject).await;
| ^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using `Box::pin`

解決問題

為瞭解決這個問題,我們需要了解 Future trait 的定義:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

poll 方法需要 self 引數是 Pin<&mut Self> 型別,這意味著在我們可以 poll 一個 future 之前,它需要被 pinned。

更新 Observer trait

我們更新 Observer trait 以傳回一個 pinned 的 Box<dyn Future>

pub trait Observer {
    type Subject;
    type Output;

    fn observe(
        &self,
        subject: &Self::Subject,
    ) -> Pin<Box<dyn Future<Output = Self::Output>>>;
}

並更新 MyObserver 的實作:

impl Observer for MyObserver {
    type Subject = Subject;
    type Output = ();

    fn observe(
        &self,
        _subject: &Self::Subject,
    ) -> Pin<Box<dyn Future<Output = Self::Output>>> {
        Box::pin(async {
            // 做一些非同步的事情!
            use tokio::time::{sleep, Duration};
            sleep(Duration::from_millis(100)).await;
        })
    }
}

更新 Observable trait

我們還需要更新 Observable trait 以支援非同步的 update 方法:

pub trait Observable {
    type Observer;

    fn update<'a>(
        &'a self,
    ) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>;

    fn attach(&mut self, observer: Self::Observer);
    fn detach(&mut self, observer: Self::Observer);
}

並實作 Observable for Subject

impl Observable for Subject {
    type Observer = Arc<dyn Observer<Subject = Self, Output = ()>>;

    fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>> {
        let observers: Vec<_> = self.observers.iter().flat_map(|o| o.upgrade()).collect();
        Box::pin(async move {
            futures::future::join_all(observers.iter().map(|o| o.observe(self))).await;
        })
    }

    // ...
}

測試非同步觀察者模式

現在,我們可以測試我們的非同步觀察者模式了:

#[tokio::main]
async fn main() {
    let mut subject = Subject::new("some subject state");
    let observer1 = MyObserver::new("observer1");
    let observer2 = MyObserver::new("observer2");

    subject.attach(observer1.clone());
    subject.attach(observer2.clone());

    // ... 做一些事情 ...
    subject.update().await;
}

輸出結果如下:

observed subject with state="some subject state" in observer1
observed subject with state="some subject state" in observer2

內容解密:

此範例程式碼展示瞭如何在 Rust 中實作非同步觀察者模式。首先,我們定義了 ObserverObservable traits,並更新它們以支援非同步操作。然後,我們實作了 MyObserverSubject 結構體,並測試了非同步觀察者模式的功能。

同步與非同步程式碼的混合使用

在 Rust 中,儘管非同步生態系統正在快速成長,但仍然存在需要混合使用同步和非同步程式碼的情況。我們應該盡量避免這種情況,但在某些情況下,這可能是必要的。

內容解密:

在混合使用同步和非同步程式碼時,我們需要注意潛在的問題。建議盡量使用非同步程式碼,以避免潛在的效能問題。

混契約步與非同步程式設計

在 Rust 的非同步程式設計中,一個常見的挑戰是混合使用同步和非同步程式碼,尤其是在使用不支援非同步的函式庫(如資料函式庫驅動程式或網路函式庫)時。例如,若要使用 Rocket 框架(https://crates.io/crates/rocket)建立一個支援非同步的 HTTP 服務,但所使用的資料函式庫尚不支援非同步操作。

從非同步上下文呼叫同步程式碼

要在非同步上下文中呼叫同步程式碼,建議使用 tokio::task::spawn_blocking() 函式。此函式接受一個函式並傳回一個 future。當呼叫 spawn_blocking() 時,它會在由 Tokio 管理的執行緒佇列上執行所提供的函式。然後,可以像其他非同步程式碼一樣使用 .await 等待該 future 的結果。

以下是一個使用 spawn_blocking() 的範例,展示如何非同步寫入檔案並同步讀取該檔案:

use tokio::io::{self, AsyncWriteExt};

async fn write_file(filename: &str) -> io::Result<()> {
    let mut f = tokio::fs::File::create(filename).await?;
    f.write(b"Hello, file!").await?;
    f.flush().await?;
    Ok(())
}

fn read_file(filename: &str) -> io::Result<String> {
    std::fs::read_to_string(filename)
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let filename = "mixed-sync-async.txt";
    write_file(filename).await?;
    let contents = tokio::task::spawn_blocking(|| read_file(filename)).await??;
    println!("File contents: {}", contents);
    tokio::fs::remove_file(filename).await?;
    Ok(())
}

內容解密:

  1. write_file 函式:此函式是非同步的,用於建立一個檔案並寫入內容。使用 tokio::fs::File::create 非同步地建立檔案,並使用 writeflush 方法寫入內容。
  2. read_file 函式:此函式是同步的,用於讀取檔案內容。它使用標準函式庫的 std::fs::read_to_string 函式來讀取檔案。
  3. main 函式:在 main 函式中,首先非同步地寫入檔案。然後,使用 tokio::task::spawn_blocking 在一個由 Tokio 管理的獨立執行緒上同步讀取檔案。注意這裡使用了雙重 ?,因為 spawn_blockingread_file 都傳回 Result
  4. Tokio 的執行緒管理:Tokio 會管理用於執行 spawn_blocking 中程式碼的執行緒數量。預設值通常足夠使用,但也可以透過 Tokio 的 runtime builder 進行組態。

在同步程式碼中使用非同步程式碼

雖然較少見,但也可以在同步程式碼中使用非同步程式碼。這通常涉及到使用 block_on() 方法來執行非同步程式碼。不過,這種做法應盡量避免,因為它可能會引入複雜性。

何時避免使用非同步程式設計

非同步程式設計非常適合 I/O 密集型的應用,如網路服務。然而,它也引入了一些複雜性。因此,當不需要平行處理時,同步程式設計可能是更好的選擇。例如,簡單的 CLI 工具或只進行少數順序 HTTP 請求的客戶端。

同步非同步程式碼

當需要在不同的非同步任務之間共用資料時,需要小心處理以避免競爭條件。Tokio 的 sync 模組提供了一些工具來幫助實作這一點,例如多生產者單消費者(mpsc)通道,可以安全地在不同的非同步任務之間傳遞訊息。

使用 mpsc 通道

mpsc 通道允許從多個生產者向單個消費者傳遞訊息,而無需顯式的鎖定。Tokio 提供了不同型別的通道,包括廣播、一次性(oneshot)和監看(watch)。

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 使用 mpsc 通道

rectangle "使用 mpsc 通道" as n1
rectangle "實作" as n2
rectangle "應用" as n3

n1 --> n2
n2 --> n3

@enduml

此圖示展示了多個生產者如何透過 mpsc 通道向單個消費者傳送訊息。

內容解密:

  1. 生產者與消費者:生產者可以是非同步任務,它們產生需要被處理的訊息。消費者則是處理這些訊息的任務。
  2. mpsc 通道:Tokio 的 mpsc 通道提供了一種機制,讓多個生產者可以安全地向單個消費者傳送訊息,無需擔心競爭條件。
  3. 通道型別:Tokio 提供了不同型別的通道,以滿足不同的需求。例如,廣播通道可以將訊息傳送給多個消費者。

總之,混合使用同步和非同步程式碼是可能的,但需要謹慎處理以避免引入複雜性和潛在的錯誤。Tokio 提供的工具,如 spawn_blocking() 和 mpsc 通道,可以幫助簡化這一過程。

8.9 追蹤與除錯非同步程式碼

對於任何足夠複雜的網路應用程式,檢測程式碼以衡量其效能和除錯問題至關重要。Tokio 專案提供了一個名為 tracing 的 crate,用於此目的。tracing 支援 OpenTelemetry 標準,能夠與許多流行的第三方追蹤和遙測工具整合,但也可以將追蹤結果輸出為日誌。

使用 Tokio 啟用追蹤功能

使用 Tokio 啟用追蹤功能後,可以解鎖 tokio-console,這是一個類別似於大多數 UNIX 系統中熟悉的 top 程式的 CLI 工具。tokio-console 允許你即時分析根據 Tokio 的非同步 Rust 程式碼。

設定追蹤的步驟

  1. 設定一個訂閱者(subscriber),以接收追蹤結果。
  2. 使用 #[tracing::instrument] 巨集檢測你想要測量的函式。

簡單範例

use tokio::time::{sleep, Duration};

#[tracing::instrument]
async fn sleep_1s() {
    sleep(Duration::from_secs(1)).await;
}

#[tracing::instrument]
async fn sleep_2s() {
    sleep(Duration::from_secs(2)).await;
}

#[tracing::instrument]
async fn sleep_3s() {
    sleep(Duration::from_secs(3)).await;
}

#[tokio::main]
async fn main() {
    console_subscriber::init();
    loop {
        tokio::spawn(sleep_1s());
        tokio::spawn(sleep_2s());
        sleep_3s().await;
    }
}

程式碼解析:

  • 使用 console_subscriber::init(); 初始化 console 訂閱者,以發出追蹤結果。
  • 三個函式 sleep_1ssleep_2ssleep_3s 被檢測並非同步執行。
  • sleep_1ssleep_2s 被觸發並忘記,而 sleep_3s 則被阻塞,直到 3 秒過去,然後無限重複這個過程。

相關依賴設定

[dependencies]
tokio = { version = "1", features = ["full", "tracing"] }
tracing = "0.1"
console-subscriber = "0.1"

編譯與執行

  1. 安裝 tokio-consolecargo install tokio-console
  2. 編譯並執行程式,需啟用 Tokio 的不穩定追蹤功能:RUSTFLAGS="--cfg tokio_unstable" cargo run
  3. 執行 tokio-console 以觀察程式的執行狀況。

使用 tokio-console 的好處

  • 即時觀察任務狀態和相關指標。
  • 監控資源使用情況。
  • 檢視個別任務的詳細資訊,包括 poll 時間直方圖。

8.10 處理非同步程式碼的測試

測試非同步程式碼時,有兩種主要策略:

  1. 為每個測試建立和銷毀非同步執行環境。
  2. 跨多個測試重複使用一個或多個非同步執行環境。

使用 #[tokio::test] 巨集

對於大多數情況,可以使用 #[tokio::test] 巨集來簡化測試的編寫。它會自動為測試函式建立非同步執行環境。

範例測試

#[tokio::test]
async fn sleep_test() {
    let start_time = Instant::now();
    sleep(Duration::from_secs(1)).await;
    let end_time = Instant::now();
    let seconds = end_time
        .checked_duration_since(start_time)
        .unwrap()
        .as_secs();
    assert_eq!(seconds, 1);
}

測試解析:

  • 使用 #[tokio::test] 巨集來標記測試函式,使其在非同步環境中執行。
  • 測試 sleep_1s 函式的功能,驗證是否正確睡眠了 1 秒。

手動管理執行環境

如果你需要,也可以手動管理非同步執行環境,但必須注意 Rust 的測試框架會平行執行測試,因此需要正確處理跨執行緒的情況。

使用 axum 框架建立 HTTP REST API 服務

在前面的章節中,已經學習了許多非同步 Rust 程式設計的相關知識。現在,將這些知識應用在實際的專案中,透過建立一個使用 axum 框架的 Web 服務來實踐所學的內容。本章節將會涵蓋以下幾個主題:

選擇適合的 Web 框架

在撰寫本文的過程中,非同步 Rust 的生態系統發生了相當大的變化,尤其是在可用於非同步 Rust 的工具和函式庫方面。Tokio 及其相關專案的進展令人印象深刻。

對於建立 Web 服務,強烈推薦使用 axum 框架。axum 是 Tokio 專案的一部分,它提供了簡潔且靈活的 API,並且大多數實作都不依賴巨集(macro)。axum 建構於 Tower 和 hyper 之上,分別提供了建立網路服務的抽象層和 HTTP 使用者端及伺服器實作。

axum 的最大優點在於它不會強加特定的模式或做法給開發者。當然,若要深入瞭解細節,學習 Tower 函式庫是必要的,但對於簡單的任務而言,這並不是必要的。axum 支援 tracing 和 metrics,只需要少量的設定即可啟用。

其他值得一提的框架

其他值得注意的框架包括 Rocket 和 Actix。Rocket 的目標是成為 Rust 版本的 Ruby on Rails,而 Actix 是最早的 Rust Web 框架之一。

Rocket 和 Actix 都大量使用了巨集來隱藏實作細節。相較之下,axum 的核心 API 不依賴巨集,這使得它更易於使用和理解。

值得注意的是,Rocket 和 Actix 都早於 Rust 的 Future 特性和 async/await 語法的穩定化,在那之前,使用巨集是必要的。最近的版本中,這兩個框架都努力減少了對巨集的依賴。

axum 的優勢

axum 的設計理念使其能夠快速建立 Web 服務,並且易於與其他函式庫和工具整合。它不強加特定的模式或做法,使得開發者能夠按照自己的方式進行開發。

使用 axum 的基本步驟

  1. 選擇 axum 作為 Web 框架:由於其簡潔、靈活且易於使用的特性,axum 成為建立 Web 服務的理想選擇。

  2. 設計 API:根據需求設計 RESTful API,包括定義路由、處理請求和回應等。

  3. 建模資料:根據業務邏輯定義資料模型,這通常涉及到資料函式庫的設計和操作。

  4. 實作 API:使用 axum 提供的方法實作 API,包括處理 CRUD(建立、讀取、更新、刪除)操作。

  5. 錯誤處理:優雅地處理錯誤,提供有用的錯誤訊息給客戶端。

程式碼解析:

此範例展示瞭如何使用 axum 建立一個簡單的 HTTP 伺服器。首先,定義了一個 User 結構體,用於表示使用者資料。然後,定義了兩個處理函式:create_user 用於處理建立使用者的請求,get_user 用於處理取得使用者資訊的請求。最後,在 main 函式中,建立了一個 Router 例項,並將其繫結到指定的位址和埠上,啟動伺服器。

重點說明:

  1. 使用 axumserde 等函式庫來建立和處理 JSON 資料。
  2. 定義了 User 結構體,並實作了 SerializeDeserialize 特性,以便於 JSON 的序列化和反序列化。
  3. 使用 Router 定義了兩個路由,分別對應到 create_userget_user 處理函式。
  4. 使用 tokio::main 巨集來標記非同步主函式,並啟動 axum 伺服器。