Kafka Streams 提供了簡潔易用的 Java API,讓開發者能輕鬆建構高效能的串流處理應用程式。本文中的 Word Count 範例展現了其基本用法,從設定 Kafka 叢集連線到定義資料轉換流程,只需幾行程式碼即可完成。股市統計範例則進一步示範如何處理更複雜的商業邏輯,例如計算移動平均和交易量等統計資料。最後,使用者行為分析範例則展示了 Kafka Streams 如何整合多個資料串流,實作更全面的資料洞察。這些範例涵蓋了 Kafka Streams 的核心概念,包括 KStream、KTable、視窗操作、聚合函式和狀態儲存等,讓開發者能快速上手並應用於實際專案中。
Kafka Streams 實戰範例解析
Kafka Streams 是 Apache Kafka 提供的一個用於建構即時資料處理應用程式的 Java 函式庫。它允許開發者以高效、可擴充套件和容錯的方式處理 Kafka 中的資料流。本文將介紹兩個使用 Kafka Streams 的實戰範例:Word Count 和 Stock Market Statistics。
Word Count 範例
Word Count 是一個經典的範例,用於展示 Kafka Streams 的基本功能。該範例從一個名為 wordcount-input 的 Kafka 主題中讀取資料,將輸入的文字分割成單個詞語,計算每個詞語的出現次數,並將結果寫入 wordcount-output 主題。
設定 Kafka Streams
首先,需要設定 Kafka Streams 的相關屬性,例如應用程式 ID、Kafka 叢集的位置、預設的序列化/反序列化類別等。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
建構 Streams Topology
接下來,使用 StreamsBuilder 物件定義資料處理流程:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\\W+");
KStream<String, String> counts = source.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.map((key, value) -> new KeyValue<String, String>(value, value))
.filter((key, value) -> (!value.equals("the")))
.groupByKey()
.count().mapValues(value -> Long.toString(value)).toStream();
counts.to("wordcount-output");
執行 Kafka Streams 應用程式
最後,建立 KafkaStreams 物件並啟動應用程式:
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 通常,stream 應用程式會一直執行,在此範例中,我們讓它執行一段時間後停止
Thread.sleep(5000L);
streams.close();
股市統計範例
股市統計範例展示瞭如何使用 Kafka Streams 處理股市交易事件,包括股票程式碼、賣出價格和賣出數量等資訊。該範例將計算不同的統計資料,例如每個股票的平均價格和交易量。
資料處理流程
該範例的資料處理流程包括讀取股市交易事件、計算統計資料和輸出結果等步驟。
// 讀取股市交易事件
KStream<String, StockTrade> trades = builder.stream("stock-trades");
// 計算統計資料
KTable<String, StockStats> stats = trades.groupByKey()
.aggregate(
() -> new StockStats(),
(key, trade, stats) -> stats.add(trade),
Materialized.with(Serdes.String(), new StockStatsSerde())
);
// 輸出結果
stats.toStream().to("stock-stats");
內容解密:
- 讀取股市交易事件:使用
builder.stream("stock-trades")從stock-trades主題中讀取股市交易事件。 - 計算統計資料:使用
groupByKey()和aggregate()方法計算每個股票的統計資料,例如平均價格和交易量。 - 輸出結果:將計算出的統計資料寫入
stock-stats主題。
串流處理實戰:股票交易資料分析
在串流處理的世界中,實時分析與處理資料是至關重要的。以下我們將以股票交易資料分析為例,展示如何使用 Kafka Streams 進行視窗聚合運算。
建立輸出串流
我們的目標是建立輸出串流,其中包含一些視窗統計資料:
- 每五秒視窗的最低要價(ask price)
- 每五秒視窗的交易數量
- 每五秒視窗的平均要價
所有統計資料將每秒更新一次。
設定與組態
首先,我們需要設定 Kafka Streams 的組態引數。與「Word Count」範例相似,但這次我們使用了不同的 Serde 類別:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.BROKER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TradeSerde.class.getName());
這裡的關鍵差異在於,我們使用了 TradeSerde 來序列化與反序列化 Trade 物件。這個 Serde 是根據 Google 的 Gson 函式庫建立的,用於將 Java 物件轉換為 JSON 格式。
TradeSerde 實作
static public final class TradeSerde extends WrapperSerde<Trade> {
public TradeSerde() {
super(new JsonSerializer<Trade>(), new JsonDeserializer<Trade>(Trade.class));
}
}
內容解密:
TradeSerde繼承自WrapperSerde<Trade>,表示它是用來處理Trade物件的序列化與反序列化。- 在建構函式中,我們傳入了
JsonSerializer<Trade>()和JsonDeserializer<Trade>(Trade.class),分別用於將Trade物件序列化為 JSON 字串和將 JSON 字串反序列化為Trade物件。 - 這種設計允許我們在 Kafka 中儲存和傳輸複雜的 Java 物件。
建立拓撲
接下來,我們建立 Kafka Streams 的拓撲:
KStream<Windowed<String>, TradeStats> stats = source
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).advanceBy(Duration.ofSeconds(1)))
.aggregate(
() -> new TradeStats(),
(k, v, tradestats) -> tradestats.add(v),
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
.withValueSerde(new TradeStatsSerde()))
.toStream()
.mapValues((trade) -> trade.computeAvgPrice());
stats.to("stockstats-output", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize)));
內容解密:
.groupByKey()操作確保事件串流根據記錄鍵進行分割。.windowedBy()定義了一個五秒的視窗,每秒推進一次。.aggregate()方法對視窗內的事件進行聚合,計算最小價格、交易數量和總價格。Materialized.as("trade-aggregates")組態了狀態儲存的名稱和 Serde,用於儲存聚合結果。.toStream()將表格轉換回事件串流。.mapValues()更新平均價格。- 最後,將結果寫入
stockstats-output主題。
Kafka Streams 實戰:多重串流的 Join 操作
在前面的範例中,我們已經展示瞭如何使用 Kafka Streams 進行基本的資料處理。在這個章節中,我們將進一步探討如何使用 Kafka Streams 進行多重串流的 Join 操作,以實作對使用者行為的全面分析。
多重串流 Join 操作的實作
首先,我們需要建立三個不同的資料來源:模擬的網站點選串流、使用者資料函式庫更新串流和網頁搜尋串流。接著,我們將對這三個串流進行 Join 操作,以獲得對每個使用者行為的360度視角。
KStream<Integer, PageView> views =
builder.stream(Constants.PAGE_VIEW_TOPIC,
Consumed.with(Serdes.Integer(), new PageViewSerde()));
KStream<Integer, Search> searches =
builder.stream(Constants.SEARCH_TOPIC,
Consumed.with(Serdes.Integer(), new SearchSerde()));
KTable<Integer, UserProfile> profiles =
builder.table(Constants.USER_PROFILE_TOPIC,
Consumed.with(Serdes.Integer(), new ProfileSerde()));
內容解密:
- 首先,我們從三個不同的 Kafka 主題中讀取資料,分別代表網站點選、網頁搜尋和使用者資料。
KStream用於處理無界的資料串流,而KTable則代表一個具狀態的、更新的表格。
進行串流與表格的 Join 操作
接下來,我們將網站點選串流與使用者資料表格進行 Left Join,以豐富點選事件的資訊。
KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles,
(page, profile) -> {
if (profile != null)
return new UserActivity(
profile.getUserID(), profile.getUserName(),
profile.getZipcode(), profile.getInterests(),
"", page.getPage());
else
return new UserActivity(
-1, "", "", null, "", page.getPage());
});
內容解密:
leftJoin方法用於將點選事件與對應的使用者資料進行結合。- 如果使用者資料存在,則建立一個新的
UserActivity物件,包含使用者資訊和點選的頁面資訊。
進行串流與串流的 Join 操作
然後,我們將加入搜尋串流的資訊,進一步豐富使用者的行為資料。
KStream<Integer, UserActivity> userActivityKStream =
viewsWithProfile.leftJoin(searches,
(userActivity, search) -> {
if (search != null)
userActivity.updateSearch(search.getSearchTerms());
else
userActivity.updateSearch("");
return userActivity;
},
JoinWindows.of(Duration.ofSeconds(1)).before(Duration.ofSeconds(0)),
StreamJoined.with(Serdes.Integer(),
new UserActivitySerde(),
new SearchSerde()));
內容解密:
- 這裡進行的是兩個串流之間的 Left Join 操作,將使用者的點選行為與搜尋行為結合。
JoinWindows.of(Duration.ofSeconds(1))定義了一個時間視窗,確保只有在搜尋後一秒內的點選行為才會被納入結果中。
Kafka Streams 的架構與最佳化
Kafka Streams 的應用程式本質上是定義了一個或多個處理拓撲(Topology),資料在這個拓撲中從輸入到輸出經過一系列的處理步驟。
圖示:Word Count 範例的拓撲結構
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Kafka Streams 範例解析與拓撲建構
package "統計分析流程" {
package "資料收集" {
component [樣本資料] as sample
component [母體資料] as population
}
package "描述統計" {
component [平均數/中位數] as central
component [標準差/變異數] as dispersion
component [分佈形狀] as shape
}
package "推論統計" {
component [假設檢定] as hypothesis
component [信賴區間] as confidence
component [迴歸分析] as regression
}
}
sample --> central : 計算
sample --> dispersion : 計算
central --> hypothesis : 檢驗
dispersion --> confidence : 估計
hypothesis --> regression : 建模
note right of hypothesis
H0: 虛無假設
H1: 對立假設
α: 顯著水準
end note
@enduml
此圖示顯示了 Word Count 範例中的處理拓撲結構,從輸入到輸出的每一步驟都被清晰地展示出來。
內容解密:
- Kafka Streams 的拓撲是由多個處理器(Processor)組成的有向無環圖(DAG)。
- 每個處理器執行特定的操作,如過濾、對映或聚合。
- 透過最佳化拓撲結構,可以提高應用程式的效能和可擴充套件性。
Kafka Streams 拓撲結構的測試與擴充套件
測試拓撲結構
在開發 Kafka Streams 應用程式時,測試是確保程式正確執行的重要步驟。我們需要進行自動化測試,以驗證應用程式在不同場景下的執行結果。
使用 TopologyTestDriver 進行測試
Kafka Streams 提供了一個名為 TopologyTestDriver 的測試工具,用於測試拓撲結構。從 1.1.0 版本開始,其 API 已經經過了多次改進,目前已經非常方便易用。
// 建立 TopologyTestDriver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props);
// 定義輸入資料
testDriver.pipeInput(record);
// 讀取輸出結果
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic");
// 驗證輸出結果
assertEquals("expected-value", outputRecord.value());
內容解密:
- 建立
TopologyTestDriver:使用TopologyTestDriver來模擬 Kafka Streams 的執行環境。 - 定義輸入資料:使用
pipeInput方法將輸入資料推播到拓撲結構中。 - 讀取輸出結果:使用
readOutput方法從輸出主題中讀取處理後的結果。 - 驗證輸出結果:將實際輸出結果與預期結果進行比較,以驗證拓撲結構的正確性。
整合測試框架
除了單元測試外,我們還需要進行整合測試,以驗證應用程式在真實環境中的執行結果。Kafka Streams 提供了兩種流行的整合測試框架:EmbeddedKafkaCluster 和 Testcontainers。
// 使用 Testcontainers 進行整合測試
@ClassRule
public static KafkaContainer kafka = new KafkaContainer();
// 建立 Kafka Streams 應用程式
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 啟動應用程式
streams.start();
內容解密:
- 使用
Testcontainers:利用 Docker 容器來隔離 Kafka 環境,以避免資源衝突。 - 建立 Kafka Streams 應用程式:使用
KafkaStreams類別來建立應用程式例項。 - 啟動應用程式:呼叫
start方法來啟動應用程式。
擴充套件拓撲結構
Kafka Streams 可以透過多執行緒和多例項的方式來擴充套件拓撲結構的執行效率。
多執行緒執行
Kafka Streams 可以在單個應用程式例項中執行多個執行緒,以提高處理效率。
// 設定執行緒數量
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// 建立 Kafka Streams 應用程式
KafkaStreams streams = new KafkaStreams(builder.build(), props);
內容解密:
- 設定執行緒數量:透過
StreamsConfig.NUM_STREAM_THREADS_CONFIG屬性來設定執行緒數量。 - 建立 Kafka Streams 應用程式:使用設定的屬性來建立應用程式例項。
多例項執行
Kafka Streams 可以在多個伺服器上執行多個應用程式例項,以實作負載平衡和提高處理效率。
// 在多個伺服器上啟動應用程式例項
KafkaStreams streams1 = new KafkaStreams(builder.build(), props);
KafkaStreams streams2 = new KafkaStreams(builder.build(), props);
streams1.start();
streams2.start();
內容解密:
- 在多個伺服器上啟動應用程式例項:在不同的伺服器上啟動多個應用程式例項。
- Kafka 自動協調工作:Kafka 會自動將任務分配給不同的例項,以實作負載平衡。