返回文章列表

KsqlDB資料流處理與SQL語法

本文介紹 ksqlDB 的基本查詢與資料操作,包含資料插入、過濾、轉換與扁平化等技巧。文章涵蓋了暫時性推播查詢、持久查詢以及衍生集合的建立,並探討如何使用 SQL 語法實作各種流處理模式,例如投影、篩選、使用萬用字元和邏輯運算元、範圍篩選以及處理巢狀結構。

資料函式庫 串流處理

ksqlDB 提供根據 SQL 的串流處理能力,簡化 Kafka 的資料存取與操作。透過 INSERT INTO 陳述式,可以將資料插入資料流或表格,如同附加記錄至 Kafka 主題。表格則儲存每個鍵的最新值,實作 upsert 操作。利用 SELECT 陳述式搭配 EMIT CHANGES 子句,可以執行暫時性推播查詢,即時檢視資料變化。設定 auto.offset.reset 為 ’earliest’ 可從最早的偏移量開始讀取。ksqlDB 也支援持久查詢,將結果寫回 Kafka 並在伺服器重啟後保留。文章也示範如何建立衍生集合,並使用 WITH 子句定義屬性。

基本查詢與資料操作

在ksqlDB中,資料過濾與轉換是流處理的基礎。本章節將探討基本的查詢方式以及如何利用SQL陳述式實作流處理模式。

插入資料

在ksqlDB中,INSERT VALUES陳述式用於將資料插入至資料流或表格中。與傳統資料函式庫不同,ksqlDB的插入語義更像是將記錄附加到底層的Kafka主題中。對於表格而言,由於只儲存每個鍵的最新表示形式,因此可視為一種upsert操作。

語法結構

INSERT INTO <collection_name> [ ( column_name [, ...]] ) ]
VALUES (
value [,...]
);

實際操作範例

首先,為了實驗各種SQL陳述式,我們需要預先填充一些測試資料到titles表格和production_changes資料流中。

INSERT INTO production_changes (
uuid,
title_id,
change_type,
before,
after,
created_at
) VALUES (
1,
1,
'season_length',
STRUCT(season_id := 1, episode_count := 12),
STRUCT(season_id := 1, episode_count := 8),
'2021-02-08 10:00:00'
);

INSERT INTO production_changes (
ROWKEY,
ROWTIME,
uuid,
title_id,
change_type,
before,
after,
created_at
) VALUES (
'2',
1581161400000,
2,
2,
'release_date',
STRUCT(season_id := 1, release_date := '2021-05-27'),
STRUCT(season_id := 1, release_date := '2021-08-18'),
'2021-02-08 10:00:00'
);

INSERT INTO titles VALUES (1, 'Stranger Things');
INSERT INTO titles VALUES (2, 'Black Mirror');
INSERT INTO titles VALUES (3, 'Bojack Horseman');

程式碼解析

  1. 插入陳述式的基本結構INSERT INTO陳述式後面跟著集合名稱(表格或資料流),然後是欄位名稱列表(可選),最後是VALUES關鍵字後面跟著要插入的值。

  2. 特殊偽列的使用:在第二個INSERT INTO範例中,我們指定了ROWKEYROWTIME,這兩個是ksqlDB自動建立的特殊偽列,分別代表記錄的鍵和時間戳記。

  3. 結構化資料的插入:使用STRUCT函式來建立結構化資料,例如在production_changes中,beforeafter欄位都是結構化資料,包含了特定的欄位(如season_idepisode_countrelease_date)。

簡單查詢(暫時性推播查詢)

暫時性推播查詢是一種簡單的查詢形式,使用SELECT陳述式並在末尾加上EMIT CHANGES子句。

語法結構

SELECT select_expr [, ...]
FROM from_item
[ LEFT JOIN join_collection ON join_criteria ]
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ PARTITION BY partitioning_expression ]
[ HAVING having_expression ]
EMIT CHANGES
[ LIMIT count ];

實際操作範例

ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT * FROM production_changes EMIT CHANGES ;

#### 內容解密:

  1. SET 'auto.offset.reset' = 'earliest';:這條命令設定ksqlDB從最早的偏移量開始讀取資料,這對於檢視已有的測試資料非常有用。

  2. SELECT * FROM production_changes EMIT CHANGES ;:這是一條暫時性推播查詢,它會從production_changes資料流中選取所有欄位並持續輸出新到達的資料。

  3. 暫時性與永續性查詢的區別:暫時性查詢不會在ksqlDB伺服器重啟後繼續執行,而永續性查詢則會在重啟後繼續執行。

  4. 查詢結果的輸出:查詢結果將持續輸出到螢幕上,包括之前插入的測試資料以及之後到達的新資料。

此圖示說明瞭查詢的基本流程:

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title KsqlDB資料流處理與SQL語法

package "資料庫架構" {
    package "應用層" {
        component [連線池] as pool
        component [ORM 框架] as orm
    }

    package "資料庫引擎" {
        component [查詢解析器] as parser
        component [優化器] as optimizer
        component [執行引擎] as executor
    }

    package "儲存層" {
        database [主資料庫] as master
        database [讀取副本] as replica
        database [快取層] as cache
    }
}

pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中

master --> replica : 資料同步

note right of cache
  Redis/Memcached
  減少資料庫負載
end note

@enduml

此圖示說明瞭當執行查詢時,首先設定偏移量,然後執行SELECT陳述式,並持續輸出結果。當有新資料到達時,查詢結果也會相應更新。

基本查詢與資料轉換

在 ksqlDB 中,Transient push queries 針對資料表的操作與串流處理類別似。我們可以透過執行 SELECT 陳述式來驗證這一點,以我們的 titles 資料表為例:

ksql> SELECT * FROM titles EMIT CHANGES ;

這將會輸出資料表的初始內容,如下所示。同時,當新的資料到達時,輸出將會相應更新:


-+




–+ |ID |TITLE | +

-+




–+ |2 |Black Mirror | |3 |Bojack Horseman | |1 |Stranger Things |

簡單的 SELECT 陳述式是建立串流處理應用程式的有用起點,但處理資料通常需要以不同的方式轉換資料。接下來,我們將探討一些基本的資料轉換任務。

投影(Projection)

最簡單的資料轉換形式之一是從串流或資料表中選擇一部分的欄位,從而簡化下游操作的資料模型。這被稱為投影,僅需將 SELECT * 語法替換為我們想要使用的明確欄位名稱。例如,在本教程中,我們需要使用 production_changes 串流中的 title_idbeforeaftercreated_at 欄位,因此我們可以寫出以下陳述式來將這些欄位投影到新的簡化串流中:

SELECT title_id, before, after, created_at
FROM production_changes
EMIT CHANGES ;

內容解密:

  1. SELECT title_id, before, after, created_at:選擇特定的欄位進行投影。
  2. FROM production_changes:指定來源串流為 production_changes
  3. EMIT CHANGES:持續輸出變更。

此查詢的輸出顯示簡化的串流:



–+





–+









|TITLE_ID |BEFORE |AFTER |CREATED_AT | +


–+





–+





-+



–+ |2 |{SEASON_ID=1, |{SEASON_ID=1, |2021-02-08…| EPISODE_COUNT=null} EPISODE_COUNT=null} |1 |{SEASON_ID=1, |{SEASON_ID=1, |2021-02-08…| EPISODE_COUNT=12} EPISODE_COUNT=8}

篩選(Filtering)

ksqlDB 的 SQL 方言包含無處不在的 WHERE 子句,可用於篩選串流和資料表。由於我們的應用程式只對某種特定的製作變更(季長變更)感興趣,因此我們可以使用以下陳述式來篩選相關記錄:

SELECT title_id, before, after, created_at
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES ;

內容解密:

  1. WHERE change_type = 'season_length':篩選條件,只保留 change_type'season_length' 的記錄。
  2. EMIT CHANGES:持續輸出篩選後的變更。

此查詢的輸出如下:



–+





–+









|TITLE_ID |BEFORE |AFTER |CREATED_AT | +


–+





–+





-+



–+ |1 |{SEASON_ID=1, |{SEASON_ID=1, |2021-02-08…| EPISODE_COUNT=12} EPISODE_COUNT=8}

萬用字元與邏輯運算元

ksqlDB 支援萬用字元篩選,例如使用 LIKE 運算元搭配 % 字元,可以實作更強大的篩選條件:

SELECT title_id, before, after, created_at
FROM production_changes
WHERE change_type LIKE 'season%'
EMIT CHANGES ;

內容解密:

  1. WHERE change_type LIKE 'season%':篩選 change_type'season' 開頭的記錄。

此外,可以使用 ANDOR 邏輯運算元以及括號來組合多個篩選條件,並使用 NOT 運算元來否定條件:

SELECT title_id, before, after, created_at
FROM production_changes
WHERE NOT change_type = 'release_date'
AND ( after->episode_count >= 8 OR after->episode_count <= 20 )
EMIT CHANGES ;

內容解密:

  1. WHERE NOT change_type = 'release_date':排除 change_type'release_date' 的記錄。
  2. ( after->episode_count >= 8 OR after->episode_count <= 20 ):篩選出 episode_count 在 8 至 20 之間的記錄。

範圍篩選(Between)

對於需要在特定數值或字母範圍內篩選記錄的情況,可以使用 BETWEEN 運算元。例如:

SELECT title_id, before, after, created_at
FROM production_changes
WHERE change_type = 'season_length'
AND after->episode_count BETWEEN 8 AND 20
EMIT CHANGES ;

內容解密:

  1. after->episode_count BETWEEN 8 AND 20:篩選出 episode_count 在 8 至 20 之間(含)的記錄。

我們的串流現在已經被篩選,但正如範例輸出所示,兩個欄位(beforeafter)包含複雜的多變數結構。接下來,我們將探討如何將這些複雜結構扁平化或解除巢狀,以簡化資料結構。

扁平化複雜結構:簡化資料模型

在處理複雜的資料結構時,扁平化(Flattening)是一種非常有用的技術。它可以將巢狀欄位分解為頂層的單一值欄位,從而簡化資料模型,方便下游操作。

使用 -> 運算元存取巢狀欄位

在 ksqlDB 中,可以使用 -> 運算元來存取巢狀結構欄位中的值。例如,假設我們有一個名為 production_changes 的資料流,其中包含一個名為 after 的欄位,該欄位是一個包含 season_idepisode_count 的結構。我們可以使用以下查詢來提取這些值:

SELECT 
  title_id,
  after->season_id,
  after->episode_count,
  created_at
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;

執行上述查詢後,原本巢狀的資料點 {SEASON_ID=1, EPISODE_COUNT=8} 現在被分解為兩個獨立的欄位:season_idepisode_count

條件表示式:處理資料完整性問題

ksqlDB 支援多種條件表示式,這些表示式可以用於解決資料流或表格中的資料完整性問題。例如,當某個欄位的值為 NULL 時,可以使用條件表示式提供備用值。

COALESCE 函式

COALESCE 函式可以傳回一組值中第一個非 NULL 的值。其函式簽名如下:

COALESCE(first T, others T[])

例如,為了實作備用邏輯以選取非 NULL 的 season_id,我們可以更新 SELECT 陳述式如下:

SELECT COALESCE(after->season_id, before->season_id, 0) AS season_id
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;

在這種情況下,如果 after->season_id 為 NULL,則會回退到 before->season_id。如果 before->season_id 也為 NULL,則會回退到預設值 0。

IFNULL 函式

IFNULL 函式與 COALESCE 類別似,但它只有一個備用值。其函式簽名如下:

IFNULL(expression T, altValue T)

例如,如果 after->season_id 為 NULL,我們可以使用以下陳述式回退到 before->season_id

SELECT IFNULL(after->season_id, before->season_id) AS season_id
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;

CASE 陳述式

CASE 陳述式是 ksqlDB 中最強大的條件表示式。它允許評估任意數量的布林條件,並傳回第一個條件評估為 true 的值。與 COALESCE 和 IFNULL 不同,CASE 陳述式中評估的條件不限於簡單的 NULL 檢查。

CASE 陳述式的語法如下:

CASE expression
WHEN condition THEN result [, ...]
[ELSE result]
END

例如,以下程式碼展示瞭如何使用包含多個備用條件的 CASE 陳述式,如果 after->season_idbefore->season_id 都為 NULL,則傳回 0:

SELECT 
  CASE 
    WHEN after->season_id IS NOT NULL THEN after->season_id
    WHEN before->season_id IS NOT NULL THEN before->season_id
    ELSE 0
  END AS season_id
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;

內容解密:

  1. COALESCE、IFNULL 和 CASE 陳述式的比較:這三種條件表示式都可以用於處理 NULL 值,但它們的使用場景和功能有所不同。COALESCE 傳回第一個非 NULL 值,IFNULL 只允許一個備用值,而 CASE 陳述式則允許評估多個條件。
  2. 選擇適合的條件表示式:在簡單的 NULL 檢查中,使用 COALESCE 或 IFNULL 更為合適。而在需要評估多個條件的情況下,CASE 陳述式是更好的選擇。
  3. 實際應用:在處理資料流或表格時,條件表示式可以用於解決資料完整性問題,提供備用值以確保資料的一致性和準確性。

將結果寫回 Kafka(持久查詢)

到目前為止,我們一直在處理暫時查詢。這些查詢以 SELECT 開頭,輸出傳回給客戶端,但不會寫回 Kafka。此外,暫時查詢在伺服器重新啟動後不會保留。

ksqlDB 也允許我們建立持久查詢,這些查詢將結果寫入 Kafka,並且在伺服器重新啟動後仍然存在。這對於使過濾、轉換和豐富的資料流可供其他客戶端使用非常有用。

建立衍生集合

衍生集合是透過從其他資料流和表格建立資料流和表格而產生的。建立衍生集合的語法與建立來源集合的語法略有不同,因為我們不需要指定欄位模式,並且有一個附加的 AS SELECT 子句。

建立衍生集合的完整語法如下:

CREATE { STREAM | TABLE } [ IF NOT EXISTS ] <identifier>
WITH (
  property=value [, ... ]
)
AS SELECT select_expr [, ...]
FROM from_item
[ LEFT JOIN join_collection ON join_criteria ]
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ PARTITION BY partitioning_expression ]
[ HAVING having_expression ]
EMIT CHANGES
[ LIMIT count ];

內容解密:

  1. 持久查詢與暫時查詢的區別:持久查詢將結果寫入 Kafka,並且在伺服器重新啟動後仍然存在,而暫時查詢則不會。
  2. 衍生集合的作用:衍生集合可以用於使轉換後的資料流可供其他客戶端使用,並且可以根據需要進行進一步的處理。
  3. 建立衍生集合的語法:建立衍生集合需要使用 CREATE STREAM 或 CREATE TABLE 陳述式,並且需要指定 AS SELECT 子句來定義衍生集合的內容。