ksqlDB 簡化了 Kafka 串流資料的處理流程,讓開發者能以熟悉的 SQL 語法進行即時資料分析。本將引導讀者從專案設定開始,逐步瞭解 ksqlDB 的核心概念與操作方式。首先,我們會使用 Docker Compose 建立包含 Kafka、Schema Registry、ksqlDB 伺服器和 ksqlDB CLI 的開發環境。接著,將介紹如何連線 ksqlDB CLI 並執行 SQL 陳述式。文章會探討 ksqlDB 的資料型別、自訂型別以及集合的概念,並以實際案例說明如何建立和管理資料表和資料流,包含主鍵設定、資料格式指定,以及時間戳欄位的運用。最後,我們將會探討如何使用 SHOW、DESCRIBE、ALTER 和 DROP 等指令來管理 ksqlDB 中的資料流和資料表,讓讀者能有效地監控和維護 ksqlDB 應用程式。
專案設定與 ksqlDB 基礎
本章節程式碼位於 https://github.com/mitch-seymour/mastering-kafka-streams-and-ksqldb.git。若需參考程式碼,請複製儲存函式庫並切換至本章節的教學目錄。以下命令可完成此操作:
$ git clone [email protected]:mitch-seymour/mastering-kafka-streams-and-ksqldb.git
$ cd mastering-kafka-streams-and-ksqldb/chapter-10/
本教學使用 Docker Compose 啟動應用程式所需的各個元件,包括 Kafka、Schema Registry、ksqlDB 伺服器和 ksqlDB CLI。只需在複製儲存函式庫後執行以下命令,即可啟動所有元件:
docker-compose up
本章討論的 SQL 陳述式將在 ksqlDB CLI 中執行。您可以使用以下命令登入 ksqlDB CLI:
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
來源主題
當您擁有 Kafka 主題中的資料並希望使用 ksqlDB 處理時,首先需要檢視來源主題中的資料,並決定如何在 ksqlDB 中建模資料。在本教學中,我們有兩個主要來源主題:titles 和 production_changes。每個來源主題中的範例記錄如下表所示:
| 來源主題 | 範例記錄 |
|---|---|
titles | {"id": 1, "title": "Stranger Things", "on_schedule": false} |
production_changes | {"uuid": 1, "title_id": 1, "change_type": "season_length", "before": {"season_id": 1, "episode_count": 12}, "after": {"season_id": 1, "episode_count": 8}, "created_at": "2021-02-08 11:30:00"} |
資料型別
在 ksqlDB 中,有多種內建資料型別可供使用,如下表所示:
| 資料型別 | 描述 |
|---|---|
ARRAY<element-type> | 相同型別的元素集合(例如 ARRAY<STRING>) |
BOOLEAN | 布林值 |
INT | 32 位元帶符號整數 |
BIGINT | 64 位元帶符號整數 |
DOUBLE | 雙精確度(64 位元)IEEE 754 浮點數 |
DECIMAL(precision, scale) | 可組態總位數(精確度)和小數點右側位數(刻度)的浮點數 |
MAP<key-type, element-type> | 包含鍵和值的物件,每個鍵和值都與資料型別相符(例如 MAP<STRING, INT>) |
STRUCT<field-name field-type [, ...]> | 結構化的欄位集合(例如 STRUCT<FOO INT, BAR BOOLEAN>) |
VARCHAR 或 STRING | Unicode 字元序列(UTF8) |
對於某些序列化格式(例如 AVRO、PROTOBUF 和 JSON_SR),資料型別是可選的,因為 Schema Registry 已經儲存了欄位名稱和型別。因此,在 CREATE 陳述式中指定資料型別是多餘的。但是,當需要指定主鍵時,仍需在 CREATE 陳述式中指定主鍵欄位。
例如,如果我們的 titles 資料格式化為 AVRO,且相關的 Avro 結構描述儲存在 Schema Registry 中,我們可以使用以下任一 CREATE 陳述式來建立我們的 titles 表格:
-- 明確指定資料型別
CREATE TABLE titles (
id INT PRIMARY KEY,
title VARCHAR
) WITH (
KAFKA_TOPIC='titles',
VALUE_FORMAT='AVRO',
PARTITIONS=4
);
-- 推斷資料型別
CREATE TABLE titles (
id INT PRIMARY KEY
) WITH (
KAFKA_TOPIC='titles',
VALUE_FORMAT='AVRO',
PARTITIONS=4
);
程式碼解析
以下程式碼展示如何使用 ksqlDB 建立表格並指定資料型別:
CREATE TABLE titles (
id INT PRIMARY KEY,
title VARCHAR
) WITH (
KAFKA_TOPIC='titles',
VALUE_FORMAT='AVRO',
PARTITIONS=4
);
內容解密
此段程式碼建立了一個名為 titles 的表格,並指定了其主鍵欄位 id 和另一個欄位 title。WITH 子句指定了 Kafka 主題名稱、值格式和分割區數量。
CREATE TABLE titles: 建立一個名為titles的表格。(id INT PRIMARY KEY, title VARCHAR): 指定表格的欄位,包括主鍵欄位id和另一個欄位title。WITH子句:指定表格的屬性。KAFKA_TOPIC='titles': 指定 Kafka 主題名稱為titles。VALUE_FORMAT='AVRO': 指定值的格式為 AVRO。PARTITIONS=4: 指定分割區數量為 4。
此程式碼用於建立一個 ksqlDB 表格,並將其與 Kafka 主題進行對映,以便進行進一步的資料處理和分析。
ksqlDB 中的自訂型別與集合
ksqlDB 提供了自訂型別的功能,允許使用者定義一組欄位名稱及其相關聯的資料型別,並在後續的 SQL 陳述式中重複使用這個自訂型別。這種功能尤其適用於需要重複使用複雜的資料型別定義的情況。
自訂型別
在 ksqlDB 中,自訂型別(Custom Types)類別似於 PostgreSQL 中的複合型別(Composite Types),允許使用者指定一組欄位名稱及其相關聯的資料型別。以下是一個例子:
假設我們需要捕捉某個資料的變更前後狀態,且這些狀態具有相同的結構。我們可以定義一個名為 season_length 的自訂型別,如下所示:
CREATE TYPE season_length AS STRUCT<season_id INT, episode_count INT>;
內容解密:
CREATE TYPE陳述式用於建立一個新的自訂型別。season_length是自訂型別的名稱。STRUCT<season_id INT, episode_count INT>定義了這個自訂型別的結構,包含兩個欄位:season_id和episode_count,它們的資料型別都是整數。
建立自訂型別後,可以使用 SHOW TYPES 陳述式來檢視已註冊的自訂型別:
SHOW TYPES;
輸出結果如下:
Type Name | Schema
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---
SEASON_LENGTH | STRUCT<SEASON_ID INTEGER, EPISODE_COUNT INTEGER>
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---
如果需要刪除自訂型別,可以使用 DROP TYPE 陳述式:
DROP TYPE season_length;
集合
在 ksqlDB 中,集合(Collections)是流(Streams)和表格(Tables)的總稱。流和表格是 Kafka Streams 和 ksqlDB 中的兩個主要抽象概念。
表格
表格可以被視為一個持續更新的資料集的快照,其中每個唯一鍵的最新狀態或計算結果被儲存在底層的集合中。表格通常用於資料豐富(Data Enrichment)和聚合(Aggregations)操作。
流
流則被建模為一個不可變的事件序列。與表格不同,流中的每個事件都被視為獨立的個體,不受其他事件的影響。流是無狀態的,這意味著每個事件在被處理後就會被遺忘。
建立來源集合
在 ksqlDB 中,需要在 Kafka 主題上建立來源集合(Source Collections),然後才能開始處理資料。建立來源集合的語法如下:
CREATE [OR REPLACE] {STREAM | TABLE} [IF NOT EXISTS] <identifier> (
column_name data_type [, ...]
) WITH (
property=value [, ...]
);
例如,我們可以建立一個名為 titles 的表格,用於儲存影片和電視節目的元資料:
CREATE TABLE titles (
id INT PRIMARY KEY,
title VARCHAR
) WITH (
-- 其他屬性設定
);
內容解密:
CREATE TABLE陳述式用於建立一個新的表格。id和title是表格中的兩個欄位,分別代表節目的 ID 和標題。WITH子句用於指定表格的其他屬性,例如 Kafka 主題的名稱等。
ksqlDB 中的資料收集:資料表與資料流的建立與管理
ksqlDB 是一種用於處理 Kafka 資料的互動式查詢語言,它允許使用者建立和管理資料表(Table)與資料流(Stream)。在本章中,我們將探討如何使用 ksqlDB 建立和操作這兩種資料結構。
建立資料表
在 ksqlDB 中,資料表用於儲存具有更新語義的資料,也就是說,當有多個記錄具有相同的主鍵時,只有最新的記錄會被儲存。以下是一個建立資料表的範例:
CREATE TABLE titles (
title_id VARCHAR PRIMARY KEY,
type VARCHAR,
name VARCHAR,
release_year INT
) WITH (
KAFKA_TOPIC='titles',
VALUE_FORMAT='AVRO',
PARTITIONS=4
);
內容解密:
CREATE TABLE陳述式用於建立一個名為titles的資料表。title_id被指定為主鍵,這意味著具有相同title_id的記錄將被更新。WITH子句用於指定 Kafka 主題、資料格式和分割區數量等屬性。KAFKA_TOPIC='titles'指定了對應的 Kafka 主題名稱。VALUE_FORMAT='AVRO'指定了資料的序列化格式為 Avro。PARTITIONS=4指定了主題的分割區數量為 4。
建立資料流
資料流則用於儲存具有插入語義的資料,也就是說,每個記錄都是獨立的,不會被更新。以下是一個建立資料流的範例:
CREATE STREAM production_changes (
rowkey VARCHAR KEY,
uuid INT,
title_id INT,
change_type VARCHAR,
before season_length,
after season_length,
created_at VARCHAR
) WITH (
KAFKA_TOPIC='production_changes',
PARTITIONS='4',
VALUE_FORMAT='JSON',
TIMESTAMP='created_at',
TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'
);
內容解密:
CREATE STREAM陳述式用於建立一個名為production_changes的資料流。rowkey被指定為記錄鍵,這與 Kafka 記錄鍵相對應。before和after欄位使用自定義型別season_length,這是一種結構化型別。TIMESTAMP='created_at'指定了用於時間相關操作的時間戳欄位。TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'指定了時間戳的格式。
WITH 子句的屬性
在建立資料表或資料流時,可以使用 WITH 子句來指定多種屬性。以下是一些常見的屬性:
| 屬性名稱 | 描述 | 是否必要 |
|---|---|---|
| KAFKA_TOPIC | 指定對應的 Kafka 主題名稱 | 是 |
| VALUE_FORMAT | 指定資料的序列化格式 | 是 |
| PARTITIONS | 指定主題的分割區數量 | 否 |
| REPLICAS | 指定主題的副本數量 | 否 |
| TIMESTAMP | 指定用於時間相關操作的時間戳欄位 | 否 |
| TIMESTAMP_FORMAT | 指定時間戳的格式 | 否 |
管理資料表與資料流
ksqlDB 提供了多種陳述式來管理和查詢已建立的資料表和資料流,例如:
SHOW STREAMS;
SHOW TABLES;
這些陳述式可以用於檢視目前已註冊的資料流和資料表的資訊。
管理 ksqlDB 中的資料流與表格
ksqlDB 提供了多種指令來管理資料流(Streams)與表格(Tables),包括顯示、描述、修改和刪除等操作。這些指令可以幫助開發者有效地管理和維護 ksqlDB 中的資料集合。
顯示資料流與表格
使用 SHOW 指令可以列出 ksqlDB 中所有的資料流和表格。例如:
ksql> SHOW TABLES ; Table Name | Kafka Topic | Format | Windowed
TITLES | titles | AVRO | false
ksql> SHOW STREAMS ; Stream Name | Kafka Topic | Format
PRODUCTION_CHANGES | production_changes | JSON
若需要更多關於底層集合的資訊,可以使用 EXTENDED 版本的 SHOW 指令:
ksql> SHOW TABLES EXTENDED;
Name : TITLES
Type : TABLE
Timestamp field : Not set - using Key format : KAFKA
Value format : AVRO
Kafka topic : titles (partitions: 4, replication: 1)
Statement : CREATE TABLE titles (…)
Field | Type
ID | INTEGER (primary key) TITLE | VARCHAR(STRING)
Local runtime statistics
messages-per-sec: 0.90 total-messages: 292 last-message: 2020-06-12… (Statistics of the local KSQL server interaction with the Kafka topic titles)
內容解密:
SHOW TABLES和SHOW STREAMS用於列出 ksqlDB 中的所有表格和資料流。SHOW TABLES EXTENDED和SHOW STREAMS EXTENDED提供更詳細的資訊,包括資料格式、Kafka 主題的相關資訊和執行時統計資料。- 執行時統計資料對於監控資料流和表格的吞吐量和錯誤率非常有用。
描述資料流與表格
使用 DESCRIBE 指令可以描述特定的資料流或表格。例如:
ksql> DESCRIBE titles ; Name : TITLES Field | Type
ID | INTEGER (primary key) TITLE | VARCHAR(STRING)
若需要更多資訊,可以使用 DESCRIBE EXTENDED:
ksql> DESCRIBE EXTENDED titles ;
內容解密:
DESCRIBE指令用於取得特定資料流或表格的結構資訊。DESCRIBE EXTENDED提供更詳細的資訊,包括執行時統計資料,與SHOW { STREAMS | TABLES } EXTENDED輸出類別似,但僅針對指定的資料流或表格。
修改資料流與表格
ksqlDB 支援使用 ALTER 指令修改現有的資料流或表格。例如,新增欄位:
ksql> ALTER TABLE titles ADD COLUMN genre VARCHAR; Message
Table TITLES altered.
內容解密:
ALTER指令用於修改現有的資料流或表格結構。- 目前支援的操作包括新增欄位,未來可能會擴充套件到其他操作。
刪除資料流與表格
使用 DROP 指令可以刪除不再需要的資料流或表格。例如:
ksql> DROP STREAM IF EXISTS production_changes DELETE TOPIC ; Message
Source PRODUCTION_CHANGES (topic: production_changes) was dropped.
內容解密:
DROP { STREAM | TABLE }指令用於刪除指定的資料流或表格。- 使用
DELETE TOPIC子句會連同刪除底層的 Kafka 主題,需謹慎使用。