返回文章列表

Kafka Streams DSL 與 Processor API 整合應用

本文探討 Kafka Streams 中 DSL 與 Processor API 的整合應用,說明如何結合兩者優勢簡化串流處理應用程式開發,並深入比較 ksqlDB 與 Kafka Streams 的特性與應用場景,以及 ksqlDB 的演進過程和與 Kafka Streams

串流處理 Kafka

Kafka Streams 提供了 DSL 和 Processor API 兩種方式構建串流處理拓撲。DSL 簡潔易用,適合常見的轉換操作,而 Processor API 提供更細粒度的控制,適用於複雜的業務邏輯。本文將示範如何結合兩者,例如使用 DSL 定義主要流程,並在需要精細控制時嵌入 Processor API 實作客製化邏輯,例如狀態儲存操作或複雜的資料轉換。這種混合使用方式兼具 DSL 的簡潔性和 Processor API 的靈活性,可以提升程式碼可讀性和維護性。同時,本文也將探討 ksqlDB 作為一個根據 Kafka Streams 的串流 SQL 引擎,如何進一步簡化串流處理應用程式的開發,並比較 ksqlDB 和 Kafka Streams 的適用場景,幫助開發者根據實際需求選擇合適的工具。最後,文章還將回顧 ksqlDB 的演進歷程,以及它與 Kafka Streams 的整合方式,讓讀者更深入地理解 ksqlDB 的核心概念和運作機制。

重構 Digital Twin Processor 步驟

在重構 Stateful Digital Twin Processor 步驟時,我們首先需要建立一個 StoreBuilder 來建立一個持久化的 KeyValueStore,用於儲存 Digital Twin 的資料。

建立 KeyValueStore

StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("digital-twin-store"),
        Serdes.String(),
        JsonSerdes.DigitalTwin());
builder.addStateStore(storeBuilder);

內容解密:

  1. Stores.keyValueStoreBuilder 用於建立一個 KeyValueStore 的構建器。
  2. Stores.persistentKeyValueStore("digital-twin-store") 指定了儲存的名稱和型別,這裡使用的是持久化的 KeyValue 儲存。
  3. Serdes.String()JsonSerdes.DigitalTwin() 分別指定了鍵和值的序列化/反序列化器。
  4. builder.addStateStore(storeBuilder) 將建立的 storeBuilder 新增到 StreamsBuilder 中。

選擇 Processor 或 Transformer

在重構 Digital Twin Processor 步驟時,我們需要決定是使用 Processor 還是 Transformer。由於我們需要將結果傳送到下游處理器,因此選擇使用 transformValues 操作,因為它允許我們傳回一個值並繼續處理管道。

使用 transformValues 操作

首先,我們需要實作 ValueTransformerWithKey 介面,並將原有的 Processor 邏輯遷移到 transform 方法中。

public class DigitalTwinValueTransformerWithKey 
    implements ValueTransformerWithKey<String, TurbineState, DigitalTwin> {
  
  @Override
  public void init(ProcessorContext context) {
    // 初始化邏輯
  }
  
  @Override
  public DigitalTwin transform(String key, TurbineState value) {
    // 轉換邏輯
    return digitalTwin;
  }
  
  @Override
  public void close() {
    // 清理邏輯
  }
  
  public void enforceTtl(Long timestamp) {
    // TTL 邏輯
  }
}

內容解密:

  1. DigitalTwinValueTransformerWithKey 實作了 ValueTransformerWithKey 介面,用於對輸入記錄的值進行轉換。
  2. transform 方法接收鍵和值,並傳回轉換後的值。
  3. 不再需要使用 context.forward 傳送記錄到下游處理器,而是直接傳回轉換後的值。

新增 transformValues 操作到管道

highWinds
    .transformValues(DigitalTwinValueTransformerWithKey::new, "digital-twin-store")
    .to("digital-twins", Produced.with(Serdes.String(), JsonSerdes.DigitalTwin()));

內容解密:

  1. transformValues 操作使用 DigitalTwinValueTransformerWithKey 的例項對 highWinds 流的值進行轉換。
  2. 指定了狀態儲存的名稱 "digital-twin-store"
  3. 結果被傳送到 "digital-twins" 主題,使用指定的序列化器。

對比兩種實作方式

Processor API onlyDSL + Processor API
使用純粹的 Processor API 構建 Topology結合 DSL 和 Processor API 構建 Topology
需要手動管理每個處理步驟和狀態儲存使用 DSL 簡化流處理步驟的管理

對比分析

兩種實作方式都可以達到相同的目標,但它們在程式碼的可讀性和維護性方面有所不同。純粹的 Processor API 需要更多的手動管理和組態,但提供了更細粒度的控制。結合 DSL 和 Processor API 的方式則利用了 DSL 的簡潔性和 Processor API 的靈活性。

使用混合DSL與Processor API的優勢

在串流處理應用程式的開發中,混合使用Kafka Streams的DSL(Domain Specific Language)與Processor API能夠帶來多項好處。這些好處包括:

  • 簡化資料流程的理解:透過連結運算元(operators),可以更容易地建立資料流程的思維地圖,而不必使用節點名稱和父名稱來定義處理器之間的關係。
  • Lambda支援:DSL中的大多數運算元都支援lambda表示式,這對於簡潔的轉換操作非常有益。相較之下,Processor API需要實作Processor介面,即使是簡單的操作也需要這樣做,這可能顯得繁瑣。
  • 簡化鍵值重新對映:在Processor API中,簡單的鍵值重新對映操作需要實作Processor介面,並且需要手動處理寫入中間重新分割槽主題的過程,這增加了程式碼的複雜性。
  • 標準化的詞彙:DSL運算元提供了一套標準化的詞彙,用於定義串流處理步驟中發生的事情。例如,從flatMap運算元可以推斷出它可能會產生與輸入不同的記錄數量,而無需瞭解具體的計算邏輯。相反,Processor API使得掩蓋特定Processor實作的本質變得容易,這會損害程式碼的可讀性並對維護產生負面影響。
  • 統一的串流型別詞彙:DSL還為不同型別的串流提供了一套共同的詞彙,包括純記錄串流、本地聚合串流(通常稱為表格)和全域聚合串流(稱為全域表格)。

ksqlDB簡介

ksqlDB的故事是一個簡化和演化的過程。它與Kafka Streams有著相同的目標:簡化構建串流處理應用程式的過程。然而,隨著ksqlDB的發展,其目標變得更加遠大。它不僅簡化了構建串流處理應用程式的方式,還簡化了這些應用程式與其他系統(包括Kafka外部的系統)的整合方式。所有這些都透過SQL介面實作,使得初學者和專家都能輕鬆利用Kafka的力量。

為什麼同時需要Kafka Streams和ksqlDB?

事實上,Kafka Streams和ksqlDB都是非常優秀的工具,可以互相補充。您可以使用ksqlDB構建可以用SQL表達的串流處理應用程式,並使用單一工具輕鬆設定資料來源和接收器,以建立端對端的資料處理管道。另一方面,您可以使用Kafka Streams構建更複雜的應用程式,並且您對該函式庫的瞭解將加深您對ksqlDB的理解,因為ksqlDB實際上是建立在Kafka Streams之上的。

SQL語法表示法

本文的第三部分將包含多個語法參考,用於描述如何在ksqlDB中構建特定的SQL陳述式。例如,用於列出ksqlDB伺服器上執行中的查詢的語法參考如下:

{ SHOW | LIST } QUERIES [EXTENDED];

我們將使用的SQL語法表示法如下:

  • 方括號 [ ] 包圍可選的元素或子句。
  • 大括號 { } 包圍一組可選的選項。
  • 圓括號 ( ) 是字面上的圓括號。
  • 豎線 | 代表邏輯或。
  • 由逗號前導的省略號 [, ... ] 表示前面的專案可以在逗號分隔的列表中重複。

ksqlDB語法範例

根據語法參考,我們可以看到我們的陳述式可以以SHOWLIST開頭,並且可以選擇性地在結尾附加EXTENDED關鍵字。因此,以下是一個有效的ksqlDB陳述式:

SHOW QUERIES;

ksqlDB 簡介與應用

ksqlDB 是由 Confluent 在 2017 年發布的開源事件流資料函式庫,簡化了串流處理應用程式的建立、佈署和維護。ksqlDB 結合了 Kafka 生態系統中的兩個專門元件(Kafka Connect 和 Kafka Streams),並提供高階 SQL 介面與這些元件互動。

ksqlDB 的功能與特點

  • 使用 SQL 將資料建模為流或表(在 ksqlDB 中被視為集合)
  • 套用多種 SQL 建構式(如連線、聚合、轉換、過濾和視窗化資料)建立新的衍生資料表示,而無需編寫 Java 程式碼
  • 使用 推播查詢(Push Queries) 查詢流和表,這些查詢會持續執行並在有新資料時將結果推播給客戶端
  • 從流和表中建立 物化檢視(Materialized Views),並使用 提取查詢(Pull Queries) 查詢這些檢視
  • 定義聯結器以將 ksqlDB 與外部資料儲存整合,實作端對端的串流 ETL 管道

何時使用 ksqlDB

ksqlDB 提供了許多好處,包括更互動的工作流程、更少的程式碼維護、更低的進入門檻、簡化的架構和更高的開發效率。這些好處使得 ksqlDB 成為許多專案的理想選擇,尤其是那些可以使用 SQL 簡潔表達串流處理應用程式的專案。

使用 ksqlDB 的優勢

  • 更互動的工作流程,藉助內建的 CLI 和 REST 服務來提交查詢
  • 更少的程式碼維護,因為串流處理拓撲使用 SQL 而非 JVM 語言表達
  • 更低的進入門檻,尤其是對於熟悉傳統 SQL 資料函式庫但新手於串流處理的人員
  • 簡化的架構,因為管理聯結器和轉換資料的介面被整合到一個系統中
  • 更高的開發效率,因為使用 SQL 表達串流處理應用程式所需的程式碼更少

與 Kafka Streams 的比較

雖然 ksqlDB 適合許多專案,但仍有一些情況下 Kafka Streams 更為合適。例如:

  • 需要更低階的應用程式狀態存取
  • 需要對資料執行週期性函式
  • 需要處理 ksqlDB 不支援的資料格式
  • 需要更彈性的應用程式效能監控(例如分散式追蹤、自訂指標收集等)
  • 有很多不易用 SQL 表達的業務邏輯

總之,ksqlDB 是一種功能強大的工具,能夠簡化串流處理應用程式的開發和維護。然而,在選擇使用 ksqlDB 或 Kafka Streams 時,需要根據專案的具體需求和限制進行評估。

ksqlDB 的安裝與執行

要開始使用 ksqlDB,首先需要了解如何安裝和執行它。ksqlDB 提供了多種發行選項,包括官方支援的 Docker 映像檔。對於那些希望簡化生產環境佈署的人,還有完全託管的雲端 ksqlDB 服務(例如 Confluent Cloud)可供選擇。

-- 建立一個簡單的 ksqlDB 流
CREATE STREAM my_stream (id INT, name VARCHAR) 
  WITH (kafka_topic='my_topic', value_format='json');

內容解密:

此 SQL 陳述式建立了一個名為 my_stream 的 ksqlDB 流,該流對映到 Kafka 主題 my_topic。該流有兩個欄位:idname,分別代表整數和字串型別。WITH 子句指定了 Kafka 主題名稱和值的格式(在此例中為 JSON)。

-- 對 my_stream 執行推播查詢
SELECT * FROM my_stream EMIT CHANGES;

內容解密:

此查詢會持續監控 my_stream 流,並在有新資料到達時輸出變更。這是推播查詢的一個例子,它允許客戶端即時接收更新。

-- 建立物化檢視
CREATE TABLE my_table AS 
  SELECT id, COUNT(*) AS count 
  FROM my_stream 
  GROUP BY id;

內容解密:

此陳述式建立了一個名為 my_table 的物化檢視,該檢視根據 my_stream 流中的 id 欄位進行分組,並計算每個 id 的出現次數。該檢視會持續更新,以反映流中的變更。

-- 對 my_table 執行提取查詢
SELECT * FROM my_table WHERE id = 1;

內容解密:

此查詢會從 my_table 物化檢視中檢索 id 為 1 的記錄。這是提取查詢的一個例子,它允許客戶端根據特定條件查詢檢視中的資料。

ksqlDB 的演進與 Kafka Streams 整合

瞭解 ksqlDB 如何隨著時間演進以及它所獲得的功能是非常有幫助的。雖然 ksqlDB 的演進過程很有趣,但本章節的目的不僅僅是回顧其歷史。由於 ksqlDB 曾經以不同的名稱(KSQL)為人所知,瞭解某些功能何時被引入,可以幫助區分不同世代的這項技術。

首先,我們來看看 ksqlDB 如何隨著時間推移改進其 Kafka Streams 整合,以及 Kafka Streams 如何支援 ksqlDB 最基本的功能之一:查詢資料。

Kafka Streams 整合

在 ksqlDB 的前兩年,它被稱為 KSQL。早期的開發重點在於其核心功能:一個能夠解析和編譯 SQL 陳述式成為完整的流處理應用程式的流式 SQL 引擎。在這個早期的演化形式中,KSQL 在概念上是傳統 SQL 資料函式庫和 Kafka Streams 的混合體,從關係型資料函式庫(RDBMS)借鑑了功能,同時使用 Kafka Streams 進行流處理層的重任。如圖 8-1 所示。

圖 8-1:ksqlDB 的第一階段演進結合了 Kafka Streams 與傳統 SQL 資料函式庫的功能,包括 SQL 介面

此圖示展示了 ksqlDB 如何將 Kafka Streams 與傳統 SQL 資料函式庫的功能結合起來。

KSQL 從 RDBMS 演化樹中借鑑的最顯著功能是 SQL 介面。這消除了在 Kafka 生態系統中建立流處理應用程式的語言障礙,因為使用者不再需要使用像 Java 或 Scala 這樣的 JVM 語言來使用 Kafka Streams。

為什麼選擇 SQL

SQL 本身是經過多輪產品開發和簡化後的產物。從永續性資料儲存中檢索資料曾經需要使用複雜語言編寫冗長的程式。然而,透過使用數學符號和宣告式結構,關係型語言變得更加緊湊和高效。額外的語言最佳化使得新的語言——SQL,能夠讀起來更像自然英語。

經過多輪簡化後,我們最終得到了一種高度可存取的語言,用於查詢永續性資料儲存,直到今天仍廣受歡迎。透過將 SQL 適應流式使用案例,ksqlDB 立即實作了使用經典 SQL 的相同好處:

  • 簡潔且富有表達力的語法,讀起來像英文
  • 宣告式程式設計風格
  • 低學習曲線

雖然 SQL 語法受到 ANSI SQL 的啟發,但仍需要一種特殊的 SQL 方言來對資料進行流和表的建模。傳統的 SQL 資料函式庫主要關注後者,沒有對無界資料集(流)的原生支援。這通常表現為經典 SQL 中的兩種型別的陳述式:

  • 經典的 DDL(資料定義語言)陳述式專注於建立和銷毀資料函式庫物件(通常是表,但有時是資料函式庫、檢視等):
CREATE TABLE users ...;
DROP TABLE users;

內容解密:

此段程式碼展示瞭如何使用 DDL 陳述式建立和刪除資料函式庫中的表。CREATE TABLE 用於建立一個新表,而 DROP TABLE 用於刪除一個現有的表。

  • 經典的 DML(資料操縱語言)陳述式專注於讀取和操縱表中的資料:
SELECT username from USERS;
INSERT INTO users (id, username) VALUES(2, "Izzy");

內容解密:

此段程式碼展示瞭如何使用 DML 陳述式查詢和修改表中的資料。SELECT 陳述式用於從表中檢索資料,而 INSERT INTO 陳述式用於向表中插入新資料。

在 KSQL(以及後來的 ksqlDB)中實作的 SQL 方言擴充套件了經典 SQL 以支援流。下一章將詳細探討擴充套件的 DDL 和 DML 陳述式,但正如您可能預期的那樣,CREATE TABLEDROP TABLE 陳述式有流等效陳述式(CREATE STREAMDROP STREAM),並且擴充套件的 DML 支援查詢流和表。

ksqlDB 的演進

早期形式的 KSQL 主要使用 Kafka Streams 來支援推查詢(push query)。這些是不斷執行的查詢,可以針對流或表執行,並且只要有新資料可用,就會向客戶端傳送(或推播)結果。推查詢的資料流程如圖 8-2 所示。

圖 8-2:推查詢自動傳送結果,只要有新資料可用;這允許應用程式/客戶端監聽新資料

此圖示展示了推查詢如何在新資料可用時自動向客戶端傳送結果。

隨著 SQL 引擎變得更加先進,當 KSQL 更名為 ksqlDB 時,一個重要的功能隨之而來:執行拉查詢(pull query)的能力。拉查詢的生命週期更類別似於傳統資料函式庫中的查詢,因為它們是用於執行鍵值查詢的短期查詢。在底層,它們利用 Kafka Streams 和狀態儲存。如您在第 4 章中所回憶的那樣,狀態儲存是本地嵌入式鍵值儲存,通常由 RocksDB 提供支援。圖 8-3 展示了 ksqlDB 中可用的推查詢和拉查詢的資料流程。

圖 8-3:ksqlDB 中的推查詢和拉查詢

此圖示展示了 ksqlDB 如何支援推查詢和拉查詢,並利用 Kafka Streams 和狀態儲存來執行這些查詢。