返回文章列表

ksqlDB 事件流資料函式庫整合與架構解析

ksqlDB 作為事件流資料函式庫,整合 Kafka Streams 與 Kafka Connect,提供根據 SQL 的串流資料處理能力。本文探討 ksqlDB 架構、與 Kafka 生態圈整合方式,以及如何定義來源聯結器、執行 SQL 查詢等。同時,比較 ksqlDB 與傳統 SQL 資料函式庫的異同,包含

資料函式庫 串流處理

ksqlDB 建構於 Kafka Streams 之上,並透過 SQL 語法提供串流資料的即時處理能力。不同於傳統資料函式庫的提取式查詢,ksqlDB 支援推播式查詢,可即時反應資料變化。透過 Kafka Connect,ksqlDB 能輕易整合外部資料來源,簡化資料管線的建置。本文除了介紹 ksqlDB 的核心架構與運作原理,也涵蓋如何使用 SQL 定義串流、表格,以及如何透過 Kafka Connect 建立與外部系統的連線。更進一步,我們將探討 ksqlDB 與傳統 SQL 資料函式庫的差異,例如綱要管理、一致性模型等,並說明 ksqlDB 的不同佈署模式與操作方式,讓讀者能更全面地理解 ksqlDB 的應用場景和優勢。

ksqlDB 的架構與整合能力

ksqlDB 是一種事件流資料函式庫,它結合了 Kafka Streams 和 Kafka Connect 的強大功能,提供了一種新的資料處理方式。讓我們探討 ksqlDB 的架構和整合能力。

ksqlDB 的架構

ksqlDB 的架構建立在 Kafka Streams 和其自身的 SQL 引擎之上。這使得 ksqlDB 可以支援推式(push)和拉式(pull)查詢。推式查詢允許客戶端即時接收資料更新,而拉式查詢則允許客戶端主動向 ksqlDB 請求資料。

Kafka Connect 整合

ksqlDB 的 Kafka Connect 整合功能使得使用者可以輕鬆地將外部資料來源整合到 ksqlDB 中。透過這個整合,使用者可以使用 SQL 陳述式定義來源和匯出聯結器,從而簡化了資料整合的過程。

定義來源聯結器

CREATE SOURCE CONNECTOR `jdbc-connector` WITH (
  "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
  "connection.url"='jdbc:postgresql://localhost:5432/my.db',
  "mode"='bulk',
  "topic.prefix"='jdbc-',
  "table.whitelist"='users',
  "key"='username'
);

內容解密:

這段程式碼定義了一個名為 jdbc-connector 的來源聯結器,用於從 PostgreSQL 資料函式庫中讀取資料。其中:

  • "connector.class" 指定了聯結器的類別。
  • "connection.url" 指定了資料函式庫的連線 URL。
  • "mode" 指定了資料讀取模式,這裡使用的是批次模式。
  • "topic.prefix" 指定了 Kafka 主題的字首。
  • "table.whitelist" 指定了要讀取的資料表名稱。
  • "key" 指定了資料表的主鍵。

與傳統 SQL 資料函式庫的比較

儘管 ksqlDB 是一種事件流資料函式庫,但它與傳統 SQL 資料函式庫有許多相似之處。兩者都支援 SQL 語法、DDL 和 DML 陳述式、網路服務和客戶端、結構描述和內建函式等。

相似之處

  • SQL介面:ksqlDB 和傳統 SQL 資料函式庫都使用 SQL 語言進行資料操作。
  • DDL 和 DML 陳述式:兩者都支援用於定義和管理資料函式庫物件的 DDL 陳述式,以及用於讀取和操作資料的 DML 陳述式。
  • 網路服務和客戶端:ksqlDB 和傳統 SQL 資料函式庫都提供網路服務和客戶端工具,用於提交查詢和管理資料。
  • 結構描述:兩者都使用結構描述來定義資料的欄位名稱和型別。
  • 內建函式和運算元:ksqlDB 和許多傳統 SQL 資料函式庫都提供豐富的內建函式和運算元,用於資料轉換和操作。

ksqlDB:事件流資料函式庫的特性與差異

ksqlDB 是一種專為事件流資料設計的資料函式庫系統,它結合了傳統 SQL 資料函式庫的特性與事件流處理的能力。讓我們探討 ksqlDB 的特點及其與其他資料函式庫系統的差異。

資料處理能力

ksqlDB 提供了一系列豐富的函式,包括字串函式、數學函式、時間函式、表格函式、地理空間函式等。同時,它也支援多種運算子號,如 +, -, /, *, %, || 等。此外,ksqlDB 還提供了一個可擴充的介面,允許使用者使用 Java 自定義函式。

// 使用 Java 自定義函式的範例
public class CustomFunction {
    public static int add(int a, int b) {
        return a + b;
    }
}

內容解密:

  • 上述 Java 程式碼定義了一個名為 CustomFunction 的類別,其中包含一個名為 add 的靜態方法,用於將兩個整數相加。
  • 在 ksqlDB 中,可以透過可擴充介面註冊此自定義函式,從而在 SQL 查詢中使用。
  • 這種機制使得使用者能夠根據特定需求擴充 ksqlDB 的功能。

資料複製

ksqlDB 繼承了 Kafka 和 Kafka Streams 的資料複製策略。在互動模式下,ksqlDB 還使用根據陳述式的複製機制,將查詢寫入內部主題(command topic),以確保叢集中的多個節點能夠處理和執行相同的查詢。

與傳統 SQL 資料函式庫的差異

儘管 ksqlDB 與傳統 SQL 資料函式庫有許多相似之處,但它們之間也存在一些關鍵差異:

  1. 增強的 DDL 和 DML 陳述式:ksqlDB 的 SQL 方言支援對流和表格的建模和查詢,並且引入了聯結器(connectors)這一新的資料函式庫物件。

  2. 推播查詢(Push Queries):與傳統 SQL 資料函式庫不同,ksqlDB 支援持續查詢,能夠在新資料到達時發出結果。

    -- 推播查詢範例
    SELECT * FROM my_stream EMIT CHANGES;
    

    內容解密:

    • 上述 SQL 陳述式對 my_stream 執行了一個持續查詢,並在新資料到達時發出變更。
    • EMIT CHANGES 關鍵字表示這是一個推播查詢,會持續輸出結果。
  3. 簡單的查詢能力:ksqlDB 專注於查詢預先維護的物化檢視,適合於串流 ETL、物化快取和事件驅動微服務等場景。

  4. 更完善的綱要管理策略:ksqlDB 支援使用 SQL 定義綱要,同時也能夠將綱要儲存在獨立的綱要登入檔(Schema Registry)中,以實作綱要演進支援和相容性保證。

    -- 定義綱要範例
    CREATE STREAM my_stream (id INT, name VARCHAR) WITH (kafka_topic='my_topic', value_format='json');
    

    內容解密:

    • 上述 SQL 陳述式建立了一個名為 my_stream 的流,並定義了其綱要結構,包括 idname 兩個欄位。
    • WITH 子句指定了相關的 Kafka 主題和值格式。
  5. ANSI 啟發的 SQL,但不完全相容:ksqlDB 的 SQL 方言引入了一些非標準 SQL 的建構,以支援事件流處理。

  6. 高用性、容錯性和容錯移轉:這些特性被深度整合到 ksqlDB 中,並且具有高度可組態性。

  7. 本地和遠端儲存:ksqlDB 中的資料儲存在 Kafka 中,而表格資料則物化在本地狀態儲存中,這提供了計算與資料共置的效能優勢。

  8. 一致性模型:ksqlDB 遵循最終一致性和非同步一致性模型,而許多傳統系統則更接近於 ACID 模型。

ksqlDB 的最終評價與架構解析

在探討了 ksqlDB 和傳統 SQL 資料函式庫之間的相似性和差異之後,我們現在來看看 ksqlDB 的最終評價。ksqlDB 具備許多傳統資料函式庫的特性,但它並不旨在取代它們。ksqlDB 是一種高度專門化的工具,適用於串流使用案例,並且當您需要其他系統的功能時,ksqlDB 的 Kafka Connect 整合將幫助您將任何豐富、轉換或以其他方式處理的資料移動到您選擇的資料儲存中。

架構

由於 ksqlDB 是建立在 Kafka Streams 之上,因此您可以參考第 2 章中關於串流架構的討論,以更深入地瞭解 Kafka Streams 整合的工作原理。本文重點介紹 ksqlDB 架構中特定的元件,這些元件分為兩大類別:ksqlDB 伺服器和 ksqlDB 使用者端。

ksqlDB 伺服器

ksqlDB 伺服器負責執行您的串流處理應用程式(在 ksqlDB 中,這將是一組共同解決業務問題的查詢)。每個伺服器在概念上都類別似於 Kafka Streams 應用程式的單一例項,並且工作負載(由查詢集建立)可以透過相同的 ksql.service.id 組態分佈在多個 ksqlDB 伺服器上。與 Kafka Streams 應用程式一樣,ksqlDB 伺服器與 Kafka 叢集分開佈署(通常在與代理本身不同的機器/容器上)。

一組協作的 ksqlDB 伺服器被稱為 ksqlDB 叢集,通常建議在叢集層級隔離單一應用程式的工作負載。例如,圖 8-5 顯示了兩個 ksqlDB 叢集,每個叢集都有不同的服務 ID,執行可獨立擴充套件和管理的隔離工作負載。

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title ksqlDB 伺服器

rectangle "Service ID 1" as node1
rectangle "Service ID 2" as node2
rectangle "Data" as node3

node1 --> node2
node2 --> node3

@enduml

圖示說明

此圖示展示了兩個 ksqlDB 叢集如何獨立處理來自 Kafka 的資料。

當您需要為您的 ksqlDB 叢集新增容量時,您可以佈署更多的 ksqlDB 伺服器。您也可以隨時透過移除 ksqlDB 伺服器來縮減叢集規模。由於具有相同服務 ID 的 ksqlDB 伺服器是同一消費者群組的成員,因此 Kafka 自動處理工作重新分配/分佈,因為新的 ksqlDB 伺服器被新增或移除(移除可能是手動或自動的,例如,由於系統故障)。

每個 ksqlDB 伺服器由兩個子元件組成:SQL 引擎和 REST 服務。我們將在以下章節中討論這些單獨的元件。

SQL 引擎

SQL 引擎負責解析 SQL 陳述式,將其轉換為一個或多個 Kafka Streams拓撲,並最終執行 Kafka Streams 應用程式。此過程的視覺化如圖 8-6 所示。

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖示說明

rectangle "ANTLR" as node1
rectangle "ksqlDB" as node2
rectangle "Execution" as node3

node1 --> node2
node2 --> node3

@enduml
圖示說明

此圖示展示了 SQL 陳述式如何被轉換為 Kafka Streams 拓撲並執行。

解析器本身使用一個名為 ANTLR 的工具,該工具將 SQL 陳述式轉換為抽象語法樹(AST),其中樹中的每個節點代表輸入查詢中識別的短語或標記。ksqlDB 存取解析樹中的每個節點,並使用它找到的標記建立 Kafka Streams 拓撲。例如,如果您在查詢中包含 WHERE 子句,ksqlDB 知道在為您的查詢建立底層 Kafka Streams 拓撲時需要使用無狀態篩選運算元。同樣,如果您的查詢包含連線條件(例如,LEFT JOIN),ksqlDB 將為您將左連線運算元新增至拓撲。源處理器由 FROM 值決定,SELECT 陳述式用於投影。

一旦引擎建立了執行查詢所需的必要處理器拓撲,它就會實際執行所產生的 Kafka Streams 應用程式。現在,讓我們來看看將查詢傳遞給 SQL 引擎所需的元件:REST 服務。

REST 服務

ksqlDB 包含一個 REST 介面,允許使用者端與 SQL 引擎互動。它主要由 ksqlDB CLI、ksqlDB UI 和其他使用者端用於向引擎提交查詢(即以 SELECT 開頭的 DML 陳述式),執行其他型別的陳述式(例如,DDL 陳述式),檢查叢集狀態/健康狀況等。預設情況下,它監聽埠 8088,並透過 HTTP 通訊,但您可以使用 listeners 組態更改端點,並使用 ssl 組態啟用透過 HTTPS 的通訊。兩組組態如下所示:

listeners=http://0.0.0.0:8088
ssl.keystore.location=/path/to/ksql.server.keystore.jks
ssl.keystore.password=...
ssl.key.password=...

REST API 使用範例

curl -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d $'{
"ksql": "SELECT USERNAME FROM users EMIT CHANGES;",
"streamsProperties": {}
}'

程式碼說明

此範例展示瞭如何使用 curl 命令直接與 ksqlDB 的 REST API 互動,以提交查詢。

#### 內容解密:

此 curl 命令用於向 ksqlDB 的 REST API 提交一個查詢請求。其中:

  • -X "POST" 指定了 HTTP 請求方法為 POST。
  • "http://localhost:8088/query" 是 ksqlDB 的 REST API 端點,用於提交查詢。
  • -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" 設定了請求的內容型別為 JSON,並指定了字元編碼為 UTF-8。
  • -d $'{...}' 指定了請求的主體內容,即要執行的 ksqlDB 查詢及其相關屬性。

直接與 API 互動時,您應參考最新的 REST API 參考檔案。ksqlDB 仍在快速發展中,根據已提交至 ksqlDB 專案的一些設計提案,我預計這將是未來會發生變化的領域之一。本文中的大多數範例將透過官方支援的使用者端間接使用 API,我們將在下一節中討論這些使用者端。

ksqlDB 使用者端

在上一節中,我們瞭解到 ksqlDB 伺服器包含一個用於提交查詢和檢索有關 ksqlDB叢集資訊的 REST 介面。我們還瞭解到,我們可以使用 curl 或自定義使用者端與 REST 服務互動,但對於大多數使用案例,您可能希望使用官方使用者端之一與 ksqlDB 伺服器互動。在本文中,我們將從 ksqlDB CLI 開始,瞭解這些使用者端。

ksqlDB CLI

ksqlDB CLI是一個命令列應用程式,允許您與執行的 ksqlDB 伺服器互動。它是實驗性使用 ksqlDB 的絕佳工具,因為它允許您…

ksqlDB 基礎介紹與操作模式

ksqlDB 提供多種互動方式,包括命令列介面(CLI)和使用者介面(UI),讓使用者能夠以互動方式提交查詢、檢查主題、調整 ksqlDB 組態等。ksqlDB 以 Docker 映像檔(confluentinc/ksqldb-cli)形式發布,也包含在 Confluent 平台中(完全託管於 Confluent Cloud 或透過自我管理的佈署)。

使用 CLI 操作 ksqlDB

要呼叫 CLI,需要執行 ksql 命令並指定 ksqlDB 伺服器的主機/埠組合(對應於 listeners 組態)。命令如下:

ksql http://localhost:8088

執行 ksql 命令後,您將進入類別似以下的提示符號:

===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.14.0, Server v0.14.0 located at http://ksqldb-server:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>

ksqlDB UI

Confluent 平台還包含一個與 ksqlDB 互動的 UI。此 UI 是一個商業功能,您可以在商業授權版本的 Confluent 平台和 Confluent Cloud(在完全託管的雲端環境中執行 Confluent 平台)中找到它。除了能夠從根據 Web 的編輯器提交查詢之外,您還可以視覺化資料流、使用 Web 表單建立流和表、檢視執行中的查詢列表等。

圖 8-7:Confluent Cloud 中的 ksqlDB UI

佈署模式

ksqlDB 支援兩種不同的佈署模式,取決於您希望與執行中的 ksqlDB 伺服器進行的互動程度。本文將描述每種佈署模式,並討論何時使用它們。

互動模式

在互動模式下執行 ksqlDB 時,使用者端可以隨時透過 REST API 提交新查詢。如名稱所示,這會帶來互動式體驗,允許 ksqlDB 伺服器隨意建立和刪除流、表、查詢和聯結器。

圖 8-8:以互動模式執行的 ksqlDB,允許客戶端(例如 ksqlDB CLI、UI、Java 使用者端或甚至 curl)透過 REST API 提交查詢

互動模式是 ksqlDB 的預設佈署模式,您不需要任何特殊組態即可在此模式下執行。但是,如果您想停用互動模式,則需要以無頭模式執行,這確實需要特殊組態。

無頭模式

在某些情況下,您可能不希望使用者端與您的 ksqlDB 群集進行互動式查詢。例如,如果您想鎖定生產環境佈署,可以在無頭模式下執行(這會停用 REST API),以確保不會對正在執行的查詢進行任何更改。要在無頭模式下執行,您只需建立一個包含您希望 SQL 引擎執行的任何持久查詢的檔案,並使用 queries.file ksqlDB 伺服器組態指定該檔案的路徑。

圖 8-9:以無頭模式執行的 ksqlDB

教程

在本文中,我們將透過一個非常簡單的“Hello, world”教程來開始使用 ksqlDB。這將涉及使用 ksqlDB 構建一個簡單的流處理應用程式,該程式將向我們寫入名為 users 的 Kafka 主題的每個使用者打招呼。

安裝 ksqlDB

有多種方法可以開始使用 ksqlDB。最流行的選項列在下表中:

安裝方法連結註解
下載 Confluent 平台https://www.confluent.io/downloadksqlDB 是自我管理版本的 Confluent 平台中的社群許可軟體元件
使用 Confluent Cloudhttps://confluent.cloud無需下載,只需建立帳戶
下載並執行官方 Docker 映象ksqldb-server:https://hub.docker.com/r/confluentinc/ksqldb-server
ksqldb-cli:https://hub.docker.com/r/confluentinc/ksqldb-cli
需要單獨執行所有依賴項和相關軟體(Kafka、Schema Registry、ksqlDB CLI)
從 GitHub 克隆開源儲存函式庫並從原始碼構建https://github.com/confluentinc/ksql這是最複雜的選項

使用 Docker 安裝 ksqlDB 的程式碼範例

docker run -d --name ksqldb-server \
  -p 8088:8088 \
  -e KSQL_LISTENERS=http://0.0.0.0:8088 \
  -e KSQL_BOOTSTRAP_SERVERS=<Kafka Broker>:9092 \
  confluentinc/ksqldb-server:latest

內容解密:

此段落程式碼展示瞭如何使用 Docker 執行 ksqlDB 伺服器。其中:

  • -d 表示以分離模式執行容器。
  • --name ksqldb-server 指定了容器的名稱。
  • -p 8088:8088 將容器的 8088 埠對映到主機的 8088 埠。
  • -e KSQL_LISTENERS=http://0.0.0.0:8088 設定 ksqlDB 的監聽地址和埠。
  • -e KSQL_BOOTSTRAP_SERVERS=<Kafka Broker>:9092 設定 Kafka Broker 的地址和埠,需要替換 <Kafka Broker> 為實際的 Kafka Broker 地址。
  • confluentinc/ksqldb-server:latest 指定了要執行的 Docker 映象名稱和標籤。

這個命令允許您快速啟動一個 ksqlDB 伺服器例項,並與 Kafka 等其他元件進行整合,是開始使用 ksqlDB 的便捷方式。