在 Rust 生態系統中,非同步程式設計已成為主流,尤其在網路服務開發領域。本文將探討如何在 Rust 中實作非同步觀察者模式,並以 Axum 框架為例,展示如何構建高效能的非同步 Web 服務。首先,我們會深入剖析非同步觀察者模式的實作細節,並解決常見的 dyn Future unpin 問題。接著,我們將逐步講解如何使用 Axum 框架建立 RESTful API,涵蓋路由設定、請求處理、回應格式以及錯誤處理等方面。同時,我們也會探討 Tokio 執行環境的管理,以及如何有效地混合使用同步和非同步程式碼。此外,我們將介紹如何使用 tracing crate 和 tokio-console 工具來追蹤和除錯非同步程式碼,以及如何編寫有效的非同步程式碼單元測試。最後,我們會簡要比較 Axum 與其他 Web 框架(如 Rocket 和 Actix)的特性,並說明 Axum 的優勢和適用場景。
實作非同步觀察者模式
在 Rust 中實作觀察者模式時,我們需要處理非同步的情況。讓我們一步一步來分析如何完成這個任務。
遇到問題
當我們嘗試測試觀察者模式的實作時,遇到了編譯錯誤。錯誤訊息指出 dyn Future<Output = ()> 無法被 unpin。
#[tokio::main]
async fn main() {
let subject = Subject;
let observer = MyObserver;
observer.observe(&subject).await;
}
錯誤訊息如下:
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:29:31
|
29 | observer.observe(&subject).await;
| ^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using `Box::pin`
解決問題
為瞭解決這個問題,我們需要了解 Future trait 的定義:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
poll 方法需要 self 引數是 Pin<&mut Self> 型別,這意味著在我們可以 poll 一個 future 之前,它需要被 pinned。
更新 Observer trait
我們更新 Observer trait 以傳回一個 pinned 的 Box<dyn Future>:
pub trait Observer {
type Subject;
type Output;
fn observe(
&self,
subject: &Self::Subject,
) -> Pin<Box<dyn Future<Output = Self::Output>>>;
}
並更新 MyObserver 的實作:
impl Observer for MyObserver {
type Subject = Subject;
type Output = ();
fn observe(
&self,
_subject: &Self::Subject,
) -> Pin<Box<dyn Future<Output = Self::Output>>> {
Box::pin(async {
// 做一些非同步的事情!
use tokio::time::{sleep, Duration};
sleep(Duration::from_millis(100)).await;
})
}
}
更新 Observable trait
我們還需要更新 Observable trait 以支援非同步的 update 方法:
pub trait Observable {
type Observer;
fn update<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>;
fn attach(&mut self, observer: Self::Observer);
fn detach(&mut self, observer: Self::Observer);
}
並實作 Observable for Subject:
impl Observable for Subject {
type Observer = Arc<dyn Observer<Subject = Self, Output = ()>>;
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>> {
let observers: Vec<_> = self.observers.iter().flat_map(|o| o.upgrade()).collect();
Box::pin(async move {
futures::future::join_all(observers.iter().map(|o| o.observe(self))).await;
})
}
// ...
}
測試非同步觀察者模式
現在,我們可以測試我們的非同步觀察者模式了:
#[tokio::main]
async fn main() {
let mut subject = Subject::new("some subject state");
let observer1 = MyObserver::new("observer1");
let observer2 = MyObserver::new("observer2");
subject.attach(observer1.clone());
subject.attach(observer2.clone());
// ... 做一些事情 ...
subject.update().await;
}
輸出結果如下:
observed subject with state="some subject state" in observer1
observed subject with state="some subject state" in observer2
內容解密:
此範例程式碼展示瞭如何在 Rust 中實作非同步觀察者模式。首先,我們定義了 Observer 和 Observable traits,並更新它們以支援非同步操作。然後,我們實作了 MyObserver 和 Subject 結構體,並測試了非同步觀察者模式的功能。
同步與非同步程式碼的混合使用
在 Rust 中,儘管非同步生態系統正在快速成長,但仍然存在需要混合使用同步和非同步程式碼的情況。我們應該盡量避免這種情況,但在某些情況下,這可能是必要的。
內容解密:
在混合使用同步和非同步程式碼時,我們需要注意潛在的問題。建議盡量使用非同步程式碼,以避免潛在的效能問題。
混契約步與非同步程式設計
在 Rust 的非同步程式設計中,一個常見的挑戰是混合使用同步和非同步程式碼,尤其是在使用不支援非同步的函式庫(如資料函式庫驅動程式或網路函式庫)時。例如,若要使用 Rocket 框架(https://crates.io/crates/rocket)建立一個支援非同步的 HTTP 服務,但所使用的資料函式庫尚不支援非同步操作。
從非同步上下文呼叫同步程式碼
要在非同步上下文中呼叫同步程式碼,建議使用 tokio::task::spawn_blocking() 函式。此函式接受一個函式並傳回一個 future。當呼叫 spawn_blocking() 時,它會在由 Tokio 管理的執行緒佇列上執行所提供的函式。然後,可以像其他非同步程式碼一樣使用 .await 等待該 future 的結果。
以下是一個使用 spawn_blocking() 的範例,展示如何非同步寫入檔案並同步讀取該檔案:
use tokio::io::{self, AsyncWriteExt};
async fn write_file(filename: &str) -> io::Result<()> {
let mut f = tokio::fs::File::create(filename).await?;
f.write(b"Hello, file!").await?;
f.flush().await?;
Ok(())
}
fn read_file(filename: &str) -> io::Result<String> {
std::fs::read_to_string(filename)
}
#[tokio::main]
async fn main() -> io::Result<()> {
let filename = "mixed-sync-async.txt";
write_file(filename).await?;
let contents = tokio::task::spawn_blocking(|| read_file(filename)).await??;
println!("File contents: {}", contents);
tokio::fs::remove_file(filename).await?;
Ok(())
}
內容解密:
write_file函式:此函式是非同步的,用於建立一個檔案並寫入內容。使用tokio::fs::File::create非同步地建立檔案,並使用write和flush方法寫入內容。read_file函式:此函式是同步的,用於讀取檔案內容。它使用標準函式庫的std::fs::read_to_string函式來讀取檔案。main函式:在main函式中,首先非同步地寫入檔案。然後,使用tokio::task::spawn_blocking在一個由 Tokio 管理的獨立執行緒上同步讀取檔案。注意這裡使用了雙重?,因為spawn_blocking和read_file都傳回Result。- Tokio 的執行緒管理:Tokio 會管理用於執行
spawn_blocking中程式碼的執行緒數量。預設值通常足夠使用,但也可以透過 Tokio 的 runtime builder 進行組態。
在同步程式碼中使用非同步程式碼
雖然較少見,但也可以在同步程式碼中使用非同步程式碼。這通常涉及到使用 block_on() 方法來執行非同步程式碼。不過,這種做法應盡量避免,因為它可能會引入複雜性。
何時避免使用非同步程式設計
非同步程式設計非常適合 I/O 密集型的應用,如網路服務。然而,它也引入了一些複雜性。因此,當不需要平行處理時,同步程式設計可能是更好的選擇。例如,簡單的 CLI 工具或只進行少數順序 HTTP 請求的客戶端。
同步非同步程式碼
當需要在不同的非同步任務之間共用資料時,需要小心處理以避免競爭條件。Tokio 的 sync 模組提供了一些工具來幫助實作這一點,例如多生產者單消費者(mpsc)通道,可以安全地在不同的非同步任務之間傳遞訊息。
使用 mpsc 通道
mpsc 通道允許從多個生產者向單個消費者傳遞訊息,而無需顯式的鎖定。Tokio 提供了不同型別的通道,包括廣播、一次性(oneshot)和監看(watch)。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 使用 mpsc 通道
rectangle "使用 mpsc 通道" as n1
rectangle "實作" as n2
rectangle "應用" as n3
n1 --> n2
n2 --> n3
@enduml
此圖示展示了多個生產者如何透過 mpsc 通道向單個消費者傳送訊息。
內容解密:
- 生產者與消費者:生產者可以是非同步任務,它們產生需要被處理的訊息。消費者則是處理這些訊息的任務。
- mpsc 通道:Tokio 的 mpsc 通道提供了一種機制,讓多個生產者可以安全地向單個消費者傳送訊息,無需擔心競爭條件。
- 通道型別:Tokio 提供了不同型別的通道,以滿足不同的需求。例如,廣播通道可以將訊息傳送給多個消費者。
總之,混合使用同步和非同步程式碼是可能的,但需要謹慎處理以避免引入複雜性和潛在的錯誤。Tokio 提供的工具,如 spawn_blocking() 和 mpsc 通道,可以幫助簡化這一過程。
8.9 追蹤與除錯非同步程式碼
對於任何足夠複雜的網路應用程式,檢測程式碼以衡量其效能和除錯問題至關重要。Tokio 專案提供了一個名為 tracing 的 crate,用於此目的。tracing 支援 OpenTelemetry 標準,能夠與許多流行的第三方追蹤和遙測工具整合,但也可以將追蹤結果輸出為日誌。
使用 Tokio 啟用追蹤功能
使用 Tokio 啟用追蹤功能後,可以解鎖 tokio-console,這是一個類別似於大多數 UNIX 系統中熟悉的 top 程式的 CLI 工具。tokio-console 允許你即時分析根據 Tokio 的非同步 Rust 程式碼。
設定追蹤的步驟
- 設定一個訂閱者(subscriber),以接收追蹤結果。
- 使用
#[tracing::instrument]巨集檢測你想要測量的函式。
簡單範例
use tokio::time::{sleep, Duration};
#[tracing::instrument]
async fn sleep_1s() {
sleep(Duration::from_secs(1)).await;
}
#[tracing::instrument]
async fn sleep_2s() {
sleep(Duration::from_secs(2)).await;
}
#[tracing::instrument]
async fn sleep_3s() {
sleep(Duration::from_secs(3)).await;
}
#[tokio::main]
async fn main() {
console_subscriber::init();
loop {
tokio::spawn(sleep_1s());
tokio::spawn(sleep_2s());
sleep_3s().await;
}
}
程式碼解析:
- 使用
console_subscriber::init();初始化 console 訂閱者,以發出追蹤結果。 - 三個函式
sleep_1s、sleep_2s和sleep_3s被檢測並非同步執行。 sleep_1s和sleep_2s被觸發並忘記,而sleep_3s則被阻塞,直到 3 秒過去,然後無限重複這個過程。
相關依賴設定
[dependencies]
tokio = { version = "1", features = ["full", "tracing"] }
tracing = "0.1"
console-subscriber = "0.1"
編譯與執行
- 安裝
tokio-console:cargo install tokio-console - 編譯並執行程式,需啟用 Tokio 的不穩定追蹤功能:
RUSTFLAGS="--cfg tokio_unstable" cargo run - 執行
tokio-console以觀察程式的執行狀況。
使用 tokio-console 的好處
- 即時觀察任務狀態和相關指標。
- 監控資源使用情況。
- 檢視個別任務的詳細資訊,包括 poll 時間直方圖。
8.10 處理非同步程式碼的測試
測試非同步程式碼時,有兩種主要策略:
- 為每個測試建立和銷毀非同步執行環境。
- 跨多個測試重複使用一個或多個非同步執行環境。
使用 #[tokio::test] 巨集
對於大多數情況,可以使用 #[tokio::test] 巨集來簡化測試的編寫。它會自動為測試函式建立非同步執行環境。
範例測試
#[tokio::test]
async fn sleep_test() {
let start_time = Instant::now();
sleep(Duration::from_secs(1)).await;
let end_time = Instant::now();
let seconds = end_time
.checked_duration_since(start_time)
.unwrap()
.as_secs();
assert_eq!(seconds, 1);
}
測試解析:
- 使用
#[tokio::test]巨集來標記測試函式,使其在非同步環境中執行。 - 測試
sleep_1s函式的功能,驗證是否正確睡眠了 1 秒。
手動管理執行環境
如果你需要,也可以手動管理非同步執行環境,但必須注意 Rust 的測試框架會平行執行測試,因此需要正確處理跨執行緒的情況。
使用 axum 框架建立 HTTP REST API 服務
在前面的章節中,已經學習了許多非同步 Rust 程式設計的相關知識。現在,將這些知識應用在實際的專案中,透過建立一個使用 axum 框架的 Web 服務來實踐所學的內容。本章節將會涵蓋以下幾個主題:
選擇適合的 Web 框架
在撰寫本文的過程中,非同步 Rust 的生態系統發生了相當大的變化,尤其是在可用於非同步 Rust 的工具和函式庫方面。Tokio 及其相關專案的進展令人印象深刻。
對於建立 Web 服務,強烈推薦使用 axum 框架。axum 是 Tokio 專案的一部分,它提供了簡潔且靈活的 API,並且大多數實作都不依賴巨集(macro)。axum 建構於 Tower 和 hyper 之上,分別提供了建立網路服務的抽象層和 HTTP 使用者端及伺服器實作。
axum 的最大優點在於它不會強加特定的模式或做法給開發者。當然,若要深入瞭解細節,學習 Tower 函式庫是必要的,但對於簡單的任務而言,這並不是必要的。axum 支援 tracing 和 metrics,只需要少量的設定即可啟用。
其他值得一提的框架
其他值得注意的框架包括 Rocket 和 Actix。Rocket 的目標是成為 Rust 版本的 Ruby on Rails,而 Actix 是最早的 Rust Web 框架之一。
Rocket 和 Actix 都大量使用了巨集來隱藏實作細節。相較之下,axum 的核心 API 不依賴巨集,這使得它更易於使用和理解。
值得注意的是,Rocket 和 Actix 都早於 Rust 的 Future 特性和 async/await 語法的穩定化,在那之前,使用巨集是必要的。最近的版本中,這兩個框架都努力減少了對巨集的依賴。
axum 的優勢
axum 的設計理念使其能夠快速建立 Web 服務,並且易於與其他函式庫和工具整合。它不強加特定的模式或做法,使得開發者能夠按照自己的方式進行開發。
使用 axum 的基本步驟
選擇 axum 作為 Web 框架:由於其簡潔、靈活且易於使用的特性,axum 成為建立 Web 服務的理想選擇。
設計 API:根據需求設計 RESTful API,包括定義路由、處理請求和回應等。
建模資料:根據業務邏輯定義資料模型,這通常涉及到資料函式庫的設計和操作。
實作 API:使用 axum 提供的方法實作 API,包括處理 CRUD(建立、讀取、更新、刪除)操作。
錯誤處理:優雅地處理錯誤,提供有用的錯誤訊息給客戶端。
程式碼解析:
此範例展示瞭如何使用 axum 建立一個簡單的 HTTP 伺服器。首先,定義了一個 User 結構體,用於表示使用者資料。然後,定義了兩個處理函式:create_user 用於處理建立使用者的請求,get_user 用於處理取得使用者資訊的請求。最後,在 main 函式中,建立了一個 Router 例項,並將其繫結到指定的位址和埠上,啟動伺服器。
重點說明:
- 使用
axum和serde等函式庫來建立和處理 JSON 資料。 - 定義了
User結構體,並實作了Serialize和Deserialize特性,以便於 JSON 的序列化和反序列化。 - 使用
Router定義了兩個路由,分別對應到create_user和get_user處理函式。 - 使用
tokio::main巨集來標記非同步主函式,並啟動 axum 伺服器。