Kafka Streams 提供了豐富的狀態化運算元,例如 join、aggregate 和 windowedBy,讓開發者得以捕捉和記憶事件資訊,進而執行更進階的串流處理操作。狀態化處理能識別事件串流中的模式、執行聚合操作,並透過連線豐富資料。Kafka Streams 提供了 KTable 和 GlobalKTable 等抽象概念來表示本地和全域狀態,並可透過互動式查詢功能公開狀態。理解事實和行為的差異是理解狀態化處理的關鍵,事實代表單一事件,而行為則是由多個事實累積而成。透過狀態化處理,我們可以捕捉和分析行為,進而產生更大的商業價值。
有狀態的串流處理
在前一章中,我們學習瞭如何使用 KStream 抽象和 Kafka Streams 中豐富的無狀態運算元來執行無狀態的記錄串流轉換。由於無狀態轉換不需要任何對先前事件的記憶,因此它們易於理解和使用。我們將每個事件視為不可變的事實,並獨立於其他事件進行處理。
然而,Kafka Streams 也使我們能夠捕捉和記住有關我們消費的事件的資訊。捕捉的資訊或狀態使我們能夠執行更進階的串流處理操作,包括連線和聚合資料。在本章中,我們將詳細探討有狀態的串流處理。我們將涵蓋的一些主題包括:
- 有狀態串流處理的好處
- 事實和行為之間的差異
- Kafka Streams 中有哪些有狀態運算元可用
- 如何在 Kafka Streams 中捕捉和查詢狀態
- 如何使用
KTable抽象來表示本地、分割槽狀態 - 如何使用
GlobalKTable抽象來表示全域、複製狀態 - 如何執行有狀態操作,包括連線和聚合資料
- 如何使用互動式查詢來公開狀態
與前一章一樣,我們將透過根據教程的方法來探討這些概念。本章的教程靈感來自電子遊戲產業,我們將建立一個需要使用許多 Kafka Streams 有狀態運算元的即時排行榜。此外,我們將花大量時間討論連線,因為這是有狀態應用程式中最常見的資料豐富形式之一。但在開始教程之前,讓我們先看看有狀態處理的一些好處。
有狀態處理的好處
有狀態處理幫助我們瞭解事件之間的關係,並利用這些關係進行更進階的串流處理使用案例。當我們能夠瞭解一個事件與其他事件的關係時,我們可以:
- 識別事件串流中的模式和行為
- 執行聚合操作
- 使用連線以更複雜的方式豐富資料
有狀態串流處理的另一個好處是,它為我們提供了一個額外的抽象層來表示資料。透過重放事件串流中的每個事件,並將每個鍵的最新狀態儲存在嵌入式鍵值儲存中,我們可以建立一個連續且無界的記錄串流的時間點表示。這些時間點表示或快照被稱為表,Kafka Streams 包含不同型別的表抽象,我們將在本章中瞭解。
表不僅是有狀態串流處理的核心,而且當它們被物化時,也可以被查詢。這種查詢即時快照快速移動事件串流的能力使 Kafka Streams 成為一個串流關聯處理平台,並且使我們能夠建立低延遲、事件驅動的微服務。
最後,有狀態串流處理使我們能夠使用更複雜的心智模型來瞭解我們的資料。Neil Avery 提出了一個特別有趣的觀點,他在討論事件優先思維時闡述了事實和行為之間的差異:
一個事件代表一個事實,某件事情發生了;它是不可變的…… 無狀態應用程式,如我們在前一章中討論的那樣,是事實驅動的。每個事件都被視為一個獨立且原子性的事實,可以使用不可變語義(想像一下永無止境的串流中的插入操作)進行處理,然後被遺忘。
然而,除了利用無狀態運算元來過濾、分支、合併和轉換事實之外,如果我們學會如何使用有狀態運算元來建模行為,我們就可以提出更進階的問題。那麼什麼是行為?根據 Neil 的說法:
事實的累積捕捉了行為。
你看,事件(或事實)在現實世界中很少孤立地發生。一切都是相互關聯的,透過捕捉和記住事實,我們可以開始瞭解它們的含義。這可以透過瞭解事件在其更大的歷史背景下,或透過檢視已被我們的應用程式捕捉和儲存的其他相關事件來實作。
一個流行的例子是購物車放棄,這是一種由多個事實組成的行為:使用者將一或多個物品新增到購物車,然後會話被手動(例如,使用者登出)或自動(例如,由於長時間不活動)終止。獨立處理任一事實告訴我們關於使用者在結帳流程中的位置知之甚少。然而,收集、記住和分析每個事實(這是有狀態處理所啟用的)使我們能夠識別和回應這種行為,並提供比將世界視為一系列無關事件更大的商業價值。
既然我們瞭解了有狀態串流處理的好處,以及事實和行為之間的差異,那麼就讓我們來預覽一下 Kafka Streams 中的有狀態運算元。
Kafka Streams 中的有狀態運算元預覽
Kafka Streams 包含幾個有狀態運算元,我們可以在處理器拓撲中使用。表 4-1 包含了我們將在本文中使用的幾個運算元的概述。
表 4-1. 有狀態運算元及其用途
| 使用案例 | 用途 | 運算元 |
|---|---|---|
| 連線資料 | 用其他串流或表中捕捉的額外資訊或上下文豐富事件 | join (內連線)、leftJoin、outerJoin |
| 聚合資料 | 計算相關事件的持續更新數學或組合轉換 | aggregate、count、reduce |
| 視窗資料 | 將具有接近時間接近性的事件分組 | windowedBy |
// 使用範例
KStream<String, String> stream = builder.stream("input-topic");
// 使用 join 進行內連線
stream.join(
otherStream,
(value1, value2) -> value1 + value2,
Joined.with(
Serdes.String(),
Serdes.String(),
Serdes.String()
)
)
#### 內容解密:
1. `stream.join(otherStream, ...)`:這行程式碼對兩個 `KStream` 物件進行內連線操作,將兩個串流中具有相同鍵的記錄合併。
2. `(value1, value2) -> value1 + value2`:這是一個 `ValueJoiner`,定義瞭如何合併兩個匹配記錄的值。在此範例中,它簡單地將兩個值字串連線起來。
3. `Joined.with(Serdes.String(), Serdes.String(), Serdes.String())`:這指定了鍵和值的序列化/反序列化器(Serdes)。在此範例中,假設所有鍵和值都是字串。
狀態儲存(State Stores)在 Kafka Streams 中的應用
在 Kafka Streams 中,狀態儲存扮演著至關重要的角色,尤其是在處理狀態化操作(stateful operations)時。狀態化操作需要應用程式記住之前處理過的事件,這些記憶的資料或狀態需要被有效地儲存和檢索。Kafka Streams 提供了一種名為狀態儲存(state store)的抽象概念來滿足這些需求。
狀態儲存的需求
當使用 Kafka Streams 進行狀態化操作,如計數(count)、聚合(aggregate)、連線(join)等時,應用程式需要維護一些歷史資料或狀態。例如,一個計算錯誤日誌數量的應用程式需要為每個鍵值維護一個滾動計數(rolling count),每當新的錯誤日誌被消費時,這個計數就會被更新。這個計數代表了記錄的歷史上下文,並與記錄鍵一起成為應用程式狀態的一部分。
狀態儲存的實作
Kafka Streams 提供了多種狀態儲存實作和組態選擇,每種都有其特定的優勢、權衡和適用場景。當在 Kafka Streams 應用程式中使用狀態化運算子時,考慮使用哪種型別的狀態儲存以及如何根據最佳化標準(如高吞吐量、操作簡便性、故障快速還原等)組態狀態儲存是非常重要的。在大多數情況下,如果沒有明確指定狀態儲存型別或覆寫其組態屬性,Kafka Streams 會選擇一個合理的預設值。
狀態儲存的共同特性
Kafka Streams 中預設的狀態儲存實作具有一些共同的特性,包括:
- 嵌入式(Embedded):預設的狀態儲存實作被嵌入到 Kafka Streams 應用程式的任務層級中。這種設計避免了使用外部儲存引擎時可能帶來的網路延遲和處理瓶頸。
- 使用 RocksDB:所有預設的狀態儲存都利用 RocksDB 作為底層儲存。RocksDB 是一個快速的嵌入式鍵值儲存,最初由 Facebook 開發。它支援任意位元組流來儲存鍵值對,這與 Kafka 的設計理念相吻合。
狀態儲存的類別
Kafka Streams 中的狀態儲存大致可以分為兩類別:永續性(persistent)儲存和記憶體內(in-memory)儲存。永續性儲存能夠在應用程式重新啟動或故障後還原狀態,而記憶體內儲存則不具備這種能力。
永續性與記憶體內儲存
- 永續性儲存:這類別儲存能夠將狀態持久化到磁碟上,即使在應用程式重新啟動或發生故障的情況下,也能夠還原之前的狀態。
- 記憶體內儲存:這類別儲存將狀態儲存在記憶體中,提供更快的存取速度,但不具備永續性。
程式碼範例與解析
以下是一個簡單的 Kafka Streams 程式碼範例,展示瞭如何使用狀態儲存來實作一個簡單的計數器:
// 建立 Kafka Streams 建構器
StreamsBuilder builder = new StreamsBuilder();
// 定義源頭 Topic
String sourceTopic = "source-topic";
// 定義流處理
KStream<String, String> stream = builder.stream(sourceTopic);
// 使用狀態儲存進行計數
KTable<String, Long> countTable = stream
.groupBy((key, value) -> key)
.count();
// 將結果寫入目標 Topic
countTable.toStream().to("count-topic");
// 建立 Kafka Streams 組態
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "count-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 建立 Kafka Streams 例項
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 啟動 Kafka Streams
streams.start();
內容解密:
- 建立 Kafka Streams 建構器:首先,我們建立了一個
StreamsBuilder例項,用於定義流處理拓撲。 - 定義源頭 Topic:指定了源頭 Topic 的名稱。
- 定義流處理:使用
builder.stream()方法建立了一個KStream物件,表示從源頭 Topic 讀取的資料流。 - 使用狀態儲存進行計數:透過
groupBy()方法對資料流進行分組,然後使用count()方法進行計數。這裡使用了狀態儲存來維護每個鍵值的計數。 - 將結果寫入目標 Topic:將計數結果轉換為
KStream後,寫入目標 Topic。 - 建立 Kafka Streams 組態:設定了 Kafka Streams 的應用程式 ID 和引導伺服器地址。
- 建立 Kafka Streams 例項:使用
StreamsBuilder建立的拓撲結構和組態,建立了一個KafkaStreams例項。 - 啟動 Kafka Streams:最後,啟動了 Kafka Streams 例項,開始處理資料流。
這個範例展示瞭如何在 Kafka Streams 中使用狀態儲存來實作一個簡單的計數器。透過這種方式,可以有效地處理和分析資料流。
Kafka Streams 中的狀態儲存:深入解析
Kafka Streams 提供多種狀態儲存(State Stores)機制,以支援其流處理應用程式。這些狀態儲存機制具有高度可組態性,並且支援多種存取模式和查詢模式。
狀態儲存的特點
Kafka Streams 中的狀態儲存具有以下特點:
- 多種存取模式:狀態儲存支援多種存取模式,包括讀取和寫入。處理器拓撲(Processor Topologies)需要讀取和寫入狀態儲存,而使用 Kafka Streams 的互動式查詢功能(Interactive Queries)時,使用者端只需要讀取狀態儲存。
- 容錯性:預設情況下,狀態儲存由 Kafka 的變更日誌主題(Changelog Topics)支援。在發生故障時,狀態儲存可以透過重放變更日誌主題中的事件來重建應用程式的狀態。
- 根據鍵的操作:狀態儲存的操作是根據鍵的。記錄的鍵定義了當前事件與其他事件之間的關係。底層資料結構會根據所使用的狀態儲存型別而有所不同,但每個實作都可以被視為某種形式的鍵值儲存。
永續性與記憶體內儲存
Kafka Streams 中的狀態儲存可以分為兩大類別:永續性儲存和記憶體內儲存。
- 永續性儲存:永續性儲存會非同步地將狀態重新整理到磁碟(到可組態的狀態目錄)。這有兩個主要好處:
- 狀態可以超過可用記憶體的大小。
- 在發生故障時,永續性儲存可以比記憶體內儲存更快地還原。
- 記憶體內儲存:記憶體內儲存將資料儲存在 RAM 中。雖然記憶體內儲存可以提供更好的效能,但其容量受限於可用記憶體的大小,並且在發生故障時需要更長的時間來還原。
RocksDB:永續性儲存的實作
Kafka Streams 預設使用 RocksDB 作為永續性儲存的實作。RocksDB 是一種高效能的鍵值儲存系統,支援非同步重新整理到磁碟。透過使用 RocksDB,Kafka Streams 可以將狀態儲存擴充套件到超出可用記憶體的大小,並且可以在發生故障時快速還原。
// 設定 RocksDB 的狀態目錄
Properties props = new Properties();
props.put(StreamsConfig.STATE_DIR_CONFIG, "/path/to/state/dir");
記憶體內儲存的限制
雖然記憶體內儲存可以提供更好的效能,但其容量受限於可用記憶體的大小。在發生故障時,記憶體內儲存需要重放整個主題來重建狀態,這可能會導致較長的還原時間。
內容解密:
本段落詳細介紹了 Kafka Streams 中的狀態儲存機制,包括其特點、永續性與記憶體內儲存的區別,以及 RocksDB 的實作。透過瞭解這些內容,開發者可以更好地使用 Kafka Streams 構建流處理應用程式。
本章重點
- Kafka Streams 中的狀態儲存機制具有高度可組態性。
- 狀態儲存支援多種存取模式和查詢模式。
- 永續性儲存和記憶體內儲存是兩種主要的狀態儲存型別。
- RocksDB 是 Kafka Streams 預設的永續性儲存實作。
隨著流處理技術的不斷發展,Kafka Streams 中的狀態儲存機制也將繼續演進。未來,我們可以期待看到更多高效、可靠的狀態儲存機制的出現,以滿足日益增長的流處理需求。
內容解密:
本段落總結了本章的重點,並對未來進行了展望。透過瞭解這些內容,開發者可以更好地把握 Kafka Streams 中的狀態儲存機制的發展趨勢。
實作視訊遊戲排行榜:狀態化處理與Kafka Streams
在上一章中,我們探討了Kafka Streams的基本概念與無狀態處理的應用。本章將進一步深入狀態化處理的世界,透過實作一個視訊遊戲排行榜來說明如何利用Kafka Streams進行複雜的資料處理。
視訊遊戲排行榜的架構設計
我們的目標是建立一個即時的視訊遊戲排行榜,能夠處理來自不同來源的資料並提供即時的查詢服務。整個系統的架構如圖4-1所示。
圖4-1:視訊遊戲排行榜的拓撲設計
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 圖4-1:視訊遊戲排行榜的拓撲設計
rectangle "未鍵值資料" as node1
rectangle "已加入玩家資料的評分事件" as node2
rectangle "已加入產品資料的評分事件" as node3
rectangle "分組後的資料" as node4
rectangle "前三名最高分數" as node5
rectangle "提供外部查詢介面" as node6
node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5
node5 --> node6
@enduml
我們的Kafka叢集包含三個主題:
score-events主題:包含遊戲評分事件,記錄未鍵值且以輪詢方式分散於主題分割區。players主題:包含玩家個人資料,每筆記錄以玩家ID為鍵值。products主題:包含視訊遊戲產品資訊,每筆記錄以產品ID為鍵值。
資料模型的定義
由於來源主題包含JSON資料,我們將使用POJO(Plain Old Java Object)資料類別來定義資料模型,並透過JSON序列化函式庫(本文使用Gson)進行序列化和反序列化。
資料類別定義
// ScoreEvent.java
public class ScoreEvent {
private Long playerId;
private Long productId;
private Double score;
}
// Player.java
public class Player {
private Long id;
private String name;
}
// Product.java
public class Product {
private Long id;
private String name;
}
內容解密:
- 我們為每個主題定義了對應的資料類別,例如
ScoreEvent、Player和Product。 - 每個資料類別都包含與主題記錄相對應的屬性。
- 我們使用POJO來簡化資料模型的定義和操作。
實作步驟
- 資料豐富化:將
score-events資料與玩家和產品資訊進行join操作,以豐富評分事件的內容。 - 分組:對豐富化後的資料進行分組,以便進行聚合運算。
- 聚合運算:計算每個遊戲的最高分數,並找出前三名。
- 提供外部查詢介面:利用Kafka Streams的互動式查詢功能,建立RESTful微服務,提供即時的排行榜查詢服務。
內容解密:
- 我們利用Kafka Streams的join操作,將不同來源的資料進行整合。
- 透過分組和聚合運算,我們能夠計算出每個遊戲的最高分數。
- 最後,我們利用互動式查詢功能,提供即時的排行榜查詢服務。
專案設定
本章的程式碼位於https://github.com/mitch-seymour/mastering-kafka-streams-and-ksqldb.git。讀者可以參考該程式碼來瞭解每個拓撲步驟的實作細節。
內容解密:
- 讀者可以透過git clone指令下載程式碼,並切換到對應的目錄。
- 使用Gradle建置工具,可以輕鬆地建置專案。