返回文章列表

KsqlDB 資料處理入門

本介紹如何使用 ksqlDB 進行資料處理,涵蓋專案設定、ksqlDB 基礎、自訂型別、集合、資料表與資料流的建立與管理等核心概念。文章將引導讀者設定開發環境,瞭解 ksqlDB 的基本操作,並逐步學習如何建立及管理資料表和資料流,最後說明如何使用 SQL 陳述式進行資料處理與分析。

資料函式庫 串流處理

ksqlDB 簡化了 Kafka 串流資料的處理流程,讓開發者能以熟悉的 SQL 語法進行即時資料分析。本將引導讀者從專案設定開始,逐步瞭解 ksqlDB 的核心概念與操作方式。首先,我們會使用 Docker Compose 建立包含 Kafka、Schema Registry、ksqlDB 伺服器和 ksqlDB CLI 的開發環境。接著,將介紹如何連線 ksqlDB CLI 並執行 SQL 陳述式。文章會探討 ksqlDB 的資料型別、自訂型別以及集合的概念,並以實際案例說明如何建立和管理資料表和資料流,包含主鍵設定、資料格式指定,以及時間戳欄位的運用。最後,我們將會探討如何使用 SHOWDESCRIBEALTERDROP 等指令來管理 ksqlDB 中的資料流和資料表,讓讀者能有效地監控和維護 ksqlDB 應用程式。

專案設定與 ksqlDB 基礎

本章節程式碼位於 https://github.com/mitch-seymour/mastering-kafka-streams-and-ksqldb.git。若需參考程式碼,請複製儲存函式庫並切換至本章節的教學目錄。以下命令可完成此操作:

$ git clone [email protected]:mitch-seymour/mastering-kafka-streams-and-ksqldb.git
$ cd mastering-kafka-streams-and-ksqldb/chapter-10/

本教學使用 Docker Compose 啟動應用程式所需的各個元件,包括 Kafka、Schema Registry、ksqlDB 伺服器和 ksqlDB CLI。只需在複製儲存函式庫後執行以下命令,即可啟動所有元件:

docker-compose up

本章討論的 SQL 陳述式將在 ksqlDB CLI 中執行。您可以使用以下命令登入 ksqlDB CLI:

docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088

來源主題

當您擁有 Kafka 主題中的資料並希望使用 ksqlDB 處理時,首先需要檢視來源主題中的資料,並決定如何在 ksqlDB 中建模資料。在本教學中,我們有兩個主要來源主題:titlesproduction_changes。每個來源主題中的範例記錄如下表所示:

來源主題範例記錄
titles{"id": 1, "title": "Stranger Things", "on_schedule": false}
production_changes{"uuid": 1, "title_id": 1, "change_type": "season_length", "before": {"season_id": 1, "episode_count": 12}, "after": {"season_id": 1, "episode_count": 8}, "created_at": "2021-02-08 11:30:00"}

資料型別

在 ksqlDB 中,有多種內建資料型別可供使用,如下表所示:

資料型別描述
ARRAY<element-type>相同型別的元素集合(例如 ARRAY<STRING>
BOOLEAN布林值
INT32 位元帶符號整數
BIGINT64 位元帶符號整數
DOUBLE雙精確度(64 位元)IEEE 754 浮點數
DECIMAL(precision, scale)可組態總位數(精確度)和小數點右側位數(刻度)的浮點數
MAP<key-type, element-type>包含鍵和值的物件,每個鍵和值都與資料型別相符(例如 MAP<STRING, INT>
STRUCT<field-name field-type [, ...]>結構化的欄位集合(例如 STRUCT<FOO INT, BAR BOOLEAN>
VARCHARSTRINGUnicode 字元序列(UTF8)

對於某些序列化格式(例如 AVRO、PROTOBUF 和 JSON_SR),資料型別是可選的,因為 Schema Registry 已經儲存了欄位名稱和型別。因此,在 CREATE 陳述式中指定資料型別是多餘的。但是,當需要指定主鍵時,仍需在 CREATE 陳述式中指定主鍵欄位。

例如,如果我們的 titles 資料格式化為 AVRO,且相關的 Avro 結構描述儲存在 Schema Registry 中,我們可以使用以下任一 CREATE 陳述式來建立我們的 titles 表格:

-- 明確指定資料型別
CREATE TABLE titles (
    id INT PRIMARY KEY,
    title VARCHAR
) WITH (
    KAFKA_TOPIC='titles',
    VALUE_FORMAT='AVRO',
    PARTITIONS=4
);

-- 推斷資料型別
CREATE TABLE titles (
    id INT PRIMARY KEY
) WITH (
    KAFKA_TOPIC='titles',
    VALUE_FORMAT='AVRO',
    PARTITIONS=4
);

程式碼解析

以下程式碼展示如何使用 ksqlDB 建立表格並指定資料型別:

CREATE TABLE titles (
    id INT PRIMARY KEY,
    title VARCHAR
) WITH (
    KAFKA_TOPIC='titles',
    VALUE_FORMAT='AVRO',
    PARTITIONS=4
);

內容解密

此段程式碼建立了一個名為 titles 的表格,並指定了其主鍵欄位 id 和另一個欄位 titleWITH 子句指定了 Kafka 主題名稱、值格式和分割區數量。

  1. CREATE TABLE titles: 建立一個名為 titles 的表格。
  2. (id INT PRIMARY KEY, title VARCHAR): 指定表格的欄位,包括主鍵欄位 id 和另一個欄位 title
  3. WITH 子句:指定表格的屬性。
    • KAFKA_TOPIC='titles': 指定 Kafka 主題名稱為 titles
    • VALUE_FORMAT='AVRO': 指定值的格式為 AVRO。
    • PARTITIONS=4: 指定分割區數量為 4。

此程式碼用於建立一個 ksqlDB 表格,並將其與 Kafka 主題進行對映,以便進行進一步的資料處理和分析。

ksqlDB 中的自訂型別與集合

ksqlDB 提供了自訂型別的功能,允許使用者定義一組欄位名稱及其相關聯的資料型別,並在後續的 SQL 陳述式中重複使用這個自訂型別。這種功能尤其適用於需要重複使用複雜的資料型別定義的情況。

自訂型別

在 ksqlDB 中,自訂型別(Custom Types)類別似於 PostgreSQL 中的複合型別(Composite Types),允許使用者指定一組欄位名稱及其相關聯的資料型別。以下是一個例子:

假設我們需要捕捉某個資料的變更前後狀態,且這些狀態具有相同的結構。我們可以定義一個名為 season_length 的自訂型別,如下所示:

CREATE TYPE season_length AS STRUCT<season_id INT, episode_count INT>;

內容解密:

  • CREATE TYPE 陳述式用於建立一個新的自訂型別。
  • season_length 是自訂型別的名稱。
  • STRUCT<season_id INT, episode_count INT> 定義了這個自訂型別的結構,包含兩個欄位:season_idepisode_count,它們的資料型別都是整數。

建立自訂型別後,可以使用 SHOW TYPES 陳述式來檢視已註冊的自訂型別:

SHOW TYPES;

輸出結果如下:

Type Name     | Schema
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---
SEASON_LENGTH | STRUCT<SEASON_ID INTEGER, EPISODE_COUNT INTEGER>
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---

如果需要刪除自訂型別,可以使用 DROP TYPE 陳述式:

DROP TYPE season_length;

集合

在 ksqlDB 中,集合(Collections)是流(Streams)和表格(Tables)的總稱。流和表格是 Kafka Streams 和 ksqlDB 中的兩個主要抽象概念。

表格

表格可以被視為一個持續更新的資料集的快照,其中每個唯一鍵的最新狀態或計算結果被儲存在底層的集合中。表格通常用於資料豐富(Data Enrichment)和聚合(Aggregations)操作。

流則被建模為一個不可變的事件序列。與表格不同,流中的每個事件都被視為獨立的個體,不受其他事件的影響。流是無狀態的,這意味著每個事件在被處理後就會被遺忘。

建立來源集合

在 ksqlDB 中,需要在 Kafka 主題上建立來源集合(Source Collections),然後才能開始處理資料。建立來源集合的語法如下:

CREATE [OR REPLACE] {STREAM | TABLE} [IF NOT EXISTS] <identifier> (
  column_name data_type [, ...]
) WITH (
  property=value [, ...]
);

例如,我們可以建立一個名為 titles 的表格,用於儲存影片和電視節目的元資料:

CREATE TABLE titles (
  id INT PRIMARY KEY,
  title VARCHAR
) WITH (
  -- 其他屬性設定
);

內容解密:

  • CREATE TABLE 陳述式用於建立一個新的表格。
  • idtitle 是表格中的兩個欄位,分別代表節目的 ID 和標題。
  • WITH 子句用於指定表格的其他屬性,例如 Kafka 主題的名稱等。

ksqlDB 中的資料收集:資料表與資料流的建立與管理

ksqlDB 是一種用於處理 Kafka 資料的互動式查詢語言,它允許使用者建立和管理資料表(Table)與資料流(Stream)。在本章中,我們將探討如何使用 ksqlDB 建立和操作這兩種資料結構。

建立資料表

在 ksqlDB 中,資料表用於儲存具有更新語義的資料,也就是說,當有多個記錄具有相同的主鍵時,只有最新的記錄會被儲存。以下是一個建立資料表的範例:

CREATE TABLE titles (
    title_id VARCHAR PRIMARY KEY,
    type VARCHAR,
    name VARCHAR,
    release_year INT
) WITH (
    KAFKA_TOPIC='titles',
    VALUE_FORMAT='AVRO',
    PARTITIONS=4
);

內容解密:

  1. CREATE TABLE 陳述式用於建立一個名為 titles 的資料表。
  2. title_id 被指定為主鍵,這意味著具有相同 title_id 的記錄將被更新。
  3. WITH 子句用於指定 Kafka 主題、資料格式和分割區數量等屬性。
  4. KAFKA_TOPIC='titles' 指定了對應的 Kafka 主題名稱。
  5. VALUE_FORMAT='AVRO' 指定了資料的序列化格式為 Avro。
  6. PARTITIONS=4 指定了主題的分割區數量為 4。

建立資料流

資料流則用於儲存具有插入語義的資料,也就是說,每個記錄都是獨立的,不會被更新。以下是一個建立資料流的範例:

CREATE STREAM production_changes (
    rowkey VARCHAR KEY,
    uuid INT,
    title_id INT,
    change_type VARCHAR,
    before season_length,
    after season_length,
    created_at VARCHAR
) WITH (
    KAFKA_TOPIC='production_changes',
    PARTITIONS='4',
    VALUE_FORMAT='JSON',
    TIMESTAMP='created_at',
    TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'
);

內容解密:

  1. CREATE STREAM 陳述式用於建立一個名為 production_changes 的資料流。
  2. rowkey 被指定為記錄鍵,這與 Kafka 記錄鍵相對應。
  3. beforeafter 欄位使用自定義型別 season_length,這是一種結構化型別。
  4. TIMESTAMP='created_at' 指定了用於時間相關操作的時間戳欄位。
  5. TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss' 指定了時間戳的格式。

WITH 子句的屬性

在建立資料表或資料流時,可以使用 WITH 子句來指定多種屬性。以下是一些常見的屬性:

屬性名稱描述是否必要
KAFKA_TOPIC指定對應的 Kafka 主題名稱
VALUE_FORMAT指定資料的序列化格式
PARTITIONS指定主題的分割區數量
REPLICAS指定主題的副本數量
TIMESTAMP指定用於時間相關操作的時間戳欄位
TIMESTAMP_FORMAT指定時間戳的格式

管理資料表與資料流

ksqlDB 提供了多種陳述式來管理和查詢已建立的資料表和資料流,例如:

SHOW STREAMS;
SHOW TABLES;

這些陳述式可以用於檢視目前已註冊的資料流和資料表的資訊。

管理 ksqlDB 中的資料流與表格

ksqlDB 提供了多種指令來管理資料流(Streams)與表格(Tables),包括顯示、描述、修改和刪除等操作。這些指令可以幫助開發者有效地管理和維護 ksqlDB 中的資料集合。

顯示資料流與表格

使用 SHOW 指令可以列出 ksqlDB 中所有的資料流和表格。例如:

ksql> SHOW TABLES ; Table Name | Kafka Topic | Format | Windowed












TITLES | titles | AVRO | false












ksql> SHOW STREAMS ; Stream Name | Kafka Topic | Format















PRODUCTION_CHANGES | production_changes | JSON















若需要更多關於底層集合的資訊,可以使用 EXTENDED 版本的 SHOW 指令:

ksql> SHOW TABLES EXTENDED; Name : TITLES Type : TABLE Timestamp field : Not set - using Key format : KAFKA Value format : AVRO Kafka topic : titles (partitions: 4, replication: 1) Statement : CREATE TABLE titles (…) Field | Type










ID | INTEGER (primary key) TITLE | VARCHAR(STRING)










Local runtime statistics






messages-per-sec: 0.90 total-messages: 292 last-message: 2020-06-12… (Statistics of the local KSQL server interaction with the Kafka topic titles)

內容解密:

  • SHOW TABLESSHOW STREAMS 用於列出 ksqlDB 中的所有表格和資料流。
  • SHOW TABLES EXTENDEDSHOW STREAMS EXTENDED 提供更詳細的資訊,包括資料格式、Kafka 主題的相關資訊和執行時統計資料。
  • 執行時統計資料對於監控資料流和表格的吞吐量和錯誤率非常有用。

描述資料流與表格

使用 DESCRIBE 指令可以描述特定的資料流或表格。例如:

ksql> DESCRIBE titles ; Name : TITLES Field | Type










ID | INTEGER (primary key) TITLE | VARCHAR(STRING)










若需要更多資訊,可以使用 DESCRIBE EXTENDED

ksql> DESCRIBE EXTENDED titles ;

內容解密:

  • DESCRIBE 指令用於取得特定資料流或表格的結構資訊。
  • DESCRIBE EXTENDED 提供更詳細的資訊,包括執行時統計資料,與 SHOW { STREAMS | TABLES } EXTENDED 輸出類別似,但僅針對指定的資料流或表格。

修改資料流與表格

ksqlDB 支援使用 ALTER 指令修改現有的資料流或表格。例如,新增欄位:

ksql> ALTER TABLE titles ADD COLUMN genre VARCHAR; Message






Table TITLES altered.






內容解密:

  • ALTER 指令用於修改現有的資料流或表格結構。
  • 目前支援的操作包括新增欄位,未來可能會擴充套件到其他操作。

刪除資料流與表格

使用 DROP 指令可以刪除不再需要的資料流或表格。例如:

ksql> DROP STREAM IF EXISTS production_changes DELETE TOPIC ; Message


















Source PRODUCTION_CHANGES (topic: production_changes) was dropped.


















內容解密:

  • DROP { STREAM | TABLE } 指令用於刪除指定的資料流或表格。
  • 使用 DELETE TOPIC 子句會連同刪除底層的 Kafka 主題,需謹慎使用。