ksqlDB 提供根據 SQL 的串流處理能力,簡化 Kafka 的資料存取與操作。透過 INSERT INTO 陳述式,可以將資料插入資料流或表格,如同附加記錄至 Kafka 主題。表格則儲存每個鍵的最新值,實作 upsert 操作。利用 SELECT 陳述式搭配 EMIT CHANGES 子句,可以執行暫時性推播查詢,即時檢視資料變化。設定 auto.offset.reset 為 ’earliest’ 可從最早的偏移量開始讀取。ksqlDB 也支援持久查詢,將結果寫回 Kafka 並在伺服器重啟後保留。文章也示範如何建立衍生集合,並使用 WITH 子句定義屬性。
基本查詢與資料操作
在ksqlDB中,資料過濾與轉換是流處理的基礎。本章節將探討基本的查詢方式以及如何利用SQL陳述式實作流處理模式。
插入資料
在ksqlDB中,INSERT VALUES陳述式用於將資料插入至資料流或表格中。與傳統資料函式庫不同,ksqlDB的插入語義更像是將記錄附加到底層的Kafka主題中。對於表格而言,由於只儲存每個鍵的最新表示形式,因此可視為一種upsert操作。
語法結構
INSERT INTO <collection_name> [ ( column_name [, ...]] ) ]
VALUES (
value [,...]
);
實際操作範例
首先,為了實驗各種SQL陳述式,我們需要預先填充一些測試資料到titles表格和production_changes資料流中。
INSERT INTO production_changes (
uuid,
title_id,
change_type,
before,
after,
created_at
) VALUES (
1,
1,
'season_length',
STRUCT(season_id := 1, episode_count := 12),
STRUCT(season_id := 1, episode_count := 8),
'2021-02-08 10:00:00'
);
INSERT INTO production_changes (
ROWKEY,
ROWTIME,
uuid,
title_id,
change_type,
before,
after,
created_at
) VALUES (
'2',
1581161400000,
2,
2,
'release_date',
STRUCT(season_id := 1, release_date := '2021-05-27'),
STRUCT(season_id := 1, release_date := '2021-08-18'),
'2021-02-08 10:00:00'
);
INSERT INTO titles VALUES (1, 'Stranger Things');
INSERT INTO titles VALUES (2, 'Black Mirror');
INSERT INTO titles VALUES (3, 'Bojack Horseman');
程式碼解析
插入陳述式的基本結構:
INSERT INTO陳述式後面跟著集合名稱(表格或資料流),然後是欄位名稱列表(可選),最後是VALUES關鍵字後面跟著要插入的值。特殊偽列的使用:在第二個
INSERT INTO範例中,我們指定了ROWKEY和ROWTIME,這兩個是ksqlDB自動建立的特殊偽列,分別代表記錄的鍵和時間戳記。結構化資料的插入:使用
STRUCT函式來建立結構化資料,例如在production_changes中,before和after欄位都是結構化資料,包含了特定的欄位(如season_id和episode_count或release_date)。
簡單查詢(暫時性推播查詢)
暫時性推播查詢是一種簡單的查詢形式,使用SELECT陳述式並在末尾加上EMIT CHANGES子句。
語法結構
SELECT select_expr [, ...]
FROM from_item
[ LEFT JOIN join_collection ON join_criteria ]
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ PARTITION BY partitioning_expression ]
[ HAVING having_expression ]
EMIT CHANGES
[ LIMIT count ];
實際操作範例
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT * FROM production_changes EMIT CHANGES ;
#### 內容解密:
SET 'auto.offset.reset' = 'earliest';:這條命令設定ksqlDB從最早的偏移量開始讀取資料,這對於檢視已有的測試資料非常有用。SELECT * FROM production_changes EMIT CHANGES ;:這是一條暫時性推播查詢,它會從production_changes資料流中選取所有欄位並持續輸出新到達的資料。暫時性與永續性查詢的區別:暫時性查詢不會在ksqlDB伺服器重啟後繼續執行,而永續性查詢則會在重啟後繼續執行。
查詢結果的輸出:查詢結果將持續輸出到螢幕上,包括之前插入的測試資料以及之後到達的新資料。
此圖示說明瞭查詢的基本流程:
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title KsqlDB資料流處理與SQL語法
package "資料庫架構" {
package "應用層" {
component [連線池] as pool
component [ORM 框架] as orm
}
package "資料庫引擎" {
component [查詢解析器] as parser
component [優化器] as optimizer
component [執行引擎] as executor
}
package "儲存層" {
database [主資料庫] as master
database [讀取副本] as replica
database [快取層] as cache
}
}
pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中
master --> replica : 資料同步
note right of cache
Redis/Memcached
減少資料庫負載
end note
@enduml
此圖示說明瞭當執行查詢時,首先設定偏移量,然後執行SELECT陳述式,並持續輸出結果。當有新資料到達時,查詢結果也會相應更新。
基本查詢與資料轉換
在 ksqlDB 中,Transient push queries 針對資料表的操作與串流處理類別似。我們可以透過執行 SELECT 陳述式來驗證這一點,以我們的 titles 資料表為例:
ksql> SELECT * FROM titles EMIT CHANGES ;
這將會輸出資料表的初始內容,如下所示。同時,當新的資料到達時,輸出將會相應更新:
-+
–+ |ID |TITLE | +
-+
–+ |2 |Black Mirror | |3 |Bojack Horseman | |1 |Stranger Things |
簡單的 SELECT 陳述式是建立串流處理應用程式的有用起點,但處理資料通常需要以不同的方式轉換資料。接下來,我們將探討一些基本的資料轉換任務。
投影(Projection)
最簡單的資料轉換形式之一是從串流或資料表中選擇一部分的欄位,從而簡化下游操作的資料模型。這被稱為投影,僅需將 SELECT * 語法替換為我們想要使用的明確欄位名稱。例如,在本教程中,我們需要使用 production_changes 串流中的 title_id、before、after 和 created_at 欄位,因此我們可以寫出以下陳述式來將這些欄位投影到新的簡化串流中:
SELECT title_id, before, after, created_at
FROM production_changes
EMIT CHANGES ;
內容解密:
SELECT title_id, before, after, created_at:選擇特定的欄位進行投影。FROM production_changes:指定來源串流為production_changes。EMIT CHANGES:持續輸出變更。
此查詢的輸出顯示簡化的串流:
–+
–+
|TITLE_ID |BEFORE |AFTER |CREATED_AT | +
–+
–+
-+
–+ |2 |{SEASON_ID=1, |{SEASON_ID=1, |2021-02-08…| EPISODE_COUNT=null} EPISODE_COUNT=null} |1 |{SEASON_ID=1, |{SEASON_ID=1, |2021-02-08…| EPISODE_COUNT=12} EPISODE_COUNT=8}
篩選(Filtering)
ksqlDB 的 SQL 方言包含無處不在的 WHERE 子句,可用於篩選串流和資料表。由於我們的應用程式只對某種特定的製作變更(季長變更)感興趣,因此我們可以使用以下陳述式來篩選相關記錄:
SELECT title_id, before, after, created_at
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES ;
內容解密:
WHERE change_type = 'season_length':篩選條件,只保留change_type為'season_length'的記錄。EMIT CHANGES:持續輸出篩選後的變更。
此查詢的輸出如下:
–+
–+
|TITLE_ID |BEFORE |AFTER |CREATED_AT | +
–+
–+
-+
–+ |1 |{SEASON_ID=1, |{SEASON_ID=1, |2021-02-08…| EPISODE_COUNT=12} EPISODE_COUNT=8}
萬用字元與邏輯運算元
ksqlDB 支援萬用字元篩選,例如使用 LIKE 運算元搭配 % 字元,可以實作更強大的篩選條件:
SELECT title_id, before, after, created_at
FROM production_changes
WHERE change_type LIKE 'season%'
EMIT CHANGES ;
內容解密:
WHERE change_type LIKE 'season%':篩選change_type以'season'開頭的記錄。
此外,可以使用 AND、OR 邏輯運算元以及括號來組合多個篩選條件,並使用 NOT 運算元來否定條件:
SELECT title_id, before, after, created_at
FROM production_changes
WHERE NOT change_type = 'release_date'
AND ( after->episode_count >= 8 OR after->episode_count <= 20 )
EMIT CHANGES ;
內容解密:
WHERE NOT change_type = 'release_date':排除change_type為'release_date'的記錄。( after->episode_count >= 8 OR after->episode_count <= 20 ):篩選出episode_count在 8 至 20 之間的記錄。
範圍篩選(Between)
對於需要在特定數值或字母範圍內篩選記錄的情況,可以使用 BETWEEN 運算元。例如:
SELECT title_id, before, after, created_at
FROM production_changes
WHERE change_type = 'season_length'
AND after->episode_count BETWEEN 8 AND 20
EMIT CHANGES ;
內容解密:
after->episode_count BETWEEN 8 AND 20:篩選出episode_count在 8 至 20 之間(含)的記錄。
我們的串流現在已經被篩選,但正如範例輸出所示,兩個欄位(before 和 after)包含複雜的多變數結構。接下來,我們將探討如何將這些複雜結構扁平化或解除巢狀,以簡化資料結構。
扁平化複雜結構:簡化資料模型
在處理複雜的資料結構時,扁平化(Flattening)是一種非常有用的技術。它可以將巢狀欄位分解為頂層的單一值欄位,從而簡化資料模型,方便下游操作。
使用 -> 運算元存取巢狀欄位
在 ksqlDB 中,可以使用 -> 運算元來存取巢狀結構欄位中的值。例如,假設我們有一個名為 production_changes 的資料流,其中包含一個名為 after 的欄位,該欄位是一個包含 season_id 和 episode_count 的結構。我們可以使用以下查詢來提取這些值:
SELECT
title_id,
after->season_id,
after->episode_count,
created_at
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;
執行上述查詢後,原本巢狀的資料點 {SEASON_ID=1, EPISODE_COUNT=8} 現在被分解為兩個獨立的欄位:season_id 和 episode_count。
條件表示式:處理資料完整性問題
ksqlDB 支援多種條件表示式,這些表示式可以用於解決資料流或表格中的資料完整性問題。例如,當某個欄位的值為 NULL 時,可以使用條件表示式提供備用值。
COALESCE 函式
COALESCE 函式可以傳回一組值中第一個非 NULL 的值。其函式簽名如下:
COALESCE(first T, others T[])
例如,為了實作備用邏輯以選取非 NULL 的 season_id,我們可以更新 SELECT 陳述式如下:
SELECT COALESCE(after->season_id, before->season_id, 0) AS season_id
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;
在這種情況下,如果 after->season_id 為 NULL,則會回退到 before->season_id。如果 before->season_id 也為 NULL,則會回退到預設值 0。
IFNULL 函式
IFNULL 函式與 COALESCE 類別似,但它只有一個備用值。其函式簽名如下:
IFNULL(expression T, altValue T)
例如,如果 after->season_id 為 NULL,我們可以使用以下陳述式回退到 before->season_id:
SELECT IFNULL(after->season_id, before->season_id) AS season_id
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;
CASE 陳述式
CASE 陳述式是 ksqlDB 中最強大的條件表示式。它允許評估任意數量的布林條件,並傳回第一個條件評估為 true 的值。與 COALESCE 和 IFNULL 不同,CASE 陳述式中評估的條件不限於簡單的 NULL 檢查。
CASE 陳述式的語法如下:
CASE expression
WHEN condition THEN result [, ...]
[ELSE result]
END
例如,以下程式碼展示瞭如何使用包含多個備用條件的 CASE 陳述式,如果 after->season_id 和 before->season_id 都為 NULL,則傳回 0:
SELECT
CASE
WHEN after->season_id IS NOT NULL THEN after->season_id
WHEN before->season_id IS NOT NULL THEN before->season_id
ELSE 0
END AS season_id
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;
內容解密:
- COALESCE、IFNULL 和 CASE 陳述式的比較:這三種條件表示式都可以用於處理 NULL 值,但它們的使用場景和功能有所不同。COALESCE 傳回第一個非 NULL 值,IFNULL 只允許一個備用值,而 CASE 陳述式則允許評估多個條件。
- 選擇適合的條件表示式:在簡單的 NULL 檢查中,使用 COALESCE 或 IFNULL 更為合適。而在需要評估多個條件的情況下,CASE 陳述式是更好的選擇。
- 實際應用:在處理資料流或表格時,條件表示式可以用於解決資料完整性問題,提供備用值以確保資料的一致性和準確性。
將結果寫回 Kafka(持久查詢)
到目前為止,我們一直在處理暫時查詢。這些查詢以 SELECT 開頭,輸出傳回給客戶端,但不會寫回 Kafka。此外,暫時查詢在伺服器重新啟動後不會保留。
ksqlDB 也允許我們建立持久查詢,這些查詢將結果寫入 Kafka,並且在伺服器重新啟動後仍然存在。這對於使過濾、轉換和豐富的資料流可供其他客戶端使用非常有用。
建立衍生集合
衍生集合是透過從其他資料流和表格建立資料流和表格而產生的。建立衍生集合的語法與建立來源集合的語法略有不同,因為我們不需要指定欄位模式,並且有一個附加的 AS SELECT 子句。
建立衍生集合的完整語法如下:
CREATE { STREAM | TABLE } [ IF NOT EXISTS ] <identifier>
WITH (
property=value [, ... ]
)
AS SELECT select_expr [, ...]
FROM from_item
[ LEFT JOIN join_collection ON join_criteria ]
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ PARTITION BY partitioning_expression ]
[ HAVING having_expression ]
EMIT CHANGES
[ LIMIT count ];
內容解密:
- 持久查詢與暫時查詢的區別:持久查詢將結果寫入 Kafka,並且在伺服器重新啟動後仍然存在,而暫時查詢則不會。
- 衍生集合的作用:衍生集合可以用於使轉換後的資料流可供其他客戶端使用,並且可以根據需要進行進一步的處理。
- 建立衍生集合的語法:建立衍生集合需要使用 CREATE STREAM 或 CREATE TABLE 陳述式,並且需要指定 AS SELECT 子句來定義衍生集合的內容。