返回文章列表

Kafka Streams 資料處理與序列化應用

本文探討 Kafka Streams 應用程式開發,從原始位元組到高階物件的處理,涵蓋建立 KStream 源處理器、資料表示、列印運算元、應用程式執行、資料序列化與反序列化、自定義 Serdes、錯誤處理、過濾與分支資料流等關鍵技術。文章使用 Java 程式碼範例,詳細說明如何使用 Gson 函式庫處理 JSON

串流處理 Kafka

Kafka Streams 提供了便捷的 API 處理串流資料,但底層仍以位元組陣列形式傳輸。因此,開發者需要將位元組陣列轉換為可操作的物件,或反過來將物件序列化成位元組陣列。本文將逐步講解如何使用 Kafka Streams 處理原始位元組資料,並利用 Gson 函式庫實作 JSON 資料的序列化和反序列化,構建自定義的 Serdes。同時,也涵蓋了無狀態處理操作,例如使用 filterfilterNotbranch 運算子進行資料過濾和分支,以及使用 mapmapValues 進行資料轉換,例如翻譯推文的功能。這些操作是構建複雜串流處理應用程式的基礎,讓開發者能更有效地處理和分析串流資料。

Kafka Streams 應用程式開發:從原始位元組到高階物件的處理

在前一章中,我們學習瞭如何使用 KStream 抽象來表示無狀態的記錄流。現在,我們將探討如何將資料從源主題匯入 Kafka Streams 應用程式,並逐步瞭解如何處理這些資料。

建立 KStream 源處理器

首先,我們需要建立一個 KStream 源處理器來讀取源主題中的資料。下面的程式碼展示瞭如何使用 StreamsBuilder 來建立一個 KStream 例項:

StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], byte[]> stream = builder.stream("tweets");

內容解密:

  • StreamsBuilder 是用於構建 Kafka Streams 處理器拓撲的類別。
  • builder.stream("tweets") 方法用於建立一個 KStream 例項,該例項代表從名為 “tweets” 的 Kafka 主題讀取的資料流。
  • KStream<byte[], byte[]> 表示記錄的鍵和值都被編碼為位元組陣列。

Kafka Streams 中的資料表示

Kafka Streams 預設將資料表示為位元組陣列,這是由於 Kafka 本身儲存和傳輸資料的方式。這種表示方式使得 Kafka 具有靈活性,因為它不會對客戶端施加特定的資料格式要求。

public interface KStream<K, V> {
    // 省略其他方法
}

內容解密:

  • KStream 介面使用兩個泛型引數,分別代表鍵(K)和值(V)的型別。
  • 在我們的例子中,KStream<byte[], byte[]> 表示鍵和值都是位元組陣列。

新增列印運算元以檢視資料

為了便於開發除錯,我們可以新增一個列印運算元來檢視流經應用程式的資料。下面的程式碼展示瞭如何修改 CryptoTopology 類別以包含列印運算元:

class CryptoTopology {
    public static Topology build() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<byte[], byte[]> stream = builder.stream("tweets");
        stream.print(Printed.<byte[], byte[]>toSysOut().withLabel("tweets-stream"));
        return builder.build();
    }
}

內容解密:

  • stream.print() 方法允許我們將資料列印到控制檯,以便於除錯。
  • Printed.toSysOut().withLabel("tweets-stream") 指定了輸出目標為控制檯,並為輸出增加了標籤 “tweets-stream”。

執行 Kafka Streams 應用程式

為了執行我們的 Kafka Streams 應用程式,我們需要建立一個 App 類別,該類別將例項化並執行拓撲:

class App {
    public static void main(String[] args) {
        Topology topology = CryptoTopology.build();
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        KafkaStreams streams = new KafkaStreams(topology, config);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        System.out.println("Starting Twitter streams");
        streams.start();
    }
}

內容解密:

  • Properties config 物件用於設定 Kafka Streams 的基本組態,包括應用程式 ID 和引導伺服器地址。
  • KafkaStreams streams 物件是用於執行處理器拓撲的例項。
  • Runtime.getRuntime().addShutdownHook() 方法用於在接收到全域性關閉訊號時優雅地停止 Kafka Streams 應用程式。
  • streams.start() 方法啟動 Kafka Streams 應用程式。

資料序列化與反序列化在 Kafka Streams 中的應用

Kafka Streams 是一個強大的串流處理平台,但其底層本質上是處理位元組流(byte arrays)。這使得開發者在處理資料時需要進行額外的轉換工作,將位元組流轉換為更高層級的物件,或將物件轉換回位元組流以便寫入 Kafka。這兩個過程分別被稱為反序列化(deserialization)序列化(serialization)

為何需要自定義 Serdes

Kafka Streams 提供了多種內建的 Serdes 類別,如 Serdes.String()Serdes.Integer() 等,可以滿足常見的資料型別需求。然而,對於 JSON、Avro 或 Protobuf 等格式,Kafka Streams 並未提供內建的支援。因此,當我們處理的資料是 JSON 格式時,就需要建立自定義的 Serdes。

使用 Gson 建立自定義 JSON Serdes

Gson 是 Google 開發的一個用於處理 JSON 的 Java 函式庫,提供了直觀的 API 將 JSON 位元組陣列轉換為 Java 物件,或將 Java 物件序列化為 JSON 位元組陣列。以下程式碼展示瞭如何使用 Gson 進行反序列化和序列化:

反序列化範例

Gson gson = new Gson();
byte[] bytes = ...; // 需要反序列化的原始位元組
Type type = ...; // 用於表示反序列化記錄的 Java 類別
gson.fromJson(new String(bytes), type); // 將原始位元組轉換為 Java 物件

序列化範例

Gson gson = new Gson();
gson.toJson(instance).getBytes(StandardCharsets.UTF_8); // 將 Java 物件轉換為原始位元組

自定義 Serdes 的實作步驟

  1. 定義資料類別:首先,我們需要定義一個資料類別(POJO),用於表示我們要從 JSON 資料中提取的欄位。例如,對於 Tweet 資料,我們可以定義一個 Tweet 類別:
public class Tweet {
    private Long createdAt;
    // 其他欄位...
    // getter 和 setter 方法...
}
  1. 實作自定義 Serdes:接下來,我們可以使用 Gson 來實作自定義的 JSON Serdes。這個 Serdes 將負責將 JSON 位元組陣列反序列化為 Tweet 物件,以及將 Tweet 物件序列化為 JSON 位元組陣列。

內容解密:

  • 首先,我們使用 Gson 函式庫來處理 JSON 資料的序列化和反序列化。
  • 定義 Tweet 類別來表示我們要處理的 JSON 資料結構。
  • 使用 Gson 的 fromJson 方法將 JSON 位元組陣列轉換為 Tweet 物件。
  • 使用 Gson 的 toJson 方法將 Tweet 物件轉換為 JSON 位元組陣列。
  • 自定義 Serdes 需要實作 Serializer 和 Deserializer 介面,並在其中使用 Gson 進行實際的序列化和反序列化工作。

自定義序列化與反序列化處理

在 Kafka Streams 應用程式中,處理資料的序列化和反序列化是非常重要的環節。本篇文章將介紹如何自定義序列化器和反序列化器,以實作更高效的資料處理。

建立資料類別

首先,我們需要建立一個代表推文(Tweet)的資料類別。這個類別將包含推文的相關屬性,如 ID、語言、是否為轉推文、以及推文內容等。

public class Tweet {
    private Long id;
    private String lang;
    private Boolean retweet;
    private String text;
    // 省略 getters 和 setters 以簡潔程式碼
}

內容解密:

  • Tweet 類別定義了推文的基本屬性,包括 idlangretweettext
  • 這些屬性對應於原始 JSON 資料中的欄位。
  • 省略 getters 和 setters 是為了簡化程式碼,在實際應用中應該補上這些方法以便存取屬性。

實作自定義反序列化器

接下來,我們需要實作一個自定義的反序列化器,將原始的位元組陣列轉換為 Tweet 物件。這裡我們使用 Gson 函式庫來簡化反序列化的過程。

public class TweetDeserializer implements Deserializer<Tweet> {
    private Gson gson = new GsonBuilder()
            .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE)
            .create();

    @Override
    public Tweet deserialize(String topic, byte[] bytes) {
        if (bytes == null) return null;
        return gson.fromJson(new String(bytes, StandardCharsets.UTF_8), Tweet.class);
    }
}

內容解密:

  • TweetDeserializer 類別實作了 Deserializer<Tweet> 介面,用於將位元組陣列反序列化為 Tweet 物件。
  • 使用 Gson 函式庫進行反序列化,並設定欄位命名策略為大駝峰命名法,以符合 Twitter Kafka 聯結器的欄位命名慣例。
  • 如果輸入的位元組陣列為空,則傳回 null

實作自定義序列化器

同樣地,我們也需要實作一個自定義的序列化器,將 Tweet 物件轉換為位元組陣列。

class TweetSerializer implements Serializer<Tweet> {
    private Gson gson = new Gson();

    @Override
    public byte[] serialize(String topic, Tweet tweet) {
        if (tweet == null) return null;
        return gson.toJson(tweet).getBytes(StandardCharsets.UTF_8);
    }
}

內容解密:

  • TweetSerializer 類別實作了 Serializer<Tweet> 介面,用於將 Tweet 物件序列化為位元組陣列。
  • 使用 Gson 函式庫進行序列化,如果輸入的 Tweet 物件為空,則傳回 null

建構自定義 Serdes

有了自定義的序列化和反序列化器後,我們可以建構一個自定義的 Serdes 類別,將這兩個功能結合起來。

public class TweetSerdes implements Serde<Tweet> {
    @Override
    public Serializer<Tweet> serializer() {
        return new TweetSerializer();
    }

    @Override
    public Deserializer<Tweet> deserializer() {
        return new TweetDeserializer();
    }
}

內容解密:

  • TweetSerdes 類別實作了 Serde<Tweet> 介面,用於結合序列化和反序列化功能。
  • 提供了 serializer()deserializer() 方法,分別傳回 TweetSerializerTweetDeserializer 的例項。

在 Kafka Streams 中使用自定義 Serdes

最後,我們可以在 Kafka Streams 中使用自定義的 Serdes。

KStream<byte[], Tweet> stream = builder.stream("tweets", Consumed.with(Serdes.ByteArray(), new TweetSerdes()));
stream.print(Printed.<byte[], Tweet>toSysOut().withLabel("tweets-stream"));

內容解密:

  • 使用 Consumed.with() 方法指定鍵和值的 Serdes,其中值的 Serdes 使用我們自定義的 TweetSerdes
  • 這樣,Kafka Streams 就會將原始的位元組陣列轉換為 Tweet 物件,方便後續的處理。

處理反序列化錯誤

在實際應用中,我們需要處理反序列化錯誤。Kafka Streams 提供了 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 組態項,允許我們指定一個反序列化例外處理器。

篩選資料

在串流處理應用程式中,篩選資料是一個常見的無狀態任務。篩選涉及選擇要處理的記錄子集,而忽略其餘部分。下圖展示了篩選的基本概念。

此圖示說明瞭如何透過篩選資料來處理事件串流中的子集資料。

篩選邏輯說明:
  • 篩選操作允許我們根據特定條件選擇要處理的資料。
  • 這種操作在串流處理中非常常見,可以幫助我們關注感興趣的資料。

無狀態處理:過濾與分支資料流

在 Kafka Streams 中,無狀態處理是一種重要的資料處理方式,它允許開發者對資料流進行各種操作,如過濾、轉換和分支等。本章將探討 Kafka Streams 中的過濾和分支操作,並介紹如何使用這些操作來實作無狀態處理。

過濾資料流

過濾是無狀態處理中的一個基本操作,它允許開發者根據特定的條件篩選出不需要的資料。在 Kafka Streams 中,filterfilterNot 是兩個主要的過濾運算子。

使用 filter 運算子

filter 運算子需要一個布林表示式(Predicate)來決定是否保留某個訊息。如果 Predicate 傳回 true,則該事件將被轉發到下游處理器;否則,該記錄將被排除在進一步處理之外。

例如,假設我們需要過濾掉轉發的推文(retweets),可以使用以下程式碼:

KStream<byte[], Tweet> filtered = stream.filter((key, tweet) -> !tweet.isRetweet());

使用 filterNot 運算子

filterNot 運算子與 filter 相反,它會根據 Predicate 的結果進行相反的操作。如果 Predicate 傳回 true,則該記錄將被丟棄。

KStream<byte[], Tweet> filtered = stream.filterNot((key, tweet) -> tweet.isRetweet());

這兩個運算子的功能是等價的,但當過濾邏輯包含否定時,使用 filterNot 可以提高可讀性。

內容解密:

  1. filterfilterNot 的區別filter 保留滿足條件的記錄,而 filterNot 丟棄滿足條件的記錄。
  2. Predicate 的使用:Predicate 是一個布林表示式,用於決定是否保留或丟棄某個記錄。
  3. Lambda 表示式的應用:在 filterfilterNot 中使用 lambda 表示式可以簡化程式碼,使其更易讀。

分支資料流

分支操作允許開發者根據事件的屬性將資料流分成多個子流。在 Kafka Streams 中,可以使用 branch 運算子來實作分支。

使用 branch 運算子

假設我們需要根據推文的語言將資料流分成兩個子流:英文推文和非英文推文。可以使用以下程式碼:

Predicate<byte[], Tweet> englishTweets = (key, tweet) -> tweet.getLang().equals("en");
Predicate<byte[], Tweet> nonEnglishTweets = (key, tweet) -> !tweet.getLang().equals("en");

KStream<byte[], Tweet>[] branches = filtered.branch(englishTweets, nonEnglishTweets);
KStream<byte[], Tweet> englishStream = branches[0];
KStream<byte[], Tweet> nonEnglishStream = branches[1];

內容解密:

  1. branch 運算子的作用:根據多個 Predicate 將資料流分成多個子流。
  2. Predicate 的順序:Predicate 的順序決定了子流的順序,第一個 Predicate 對應的第一個子流,依此類別推。
  3. 未匹配的記錄:如果某個記錄未匹配任何 Predicate,則該記錄將被丟棄。

翻譯推文

在某些情況下,我們需要對非英文推文進行翻譯,以便進行情感分析。Kafka Streams 提供了 mapmapValues 運算子來實作資料轉換。

使用 mapmapValues 運算子

假設我們需要將非英文推文翻譯成英文,可以使用以下程式碼:

nonEnglishStream.mapValues(value -> translate(value));

內容解密:

  1. mapmapValues 的區別map 可以修改記錄的鍵和值,而 mapValues 只修改記錄的值。
  2. 翻譯邏輯:翻譯邏輯可以根據具體需求實作,例如呼叫第三方翻譯 API。
  3. 1:1 轉換mapmapValues 都是一對一的轉換操作,即每個輸入記錄對應一個輸出記錄。

本章介紹了 Kafka Streams 中的無狀態處理操作,包括過濾、分支和轉換。這些操作是構建複雜資料處理流程的基礎。接下來的章節將繼續探討 Kafka Streams 的更多功能和應用場景。