Kafka AdminClient 是管理 Kafka 叢集不可或缺的工具,讓開發者能動態操作主題、組態及消費者群組。本文除了示範如何使用 AdminClient 建立和修改主題,更進一步剖析 Kafka 的內部運作,包含叢整合員管理、控制器角色及資料儲存機制。同時,也探討了新一代根據 Raft 協定的控制器 KRaft,它取代 ZooKeeper 的優勢,以及如何解決現有控制器面臨的後設資料一致性、控制器重啟瓶頸等問題。最後,文章詳述了 Kafka 的複製機制,包含長官者副本和從屬副本的角色、從從屬副本讀取的機制以及高水位標記的應用。此外,Broker 的請求處理流程、生產請求、取得請求和後設資料請求的處理方式,以及 I/O 執行緒和延遲回應機制,都有詳細的說明。
Kafka AdminClient 與內部機制解析
前言
Kafka AdminClient 是開發者與維運人員的重要工具,用於動態管理 Kafka 主題、組態及消費者群組。本文將探討 AdminClient 的使用方法,並解析 Kafka 的內部運作機制,包括叢整合員管理、控制器角色及資料儲存機制。
使用 AdminClient 進行主題管理
AdminClient 提供了一系列 API 用於管理 Kafka 主題。以下是一個使用 TopicCreator 類別動態建立主題的範例:
public class TopicCreator {
private AdminClient admin;
public TopicCreator(AdminClient admin) {
this.admin = admin;
}
public void maybeCreateTopic(String topicName) throws ExecutionException, InterruptedException {
// 檢查主題是否存在
if (!admin.listTopics().names().get().contains(topicName)) {
// 建立主題
NewTopic newTopic = new NewTopic(topicName, Optional.of(1), Optional.of((short) 1));
admin.createTopics(Collections.singleton(newTopic)).all().get();
// 修改主題組態
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
Collection<AlterConfigOp> configOp = new ArrayList<>();
configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
alterConf.put(configResource, configOp);
admin.incrementalAlterConfigs(alterConf).all().get();
}
}
}
程式碼解析:
- 檢查主題是否存在:使用
admin.listTopics()方法取得現有的主題列表,並檢查目標主題是否已存在。 - 建立主題:若主題不存在,則使用
NewTopic物件定義新主題的名稱、分割槽數及複製因子,並呼叫admin.createTopics()方法建立主題。 - 修改主題組態:建立主題後,使用
ConfigEntry物件修改主題的清理策略(cleanup policy),並呼叫admin.incrementalAlterConfigs()方法套用新的組態。
測試 AdminClient 相關功能
為了驗證 TopicCreator 類別的功能,我們可以使用 Mockito 框架模擬 AdminClient 並撰寫單元測試:
@Before
public void setUp() {
Node broker = new Node(0, "localhost", 9092);
this.admin = spy(new MockAdminClient(Collections.singletonList(broker), broker));
// 設定 mock 物件以避免未實作的方法丟擲例外
AlterConfigsResult emptyResult = mock(AlterConfigsResult.class);
doReturn(KafkaFuture.completedFuture(null)).when(emptyResult).all();
doReturn(emptyResult).when(admin).incrementalAlterConfigs(any());
}
@Test
public void testCreateTestTopic() throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(admin);
tc.maybeCreateTopic("test.is.a.test.topic");
verify(admin, times(1)).createTopics(any());
}
@Test
public void testNotTopic() throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(admin);
tc.maybeCreateTopic("not.a.test");
verify(admin, never()).createTopics(any());
}
測試解析:
- 模擬 AdminClient:使用 Mockito 的
spy方法包裝MockAdminClient,以便驗證TopicCreator的行為。 - 測試主題建立:驗證當主題名稱以 “test” 開頭時,
maybeCreateTopic方法是否正確呼叫createTopics方法。 - 測試非測試主題:驗證當主題名稱不以 “test” 開頭時,
maybeCreateTopic方法是否未呼叫createTopics方法。
Kafka 內部機制解析
叢整合員管理
Kafka 使用 Apache ZooKeeper 維護叢集中的活躍代理(broker)列表。每個代理在啟動時會在 ZooKeeper 中註冊一個臨時節點(ephemeral node),並在斷開連線時自動刪除該節點。這種機制確保了叢整合員的動態管理。
控制器角色
Kafka 控制器(controller)負責管理分割槽長官者的選舉。第一個啟動的代理成為控制器,並在 ZooKeeper 中建立一個臨時節點以宣告其控制器身份。其他代理會監聽控制器的狀態,以便在控制器故障時進行重新選舉。
Kafka 控制器機制與 KRaft 介紹
Kafka 叢集的控制器(Controller)是負責管理叢集後設資料(Metadata)和執行長官者選舉(Leader Election)的關鍵元件。控制器利用 ZooKeeper 的臨時節點(Ephemeral Node)來實作選舉和通知機制。
控制器選舉與通知機制
當 Kafka 叢集啟動時,第一個啟動的 Broker 會嘗試在 ZooKeeper 上建立 /controller 節點。其他 Broker 在啟動時也會嘗試建立該節點,但會收到「節點已存在」的異常,從而得知控制器已經存在。這些 Broker 會在 /controller 節點上設定監聽器(Watch),以便在控制器發生變化時收到通知。
此圖示說明瞭控制器選舉的過程。
內容解密:
- Broker 1 嘗試建立
/controller節點,成功成為控制器。 - Broker 2 嘗試建立
/controller節點,收到「節點已存在」異常,設定監聽器。 - 當控制器斷開連線或停止時,
/controller節點消失,其他 Broker 收到通知並嘗試成為新的控制器。
控制器失效與重新選舉
當控制器 Broker 停止或失去與 ZooKeeper 的連線時,/controller 節點會消失。其他 Broker 透過監聽器得知控制器已不存在,並嘗試建立新的 /controller 節點。第一個成功建立該節點的 Broker 成為新的控制器。
控制器 Epoch 號的作用
每次選舉出新的控制器時,ZooKeeper 會透過條件遞增操作(Conditional Increment Operation)為其分配一個新的、更高的控制器 Epoch 號。Broker 知道目前的控制器 Epoch 號,如果收到來自舊控制器的訊息(Epoch 號較低),則會忽略該訊息。這種機制有效地防止了「腦裂」(Split Brain)情況的發生,即兩個節點都認為自己是目前的控制器。
KRaft:Kafka 的新一代 Raft-based 控制器
自 2019 年起,Apache Kafka 社群開始開發根據 Raft 的控制器仲裁機制,稱為 KRaft。KRaft 的預覽版本隨 Apache Kafka 2.8 發布,而第一個生產版本將包含在 Apache Kafka 3.0 中。屆時,Kafka 叢集將能夠使用傳統的根據 ZooKeeper 的控制器或 KRaft。
為何替換原有的控制器?
儘管 Kafka 現有的控制器已經經過多次重寫,但仍存在多個已知問題促使社群決定替換它:
後設資料更新不一致:更新同步寫入 ZooKeeper,但非同步傳送給 Broker。此外,從 ZooKeeper 接收更新也是非同步的,這導致在某些邊緣情況下,後設資料在 Broker、控制器和 ZooKeeper 之間不一致。
控制器重啟瓶頸:每次控制器重啟,都需要從 ZooKeeper 中讀取所有 Broker 和分割槽的後設資料,然後將這些後設資料傳送給所有 Broker。隨著分割槽和 Broker 數量的增加,重啟控制器的時間變得更長。
內部架構問題:圍繞後設資料所有權的內部架構不夠完善,部分操作透過控制器執行,其他操作則由任意 Broker 或直接在 ZooKeeper 上執行。
ZooKeeper 的複雜性:ZooKeeper 是另一個分散式系統,需要一定的專業知識來操作。因此,使用 Kafka 的開發者需要學習兩個分散式系統,而不僅僅是一個。
Kafka 新一代控制器設計與複製機制解析
Kafka 的新控制器設計核心概念源自其自身的日誌基礎架構(log-based architecture),其中使用者將狀態表示為事件流。社群早已認可這種表示方式的優勢:多個消費者可以透過重播事件快速趕上最新狀態。日誌建立了事件之間的明確順序,確保消費者始終沿著單一時序推進。新控制器架構將相同的優勢帶到 Kafka 的中繼資料管理。
新控制器架構設計
在新架構中,控制器節點形成 Raft 共識組(Raft quorum),負責管理中繼資料事件的日誌。此日誌包含了叢集中繼資料每次變更的資訊。所有目前儲存在 ZooKeeper 中的資料,如主題、分割槽、ISR(in-sync replicas)、組態等,都將被儲存在此日誌中。
使用 Raft 演算法,控制器節點將在內部選舉出長官者(leader),無需依賴外部系統。中繼資料日誌的長官者被稱為活躍控制器(active controller)。活躍控制器處理來自代理伺服器(brokers)的所有 RPC 請求。從屬控制器(follower controllers)則複製寫入活躍控制器的資料,並在活躍控制器故障時作為熱備用。
由於控制器將持續追蹤最新狀態,控制器容錯移轉(failover)不再需要冗長的重新載入期,以將所有狀態轉移到新控制器。
中繼資料更新機制
新架構中,代理伺服器將透過新的 MetadataFetch API 從活躍控制器擷取更新,而不是由控制器推播更新給其他代理伺服器。與擷取請求類別似,代理伺服器將追蹤其擷取的最新中繼資料變更偏移量,並且只向控制器請求更新的內容。代理伺服器會將中繼資料持久化到磁碟,這使得即使擁有數百萬個分割槽,也能快速啟動。
註冊與狀態管理
代理伺服器將向控制器共識組註冊,並保持註冊狀態,直到管理員取消註冊。因此,一旦代理伺服器關閉,它將處於離線但仍註冊的狀態。未跟上最新中繼資料的代理伺服器將被隔離(fenced),無法提供客戶端請求。新隔離狀態防止客戶端向不再是長官者的代理伺服器生產事件,而該代理伺服器可能因過於過時而未察覺自己已不是長官者。
遷移與相容性
作為遷移到控制器共識組的一部分,所有先前涉及客戶端或代理伺服器直接與 ZooKeeper 通訊的操作,將透過控制器進行路由。這使得替換控制器時無需更改任何代理伺服器上的組態,從而實作無縫遷移。
新架構的整體設計在 KIP-500 中有詳細描述。Raft 協定如何適應 Kafka 的細節在 KIP-595 中說明。新的控制器共識組的詳細設計,包括控制器組態和與叢集中繼資料互動的新 CLI,在 KIP-631 中有詳細闡述。
Kafka 複製機制深度解析
複製是 Kafka 架構的核心。Kafka 經常被描述為「分散式、分割槽、複製的提交日誌服務」。複製至關重要,因為當個別節點不可避免地失敗時,它保證了可用性和永續性。
資料組織與複製
如前所述,Kafka 中的資料按主題組織。每個主題都被分割槽,每個分割槽可以有多個副本。這些副本儲存在代理伺服器上,每個代理伺服器通常儲存屬於不同主題和分割槽的數百甚至數千個副本。
副本型別
長官者副本(Leader Replica):每個分割槽都有一個指定的長官者副本。所有生產請求都透過長官者來保證一致性。客戶端可以從長官者副本或其從屬副本消費資料。
從屬副本(Follower Replica):分割槽中不是長官者的所有副本都稱為從屬副本。除非另有組態,從屬副本不提供客戶端請求;它們的主要任務是從長官者複製訊息並保持與長官者最新的訊息同步。如果分割槽的長官者副本當機,其中一個從屬副本將被提升為該分割槽的新長官者。
從從屬副本讀取
KIP-392 新增了從從屬副本讀取的功能。此功能的主要目標是透過允許客戶端從最近的同步副本消費,而不是從長官者副本消費,以減少網路流量成本。要使用此功能,消費者組態應包括 client.rack 以標識客戶端的位置。代理伺服器組態應包括 replica.selector.class。此組態預設為 LeaderSelector(始終從長官者消費),但可以設定為 RackAwareReplicaSelector,它將選擇一個位於具有與客戶端 client.rack 相比對的 rack.id 組態的代理伺服器上的副本。我們也可以透過實作 ReplicaSelector 介面並使用自己的實作來實作自己的副本選擇邏輯。
內容解密:
client.rack組態用於標識客戶端的位置,以幫助選擇最近的副本。replica.selector.class組態用於選擇消費資料的副本策略,預設是始終從長官者消費。- 自定義
ReplicaSelector介面允許開發者根據需求實作自定義的副本選擇邏輯。
複製協定擴充套件
複製協定被擴充套件以保證只有已提交的訊息才會在從從屬副本消費時可用。這意味著即使從從屬副本擷取,也能獲得相同的可靠性保證。為了提供此保證,所有副本都需要知道哪些訊息已被長官者提交。為此,長官者在其傳送給從屬副本的資料中包含目前的高水位標記(latest committed offset)。高水位標記的傳播引入了小延遲,這意味著資料在長官者處可供消費的時間早於在從屬副本上可供消費。記住這個額外的延遲非常重要,因為嘗試透過從長官者副本消費來減少消費者延遲是很誘人的。
內容解密:
- 高水位標記(high-water mark)代表目前已提交的最新偏移量。
- 長官者在傳送給從屬副本的資料中包含高水位標記,以確保所有副本都知道哪些訊息已提交。
- 高水位標記的傳播會引入一定的延遲,影響從從屬副本消費的時效性。
Kafka Broker 請求處理機制詳解
Kafka Broker 的核心功能之一是處理來自客戶端、副本和控制器的請求。Kafka 使用二進位制協定(根據 TCP)定義請求的格式以及 Broker 如何回應這些請求。
請求處理流程
- 客戶端發起連線和請求:客戶端主動發起連線並傳送請求,Broker 處理請求並回應。
- 請求處理執行緒:每個 Broker 監聽的埠上執行一個接收執行緒(acceptor thread),該執行緒建立連線並將其交給處理器執行緒(processor thread)進行處理。
- 網路執行緒:處理器執行緒負責從客戶端連線中取得請求,將其放入請求佇列(request queue),並從回應佇列(response queue)中取得回應並發送回客戶端。
請求型別
- 生產請求(Produce Requests):由生產者傳送,包含客戶端寫入 Kafka 的訊息。
- 取得請求(Fetch Requests):由消費者和後續副本傳送,用於從 Kafka 讀取訊息。
- 管理請求(Admin Requests):由管理客戶端傳送,用於執行後設資料操作,如建立和刪除主題。
生產請求與取得請求的處理
這兩種請求都必須傳送到分割槽的主副本。如果 Broker 收到特定分割槽的生產請求,但該分割槽的主副本位於不同的 Broker 上,客戶端將收到「Not a Leader for Partition」的錯誤回應。同樣的錯誤也會發生在取得請求上,如果請求到達的 Broker 不是該分割槽的主副本。
客戶端如何知道將請求傳送到哪裡?
Kafka 客戶端使用另一種型別的請求,稱為後設資料請求(metadata request),其中包含客戶端感興趣的主題列表。伺服器的回應指定了主題中的分割槽、每個分割槽的副本以及哪個副本是主副本。
請求處理內部機制
- I/O 執行緒:也稱為請求處理程式執行緒(request handler threads),負責從請求佇列中取出請求並處理它們。
- 延遲回應:在某些情況下,客戶端的回應需要被延遲,例如消費者只有在有可用資料時才會收到回應,或者管理客戶端在主題刪除進行中時才會收到刪除主題請求的回應。這些延遲的回應會被儲存在一個稱為「煉獄」(purgatory)的區域,直到它們可以被完成。
圖示說明:Kafka 請求處理流程
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Kafka內部機制與AdminClient實務解析
package "系統架構" {
package "前端層" {
component [使用者介面] as ui
component [API 客戶端] as client
}
package "後端層" {
component [API 服務] as api
component [業務邏輯] as logic
component [資料存取] as dao
}
package "資料層" {
database [主資料庫] as db
database [快取] as cache
}
}
ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取
note right of api
RESTful API
或 GraphQL
end note
@enduml
圖示說明
此圖示呈現了 Kafka 請求處理的內部流程,包括客戶端發起請求、Broker 接收和處理請求、以及最終回應客戶端的整個過程。圖中清晰地展示了請求如何透過不同的執行緒和佇列進行處理,最終完成客戶端的請求。