返回文章列表

Kafka Streams 範例解析與拓撲建構

本文探討 Kafka Streams 的實戰應用,涵蓋 Word Count、股市統計和使用者行為分析等多個範例,同時解析拓撲建構、測試和擴充套件的最佳實務。文章內容包含設定 Kafka Streams 屬性、建構 Streams Topology、執行應用程式、多重串流 Join

串流處理 大資料

Kafka Streams 提供了簡潔易用的 Java API,讓開發者能輕鬆建構高效能的串流處理應用程式。本文中的 Word Count 範例展現了其基本用法,從設定 Kafka 叢集連線到定義資料轉換流程,只需幾行程式碼即可完成。股市統計範例則進一步示範如何處理更複雜的商業邏輯,例如計算移動平均和交易量等統計資料。最後,使用者行為分析範例則展示了 Kafka Streams 如何整合多個資料串流,實作更全面的資料洞察。這些範例涵蓋了 Kafka Streams 的核心概念,包括 KStream、KTable、視窗操作、聚合函式和狀態儲存等,讓開發者能快速上手並應用於實際專案中。

Kafka Streams 實戰範例解析

Kafka Streams 是 Apache Kafka 提供的一個用於建構即時資料處理應用程式的 Java 函式庫。它允許開發者以高效、可擴充套件和容錯的方式處理 Kafka 中的資料流。本文將介紹兩個使用 Kafka Streams 的實戰範例:Word Count 和 Stock Market Statistics。

Word Count 範例

Word Count 是一個經典的範例,用於展示 Kafka Streams 的基本功能。該範例從一個名為 wordcount-input 的 Kafka 主題中讀取資料,將輸入的文字分割成單個詞語,計算每個詞語的出現次數,並將結果寫入 wordcount-output 主題。

設定 Kafka Streams

首先,需要設定 Kafka Streams 的相關屬性,例如應用程式 ID、Kafka 叢集的位置、預設的序列化/反序列化類別等。

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

建構 Streams Topology

接下來,使用 StreamsBuilder 物件定義資料處理流程:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\\W+");
KStream<String, String> counts = source.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
        .map((key, value) -> new KeyValue<String, String>(value, value))
        .filter((key, value) -> (!value.equals("the")))
        .groupByKey()
        .count().mapValues(value -> Long.toString(value)).toStream();
counts.to("wordcount-output");

執行 Kafka Streams 應用程式

最後,建立 KafkaStreams 物件並啟動應用程式:

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 通常,stream 應用程式會一直執行,在此範例中,我們讓它執行一段時間後停止
Thread.sleep(5000L);
streams.close();

股市統計範例

股市統計範例展示瞭如何使用 Kafka Streams 處理股市交易事件,包括股票程式碼、賣出價格和賣出數量等資訊。該範例將計算不同的統計資料,例如每個股票的平均價格和交易量。

資料處理流程

該範例的資料處理流程包括讀取股市交易事件、計算統計資料和輸出結果等步驟。

// 讀取股市交易事件
KStream<String, StockTrade> trades = builder.stream("stock-trades");

// 計算統計資料
KTable<String, StockStats> stats = trades.groupByKey()
        .aggregate(
                () -> new StockStats(),
                (key, trade, stats) -> stats.add(trade),
                Materialized.with(Serdes.String(), new StockStatsSerde())
        );

// 輸出結果
stats.toStream().to("stock-stats");

內容解密:

  1. 讀取股市交易事件:使用 builder.stream("stock-trades")stock-trades 主題中讀取股市交易事件。
  2. 計算統計資料:使用 groupByKey()aggregate() 方法計算每個股票的統計資料,例如平均價格和交易量。
  3. 輸出結果:將計算出的統計資料寫入 stock-stats 主題。

串流處理實戰:股票交易資料分析

在串流處理的世界中,實時分析與處理資料是至關重要的。以下我們將以股票交易資料分析為例,展示如何使用 Kafka Streams 進行視窗聚合運算。

建立輸出串流

我們的目標是建立輸出串流,其中包含一些視窗統計資料:

  • 每五秒視窗的最低要價(ask price)
  • 每五秒視窗的交易數量
  • 每五秒視窗的平均要價

所有統計資料將每秒更新一次。

設定與組態

首先,我們需要設定 Kafka Streams 的組態引數。與「Word Count」範例相似,但這次我們使用了不同的 Serde 類別:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.BROKER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TradeSerde.class.getName());

這裡的關鍵差異在於,我們使用了 TradeSerde 來序列化與反序列化 Trade 物件。這個 Serde 是根據 Google 的 Gson 函式庫建立的,用於將 Java 物件轉換為 JSON 格式。

TradeSerde 實作

static public final class TradeSerde extends WrapperSerde<Trade> {
    public TradeSerde() {
        super(new JsonSerializer<Trade>(), new JsonDeserializer<Trade>(Trade.class));
    }
}

內容解密:

  1. TradeSerde 繼承自 WrapperSerde<Trade>,表示它是用來處理 Trade 物件的序列化與反序列化。
  2. 在建構函式中,我們傳入了 JsonSerializer<Trade>()JsonDeserializer<Trade>(Trade.class),分別用於將 Trade 物件序列化為 JSON 字串和將 JSON 字串反序列化為 Trade 物件。
  3. 這種設計允許我們在 Kafka 中儲存和傳輸複雜的 Java 物件。

建立拓撲

接下來,我們建立 Kafka Streams 的拓撲:

KStream<Windowed<String>, TradeStats> stats = source
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).advanceBy(Duration.ofSeconds(1)))
    .aggregate(
        () -> new TradeStats(),
        (k, v, tradestats) -> tradestats.add(v),
        Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
            .withValueSerde(new TradeStatsSerde()))
    .toStream()
    .mapValues((trade) -> trade.computeAvgPrice());
stats.to("stockstats-output", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize)));

內容解密:

  1. .groupByKey() 操作確保事件串流根據記錄鍵進行分割。
  2. .windowedBy() 定義了一個五秒的視窗,每秒推進一次。
  3. .aggregate() 方法對視窗內的事件進行聚合,計算最小價格、交易數量和總價格。
  4. Materialized.as("trade-aggregates") 組態了狀態儲存的名稱和 Serde,用於儲存聚合結果。
  5. .toStream() 將表格轉換回事件串流。
  6. .mapValues() 更新平均價格。
  7. 最後,將結果寫入 stockstats-output 主題。

Kafka Streams 實戰:多重串流的 Join 操作

在前面的範例中,我們已經展示瞭如何使用 Kafka Streams 進行基本的資料處理。在這個章節中,我們將進一步探討如何使用 Kafka Streams 進行多重串流的 Join 操作,以實作對使用者行為的全面分析。

多重串流 Join 操作的實作

首先,我們需要建立三個不同的資料來源:模擬的網站點選串流、使用者資料函式庫更新串流和網頁搜尋串流。接著,我們將對這三個串流進行 Join 操作,以獲得對每個使用者行為的360度視角。

KStream<Integer, PageView> views = 
    builder.stream(Constants.PAGE_VIEW_TOPIC, 
                   Consumed.with(Serdes.Integer(), new PageViewSerde()));

KStream<Integer, Search> searches = 
    builder.stream(Constants.SEARCH_TOPIC, 
                   Consumed.with(Serdes.Integer(), new SearchSerde()));

KTable<Integer, UserProfile> profiles = 
    builder.table(Constants.USER_PROFILE_TOPIC, 
                  Consumed.with(Serdes.Integer(), new ProfileSerde()));

內容解密:

  • 首先,我們從三個不同的 Kafka 主題中讀取資料,分別代表網站點選、網頁搜尋和使用者資料。
  • KStream 用於處理無界的資料串流,而 KTable 則代表一個具狀態的、更新的表格。

進行串流與表格的 Join 操作

接下來,我們將網站點選串流與使用者資料表格進行 Left Join,以豐富點選事件的資訊。

KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles,
    (page, profile) -> {
        if (profile != null)
            return new UserActivity(
                profile.getUserID(), profile.getUserName(),
                profile.getZipcode(), profile.getInterests(),
                "", page.getPage());
        else
            return new UserActivity(
                -1, "", "", null, "", page.getPage());
    });

內容解密:

  • leftJoin 方法用於將點選事件與對應的使用者資料進行結合。
  • 如果使用者資料存在,則建立一個新的 UserActivity 物件,包含使用者資訊和點選的頁面資訊。

進行串流與串流的 Join 操作

然後,我們將加入搜尋串流的資訊,進一步豐富使用者的行為資料。

KStream<Integer, UserActivity> userActivityKStream =
    viewsWithProfile.leftJoin(searches,
        (userActivity, search) -> {
            if (search != null)
                userActivity.updateSearch(search.getSearchTerms());
            else
                userActivity.updateSearch("");
            return userActivity;
        },
        JoinWindows.of(Duration.ofSeconds(1)).before(Duration.ofSeconds(0)),
        StreamJoined.with(Serdes.Integer(),
                          new UserActivitySerde(),
                          new SearchSerde()));

內容解密:

  • 這裡進行的是兩個串流之間的 Left Join 操作,將使用者的點選行為與搜尋行為結合。
  • JoinWindows.of(Duration.ofSeconds(1)) 定義了一個時間視窗,確保只有在搜尋後一秒內的點選行為才會被納入結果中。

Kafka Streams 的架構與最佳化

Kafka Streams 的應用程式本質上是定義了一個或多個處理拓撲(Topology),資料在這個拓撲中從輸入到輸出經過一系列的處理步驟。

圖示:Word Count 範例的拓撲結構

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Kafka Streams 範例解析與拓撲建構

package "統計分析流程" {
    package "資料收集" {
        component [樣本資料] as sample
        component [母體資料] as population
    }

    package "描述統計" {
        component [平均數/中位數] as central
        component [標準差/變異數] as dispersion
        component [分佈形狀] as shape
    }

    package "推論統計" {
        component [假設檢定] as hypothesis
        component [信賴區間] as confidence
        component [迴歸分析] as regression
    }
}

sample --> central : 計算
sample --> dispersion : 計算
central --> hypothesis : 檢驗
dispersion --> confidence : 估計
hypothesis --> regression : 建模

note right of hypothesis
  H0: 虛無假設
  H1: 對立假設
  α: 顯著水準
end note

@enduml

此圖示顯示了 Word Count 範例中的處理拓撲結構,從輸入到輸出的每一步驟都被清晰地展示出來。

內容解密:

  • Kafka Streams 的拓撲是由多個處理器(Processor)組成的有向無環圖(DAG)。
  • 每個處理器執行特定的操作,如過濾、對映或聚合。
  • 透過最佳化拓撲結構,可以提高應用程式的效能和可擴充套件性。

Kafka Streams 拓撲結構的測試與擴充套件

測試拓撲結構

在開發 Kafka Streams 應用程式時,測試是確保程式正確執行的重要步驟。我們需要進行自動化測試,以驗證應用程式在不同場景下的執行結果。

使用 TopologyTestDriver 進行測試

Kafka Streams 提供了一個名為 TopologyTestDriver 的測試工具,用於測試拓撲結構。從 1.1.0 版本開始,其 API 已經經過了多次改進,目前已經非常方便易用。

// 建立 TopologyTestDriver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props);

// 定義輸入資料
testDriver.pipeInput(record);

// 讀取輸出結果
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic");

// 驗證輸出結果
assertEquals("expected-value", outputRecord.value());

內容解密:

  1. 建立 TopologyTestDriver:使用 TopologyTestDriver 來模擬 Kafka Streams 的執行環境。
  2. 定義輸入資料:使用 pipeInput 方法將輸入資料推播到拓撲結構中。
  3. 讀取輸出結果:使用 readOutput 方法從輸出主題中讀取處理後的結果。
  4. 驗證輸出結果:將實際輸出結果與預期結果進行比較,以驗證拓撲結構的正確性。

整合測試框架

除了單元測試外,我們還需要進行整合測試,以驗證應用程式在真實環境中的執行結果。Kafka Streams 提供了兩種流行的整合測試框架:EmbeddedKafkaClusterTestcontainers

// 使用 Testcontainers 進行整合測試
@ClassRule
public static KafkaContainer kafka = new KafkaContainer();

// 建立 Kafka Streams 應用程式
KafkaStreams streams = new KafkaStreams(builder.build(), props);

// 啟動應用程式
streams.start();

內容解密:

  1. 使用 Testcontainers:利用 Docker 容器來隔離 Kafka 環境,以避免資源衝突。
  2. 建立 Kafka Streams 應用程式:使用 KafkaStreams 類別來建立應用程式例項。
  3. 啟動應用程式:呼叫 start 方法來啟動應用程式。

擴充套件拓撲結構

Kafka Streams 可以透過多執行緒和多例項的方式來擴充套件拓撲結構的執行效率。

多執行緒執行

Kafka Streams 可以在單個應用程式例項中執行多個執行緒,以提高處理效率。

// 設定執行緒數量
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

// 建立 Kafka Streams 應用程式
KafkaStreams streams = new KafkaStreams(builder.build(), props);

內容解密:

  1. 設定執行緒數量:透過 StreamsConfig.NUM_STREAM_THREADS_CONFIG 屬性來設定執行緒數量。
  2. 建立 Kafka Streams 應用程式:使用設定的屬性來建立應用程式例項。

多例項執行

Kafka Streams 可以在多個伺服器上執行多個應用程式例項,以實作負載平衡和提高處理效率。

// 在多個伺服器上啟動應用程式例項
KafkaStreams streams1 = new KafkaStreams(builder.build(), props);
KafkaStreams streams2 = new KafkaStreams(builder.build(), props);

streams1.start();
streams2.start();

內容解密:

  1. 在多個伺服器上啟動應用程式例項:在不同的伺服器上啟動多個應用程式例項。
  2. Kafka 自動協調工作:Kafka 會自動將任務分配給不同的例項,以實作負載平衡。