返回文章列表

Rust 實作 Redis Key 過期機制

本文探討如何在 Rust 中實作類別似 Redis 的 Key 過期機制,包含引數解析、Storage 更新和測試驗證。文章首先解析 SET 命令的 NX、XX、EX、PX 等引數,接著更新 Storage 結構以支援 Key 過期,最後提供測試程式碼驗證實作的正確性。此外,文章還介紹瞭如何使用 Actor 模型和

後端開發 資料函式庫

在設計 Key-Value 儲存系統時,Key 的過期機制扮演著重要的角色。本文將逐步說明如何在 Rust 中實作此機制,並涵蓋引數解析、儲存更新以及測試驗證等關鍵步驟。程式碼範例將展示如何處理 SET 命令的各種引數,例如設定過期時間的 EX 和 PX 選項,並說明如何更新儲存結構以支援 Key 的過期。此外,我們也將探討如何使用 Actor 模型和 Tokio 處理非同步任務和訊息傳遞,以提升系統的效能和可維護性。

實作 Redis 式 Key 過期機制

在建構高效能的 Key-Value 儲存系統時,實作 Key 的過期機制是一項重要的任務。本章節將探討如何在 Rust 語言中實作類別似 Redis 的 Key 過期功能。

解析 SET 命令引數

首先,我們需要對 SET 命令的引數進行解析,以支援 NX、XX、EX 和 PX 等選項。以下程式碼展示瞭如何解析這些引數:

match arguments[idx].to_lowercase().as_str() {
    "ex" => {
        if let Some(KeyExpiry::PX(_)) = args.expiry {
            return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
        }
        if idx + 1 == arguments.len() {
            return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
        }
        let value: u64 = arguments[idx + 1].parse().map_err(|_| StorageError::CommandSyntaxError(arguments.join(" ")))?;
        args.expiry = Some(KeyExpiry::EX(value));
        idx += 2;
    }
    "px" => {
        if let Some(KeyExpiry::EX(_)) = args.expiry {
            return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
        }
        if idx + 1 == arguments.len() {
            return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
        }
        let value: u64 = arguments[idx + 1].parse().map_err(|_| StorageError::CommandSyntaxError(arguments.join(" ")))?;
        args.expiry = Some(KeyExpiry::PX(value));
        idx += 2;
    }
    _ => {}
}

程式碼解密:

  1. 引數檢查:首先檢查目前的引數是否為 “ex” 或 “px”,並驗證其後是否跟隨一個數值。
  2. 互斥檢查:確保 “ex” 和 “px” 不會同時出現,因為它們代表不同的時間單位(秒和毫秒)。
  3. 錯誤處理:若解析失敗或缺少必要引數,則傳回 StorageError::CommandSyntaxError
  4. 更新引數:成功解析後,將對應的過期時間存入 args.expiry

更新 Storage 以支援 Key 過期

接下來,我們需要更新 Storage 結構以支援 Key 的過期機制。主要涉及以下變更:

impl Storage {
    fn set(&mut self, key: String, value: String, args: SetArgs) -> StorageResult<String> {
        let mut data = StorageData::from(value);
        if let Some(value) = args.expiry {
            let expiry = match value {
                KeyExpiry::EX(v) => Duration::from_secs(v),
                KeyExpiry::PX(v) => Duration::from_millis(v),
            };
            data.add_expiry(expiry);
            self.expiry.insert(key.clone(), SystemTime::now().add(expiry));
        }
        self.store.insert(key.clone(), data);
        Ok(String::from("OK"))
    }

    fn get(&mut self, key: String) -> StorageResult<Option<String>> {
        if let Some(&expiry) = self.expiry.get(&key) {
            if SystemTime::now() >= expiry {
                self.expiry.remove(&key);
                self.store.remove(&key);
                return Ok(None);
            }
        }
        match self.store.get(&key) {
            Some(StorageData { value: StorageValue::String(v), .. }) => Ok(Some(v.clone())),
            None => Ok(None),
        }
    }
}

程式碼解密:

  1. 設定過期時間:在 set 方法中,若 args.expiry 存在,則根據其值(EX 或 PX)計算過期時間並更新 dataself.expiry
  2. 檢查過期:在 get 方法中,首先檢查該 Key 是否已過期,若已過期則移除並傳回 None
  3. 傳回結果:若 Key 仍有效,則傳回其對應的值。

測試與驗證

最後,為了確保實作的正確性,我們需要撰寫相關的測試程式碼:

#[test]
fn test_parse_ex() {
    let commands: Vec<String> = vec![String::from("EX"), String::from("100")];
    let args = parse_set_arguments(&commands).unwrap();
    assert_eq!(args.expiry, Some(KeyExpiry::EX(100)));
}

#[test]
fn test_set_value() {
    let mut storage: Storage = Storage::new();
    let output = storage.set(String::from("akey"), String::from("avalue"), SetArgs::new()).unwrap();
    assert_eq!(output, String::from("OK"));
    assert_eq!(storage.store.len(), 1);
}

程式碼解密:

  1. 引數解析測試:驗證 parse_set_arguments 對 “EX” 和數值的解析是否正確。
  2. 儲存測試:檢查 set 方法是否能正確儲存資料並傳回 “OK”。

第五章:使用 Actor 實作平行處理

在開發遠端鍵值儲存系統(如 Redis)的過程中,我們已經建立了一個基本的內部核心來管理資料,並實作了一個能夠路由命令的請求處理器。然而,當前的系統架構並不適合處理像鍵過期機制這樣的非同步任務。為瞭解決這個問題,我們將引入 Actor 模型來重構系統,使其能夠更好地支援平行處理。

Actor 模型

Actor 模型是一種簡單而強大的平行處理模式。在這個模型中,獨立的元件(稱為 Actor)透過訊息進行通訊。這種根據訊息的通訊機制有助於減少系統各部分之間的耦合度。

為什麼選擇 Actor 模型?

  1. 解耦合:透過訊息傳遞,Actor 之間的依賴關係被降到最低,使得系統更容易維護和擴充套件。
  2. 符合 Rust 的所有權模型:在 Rust 中,每個值都有一個所有者。Actor 模型確保每個資源都有一個明確的所有者(Actor),其他 Actor 透過訊息來存取這些資源,避免了使用 Arc<Mutex<T>> 這樣的同步原語。

系統架構

新的系統架構將包括以下主要元件:

  1. Server(伺服器):擁有儲存資源,並接收來自其他元件的訊息。
  2. Connection Handlers(連線處理器):負責接收客戶端的 RESP 命令,並將其轉換為伺服器能夠理解的 Request 訊息。

元件之間的互動

  • 伺服器與儲存資源:由於伺服器是單一實體,因此無需對儲存資源進行鎖定,直接進行方法呼叫即可。
  • 連線處理器與伺服器:連線處理器將 RESP 命令轉換為 Request 訊息,透過 Tokio 通道傳送給伺服器。

實作細節

Tokio 通道

Tokio 提供了一套非同步通道,用於在不同的任務(Task)之間傳遞訊息。這非常適合 Actor 模型,因為它允許不同的 Actor 透過訊息進行通訊。

// 使用 Tokio 的 mpsc(多生產者,單消費者)通道
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(100); // 建立一個容量為 100 的通道

    // 在另一個任務中傳送訊息
    tokio::spawn(async move {
        tx.send("Hello, world!".to_string()).await.unwrap();
    });

    // 在當前任務中接收訊息
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}

程式碼實作解析

建立 Server 和 Connection Handler

Server 將擁有儲存資源,並監聽來自 Connection Handler 的訊息。

// server.rs
use tokio::sync::mpsc;

pub struct Server {
    storage: Storage,
    receiver: mpsc::Receiver<Request>,
}

impl Server {
    pub async fn run(&mut self) {
        while let Some(request) = self.receiver.recv().await {
            // 處理請求
            self.handle_request(request).await;
        }
    }

    async fn handle_request(&mut self, request: Request) {
        // 根據請求型別進行處理
        match request {
            Request::Get(key) => self.get(key).await,
            Request::Set(key, value, args) => self.set(key, value, args).await,
            // 其他請求處理...
        }
    }
}

Connection Handler 負責將 RESP 命令轉換為 Request 訊息,並透過通道傳送給 Server。

// connection_handler.rs
use tokio::sync::mpsc;

pub struct ConnectionHandler {
    sender: mpsc::Sender<Request>,
}

impl ConnectionHandler {
    pub async fn handle_connection(&self, stream: TcpStream) {
        // 解析 RESP 命令並轉換為 Request
        let request = parse_resp(stream).await;
        self.sender.send(request).await.unwrap();
    }
}

內容解密:

  1. Server 結構體:包含 storagereceiver,用於接收來自 Connection Handler 的請求。
  2. run 方法:持續監聽並處理來自通道的請求。
  3. handle_request 方法:根據請求型別呼叫相應的處理邏輯。
  4. ConnectionHandler 結構體:包含 sender,用於向 Server 傳送請求。
  5. handle_connection 方法:解析客戶端的 RESP 命令,並將其轉換為 Request 傳送給 Server。

使用Actor模型實作平行處理

在探討如何利用Actor模型實作平行處理的過程中,我們首先需要了解Actor的基本概念及其在非同步任務管理中的角色。在本案例中,Actor本質上是非同步任務,而Tokio執行環境為其提供了執行的基礎。

Actor與非同步任務

Rust並非物件導向程式語言,因此Actor在本質上是以函式的形式存在。Tokio提供了建立和管理Actor的機制,尤其是透過tokio::spawn來建立新的非同步任務。

訊息傳遞機制

為了實作Actor之間的通訊,Tokio提供了兩種主要的通道(Channel)型別:一次性通道(One-shot Channel)和多生產者單消費者通道(MPSC)。這兩種通道型別允許在不同的非同步任務之間傳遞資料。

One-shot vs MPSC

  • 一次性通道適用於在生產者和消費者之間傳送單一訊息的場景。這種模式類別似於郵政投票中使用的預印信封,接收者可以透過該信封回傳檔案。
  • MPSC通道則允許建立更持久的通訊管道,支援多個生產者向單一消費者傳送訊息。在本文的後續章節中,MPSC通道將被廣泛使用。

建立Actor模型

為了將伺服器轉換為Actor模型,我們將按照以下步驟進行:

  1. 定義伺服器結果和錯誤型別:建立ServerErrorServerResult型別,以便在伺服器內部處理錯誤和結果。
  2. 建立請求和訊息型別:定義RequestConnectionMessage結構,用於在連線處理器和伺服器之間傳遞資料。
  3. 重構連線處理器:將連線處理器修改為使用select!迴圈,並利用請求型別處理客戶端的請求。
  4. 將伺服器轉換為Actor:使伺服器監聽來自連線處理器的訊息,並對其進行處理。
  5. 整合訊息處理:讓連線處理器使用訊息與伺服器進行通訊。

程式碼實作

// src/server_result.rs
use std::fmt;

#[derive(Debug, PartialEq)]
pub enum ServerError {
    CommandError,
}

impl fmt::Display for ServerError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ServerError::CommandError => write!(f, "Error while processing!"),
        }
    }
}

pub type ServerResult<T> = Result<T, ServerError>;

// src/connection.rs
use crate::request::Request;

#[derive(Debug)]
pub enum ConnectionMessage {
    Request(Request),
}

// src/request.rs
use crate::{resp::RESP, server_result::ServerMessage};
use tokio::sync::mpsc;

#[derive(Debug)]
pub struct Request {
    pub value: RESP,
    pub sender: mpsc::Sender<ServerMessage>,
}

// src/server_result.rs
use crate::resp::RESP;

#[derive(Debug)]
pub enum ServerMessage {
    Data(RESP),
    Error(ServerError),
}

內容解密:

  1. ServerErrorServerResult的定義:這兩個型別用於處理伺服器內部的錯誤和結果,提供了一種統一的方式來表示成功或失敗的操作結果。
  2. ConnectionMessageRequest的定義:這兩個結構體用於在連線處理器和伺服器之間傳遞資料。ConnectionMessage是一種列舉,可以攜帶不同型別的訊息,而Request則封裝了客戶端的請求和回應通道。
  3. ServerMessage的定義:這個列舉代表了伺服器的回應,可以是資料或錯誤。它提供了一種靈活的方式來表示伺服器的輸出。