在建構分散式快取系統時,理解 Redis 核心命令 GET 和 SET 的底層實作至關重要。本文以 Rust 語言示範如何建構一個簡化的 Redis 儲存引擎,並著重於 GET 和 SET 命令的實作細節。首先,我們設計了一個名為 Storage 的結構體,用於管理鍵值對的儲存,並使用 HashMap 作為底層資料結構。接著,我們分別實作了 set 和 get 方法,用於設定和取得鍵值對。為了符合 Redis 協定,我們進一步設計了 command_set 和 command_get 函式,作為外部命令的入口,並處理命令語法解析和錯誤回傳。考量到實際應用場景,我們還引入了 Arc 和 Mutex,確保在非同步多執行緒環境下安全地分享 Storage 資源,避免資料競爭和效能問題。最後,我們撰寫了單元測試,驗證程式碼的正確性和穩定性。
第三章:實作 GET 和 SET 命令
在本章中,我們將實作 Redis 中最核心的兩個命令:GET 和 SET。這些命令是鍵值儲存的核心,我們將探討如何在記憶體中實作儲存管理。
步驟 1:建立儲存管理器
為了有效地管理儲存,我們需要一個資料結構來簡化資料的操作。因此,我們引入了一個名為 Storage 的結構體,用於封裝儲存的資料以及系統所提供的低階命令。
建立 StorageResult
首先,我們定義了一個新的檔案 storage_result.rs,用於處理儲存相關的錯誤。
// src/storage_result.rs
use std::fmt;
#[derive(Debug)]
pub enum StorageError {
IncorrectRequest,
CommandNotAvailable(String),
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StorageError::IncorrectRequest => {
write!(f, "客戶端發送了錯誤的請求!")
}
StorageError::CommandNotAvailable(c) => {
write!(f, "請求的命令 {} 不可用!", c)
}
}
}
}
pub type StorageResult<T> = Result<T, StorageError>;
建立 Storage
接下來,我們建立了 storage.rs 檔案,定義了 Storage 結構體。
// src/storage.rs
use std::collections::HashMap;
#[derive(Debug, PartialEq)]
pub enum StorageValue {
String(String),
}
pub struct Storage {
store: HashMap<String, StorageValue>,
}
impl Storage {
pub fn new() -> Self {
let store: HashMap<String, StorageValue> = HashMap::new();
Self { store }
}
}
並在 main.rs 中加入新的模組。
// src/main.rs
mod resp;
mod resp_result;
mod server;
mod storage;
mod storage_result;
將 PING 和 ECHO 命令移至 Storage
我們將 PING 和 ECHO 命令的處理邏輯移至 Storage 結構體中,並新增了 process_command 方法。
// src/storage.rs
impl Storage {
pub fn process_command(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
match command[0].to_lowercase().as_str() {
"ping" => self.command_ping(&command),
"echo" => self.command_echo(&command),
_ => Err(StorageError::CommandNotAvailable(command[0].clone())),
}
}
fn command_ping(&self, _command: &Vec<String>) -> StorageResult<RESP> {
Ok(RESP::SimpleString("PONG".to_string()))
}
fn command_echo(&self, command: &Vec<String>) -> StorageResult<RESP> {
Ok(RESP::BulkString(command[1].clone()))
}
}
測試
我們為 Storage 模組新增了測試。
// src/storage.rs
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_new() {
let storage: Storage = Storage::new();
assert_eq!(storage.store.len(), 0);
}
#[test]
fn test_command_ping() {
let command = vec![String::from("ping")];
let storage: Storage = Storage::new();
let output = storage.command_ping(&command).unwrap();
assert_eq!(output, RESP::SimpleString(String::from("PONG")));
}
#[test]
fn test_command_echo() {
let command = vec![String::from("echo"), String::from("42")];
let storage: Storage = Storage::new();
let output = storage.command_echo(&command).unwrap();
assert_eq!(output, RESP::BulkString(String::from("42")));
}
}
內容解密:
- 建立儲存管理器:引入
Storage結構體來管理資料,提供process_command方法來處理命令。 - 定義錯誤處理:建立
StorageError列舉來處理儲存相關的錯誤,並定義StorageResult型別。 - 實作 PING 和 ECHO 命令:將 PING 和 ECHO 命令的處理邏輯移至
Storage結構體中。 - 新增測試:為
Storage模組新增測試,以確保其正確性。
在非同步任務中分享資源:使用 Arc 與 Mutex
在開發非同步程式時,我們經常需要面臨如何在多個任務之間分享資源的問題。以本章的例子來說,我們需要將一個 Storage 例項傳遞給多個連線處理任務。在 Rust 中,變數的所有權機制使得這個問題變得複雜。
所有權問題
Rust 的所有權規則保證了記憶體安全,但也限制了我們直接將一個變數傳遞給多個任務。在以下範例中,編譯器會拒絕我們的程式碼:
let mut storage = Storage::new();
loop {
match listener.accept().await {
Ok((stream, _)) => {
tokio::spawn(handle_connection(stream, storage));
}
Err(e) => {
println!("Error: {}", e);
continue;
}
}
}
編譯錯誤訊息指出 storage 的所有權在第一次迴圈迭代後就被移動到 handle_connection 函式中,導致後續迭代無法再使用它。
使用參照
一個可能的解決方案是傳遞 storage 的參照。然而,由於我們需要同時讀寫 storage,單純使用參照是不可行的。編譯器會報錯,因為在迴圈的不同迭代中,我們試圖多次借用 storage 為可變參照。
Arc 與 Mutex 的解決方案
為瞭解決上述問題,我們可以結合使用 std::sync::Arc 和 std::sync::Mutex。Arc 提供了原子性的參照計數,允許我們在多個任務之間分享資源。而 Mutex 則提供了一種鎖定機制,確保在同一時間只有一個任務可以存取分享資源。
程式碼範例
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6379").await?;
let storage = Arc::new(Mutex::new(Storage::new()));
loop {
match listener.accept().await {
Ok((stream, _)) => {
tokio::spawn(handle_connection(stream, storage.clone()));
}
Err(e) => {
println!("Error: {}", e);
continue;
}
}
}
}
async fn handle_connection(mut stream: TcpStream, storage: Arc<Mutex<Storage>>) {
// ...
let response = match process_request(request, storage.clone()) {
Ok(v) => v,
Err(e) => {
eprintln!("Error parsing command: {}", e);
return;
}
};
// ...
}
pub fn process_request(request: RESP, storage: Arc<Mutex<Storage>>) -> StorageResult<RESP> {
let elements = match request {
RESP::Array(v) => v,
_ => return Err(StorageError::IncorrectRequest),
};
let mut command = Vec::new();
// ...
let mut storage_lock = storage.lock().unwrap();
// 使用 storage_lock 進行操作
}
內容解密:
- 使用
Arc包裝Mutex:我們使用Arc::new(Mutex::new(Storage::new()))建立一個可以被多個任務分享的Storage例項。 - 克隆
Arc:在迴圈中,我們克隆storage以增加參照計數,確保Storage例項在所有任務完成前保持有效。 - 鎖定
Mutex:在process_request函式中,我們透過storage.lock().unwrap()取得Mutex的鎖定,從而安全地存取Storage例項。
實作 GET 和 SET 命令
在實作 GET 和 SET 命令的過程中,我們首先需要了解這兩個命令的基本功能。GET 命令用於從儲存中檢索特定鍵的值,而 SET 命令則用於設定特定鍵的值。
儲存結構的修改
為了支援 GET 和 SET 命令,我們需要在 Storage 結構中新增兩個方法:get 和 set。這兩個方法將直接與底層的 HashMap 互動,用於儲存和檢索值。
set 方法的實作
fn set(&mut self, key: String, value: String) -> StorageResult<String> {
self.store.insert(key, StorageValue::String(value));
Ok(String::from("OK"))
}
get 方法的實作
fn get(&self, key: String) -> StorageResult<Option<String>> {
match self.store.get(&key) {
Some(StorageValue::String(v)) => return Ok(Some(v.clone())),
None => return Ok(None),
}
}
內容解密:
set方法接收一個鍵值對,將其插入到store中,並傳回一個成功的訊息。get方法根據提供的鍵,從store中檢索對應的值。如果找到,則傳回該值的副本;否則,傳回None。
命令包裝器
為了將 get 和 set 方法暴露為命令,我們需要建立包裝器函式:command_set 和 command_get。這些包裝器函式負責錯誤處理,例如檢查命令語法的正確性。
command_set 的實作
fn command_set(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
if command.len() != 3 {
return Err(StorageError::CommandSyntaxError(command.join(" ")));
}
let _ = self.set(command[1].clone(), command[2].clone());
Ok(RESP::SimpleString(String::from("OK")))
}
command_get 的實作
fn command_get(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
if command.len() != 2 {
return Err(StorageError::CommandSyntaxError(command.join(" ")));
}
let output = self.get(command[1].clone());
match output {
Ok(Some(v)) => Ok(RESP::BulkString(v)),
Ok(None) => Ok(RESP::Null),
Err(_) => Err(StorageError::CommandInternalError(command.join(" "))),
}
}
內容解密:
command_set檢查命令是否包含正確的引數數量(3),然後呼叫set方法設定值。command_get檢查命令是否包含正確的引數數量(2),然後呼叫get方法檢索值,並根據結果傳回適當的 RESP 回應。
錯誤處理
為了支援新的命令,我們新增了兩個錯誤型別:CommandSyntaxError 和 CommandInternalError。
#[derive(Debug)]
pub enum StorageError {
IncorrectRequest,
CommandNotAvailable(String),
CommandSyntaxError(String),
CommandInternalError(String),
}
內容解密:
CommandSyntaxError用於表示命令語法錯誤。CommandInternalError用於表示命令內部錯誤。
測試案例
為了驗證新功能的正確性,我們新增了多個測試案例,涵蓋了 set、get、command_set 和 command_get 的不同場景。
範例測試程式碼
#[test]
fn test_set_value() {
let mut storage: Storage = Storage::new();
let output = storage.set(String::from("akey"), String::from("avalue")).unwrap();
assert_eq!(output, String::from("OK"));
assert_eq!(storage.store.len(), 1);
}
#[test]
fn test_get_value() {
let mut storage: Storage = Storage::new();
storage.store.insert(String::from("akey"), StorageValue::String(String::from("avalue")));
let result = storage.get(String::from("akey")).unwrap();
assert_eq!(result, Some(String::from("avalue")));
}
內容解密:
- 測試案例驗證了
set和get方法的基本功能。 - 測試案例確保了在不同場景下,方法的行為符合預期。