在資料串流應用盛行的今日,ksqlDB 提供了簡化的串流處理方法。然而,內建函式可能無法滿足所有應用場景。此時,開發者可以利用 Java 建立自定義函式(UDF)來擴充套件 ksqlDB 的功能。本文將詳細介紹如何開發一個移除停用詞的 UDF,並整合至 ksqlDB 中。首先,需要設定開發環境,包括加入 Confluent Maven 倉函式庫和 ksqldb-udf 依賴項到專案的 build.gradle 檔案中。接著,撰寫 Java 程式碼實作移除停用詞的邏輯,並使用 @UdfDescription、@Udf 和 @UdfParameter 等註解標記 UDF 的相關資訊。完成後,使用 Gradle 構建 UDF 的 JAR 檔案,並將其佈署到 ksqlDB 的擴充套件目錄。最後,透過重啟 ksqlDB 服務並執行 SHOW FUNCTIONS 和 DESCRIBE FUNCTION 命令驗證 UDF 是否成功載入。
使用ksqlDB建立自定義函式(UDF)
ksqlDB允許開發者建立自定義函式(UDF)以擴充套件其功能,滿足特定的業務需求。本篇文章將指導如何使用Java建立一個簡單的UDF,該函式能夠從給定的文字字串中移除停用詞(stop words)。
步驟1:設定開發環境
首先,需要在build.gradle檔案中新增Confluent Maven倉函式庫和ksqldb-udf依賴項。這樣可以取得必要的註解和其他資源,以便開發UDF。
repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
implementation 'io.confluent.ksql:ksqldb-udf:6.0.0'
}
內容解密:
repositories區塊用於指定依賴項的來源。本例中,除了預設的Maven中央倉函式庫外,還增加了Confluent的Maven倉函式庫。dependencies區塊則用於列出專案所需的依賴項。這裡增加了ksqldb-udf,這是開發ksqlDB UDF所必需的。
步驟2:實作業務邏輯
接下來,建立一個名為RemoveStopWordsUdf.java的檔案,並在其中實作業務邏輯。
public class RemoveStopWordsUdf {
private final List<String> stopWords = Arrays.asList("a", "and", "are", "but", "or", "over", "the");
private ArrayList<String> stringToWords(String source) {
return Stream.of(source.toLowerCase().split(" "))
.collect(Collectors.toCollection(ArrayList<String>::new));
}
private String wordsToString(ArrayList<String> words) {
return words.stream().collect(Collectors.joining(" "));
}
public String apply(final String source) {
ArrayList<String> words = stringToWords(source);
words.removeAll(stopWords);
return wordsToString(words);
}
}
內容解密:
stringToWords方法將輸入的字串轉換為單詞列表。wordsToString方法將單詞列表轉換回字串。apply方法是UDF的核心,負責從輸入字串中移除停用詞。
步驟3:新增註解
為了讓ksqlDB能夠識別並正確載入UDF,需要新增特定的註解。
@UdfDescription(
name = "remove_stop_words",
description = "A UDF that removes stop words from a string of text",
version = "0.1.0",
author = "Mitch Seymour")
public class RemoveStopWordsUdf {
@Udf(description = "Remove the default stop words from a string of text")
public String apply(
@UdfParameter(value = "source", description = "the raw source string")
final String source
) {
// 方法實作
}
}
內容解密:
@UdfDescription註解提供了關於UDF的後設資料,包括名稱、描述、版本和作者。@Udf註解標識了可以被ksqlDB呼叫的方法。@UdfParameter註解用於描述方法的引數。
步驟4:構建和佈署UDF
使用Gradle構建UDF的uber JAR檔案,並將其複製到ksqlDB的擴充套件目錄中。然後,重啟ksqlDB服務。
./gradlew build --info
cp build/libs/udf.jar /etc/ksqldb/extensions
ksql-server-stop
ksql-server-start
內容解密:
./gradlew build --info命令用於構建專案並生成uber JAR。- 將生成的JAR檔案複製到ksqlDB指定的擴充套件目錄。
- 重啟ksqlDB服務以載入新的UDF。
步驟5:驗證UDF
在ksqlDB客戶端中使用SHOW FUNCTIONS和DESCRIBE FUNCTION命令來驗證UDF是否正確載入。
ksql> SHOW FUNCTIONS ;
ksql> DESCRIBE FUNCTION REMOVE_STOP_WORDS ;
內容解密:
SHOW FUNCTIONS命令列出了所有可用的函式,包括新新增的UDF。DESCRIBE FUNCTION REMOVE_STOP_WORDS命令提供了關於REMOVE_STOP_WORDSUDF的詳細資訊。
透過以上步驟,您可以成功地在ksqlDB中建立和佈署自定義函式,從而擴充套件ksqlDB的功能以滿足特定的業務需求。
ksqlDB 的進階串流處理與自訂函式
ksqlDB 不僅提供簡單的介面,還支援許多中級到進階的串流處理應用案例,包括在不同集合中連線資料、聚合資料、建立可使用鍵值查詢查詢的物化檢視等。此外,ksqlDB 還提供了一個豐富的內建函式庫,用於處理各種資料處理和豐富的應用案例。
使用自訂函式擴充套件 ksqlDB
ksqlDB 的一大亮點是能夠讓開發者使用自己的自訂 Java 函式擴充套件內建函式庫。這對於需要自訂業務邏輯的應用程式來說非常重要,因為它允許開發者繼續使用簡單的 SQL 方言構建串流處理應用程式。
建立自訂函式的步驟
開發 Java 函式:首先,需要開發一個 Java 函式,該函式將被用於 ksqlDB 中。
封裝 Java 函式:將開發好的 Java 函式封裝成 JAR 檔案。
註冊 JAR 檔案:將 JAR 檔案註冊到 ksqlDB 中,使其可被 ksqlDB 使用。
使用自訂函式:註冊完成後,即可在 ksqlDB 中像使用內建函式一樣使用自訂函式。
示例:移除停用詞的自訂函式
// 假設這是一個移除停用詞的 Java 函式
public class RemoveStopWords {
public String removeStopWords(String text) {
// 實作移除停用詞的邏輯
// 例如:將 "The quick brown fox jumps over the lazy dog"
// 處理為 "quick brown fox jumps lazy dog"
return processedText;
}
}
在 ksqlDB 中使用自訂函式
-- 建立一個名為 model_inputs 的串流
CREATE STREAM model_inputs (
text STRING
)
WITH (
KAFKA_TOPIC='model_inputs',
VALUE_FORMAT='JSON',
PARTITIONS=4
);
-- 向 model_inputs 插入測試資料
INSERT INTO model_inputs VALUES ('The quick brown fox jumps over the lazy dog');
-- 使用自訂函式 remove_stop_words
SELECT
text AS original,
remove_stop_words(text) AS no_stop_words
FROM model_inputs
EMIT CHANGES;
輸出結果將顯示原始文字和移除停用詞後的文字。
測試、監控和佈署
在開發完成後,需要對 ksqlDB 查詢或 Kafka Streams 應用程式進行測試、監控和佈署,以確保其在生產環境中的穩定性和效能。
測試 ksqlDB 查詢
ksqlDB 提供了一個名為 ksql-test-runner 的工具,用於測試 ksqlDB 查詢。該工具需要三個引數:
- 包含要測試的 SQL 陳述式的檔案
- 指定來源主題輸入資料的檔案
- 指定預期輸出結果的檔案
-- statements.sql 中的查詢
CREATE STREAM users (
ROWKEY INT KEY,
USERNAME VARCHAR
) WITH (kafka_topic='users', value_format='JSON');
CREATE STREAM greetings
WITH (KAFKA_TOPIC = 'greetings') AS
SELECT ROWKEY, 'Hello, ' + USERNAME AS "greeting"
FROM users
EMIT CHANGES;
// input.json 中的輸入資料
[
{
"rowkey": 1,
"username": "John"
},
{
"rowkey": 2,
"username": "Jane"
}
]
使用 ksql-test-runner 執行測試,驗證查詢結果是否符合預期。
測試 ksqlDB 與 Kafka Streams 應用
在開發資料流處理應用時,測試是確保程式碼品質和穩定性的關鍵步驟。本文將介紹如何測試 ksqlDB 和 Kafka Streams 應用。
測試 ksqlDB 查詢
ksqlDB 提供了一個測試工具 ksql-test-runner,可以用於測試查詢的正確性。以下是測試 ksqlDB 查詢的步驟:
- 建立輸入資料檔案
input.json,包含測試資料。 - 建立查詢陳述式檔案
statements.sql,包含要測試的查詢陳述式。 - 建立輸出資料檔案
output.json,包含預期的輸出結果。 - 使用
ksql-test-runner工具執行測試。
input.json
{
"inputs": [
{
"topic": "users",
"timestamp": 0,
"value": {"USERNAME": "Isabelle"},
"key": 0
},
{
"topic": "users",
"timestamp": 0,
"value": {"USERNAME": "Elyse"},
"key": 0
}
]
}
statements.sql
CREATE STREAM users (ROWKEY INT KEY, USERNAME VARCHAR)
WITH (KAFKA_TOPIC = 'users', VALUE_FORMAT = 'json');
CREATE STREAM greetings
WITH (KAFKA_TOPIC = 'greetings') AS
SELECT ROWKEY, 'Hello, ' + USERNAME AS greeting
FROM users
EMIT CHANGES;
output.json
{
"outputs": [
{
"topic": "greetings",
"timestamp": 0,
"value": {"greeting": "Hello, Isabelle"},
"key": 0
},
{
"topic": "greetings",
"timestamp": 0,
"value": {"greeting": "Hello, Elyse"},
"key": 0
}
]
}
執行測試的命令如下:
docker run \
-v "$(pwd)":/ksqldb/ \
-w /ksqldb \
-ti confluentinc/ksqldb-server:0.14.0 \
ksql-test-runner -s statements.sql -i input.json -o output.json
#### 內容解密:
此命令會啟動一個 Docker 容器,掛載當前目錄到 /ksqldb/,並執行 ksql-test-runner 工具。該工具會讀取 statements.sql、input.json 和 output.json 檔案,並執行測試。如果測試透過,會輸出 >>> Test passed!。
測試 Kafka Streams 應用
測試 Kafka Streams 應用可以使用 JUnit 和 AssertJ 等測試框架。以下是測試 Kafka Streams 應用的步驟:
- 新增測試依賴項到
build.gradle檔案中。 - 建立測試類別,使用 JUnit 和 AssertJ 等測試框架。
- 編寫測試方法,測試 Kafka Streams 應用的正確性。
build.gradle
dependencies {
testImplementation "org.apache.kafka:kafka-streams-test-utils:${kafkaVersion}"
testImplementation 'org.assertj:assertj-core:3.15.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
}
test {
useJUnitPlatform()
}
#### 內容解密:
此組態會新增 Kafka Streams 測試工具、AssertJ 和 JUnit 等依賴項到專案中。並使用 JUnit 平台執行測試。
測試 Kafka Streams 應用程式
當將邏輯移到專用方法後,測試變得更加容易。事實上,我們甚至不需要 kafka-streams-test-utils 套件來測試這段程式碼,因為我們可以直接使用測試框架來單元測試這個方法,如範例 12-1 所示。
簡單的單元測試
範例 12-1. 一個簡單的單元測試,用於測試 selectKey 拓撲步驟,該步驟依賴於一個名為 MyTopology.decodeKey 的專用方法
class MyTopologyTest {
@Test
public void testDecodeId() {
String key = "1XRZTUW3";
byte[] value = new byte[] {};
String actualValue = MyTopology.decodeKey(key, value);
String expectedValue = "decoded-1XRZTUW3";
assertThat(actualValue).isEqualTo(expectedValue);
}
}
內容解密:
- 我們的測試將呼叫與
selectKey處理器中使用的相同方法。 - 在這裡,我們傳遞一個硬編碼的鍵和值。如果需要,我們也可以使用 JUnit 的引數化測試功能,使用不同的鍵值對來執行多次迭代。
- 我們定義了對方法輸出的期望。
- 我們使用 AssertJ 來斷言
MyTopology.decodeKey方法傳回的實際值與我們的期望相符。
使用 Processor API 進行測試
使用 Processor API 時,我們通常不使用 lambda 或方法參考來定義流處理器的邏輯。相反,我們使用實作 Processor 或 Transformer 介面的類別。因此,我們的測試策略略有不同,因為我們需要模擬 Kafka Streams 在初始化時傳遞給這些底層流處理器的 ProcessorContext。
狀態轉換器測試
讓我們實作一個狀態轉換器,追蹤每個唯一鍵所看到的記錄數量。我們將測試的轉換器實作如下:
public class CountTransformer
implements ValueTransformerWithKey<String, String, Long> {
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, Long>) context.getStateStore("my-store");
}
@Override
public Long transform(String key, String value) {
// 處理 tombstones
if (value == null) {
store.delete(key);
return null;
}
// 取得此鍵的前一個計數,如果是第一次看到此鍵,則設為 0
Long previousCount = store.get(key);
if (previousCount == null) {
previousCount = 0L;
}
// 計算新的計數
Long newCount = previousCount + 1;
store.put(key, newCount);
return newCount;
}
@Override
public void close() {}
}
內容解密:
Processor和Transformer實作有一個初始化函式,接受ProcessorContext。- 此轉換器是有狀態的,因為它需要記住每個唯一鍵的計數,因此我們將儲存對狀態儲存的參考。
transform方法包含此流處理器的邏輯,註解中討論了實作細節。
使用 MockProcessorContext 進行測試
我們的轉換器就緒後,就可以開始編寫單元測試。與前面的簡單方法級測試不同,這個測試需要使用 kafka-streams-test-utils 套件中的一些輔助工具。該套件允許我們建立 MockProcessorContext 物件,使我們能夠存取狀態和排程週期函式,而無需實際執行拓撲。
public class CountTransformerTest {
MockProcessorContext processorContext;
@BeforeEach
public void setup() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
processorContext = new MockProcessorContext(props);
KeyValueStore<String, Long> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
Serdes.String(), Serdes.Long())
.withLoggingDisabled()
.build();
store.init(processorContext, store);
processorContext.register(store, null);
}
}
內容解密:
- 一種常見的方法是在設定函式中建立
MockProcessorContext,該函式在單元測試執行之前執行。 - 如果處理器是有狀態的,則還需要初始化和註冊狀態儲存。
- 雖然拓撲可能使用持久狀態儲存,但建議在單元測試中使用記憶體儲存。