ksqlDB 提供了視窗化 Join 和資料聚合功能,方便處理和分析串流資料。視窗化 Join 允許在特定時間範圍內關聯不同串流的資料,例如結合開始和結束事件計算持續時間。聚合函式則能統計資料,例如計算事件次數或取得最新值。搭配視窗函式,可以針對特定時間區間進行聚合,例如計算每小時的事件數量。設定寬限期能處理延遲資料,避免遺漏資訊。物化檢視則能儲存聚合結果,方便後續查詢和分析。
ksqlDB 中的視窗化 Join 操作與資料聚合
在處理流式資料時,ksqlDB 提供了豐富的功能來處理和分析資料。本篇文章將探討如何在 ksqlDB 中實作視窗化 Join 操作以及如何進行資料聚合。
視窗化 Join 操作
視窗化 Join 操作允許我們在特定的時間視窗內對資料進行 Join 操作。要實作視窗化 Join,需要在 Join 子句中包含 WITHIN 表示式。其語法如下:
WITHIN <number> <time_unit>
支援的時間單位包括(單數和複數形式):
- DAY, DAYS
- HOUR, HOURS
- MINUTE, MINUTES
- SECOND, SECONDS
- MILLISECOND, MILLISECONDS
使用範例
假設我們正在為 Netflix 工作,需要捕捉所有觀看時間少於兩分鐘的觀看記錄。開始觀看和停止觀看的事件被寫入不同的 Kafka 主題中。這是視窗化 Join 的一個很好的應用場景,因為我們需要根據 session_id(標識觀看會話)和事件時間對記錄進行 Join。
首先,建立兩個來源流,分別用於開始觀看和停止觀看的事件:
CREATE STREAM start_watching_events (
session_id STRING,
title_id INT,
created_at STRING
)
WITH (
KAFKA_TOPIC='start_watching_events',
VALUE_FORMAT='JSON',
PARTITIONS=4,
TIMESTAMP='created_at',
TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'
);
CREATE STREAM stop_watching_events (
session_id STRING,
title_id INT,
created_at STRING
)
WITH (
KAFKA_TOPIC='stop_watching_events',
VALUE_FORMAT='JSON',
PARTITIONS=4,
TIMESTAMP='created_at',
TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'
);
然後,插入一些開始觀看和停止觀看的事件:
INSERT INTO start_watching_events
VALUES ('session_123', 1, '2021-02-08 02:00:00');
INSERT INTO stop_watching_events
VALUES ('session_123', 1, '2021-02-08 02:01:30');
INSERT INTO start_watching_events
VALUES ('session_456', 1, '2021-02-08 02:00:00');
INSERT INTO stop_watching_events
VALUES ('session_456', 1, '2021-02-08 02:25:00');
最後,使用視窗化 Join 捕捉觀看時間少於兩分鐘的觀看會話:
SELECT
A.title_id as title_id,
A.session_id as session_id
FROM start_watching_events A
INNER JOIN stop_watching_events B
WITHIN 2 MINUTES
ON A.session_id = B.session_id
EMIT CHANGES;
程式碼解密:
CREATE STREAM陳述式:這兩個陳述式用於建立來源流,分別用於開始觀看和停止觀看的事件。這些流與 Kafka 主題繫結,並指定了資料格式、分割區數量以及時間戳欄位和格式。KAFKA_TOPIC:指定了 Kafka 主題名稱。VALUE_FORMAT:指定了資料的格式為 JSON。PARTITIONS:指定了分割區的數量。TIMESTAMP和TIMESTAMP_FORMAT:用於指定事件時間欄位及其格式。
INSERT INTO陳述式:用於向來源流中插入測試資料,模擬開始觀看和停止觀看的事件。SELECT陳述式與INNER JOIN:使用視窗化 Join 對開始觀看和停止觀看的事件進行 Join 操作,捕捉觀看時間少於兩分鐘的會話。WITHIN 2 MINUTES:指定了視窗大小為 2 分鐘,即在開始觀看事件發生後的 2 分鐘內,如果有對應的停止觀看事件,則會被捕捉。
資料聚合
資料聚合允許我們對相關記錄進行分組並計算匯總值。在 ksqlDB 中,聚合操作可以應用於流和表,但總是傳回一個表。因為聚合函式的結果需要儲存在一個可變的結構中,以便在新記錄到來時可以輕易檢索和更新。
聚合型別
ksqlDB 中的聚合操作主要分為兩類別:視窗化聚合和非視窗化聚合。視窗化聚合允許我們在特定的時間視窗內對資料進行聚合計算。
使用範例
在我們的 Netflix 變更追蹤應用程式中,我們可以使用視窗化聚合來計算一段時間內的季長度變更次數。這有助於改善新電視節目或電影的規劃流程,或者提供專案執行中的營運問題指標。
-- 示例中未直接給出視窗化聚合的 SQL 陳述式,但可以根據前面的內容推斷出類別似的操作。
ksqlDB中的聚合運算基礎與進階應用
在ksqlDB中,聚合運算是處理資料流的重要功能。聚合運算允許使用者對資料流進行匯總和分析,以獲得有價值的洞察。在本章中,我們將探討ksqlDB中的聚合運算基礎知識,包括如何使用聚合函式和GROUP BY子句,以及如何進行視窗聚合運算。
聚合運算基礎
在ksqlDB中進行聚合運算需要兩個主要步驟:
- 建立SELECT表示式,使用聚合函式:使用者需要選擇適合的聚合函式來匯總資料,例如COUNT、AVG、MAX、MIN、SUM等。
- 使用GROUP BY子句對相關記錄進行分組:GROUP BY子句用於將記錄分組,聚合函式將對每個組進行計算。
示例:使用COUNT和LATEST_BY_OFFSET聚合函式
SELECT
title_id,
COUNT(*) AS change_count,
LATEST_BY_OFFSET(new_episode_count) AS latest_episode_count
FROM season_length_changes_enriched
GROUP BY title_id
EMIT CHANGES;
在上述查詢中,我們使用COUNT函式計算每個title_id的記錄數量,並使用LATEST_BY_OFFSET函式取得最新的new_episode_count值。
GROUP BY子句的重要性
GROUP BY子句對於聚合運算是至關重要的。它根據指定的列將記錄分成不同的組。每個組將被單獨處理,聚合函式將對每個組的資料進行計算。
新增非聚合列到SELECT表示式
如果需要在SELECT表示式中包含非聚合列,則必須將這些列新增到GROUP BY子句中。例如:
SELECT
title_id,
season_id,
COUNT(*) AS change_count,
LATEST_BY_OFFSET(new_episode_count) AS latest_episode_count
FROM season_length_changes_enriched
GROUP BY title_id, season_id
EMIT CHANGES;
錯誤示例:未將非聚合列新增到GROUP BY子句
SELECT
title_id,
season_id,
COUNT(*) AS change_count,
LATEST_BY_OFFSET(new_episode_count) AS latest_episode_count
FROM season_length_changes_enriched
GROUP BY title_id
EMIT CHANGES;
上述查詢將導致錯誤,因為season_id未包含在GROUP BY子句中。
視窗聚合運算
視窗聚合運算是ksqlDB中的另一種重要功能,它允許使用者根據時間視窗對資料進行聚合。視窗型別包括Tumbling視窗、Hopping視窗和Session視窗。
視窗型別示例
| 視窗型別 | 示例 |
|---|---|
| Tumbling視窗 | WINDOW TUMBLING (SIZE 30 SECONDS) |
| Hopping視窗 | WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) |
| Session視窗 | WINDOW SESSION (60 SECONDS) |
視窗聚合運算為使用者提供了更靈活的資料分析能力,允許根據不同的時間視窗進行資料匯總和分析。
ksqlDB 中的視窗表示式與延遲資料處理
在 ksqlDB 中,視窗表示式(Window Expression)是一種強大的工具,能夠根據時間或其他條件對資料流進行分組和聚合運算。視窗表示式在處理包含時間戳的資料流時特別有用,因為它允許根據時間範圍對資料進行分組。
在查詢中加入視窗表示式
要在查詢中加入視窗表示式,只需在 GROUP BY 子句之前新增視窗表示式的定義。下面是一個例子:
SELECT
title_id,
season_id,
COUNT(*) AS change_count,
LATEST_BY_OFFSET(new_episode_count) AS latest_episode_count
FROM
season_length_changes_enriched
WINDOW
TUMBLING (SIZE 1 HOUR)
GROUP BY
title_id, season_id
EMIT CHANGES;
內容解密:
WINDOW TUMBLING (SIZE 1 HOUR):定義了一個翻滾視窗(Tumbling Window),大小為 1 小時。這意味著資料將根據時間戳被分成連續的、不重疊的 1 小時段。GROUP BY title_id, season_id:資料根據title_id和season_id進行分組,這樣每個分組內的資料都會被單獨處理。COUNT(*) AS change_count:計算每個分組內的記錄數。LATEST_BY_OFFSET(new_episode_count) AS latest_episode_count:取得每個分組內最新的new_episode_count值。
處理延遲資料
在 ksqlDB 中,延遲資料是指那些到達時間晚於預期的資料。這些資料可能會影響視窗聚合的結果,因為它們可能屬於已經關閉的視窗。
寬限期(Grace Period)
為瞭解決這個問題,ksqlDB 引入了寬限期(Grace Period)的概念。寬限期允許延遲到達的資料在一定時間內仍然被納入相關視窗的計算中。
WINDOW <window_type> (
<window_properties>,
GRACE PERIOD <number> <time_unit>
)
例如,若要定義一個寬限期為 10 分鐘的翻滾視窗,可以這樣寫:
SELECT
title_id,
season_id,
COUNT(*) AS change_count,
LATEST_BY_OFFSET(new_episode_count) AS episode_count
FROM
season_length_changes_enriched
WINDOW
TUMBLING (SIZE 1 HOUR, GRACE PERIOD 10 MINUTES)
GROUP BY
title_id, season_id
EMIT CHANGES;
內容解密:
GRACE PERIOD 10 MINUTES:定義了一個寬限期為 10 分鐘。這意味著在視窗關閉後 10 分鐘內到達的資料仍然會被納入該視窗的計算中。- 這種設定允許系統容忍一定程度的延遲資料,從而提高結果的準確性。
示例與驗證
透過執行以下插入陳述式,可以觀察寬限期對延遲資料的處理效果:
INSERT INTO production_changes VALUES (
'1', 1, 1, 'season_length',
STRUCT(season_id := 1, episode_count := 12),
STRUCT(season_id := 1, episode_count := 8),
'2021-02-24 10:00:00'
);
INSERT INTO production_changes VALUES (
'1', 1, 1, 'season_length',
STRUCT(season_id := 1, episode_count := 8),
STRUCT(season_id := 1, episode_count := 10),
'2021-02-24 11:00:00'
);
INSERT INTO production_changes VALUES (
'1', 1, 1, 'season_length',
STRUCT(season_id := 1, episode_count := 10),
STRUCT(season_id := 1, episode_count := 8),
'2021-02-24 10:59:00'
);
INSERT INTO production_changes VALUES (
'1', 1, 1, 'season_length',
STRUCT(season_id := 1, episode_count := 8),
STRUCT(season_id := 1, episode_count := 12),
'2021-02-24 11:10:00'
);
INSERT INTO production_changes VALUES (
'1', 1, 1, 'season_length',
STRUCT(season_id := 1, episode_count := 12),
STRUCT(season_id := 1, episode_count := 8),
'2021-02-24 10:59:00'
);
此圖示說明瞭流時間(Stream Time)與寬限期的關係:
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 此圖示說明瞭流時間(Stream Time)與寬限期的關係:
rectangle "根據時間戳更新" as node1
rectangle "判定是否延遲" as node2
rectangle "是" as node3
rectangle "否" as node4
node1 --> node2
node2 --> node3
node3 --> node4
@enduml
圖示內容解密:
- 記錄時間戳:每條記錄都有一個時間戳,用於更新流時間。
- 流時間:系統根據記錄的時間戳維護的一個不斷增加的時間值,用於判定記錄是否延遲。
- 寬限期判定:系統檢查延遲記錄是否在寬限期內,以決定是否納入相關視窗的計算中。
ksqlDB 中的視窗聚合與物化檢視
ksqlDB 是一種強大的流處理引擎,提供了視窗聚合(Windowed Aggregations)和物化檢視(Materialized Views)等功能,以支援實時資料處理和分析。在本篇文章中,我們將探討 ksqlDB 中的視窗聚合和物化檢視,包括其工作原理、組態選項以及如何使用它們來構建實時資料處理管道。
視窗聚合的寬限期
在 ksqlDB 中,視窗聚合是一種將資料分組到固定時間範圍內的操作。然而,由於資料可能不會按時到達,因此需要設定一個寬限期(Grace Period)來處理延遲資料。寬限期是指在視窗關閉之前,ksqlDB 等待延遲資料到達的時間。如果資料在寬限期內到達,則會被納入視窗聚合中;否則,將被忽略。
WINDOW TUMBLING (
SIZE 1 HOUR,
GRACE PERIOD 10 MINUTES
)
內容解密:
WINDOW TUMBLING指定了視窗型別為滾動視窗,每小時計算一次。SIZE 1 HOUR設定了視窗大小為一小時。GRACE PERIOD 10 MINUTES設定了寬限期為 10 分鐘,允許資料延遲到達。
視窗保留
視窗保留(Window Retention)是指 ksqlDB 保留視窗資料的時間長度。預設情況下,ksqlDB 會保留視窗資料一段時間,以便支援查詢。如果需要查詢舊的視窗資料,可以透過設定 RETENTION 屬性來控制視窗保留時間。
WINDOW TUMBLING (
SIZE 1 HOUR,
RETENTION 2 DAYS,
GRACE PERIOD 10 MINUTES
)
內容解密:
RETENTION 2 DAYS設定了視窗保留時間為兩天,確保資料在兩天內可供查詢。- 這裡需要注意的是,保留時間必須大於或等於視窗大小加上寬限期。
物化檢視
物化檢視(Materialized Views)是 ksqlDB 中的一種特殊表格,用於儲存查詢結果。物化檢視可以自動重新整理,以反映最新的資料變化。在 ksqlDB 中,物化檢視通常用於支援查詢操作,例如 pull queries。
CREATE TABLE season_length_changes_enriched_table AS
SELECT
title_id,
season_id,
LATEST_BY_OFFSET(new_episode_count) AS episode_count,
COUNT(*) AS change_count
FROM season_length_changes_enriched
WINDOW TUMBLING (
SIZE 1 HOUR,
RETENTION 2 DAYS,
GRACE PERIOD 10 MINUTES
)
GROUP BY title_id, season_id
EMIT CHANGES;
內容解密:
CREATE TABLE陳述式建立了一個物化檢視,用於儲存查詢結果。SELECT陳述式指定了要查詢的欄位和聚合操作。WINDOW TUMBLING指定了視窗型別和相關組態。GROUP BY陳述式指定了分組欄位。EMIT CHANGES陳述式指定了輸出變更資料。