返回文章列表

Kafka Streams 資料流處理深度解析

本文探討 Kafka Streams 的資料流處理機制,包含深度優先處理策略、處理器拓撲優勢、任務與執行緒關係以及效能最佳化。同時,也透過視覺化圖示解析任務與執行緒組態的影響,並比較高階 DSL 與低階 Processor API 的特性與使用場景。最後,提供使用 Gradle 初始化 Kafka Streams

串流處理 大資料

Kafka Streams 提供強大的資料流處理能力,採用深度優先策略處理資料,並以有向圖表示程式流程,讓資料流更加清晰易懂。理解任務和執行緒的關係對於效能調校至關重要,可根據 CPU 核心數量和任務數量調整執行緒數量以達最佳效能。Kafka Streams 提供高階 DSL 和低階 Processor API 兩種開發方式,DSL 適合快速開發簡單應用,而 Processor API 則提供更底層的控制,適用於複雜場景。

Kafka Streams 資料流處理深度解析

Kafka Streams 是一種用於建構即時資料處理應用程式的強大工具。它採用資料流程式設計模型,將程式表示為有向圖,使得資料流經應用程式的流程更加直觀易懂。在本文中,我們將探討 Kafka Streams 的資料流處理機制,包括深度優先處理、處理器拓撲、任務和執行緒之間的關係等。

深度優先處理

Kafka Streams 採用深度優先策略處理資料。當接收到新記錄時,它會在處理另一個記錄之前,將該記錄路由透過拓撲中的每個串流處理器。這種深度優先策略使得資料流更加容易理解,但也意味著緩慢的串流處理操作可能會阻塞同一執行緒中其他記錄的處理。

圖示說明

此圖示展示了 Kafka Streams 中的深度優先處理流程。 在此圖示中,我們可以看到單一記錄如何透過整個拓撲結構進行處理。

處理器拓撲的優勢

使用 Kafka Streams 和資料流程式設計模型建構即時資料處理應用程式具有多項優勢。首先,將程式表示為有向圖使得理解資料流經應用程式的流程變得更加容易。其次,這種表示方式允許我們標準化即時資料處理問題的框架,從而簡化了串流解決方案的建構。

具體優勢

  • 易於理解和維護:有向圖表示使得非技術利益相關者也能理解資料流經應用程式的流程。
  • 可擴充套件性:處理器拓撲可以輕易地在多個執行緒和應使用案例項中例項化和平行化,從而實作效能和可擴充套件性優勢。

任務和執行緒

在 Kafka Streams 中,任務是可平行執行的最小工作單元。任務的數量取決於輸入主題的分割槽數量。另一方面,執行緒負責執行任務。您可以透過組態屬性 num.stream.threads 指定執行緒數量。

任務數量計算公式

max(source_topic_1_partitions, ... source_topic_n_partitions)

例如,如果您的拓撲結構從一個包含 16 個分割槽的源主題讀取資料,則 Kafka Streams 將建立 16 個任務,每個任務將例項化其自身的底層處理器拓撲結構。

效能最佳化

瞭解任務和執行緒之間的關係對於實作良好的 Kafka Streams 效能至關重要。您可以根據可用核心數量和任務數量來決定執行緒數量。

範例程式碼:組態執行緒數量

Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); // 設定執行緒數量為4
StreamsConfig config = new StreamsConfig(props);

在此範例中,我們將執行緒數量設定為 4,以充分利用 4 核 CPU 的資源。

Kafka Streams 任務與執行緒組態視覺化解析

在深入瞭解 Kafka Streams 的運作原理後,我們可以透過視覺化的方式來理解任務(tasks)與執行緒(threads)之間的關係。首先,我們假設有一個 Kafka Streams 應用程式正在讀取一個包含四個分割槽(partitions)的源主題(source topic),並探討不同的執行緒組態對任務分配的影響。

執行緒組態與任務分配

首先,假設我們將 num.stream.threads 設定為 2。由於源主題有四個分割槽,Kafka Streams 將會建立四個任務並將它們分配到兩個執行緒中。最終的任務/執行緒佈局如 圖 2-8 所示。

此圖示

圖示解密:

  • 圖中展示了兩個執行緒(Thread 1 和 Thread 2)分別執行兩個任務。
  • 每個任務對應一個分割槽(p1 至 p4)。
  • 當執行緒數量少於任務數量時,多個任務會被分配到同一個執行緒中執行。

在某些情況下,為了充分利用可用的 CPU 資源,我們可能希望增加執行緒數量。將 num.stream.threads 設定為 4 時,任務的數量保持不變,但任務在執行緒之間的分配方式發生了變化。新的任務/執行緒佈局如 圖 2-9 所示。

此圖示

圖示解密:

  • 當執行緒數量等於任務數量時,每個執行緒執行一個任務。
  • 這種組態可以更好地利用多核 CPU 的優勢,提高處理效率。

Kafka Streams API:高階 DSL 與低階 Processor API

Kafka Streams 為開發者提供了兩種 API:高階 DSL(Domain Specific Language)和低階 Processor API。這兩種 API 的抽象層級不同,適用於不同的使用場景。

  • 高階 DSL:適合使用函式式程式設計風格的開發者,提供更高層級的抽象,方便處理資料流和表格。
  • 低階 Processor API:提供更底層的控制,例如存取記錄元資料、排程週期性函式、更精細的應用程式狀態存取等。

高階 DSL 與低階 Processor API 對比圖示

@startuml
skinparam backgroundColor #FEFEFE
skinparam sequenceArrowThickness 2

title Kafka Streams 資料流處理深度解析

actor "客戶端" as client
participant "API Gateway" as gateway
participant "認證服務" as auth
participant "業務服務" as service
database "資料庫" as db
queue "訊息佇列" as mq

client -> gateway : HTTP 請求
gateway -> auth : 驗證 Token
auth --> gateway : 認證結果

alt 認證成功
    gateway -> service : 轉發請求
    service -> db : 查詢/更新資料
    db --> service : 回傳結果
    service -> mq : 發送事件
    service --> gateway : 回應資料
    gateway --> client : HTTP 200 OK
else 認證失敗
    gateway --> client : HTTP 401 Unauthorized
end

@enduml

圖示解密:

  • 圖中展示了 Kafka Streams 的兩種 API 及其特點。
  • 高階 DSL 適合快速開發和簡單應用,而低階 Processor API 則提供更底層的控制,適合複雜場景。

第一個 Kafka Streams 教學:Hello Streams

在本文中,我們將動手實作第一個 Kafka Streams 應用程式。這個範例根據經典的 “Hello, world” 教學,包含兩個實作版本:一個使用高階 DSL,另一個使用低階 Processor API。兩個版本的功能相同,都會在接收到來自 users 主題的訊息時列印問候訊息。

程式碼範例(高階 DSL 版本)

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class HelloStreams {
    public static void main(String[] args) {
        // 設定 Kafka Streams 組態
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 建立 StreamsBuilder
        StreamsBuilder builder = new StreamsBuilder();

        // 定義資料流處理拓撲
        KStream<String, String> source = builder.stream("users");
        source.foreach((key, value) -> System.out.println("Hello, " + value));

        // 建立 Topology
        Topology topology = builder.build();

        // 建立 KafkaStreams 例項並啟動
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
    }
}

程式碼範例(低階 Processor API 版本)

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;

import java.util.Properties;

public class HelloStreamsProcessorAPI {
    public static void main(String[] args) {
        // 設定 Kafka Streams 組態
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello-streams-processor-api");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 定義 Topology
        Topology topology = new Topology();
        topology.addSource("Source", "users")
                .addProcessor("Processor", new ProcessorSupplier<String, String>() {
                    @Override
                    public Processor<String, String> get() {
                        return new Processor<String, String>() {
                            private ProcessorContext context;

                            @Override
                            public void init(ProcessorContext context) {
                                this.context = context;
                            }

                            @Override
                            public void process(String key, String value) {
                                System.out.println("Hello, " + value);
                            }

                            @Override
                            public void punctuate(long timestamp) {
                            }

                            @Override
                            public void close() {
                            }
                        };
                    }
                }, "Source");

        // 建立 KafkaStreams 例項並啟動
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
    }
}

程式碼解密:

  • 高階 DSL 版本使用 StreamsBuilder 簡化了資料流處理拓撲的定義。
  • 低階 Processor API 版本則提供了更底層的控制,允許自定義 Processor 處理訊息。
  • 兩種實作都達到了相同的功能,但在程式設計風格和靈活性方面有所不同。

使用Gradle初始化Kafka Streams專案

在開始使用Kafka Streams之前,我們需要初始化專案結構。雖然本文的原始碼已經包含了每個教學的初始化專案結構,但瞭解如何從頭開始建立專案仍然很重要。

初始化專案結構

使用Gradle初始化新的Kafka Streams專案,可以執行以下命令:

gradle init \
--type java-application \
--dsl groovy \
--test-framework junit-jupiter \
--project-name my-project \
--package com.example

這個命令會建立基本的專案結構,如下所示:

.
├── build.gradle
└── src
    ├── main
    │   ├── java
    │   └── resources
    └── test
        └── java

檔案與目錄說明

  • build.gradle:專案的建置檔案,用於指定專案的依賴關係,包括Kafka Streams函式庫。
  • src/main/java:存放原始碼和拓撲定義。
  • src/main/resources:通常用於存放設定檔。
  • src/test/java:存放單元測試和拓撲測試。

新增Kafka Streams依賴

要在專案中使用Kafka Streams,需要在build.gradle檔案中新增Kafka Streams函式庫作為依賴。範例如下:

plugins {
    id 'java'
    id 'application'
}

repositories {
    jcenter()
}

dependencies {
    implementation 'org.apache.kafka:kafka-streams:2.7.0'
}

內容解密:

  1. plugins區塊:指定使用Java和應用程式外掛。
  2. repositories區塊:指定從jcenter()倉函式庫下載依賴。
  3. dependencies區塊:新增Kafka Streams函式庫作為實作依賴。

建置專案

執行以下命令建置專案,並下載所需的依賴:

./gradlew build

這樣,Kafka Streams就已經安裝並準備好使用了。