ksqlDB 雖然沿用檢視的命名,但實際上並無獨立檢視物件,而是透過查詢建立的表集合。建立物化檢視需使用聚合查詢,例如以翻滾視窗聚合資料,指定 Kafka 主題、值格式及分割槽數等。完成後,即可執行提取查詢,透過 SQL 語法篩選所需資料,並可使用 CLI 或 curl 等使用者端工具進行查詢。ksqlDB 提供 Pull Queries 和 Push Queries 兩種查詢方式,Pull Queries 查詢現有資料,Push Queries 則持續監聽資料變化。Pull Queries 可使用時間戳或日期時間字串過濾資料,並可透過 curl 執行,回傳包含欄位名稱及資料列的 JSON 格式結果。Push Queries 則持續回傳資料更新,直到連線終止。ksqlDB 也提供豐富的函式與運算元,包含算術、字串、下標及結構體解參照等運算元,以及 Scalar、Aggregate 和 Table 等函式型別。使用者可透過 SHOW FUNCTIONS 指令檢視可用函式。
ksqlDB 中的物化檢視與查詢
在 ksqlDB 中,雖然採用了傳統系統中的檢視命名法,但實際上並沒有獨立的檢視物件。因此,當提到 ksqlDB 中的檢視時,可以理解為可用於查詢式提取查詢的表集合。
建立物化檢視
要建立物化檢視,需要使用聚合查詢建立派生表集合。例如,利用範例 11-6 中的視窗聚合查詢,可以建立一個名為 season_length_change_counts 的物化檢視,具體的 SQL 陳述式如範例 11-7 所示。
CREATE TABLE season_length_change_counts
WITH (
KAFKA_TOPIC = 'season_length_change_counts',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 1
) AS
SELECT
title_id,
season_id,
COUNT(*) AS change_count,
LATEST_BY_OFFSET(new_episode_count) AS episode_count
FROM season_length_changes_enriched
WINDOW TUMBLING (
SIZE 1 HOUR,
RETENTION 2 DAYS,
GRACE PERIOD 10 MINUTES
)
GROUP BY title_id, season_id
EMIT CHANGES;
內容解密:
CREATE TABLE: 使用CREATE TABLE陳述式建立一個名為season_length_change_counts的物化檢視。WITH: 指定物化檢視的屬性,例如 Kafka 主題、值格式和分割槽數。SELECT: 從season_length_changes_enriched流中選擇所需的欄位,並進行聚合計算。WINDOW TUMBLING: 定義一個翻滾視窗,大小為 1 小時,保留期為 2 天,寬限期為 10 分鐘。GROUP BY: 按照title_id和season_id分組。
提取查詢
有了物化檢視後,就可以對其執行提取查詢。提取查詢的語法如下:
SELECT select_expr [, ...]
FROM from_item
WHERE condition;
例如,可以使用範例 11-8 中的提取查詢來查詢 season_length_change_counts 檢視中的特定資料。
SELECT *
FROM season_length_change_counts
WHERE KSQL_COL_0 = '1|+|1';
內容解密:
SELECT *: 選擇所有欄位。FROM season_length_change_counts: 指定要查詢的物化檢視。WHERE KSQL_COL_0 = '1|+|1': 根據主鍵KSQL_COL_0的值進行過濾。
執行上述查詢後,將傳回符合條件的資料,包括 WINDOWSTART 和 WINDOWEND 這兩個偽列。
使用者端查詢
ksqlDB 支援多種使用者端查詢方式,包括 CLI 和 curl。可以使用這些工具對物化檢視和流執行查詢。
隨著 ksqlDB 的不斷發展,未來可能會出現更多的新功能和改進。例如,Java 使用者端的介面可能會發生變化,或者會出現新的官方使用者端,如 Python 或 Go 使用者端。這些變化將進一步增強 ksqlDB 的功能和易用性。
此圖示說明 ksqlDB 中物化檢視的建立和查詢流程
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title KsqlDB 物化檢視與查詢技術
package "資料庫架構" {
package "應用層" {
component [連線池] as pool
component [ORM 框架] as orm
}
package "資料庫引擎" {
component [查詢解析器] as parser
component [優化器] as optimizer
component [執行引擎] as executor
}
package "儲存層" {
database [主資料庫] as master
database [讀取副本] as replica
database [快取層] as cache
}
}
pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中
master --> replica : 資料同步
note right of cache
Redis/Memcached
減少資料庫負載
end note
@enduml
圖示內容解密:
建立物化檢視: 使用聚合查詢建立派生表集合。執行提取查詢: 對物化檢視執行提取查詢。傳回查詢結果: 傳回符合條件的資料。使用使用者端工具查詢: 使用 CLI 或 curl 等使用者端工具進行查詢。
ksqlDB查詢與函式操作
查詢資料:Pull Queries 與 Push Queries
ksqlDB 提供了兩種主要的查詢方式:Pull Queries 和 Push Queries。Pull Queries 用於查詢資料函式庫中的現有資料,而 Push Queries 則用於持續監聽資料函式庫中的變化。
Pull Queries
Pull Queries 允許使用者查詢 ksqlDB 中的物化檢視(Materialized Views)。使用者可以使用時間戳或更易讀的日期時間字串來過濾資料。以下兩個查詢範例展示了這兩種型別的查詢:
SELECT * FROM season_length_change_counts
WHERE KSQL_COL_0 = '1|+|1' AND WINDOWSTART = 1614164400000;
SELECT * FROM season_length_change_counts
WHERE KSQL_COL_0 = '1|+|1' AND WINDOWSTART = '2021-02-24T10:00:00';
這兩個查詢的輸出結果相同:
+
---
-
---
-
---
+
---
-
---
-
---
---
+
---
-
---
-
---
---
+
---
-
---
-
---
---
+
---
-
---
-
---
--+
|KSQL_COL_0 |WINDOWSTART |WINDOWEND |CHANGE_COUNT |EPISODE_COUNT|
+
---
-
---
-
---
+
---
-
---
-
---
-
---
-
---
-
---
-
---
--+
---
-
---
-
---
-
---
-
---
-
---
-
---
-+
|1|+|1 |1614160800000 |1614164400000 |2 |8 |
使用 Curl 執行 Pull Queries
使用者可以使用 curl 命令列工具來執行 Pull Queries。以下範例展示瞭如何使用 curl 執行 Pull Query:
curl -X POST "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
--data $'{
"ksql":"SELECT * FROM season_length_change_counts WHERE KSQL_COL_0=\'1|+|1\';",
"streamsProperties": {}
}'
輸出結果包含欄位名稱和一或多行資料:
[
{
"header": {
"queryId": "query_1604158332837",
"schema": "`KSQL_COL_0` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `CHANGE_COUNT` BIGINT, `EPISODE_COUNT` INTEGER"
}
},
{
"row": {
"columns": [
"1|+|1",
1614160800000,
1614164400000,
2,
8
]
}
}
]
Push Queries
Push Queries 用於持續監聽資料函式庫中的變化。以下範例展示瞭如何使用 curl 執行 Push Query:
curl -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d $'{
"ksql": "SELECT * FROM season_length_changes_enriched EMIT CHANGES ;",
"streamsProperties": {}
}'
輸出結果將持續更新,直到連線終止:
[
{"header":{"queryId":"none","schema":"`ROWTIME` BIGINT, `ROWKEY` INTEGER, `TITLE_ID` INTEGER, `CHANGE_COUNT` BIGINT"}},
{"row":{"columns":[1,"Stranger Things",1,12,8,"2021-02-24 10:00:00"]}},
{"row":{"columns":[1,"Stranger Things",1,8,10,"2021-02-24 11:00:00"]}},
{"row":{"columns":[1,"Stranger Things",1,10,8,"2021-02-24 10:59:00"]}},
{"row":{"columns":[1,"Stranger Things",1,8,12,"2021-02-24 11:10:00"]}},
{"row":{"columns":[1,"Stranger Things",1,12,8,"2021-02-24 10:59:00"]}}
]
程式碼解析
curl -X POST "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
--data $'{
"ksql":"SELECT * FROM season_length_change_counts WHERE KSQL_COL_0=\'1|+|1\';",
"streamsProperties": {}
}'
內容解密:
- 使用
curl傳送 POST 請求:透過指定-X POST,使用者向 ksqlDB 的/query端點傳送請求。 - 設定內容型別:
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8"指定了請求的內容型別和字元編碼。 - 傳遞 SQL 查詢:在
--data中,使用 JSON 格式傳遞 SQL 查詢陳述式。 - streamsProperties 組態:在 JSON 物件中,
streamsProperties為空物件,表示未指定額外的流處理屬性。
ksqlDB 的函式與運算元
ksqlDB 提供豐富的函式和運算元,用於處理資料。
運算元
ksqlDB 支援多種運算元,包括:
- 算術運算元(
+,-,/,*,%) - 字串串接運算元(
+,||) - 下標運算元(
[]),用於存取陣列索引或對映鍵 - 結構體解參照運算元(
->)
函式
ksqlDB 提供多種型別的函式,包括:
- SCALAR:無狀態函式,對單一行資料進行操作並傳回一個輸出值。
- AGGREGATE:有狀態函式,用於聚合資料並傳回一個輸出值。
- TABLE:無狀態函式,接受一個輸入並產生零或多個輸出。
使用者可以使用 SHOW FUNCTIONS; 陳述式列出可用的函式:
ksql> SHOW FUNCTIONS ;
輸出結果包含函式名稱和型別:
Function Name | Type
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
AVG | AGGREGATE
CEIL | SCALAR
CONCAT | SCALAR
COUNT | AGGREGATE
...
程式碼解析
SHOW FUNCTIONS ;
內容解密:
SHOW FUNCTIONS;陳述式:用於列出 ksqlDB 中所有可用的函式。- 函式型別:輸出結果包含函式名稱和型別(SCALAR、AGGREGATE、TABLE)。
- 函式用途:不同型別的函式具有不同的用途,使用者可以根據需求選擇合適的函式。
使用ksqlDB函式與運算子
描述ksqlDB函式
當你在瀏覽函式庫時,可能會希望獲得更多關於某個特定函式的資訊。在下一節中,我們將瞭解如何實作這一點。
描述函式
如果你想了解更多關於ksqlDB函式的資訊,你可以存取ksqlDB官方網站上的官方檔案。然而,你不需要離開CLI來描述一個函式,因為ksqlDB包含一個特殊的陳述式,可以提供你所需的資訊。該陳述式的語法如下:
DESCRIBE FUNCTION <identifier>
例如,如果你想了解更多關於內建的EARLIEST_BY_OFFSET函式的資訊,你可以執行範例11-9中的SQL陳述式。
範例11-9:如何在ksqlDB中描述一個函式
ksql> DESCRIBE FUNCTION EARLIEST_BY_OFFSET;
Name : EARLIEST_BY_OFFSET
Author : Confluent
Overview : This function returns the oldest value for the column,
computed by offset.
Type : AGGREGATE
Jar : internal
Variations :
Variation : EARLIEST_BY_OFFSET(val BOOLEAN)
Returns : BOOLEAN
Description : return the earliest value of a Boolean column
Variation : EARLIEST_BY_OFFSET(val INT)
Returns : INT
Description : return the earliest value of an integer column
...
輸出結果包括:
- 函式的描述。
- 函式的型別(參見表11-3以取得可用函式型別的列表)。
- 當檢視ksqlDB中包含的內建函式時,Jar值將顯示為internal。在下一節中,我們將瞭解如何構建自己的函式,這將包含實際Jar在磁碟上的路徑。
- Variations部分將包含函式的所有有效方法簽名的列表。為了簡潔起見,這一部分被截斷了,但你可以看到這個函式至少有兩個變體:一個變體接受一個BOOLEAN引數並傳回一個BOOLEAN值,而另一個變體接受並傳回一個INT。
內容解密:
DESCRIBE FUNCTION陳述式用於取得ksqlDB中特定函式的詳細資訊。- 輸出結果提供了函式的名稱、作者、概述、型別和有效的方法簽名。
- 內建函式的Jar值顯示為internal,而自定義函式將顯示實際的Jar檔案路徑。
建立自定義函式
有時,你可能希望建立自定義函式以在ksqlDB查詢中使用。例如,你可能希望對列應用專門的數學函式,或使用資料流中的輸入呼叫機器學習模型。無論你的使用案例有多簡單或複雜,ksqlDB都包含一個Java介面,使你能夠使用自己的使用者定義函式擴充套件內建函式庫。
使用者定義函式型別
ksqlDB中有三種型別的使用者定義函式,每種型別都與我們在上一節中討論的內建函式型別(標量、聚合和表函式)相關。下表總結了使用者定義函式型別:
| 型別 | 描述 |
|---|---|
| 使用者定義函式(UDFs) | 自定義的SCALAR函式。UDFs是無狀態的並傳回一個值。 |
| 使用者定義聚合函式(UDAFs) | 自定義的AGGREGATE函式。UDAFs是有狀態的並傳回一個值。 |
| 使用者定義表函式(UDTFs) | 自定義的TABLE函式。UDTFs是無狀態的並傳回零或多個值。 |
停止詞移除UDF
一個常見的資料預處理任務是從文字字串中移除所謂的停止詞。停止詞是常見詞彙(如“a”、“and”、“are”、“but”、“or”、“the”等),它們對底層文字的意義貢獻不大。如果你有一個機器學習或自然語言模型試圖從文字中提取資訊,通常會先從模型輸入中移除停止詞。因此,我們將建立一個名為REMOVE_STOP_WORDS的UDF。
建立自定義函式的步驟
- 建立一個Java專案來包含我們的函式程式碼。
- 將
io.confluent.ksql:ksql-udf依賴項新增到我們的專案中,該依賴項包含我們UDF類別所需的註解。 - 新增實作我們的函式所需的任何其他依賴項。例如,如果有想要在程式碼中利用的第三方Maven依賴項,請將其新增到構建檔案中。
- 使用適當的註解編寫我們的UDF邏輯(我們將很快介紹這些)。
- 將程式碼構建並封裝為uber JAR,它將我們的函式原始碼及其依賴的所有第三方程式碼組合成一個單獨的JAR檔案。
- 將uber JAR複製到ksqlDB擴充套件目錄。這是ksqlDB伺服器上可組態的檔案路徑,可以使用
ksql.extension.dir組態屬性定義。 - 重啟ksqlDB伺服器。當ksqlDB伺服器重新上線時,它將載入新函式,使其可在SQL陳述式中使用。
內容解密:
- 建立自定義UDF需要建立Java專案並新增必要的依賴項。
- 使用適當的註解編寫UDF邏輯,並將程式碼封裝為uber JAR。
- 將uber JAR複製到ksqlDB擴充套件目錄並重啟ksqlDB伺服器,以使新函式可用。
建立新Java專案
首先,讓我們使用Gradle的init命令建立一個新的Java專案。以下程式碼顯示瞭如何執行此操作:
mkdir udf && cd udf
gradle init \
--type java-library \
--dsl groovy \
--test-framework junit-jupiter \
--project-name udf \
--package com.magicalpipelines.ksqldb
接下來,讓我們將ksql-udf依賴項新增到我們的構建檔案(build.gradle)中。該依賴項位於Confluent的Maven儲存函式庫中,因此我們還需要更新repositories塊:
repositories {
// ...
maven {
url = uri('http://packages.confluent.io/maven/')
}
}
dependencies {
// 新增ksql-udf依賴項
}
內容解密:
- 使用Gradle建立新的Java專案,並指定專案名稱和包名。
- 將Confluent的Maven儲存函式庫新增到repositories塊中,以便存取
ksql-udf依賴項。 - 在dependencies塊中新增
ksql-udf依賴項,以包含必要的註解和類別。