返回文章列表

Redis核心命令GET和SET實作

本文探討如何在 Rust 中實作 Redis 的 GET 和 SET 命令,包含儲存管理、錯誤處理以及非同步資源分享等關鍵技術。文章首先介紹了 Storage 結構體的設計與實作,接著詳細說明瞭 GET 和 SET 命令的處理邏輯,並涵蓋了錯誤處理和測試案例。此外,文章還探討了在非同步環境下使用 Arc 和

資料函式庫 系統設計

在建構分散式快取系統時,理解 Redis 核心命令 GET 和 SET 的底層實作至關重要。本文以 Rust 語言示範如何建構一個簡化的 Redis 儲存引擎,並著重於 GET 和 SET 命令的實作細節。首先,我們設計了一個名為 Storage 的結構體,用於管理鍵值對的儲存,並使用 HashMap 作為底層資料結構。接著,我們分別實作了 set 和 get 方法,用於設定和取得鍵值對。為了符合 Redis 協定,我們進一步設計了 command_set 和 command_get 函式,作為外部命令的入口,並處理命令語法解析和錯誤回傳。考量到實際應用場景,我們還引入了 Arc 和 Mutex,確保在非同步多執行緒環境下安全地分享 Storage 資源,避免資料競爭和效能問題。最後,我們撰寫了單元測試,驗證程式碼的正確性和穩定性。

第三章:實作 GET 和 SET 命令

在本章中,我們將實作 Redis 中最核心的兩個命令:GET 和 SET。這些命令是鍵值儲存的核心,我們將探討如何在記憶體中實作儲存管理。

步驟 1:建立儲存管理器

為了有效地管理儲存,我們需要一個資料結構來簡化資料的操作。因此,我們引入了一個名為 Storage 的結構體,用於封裝儲存的資料以及系統所提供的低階命令。

建立 StorageResult

首先,我們定義了一個新的檔案 storage_result.rs,用於處理儲存相關的錯誤。

// src/storage_result.rs
use std::fmt;

#[derive(Debug)]
pub enum StorageError {
    IncorrectRequest,
    CommandNotAvailable(String),
}

impl fmt::Display for StorageError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            StorageError::IncorrectRequest => {
                write!(f, "客戶端發送了錯誤的請求!")
            }
            StorageError::CommandNotAvailable(c) => {
                write!(f, "請求的命令 {} 不可用!", c)
            }
        }
    }
}

pub type StorageResult<T> = Result<T, StorageError>;

建立 Storage

接下來,我們建立了 storage.rs 檔案,定義了 Storage 結構體。

// src/storage.rs
use std::collections::HashMap;

#[derive(Debug, PartialEq)]
pub enum StorageValue {
    String(String),
}

pub struct Storage {
    store: HashMap<String, StorageValue>,
}

impl Storage {
    pub fn new() -> Self {
        let store: HashMap<String, StorageValue> = HashMap::new();
        Self { store }
    }
}

並在 main.rs 中加入新的模組。

// src/main.rs
mod resp;
mod resp_result;
mod server;
mod storage;
mod storage_result;

將 PING 和 ECHO 命令移至 Storage

我們將 PING 和 ECHO 命令的處理邏輯移至 Storage 結構體中,並新增了 process_command 方法。

// src/storage.rs
impl Storage {
    pub fn process_command(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
        match command[0].to_lowercase().as_str() {
            "ping" => self.command_ping(&command),
            "echo" => self.command_echo(&command),
            _ => Err(StorageError::CommandNotAvailable(command[0].clone())),
        }
    }

    fn command_ping(&self, _command: &Vec<String>) -> StorageResult<RESP> {
        Ok(RESP::SimpleString("PONG".to_string()))
    }

    fn command_echo(&self, command: &Vec<String>) -> StorageResult<RESP> {
        Ok(RESP::BulkString(command[1].clone()))
    }
}

測試

我們為 Storage 模組新增了測試。

// src/storage.rs
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_create_new() {
        let storage: Storage = Storage::new();
        assert_eq!(storage.store.len(), 0);
    }

    #[test]
    fn test_command_ping() {
        let command = vec![String::from("ping")];
        let storage: Storage = Storage::new();
        let output = storage.command_ping(&command).unwrap();
        assert_eq!(output, RESP::SimpleString(String::from("PONG")));
    }

    #[test]
    fn test_command_echo() {
        let command = vec![String::from("echo"), String::from("42")];
        let storage: Storage = Storage::new();
        let output = storage.command_echo(&command).unwrap();
        assert_eq!(output, RESP::BulkString(String::from("42")));
    }
}

內容解密:

  1. 建立儲存管理器:引入 Storage 結構體來管理資料,提供 process_command 方法來處理命令。
  2. 定義錯誤處理:建立 StorageError 列舉來處理儲存相關的錯誤,並定義 StorageResult 型別。
  3. 實作 PING 和 ECHO 命令:將 PING 和 ECHO 命令的處理邏輯移至 Storage 結構體中。
  4. 新增測試:為 Storage 模組新增測試,以確保其正確性。

在非同步任務中分享資源:使用 ArcMutex

在開發非同步程式時,我們經常需要面臨如何在多個任務之間分享資源的問題。以本章的例子來說,我們需要將一個 Storage 例項傳遞給多個連線處理任務。在 Rust 中,變數的所有權機制使得這個問題變得複雜。

所有權問題

Rust 的所有權規則保證了記憶體安全,但也限制了我們直接將一個變數傳遞給多個任務。在以下範例中,編譯器會拒絕我們的程式碼:

let mut storage = Storage::new();
loop {
    match listener.accept().await {
        Ok((stream, _)) => {
            tokio::spawn(handle_connection(stream, storage));
        }
        Err(e) => {
            println!("Error: {}", e);
            continue;
        }
    }
}

編譯錯誤訊息指出 storage 的所有權在第一次迴圈迭代後就被移動到 handle_connection 函式中,導致後續迭代無法再使用它。

使用參照

一個可能的解決方案是傳遞 storage 的參照。然而,由於我們需要同時讀寫 storage,單純使用參照是不可行的。編譯器會報錯,因為在迴圈的不同迭代中,我們試圖多次借用 storage 為可變參照。

ArcMutex 的解決方案

為瞭解決上述問題,我們可以結合使用 std::sync::Arcstd::sync::MutexArc 提供了原子性的參照計數,允許我們在多個任務之間分享資源。而 Mutex 則提供了一種鎖定機制,確保在同一時間只有一個任務可以存取分享資源。

程式碼範例

use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6379").await?;
    let storage = Arc::new(Mutex::new(Storage::new()));
    loop {
        match listener.accept().await {
            Ok((stream, _)) => {
                tokio::spawn(handle_connection(stream, storage.clone()));
            }
            Err(e) => {
                println!("Error: {}", e);
                continue;
            }
        }
    }
}

async fn handle_connection(mut stream: TcpStream, storage: Arc<Mutex<Storage>>) {
    // ...
    let response = match process_request(request, storage.clone()) {
        Ok(v) => v,
        Err(e) => {
            eprintln!("Error parsing command: {}", e);
            return;
        }
    };
    // ...
}

pub fn process_request(request: RESP, storage: Arc<Mutex<Storage>>) -> StorageResult<RESP> {
    let elements = match request {
        RESP::Array(v) => v,
        _ => return Err(StorageError::IncorrectRequest),
    };
    let mut command = Vec::new();
    // ...
    let mut storage_lock = storage.lock().unwrap();
    // 使用 storage_lock 進行操作
}

內容解密:

  1. 使用 Arc 包裝 Mutex:我們使用 Arc::new(Mutex::new(Storage::new())) 建立一個可以被多個任務分享的 Storage 例項。
  2. 克隆 Arc:在迴圈中,我們克隆 storage 以增加參照計數,確保 Storage 例項在所有任務完成前保持有效。
  3. 鎖定 Mutex:在 process_request 函式中,我們透過 storage.lock().unwrap() 取得 Mutex 的鎖定,從而安全地存取 Storage 例項。

實作 GET 和 SET 命令

在實作 GET 和 SET 命令的過程中,我們首先需要了解這兩個命令的基本功能。GET 命令用於從儲存中檢索特定鍵的值,而 SET 命令則用於設定特定鍵的值。

儲存結構的修改

為了支援 GET 和 SET 命令,我們需要在 Storage 結構中新增兩個方法:getset。這兩個方法將直接與底層的 HashMap 互動,用於儲存和檢索值。

set 方法的實作

fn set(&mut self, key: String, value: String) -> StorageResult<String> {
    self.store.insert(key, StorageValue::String(value));
    Ok(String::from("OK"))
}

get 方法的實作

fn get(&self, key: String) -> StorageResult<Option<String>> {
    match self.store.get(&key) {
        Some(StorageValue::String(v)) => return Ok(Some(v.clone())),
        None => return Ok(None),
    }
}

內容解密:

  1. set 方法接收一個鍵值對,將其插入到 store 中,並傳回一個成功的訊息。
  2. get 方法根據提供的鍵,從 store 中檢索對應的值。如果找到,則傳回該值的副本;否則,傳回 None

命令包裝器

為了將 getset 方法暴露為命令,我們需要建立包裝器函式:command_setcommand_get。這些包裝器函式負責錯誤處理,例如檢查命令語法的正確性。

command_set 的實作

fn command_set(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
    if command.len() != 3 {
        return Err(StorageError::CommandSyntaxError(command.join(" ")));
    }
    let _ = self.set(command[1].clone(), command[2].clone());
    Ok(RESP::SimpleString(String::from("OK")))
}

command_get 的實作

fn command_get(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
    if command.len() != 2 {
        return Err(StorageError::CommandSyntaxError(command.join(" ")));
    }
    let output = self.get(command[1].clone());
    match output {
        Ok(Some(v)) => Ok(RESP::BulkString(v)),
        Ok(None) => Ok(RESP::Null),
        Err(_) => Err(StorageError::CommandInternalError(command.join(" "))),
    }
}

內容解密:

  1. command_set 檢查命令是否包含正確的引數數量(3),然後呼叫 set 方法設定值。
  2. command_get 檢查命令是否包含正確的引數數量(2),然後呼叫 get 方法檢索值,並根據結果傳回適當的 RESP 回應。

錯誤處理

為了支援新的命令,我們新增了兩個錯誤型別:CommandSyntaxErrorCommandInternalError

#[derive(Debug)]
pub enum StorageError {
    IncorrectRequest,
    CommandNotAvailable(String),
    CommandSyntaxError(String),
    CommandInternalError(String),
}

內容解密:

  1. CommandSyntaxError 用於表示命令語法錯誤。
  2. CommandInternalError 用於表示命令內部錯誤。

測試案例

為了驗證新功能的正確性,我們新增了多個測試案例,涵蓋了 setgetcommand_setcommand_get 的不同場景。

範例測試程式碼

#[test]
fn test_set_value() {
    let mut storage: Storage = Storage::new();
    let output = storage.set(String::from("akey"), String::from("avalue")).unwrap();
    assert_eq!(output, String::from("OK"));
    assert_eq!(storage.store.len(), 1);
}

#[test]
fn test_get_value() {
    let mut storage: Storage = Storage::new();
    storage.store.insert(String::from("akey"), StorageValue::String(String::from("avalue")));
    let result = storage.get(String::from("akey")).unwrap();
    assert_eq!(result, Some(String::from("avalue")));
}

內容解密:

  1. 測試案例驗證了 setget 方法的基本功能。
  2. 測試案例確保了在不同場景下,方法的行為符合預期。