Kafka Streams 提供了便捷的 API 處理串流資料,但底層仍以位元組陣列形式傳輸。因此,開發者需要將位元組陣列轉換為可操作的物件,或反過來將物件序列化成位元組陣列。本文將逐步講解如何使用 Kafka Streams 處理原始位元組資料,並利用 Gson 函式庫實作 JSON 資料的序列化和反序列化,構建自定義的 Serdes。同時,也涵蓋了無狀態處理操作,例如使用 filter、filterNot 和 branch 運算子進行資料過濾和分支,以及使用 map 和 mapValues 進行資料轉換,例如翻譯推文的功能。這些操作是構建複雜串流處理應用程式的基礎,讓開發者能更有效地處理和分析串流資料。
Kafka Streams 應用程式開發:從原始位元組到高階物件的處理
在前一章中,我們學習瞭如何使用 KStream 抽象來表示無狀態的記錄流。現在,我們將探討如何將資料從源主題匯入 Kafka Streams 應用程式,並逐步瞭解如何處理這些資料。
建立 KStream 源處理器
首先,我們需要建立一個 KStream 源處理器來讀取源主題中的資料。下面的程式碼展示瞭如何使用 StreamsBuilder 來建立一個 KStream 例項:
StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], byte[]> stream = builder.stream("tweets");
內容解密:
StreamsBuilder是用於構建 Kafka Streams 處理器拓撲的類別。builder.stream("tweets")方法用於建立一個KStream例項,該例項代表從名為 “tweets” 的 Kafka 主題讀取的資料流。KStream<byte[], byte[]>表示記錄的鍵和值都被編碼為位元組陣列。
Kafka Streams 中的資料表示
Kafka Streams 預設將資料表示為位元組陣列,這是由於 Kafka 本身儲存和傳輸資料的方式。這種表示方式使得 Kafka 具有靈活性,因為它不會對客戶端施加特定的資料格式要求。
public interface KStream<K, V> {
// 省略其他方法
}
內容解密:
KStream介面使用兩個泛型引數,分別代表鍵(K)和值(V)的型別。- 在我們的例子中,
KStream<byte[], byte[]>表示鍵和值都是位元組陣列。
新增列印運算元以檢視資料
為了便於開發除錯,我們可以新增一個列印運算元來檢視流經應用程式的資料。下面的程式碼展示瞭如何修改 CryptoTopology 類別以包含列印運算元:
class CryptoTopology {
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], byte[]> stream = builder.stream("tweets");
stream.print(Printed.<byte[], byte[]>toSysOut().withLabel("tweets-stream"));
return builder.build();
}
}
內容解密:
stream.print()方法允許我們將資料列印到控制檯,以便於除錯。Printed.toSysOut().withLabel("tweets-stream")指定了輸出目標為控制檯,並為輸出增加了標籤 “tweets-stream”。
執行 Kafka Streams 應用程式
為了執行我們的 Kafka Streams 應用程式,我們需要建立一個 App 類別,該類別將例項化並執行拓撲:
class App {
public static void main(String[] args) {
Topology topology = CryptoTopology.build();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
KafkaStreams streams = new KafkaStreams(topology, config);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
System.out.println("Starting Twitter streams");
streams.start();
}
}
內容解密:
Properties config物件用於設定 Kafka Streams 的基本組態,包括應用程式 ID 和引導伺服器地址。KafkaStreams streams物件是用於執行處理器拓撲的例項。Runtime.getRuntime().addShutdownHook()方法用於在接收到全域性關閉訊號時優雅地停止 Kafka Streams 應用程式。streams.start()方法啟動 Kafka Streams 應用程式。
資料序列化與反序列化在 Kafka Streams 中的應用
Kafka Streams 是一個強大的串流處理平台,但其底層本質上是處理位元組流(byte arrays)。這使得開發者在處理資料時需要進行額外的轉換工作,將位元組流轉換為更高層級的物件,或將物件轉換回位元組流以便寫入 Kafka。這兩個過程分別被稱為反序列化(deserialization)和序列化(serialization)。
為何需要自定義 Serdes
Kafka Streams 提供了多種內建的 Serdes 類別,如 Serdes.String()、Serdes.Integer() 等,可以滿足常見的資料型別需求。然而,對於 JSON、Avro 或 Protobuf 等格式,Kafka Streams 並未提供內建的支援。因此,當我們處理的資料是 JSON 格式時,就需要建立自定義的 Serdes。
使用 Gson 建立自定義 JSON Serdes
Gson 是 Google 開發的一個用於處理 JSON 的 Java 函式庫,提供了直觀的 API 將 JSON 位元組陣列轉換為 Java 物件,或將 Java 物件序列化為 JSON 位元組陣列。以下程式碼展示瞭如何使用 Gson 進行反序列化和序列化:
反序列化範例
Gson gson = new Gson();
byte[] bytes = ...; // 需要反序列化的原始位元組
Type type = ...; // 用於表示反序列化記錄的 Java 類別
gson.fromJson(new String(bytes), type); // 將原始位元組轉換為 Java 物件
序列化範例
Gson gson = new Gson();
gson.toJson(instance).getBytes(StandardCharsets.UTF_8); // 將 Java 物件轉換為原始位元組
自定義 Serdes 的實作步驟
- 定義資料類別:首先,我們需要定義一個資料類別(POJO),用於表示我們要從 JSON 資料中提取的欄位。例如,對於 Tweet 資料,我們可以定義一個
Tweet類別:
public class Tweet {
private Long createdAt;
// 其他欄位...
// getter 和 setter 方法...
}
- 實作自定義 Serdes:接下來,我們可以使用 Gson 來實作自定義的 JSON Serdes。這個 Serdes 將負責將 JSON 位元組陣列反序列化為
Tweet物件,以及將Tweet物件序列化為 JSON 位元組陣列。
內容解密:
- 首先,我們使用 Gson 函式庫來處理 JSON 資料的序列化和反序列化。
- 定義
Tweet類別來表示我們要處理的 JSON 資料結構。 - 使用 Gson 的
fromJson方法將 JSON 位元組陣列轉換為Tweet物件。 - 使用 Gson 的
toJson方法將Tweet物件轉換為 JSON 位元組陣列。 - 自定義 Serdes 需要實作 Serializer 和 Deserializer 介面,並在其中使用 Gson 進行實際的序列化和反序列化工作。
自定義序列化與反序列化處理
在 Kafka Streams 應用程式中,處理資料的序列化和反序列化是非常重要的環節。本篇文章將介紹如何自定義序列化器和反序列化器,以實作更高效的資料處理。
建立資料類別
首先,我們需要建立一個代表推文(Tweet)的資料類別。這個類別將包含推文的相關屬性,如 ID、語言、是否為轉推文、以及推文內容等。
public class Tweet {
private Long id;
private String lang;
private Boolean retweet;
private String text;
// 省略 getters 和 setters 以簡潔程式碼
}
內容解密:
Tweet類別定義了推文的基本屬性,包括id、lang、retweet和text。- 這些屬性對應於原始 JSON 資料中的欄位。
- 省略 getters 和 setters 是為了簡化程式碼,在實際應用中應該補上這些方法以便存取屬性。
實作自定義反序列化器
接下來,我們需要實作一個自定義的反序列化器,將原始的位元組陣列轉換為 Tweet 物件。這裡我們使用 Gson 函式庫來簡化反序列化的過程。
public class TweetDeserializer implements Deserializer<Tweet> {
private Gson gson = new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE)
.create();
@Override
public Tweet deserialize(String topic, byte[] bytes) {
if (bytes == null) return null;
return gson.fromJson(new String(bytes, StandardCharsets.UTF_8), Tweet.class);
}
}
內容解密:
TweetDeserializer類別實作了Deserializer<Tweet>介面,用於將位元組陣列反序列化為Tweet物件。- 使用 Gson 函式庫進行反序列化,並設定欄位命名策略為大駝峰命名法,以符合 Twitter Kafka 聯結器的欄位命名慣例。
- 如果輸入的位元組陣列為空,則傳回
null。
實作自定義序列化器
同樣地,我們也需要實作一個自定義的序列化器,將 Tweet 物件轉換為位元組陣列。
class TweetSerializer implements Serializer<Tweet> {
private Gson gson = new Gson();
@Override
public byte[] serialize(String topic, Tweet tweet) {
if (tweet == null) return null;
return gson.toJson(tweet).getBytes(StandardCharsets.UTF_8);
}
}
內容解密:
TweetSerializer類別實作了Serializer<Tweet>介面,用於將Tweet物件序列化為位元組陣列。- 使用 Gson 函式庫進行序列化,如果輸入的
Tweet物件為空,則傳回null。
建構自定義 Serdes
有了自定義的序列化和反序列化器後,我們可以建構一個自定義的 Serdes 類別,將這兩個功能結合起來。
public class TweetSerdes implements Serde<Tweet> {
@Override
public Serializer<Tweet> serializer() {
return new TweetSerializer();
}
@Override
public Deserializer<Tweet> deserializer() {
return new TweetDeserializer();
}
}
內容解密:
TweetSerdes類別實作了Serde<Tweet>介面,用於結合序列化和反序列化功能。- 提供了
serializer()和deserializer()方法,分別傳回TweetSerializer和TweetDeserializer的例項。
在 Kafka Streams 中使用自定義 Serdes
最後,我們可以在 Kafka Streams 中使用自定義的 Serdes。
KStream<byte[], Tweet> stream = builder.stream("tweets", Consumed.with(Serdes.ByteArray(), new TweetSerdes()));
stream.print(Printed.<byte[], Tweet>toSysOut().withLabel("tweets-stream"));
內容解密:
- 使用
Consumed.with()方法指定鍵和值的 Serdes,其中值的 Serdes 使用我們自定義的TweetSerdes。 - 這樣,Kafka Streams 就會將原始的位元組陣列轉換為
Tweet物件,方便後續的處理。
處理反序列化錯誤
在實際應用中,我們需要處理反序列化錯誤。Kafka Streams 提供了 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 組態項,允許我們指定一個反序列化例外處理器。
篩選資料
在串流處理應用程式中,篩選資料是一個常見的無狀態任務。篩選涉及選擇要處理的記錄子集,而忽略其餘部分。下圖展示了篩選的基本概念。
此圖示說明瞭如何透過篩選資料來處理事件串流中的子集資料。
篩選邏輯說明:
- 篩選操作允許我們根據特定條件選擇要處理的資料。
- 這種操作在串流處理中非常常見,可以幫助我們關注感興趣的資料。
無狀態處理:過濾與分支資料流
在 Kafka Streams 中,無狀態處理是一種重要的資料處理方式,它允許開發者對資料流進行各種操作,如過濾、轉換和分支等。本章將探討 Kafka Streams 中的過濾和分支操作,並介紹如何使用這些操作來實作無狀態處理。
過濾資料流
過濾是無狀態處理中的一個基本操作,它允許開發者根據特定的條件篩選出不需要的資料。在 Kafka Streams 中,filter 和 filterNot 是兩個主要的過濾運算子。
使用 filter 運算子
filter 運算子需要一個布林表示式(Predicate)來決定是否保留某個訊息。如果 Predicate 傳回 true,則該事件將被轉發到下游處理器;否則,該記錄將被排除在進一步處理之外。
例如,假設我們需要過濾掉轉發的推文(retweets),可以使用以下程式碼:
KStream<byte[], Tweet> filtered = stream.filter((key, tweet) -> !tweet.isRetweet());
使用 filterNot 運算子
filterNot 運算子與 filter 相反,它會根據 Predicate 的結果進行相反的操作。如果 Predicate 傳回 true,則該記錄將被丟棄。
KStream<byte[], Tweet> filtered = stream.filterNot((key, tweet) -> tweet.isRetweet());
這兩個運算子的功能是等價的,但當過濾邏輯包含否定時,使用 filterNot 可以提高可讀性。
內容解密:
filter和filterNot的區別:filter保留滿足條件的記錄,而filterNot丟棄滿足條件的記錄。- Predicate 的使用:Predicate 是一個布林表示式,用於決定是否保留或丟棄某個記錄。
- Lambda 表示式的應用:在
filter和filterNot中使用 lambda 表示式可以簡化程式碼,使其更易讀。
分支資料流
分支操作允許開發者根據事件的屬性將資料流分成多個子流。在 Kafka Streams 中,可以使用 branch 運算子來實作分支。
使用 branch 運算子
假設我們需要根據推文的語言將資料流分成兩個子流:英文推文和非英文推文。可以使用以下程式碼:
Predicate<byte[], Tweet> englishTweets = (key, tweet) -> tweet.getLang().equals("en");
Predicate<byte[], Tweet> nonEnglishTweets = (key, tweet) -> !tweet.getLang().equals("en");
KStream<byte[], Tweet>[] branches = filtered.branch(englishTweets, nonEnglishTweets);
KStream<byte[], Tweet> englishStream = branches[0];
KStream<byte[], Tweet> nonEnglishStream = branches[1];
內容解密:
branch運算子的作用:根據多個 Predicate 將資料流分成多個子流。- Predicate 的順序:Predicate 的順序決定了子流的順序,第一個 Predicate 對應的第一個子流,依此類別推。
- 未匹配的記錄:如果某個記錄未匹配任何 Predicate,則該記錄將被丟棄。
翻譯推文
在某些情況下,我們需要對非英文推文進行翻譯,以便進行情感分析。Kafka Streams 提供了 map 和 mapValues 運算子來實作資料轉換。
使用 map 和 mapValues 運算子
假設我們需要將非英文推文翻譯成英文,可以使用以下程式碼:
nonEnglishStream.mapValues(value -> translate(value));
內容解密:
map和mapValues的區別:map可以修改記錄的鍵和值,而mapValues只修改記錄的值。- 翻譯邏輯:翻譯邏輯可以根據具體需求實作,例如呼叫第三方翻譯 API。
- 1:1 轉換:
map和mapValues都是一對一的轉換操作,即每個輸入記錄對應一個輸出記錄。
本章介紹了 Kafka Streams 中的無狀態處理操作,包括過濾、分支和轉換。這些操作是構建複雜資料處理流程的基礎。接下來的章節將繼續探討 Kafka Streams 的更多功能和應用場景。