返回文章列表

ksqlDB自定義函式開發與佈署

本文介紹如何使用 Java 開發 ksqlDB 自定義函式(UDF),並逐步講解 UDF 的構建、佈署和驗證過程。文章涵蓋了開發環境設定、業務邏輯實作、必要註解新增、Gradle 構建、佈署到 ksqlDB 擴充套件目錄以及使用 SQL 命令驗證 UDF 等關鍵步驟,讓開發者能夠輕鬆擴充套件 ksqlDB

串流處理 資料函式庫

在資料串流應用盛行的今日,ksqlDB 提供了簡化的串流處理方法。然而,內建函式可能無法滿足所有應用場景。此時,開發者可以利用 Java 建立自定義函式(UDF)來擴充套件 ksqlDB 的功能。本文將詳細介紹如何開發一個移除停用詞的 UDF,並整合至 ksqlDB 中。首先,需要設定開發環境,包括加入 Confluent Maven 倉函式庫和 ksqldb-udf 依賴項到專案的 build.gradle 檔案中。接著,撰寫 Java 程式碼實作移除停用詞的邏輯,並使用 @UdfDescription@Udf@UdfParameter 等註解標記 UDF 的相關資訊。完成後,使用 Gradle 構建 UDF 的 JAR 檔案,並將其佈署到 ksqlDB 的擴充套件目錄。最後,透過重啟 ksqlDB 服務並執行 SHOW FUNCTIONSDESCRIBE FUNCTION 命令驗證 UDF 是否成功載入。

使用ksqlDB建立自定義函式(UDF)

ksqlDB允許開發者建立自定義函式(UDF)以擴充套件其功能,滿足特定的業務需求。本篇文章將指導如何使用Java建立一個簡單的UDF,該函式能夠從給定的文字字串中移除停用詞(stop words)。

步驟1:設定開發環境

首先,需要在build.gradle檔案中新增Confluent Maven倉函式庫和ksqldb-udf依賴項。這樣可以取得必要的註解和其他資源,以便開發UDF。

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencies {
    implementation 'io.confluent.ksql:ksqldb-udf:6.0.0'
}

內容解密:

  1. repositories區塊用於指定依賴項的來源。本例中,除了預設的Maven中央倉函式庫外,還增加了Confluent的Maven倉函式庫。
  2. dependencies區塊則用於列出專案所需的依賴項。這裡增加了ksqldb-udf,這是開發ksqlDB UDF所必需的。

步驟2:實作業務邏輯

接下來,建立一個名為RemoveStopWordsUdf.java的檔案,並在其中實作業務邏輯。

public class RemoveStopWordsUdf {
    private final List<String> stopWords = Arrays.asList("a", "and", "are", "but", "or", "over", "the");

    private ArrayList<String> stringToWords(String source) {
        return Stream.of(source.toLowerCase().split(" "))
                .collect(Collectors.toCollection(ArrayList<String>::new));
    }

    private String wordsToString(ArrayList<String> words) {
        return words.stream().collect(Collectors.joining(" "));
    }

    public String apply(final String source) {
        ArrayList<String> words = stringToWords(source);
        words.removeAll(stopWords);
        return wordsToString(words);
    }
}

內容解密:

  1. stringToWords方法將輸入的字串轉換為單詞列表。
  2. wordsToString方法將單詞列表轉換回字串。
  3. apply方法是UDF的核心,負責從輸入字串中移除停用詞。

步驟3:新增註解

為了讓ksqlDB能夠識別並正確載入UDF,需要新增特定的註解。

@UdfDescription(
    name = "remove_stop_words",
    description = "A UDF that removes stop words from a string of text",
    version = "0.1.0",
    author = "Mitch Seymour")
public class RemoveStopWordsUdf {

    @Udf(description = "Remove the default stop words from a string of text")
    public String apply(
        @UdfParameter(value = "source", description = "the raw source string")
        final String source
    ) { 
        // 方法實作
    }
}

內容解密:

  1. @UdfDescription註解提供了關於UDF的後設資料,包括名稱、描述、版本和作者。
  2. @Udf註解標識了可以被ksqlDB呼叫的方法。
  3. @UdfParameter註解用於描述方法的引數。

步驟4:構建和佈署UDF

使用Gradle構建UDF的uber JAR檔案,並將其複製到ksqlDB的擴充套件目錄中。然後,重啟ksqlDB服務。

./gradlew build --info
cp build/libs/udf.jar /etc/ksqldb/extensions
ksql-server-stop
ksql-server-start

內容解密:

  1. ./gradlew build --info命令用於構建專案並生成uber JAR。
  2. 將生成的JAR檔案複製到ksqlDB指定的擴充套件目錄。
  3. 重啟ksqlDB服務以載入新的UDF。

步驟5:驗證UDF

在ksqlDB客戶端中使用SHOW FUNCTIONSDESCRIBE FUNCTION命令來驗證UDF是否正確載入。

ksql> SHOW FUNCTIONS ;
ksql> DESCRIBE FUNCTION REMOVE_STOP_WORDS ;

內容解密:

  1. SHOW FUNCTIONS命令列出了所有可用的函式,包括新新增的UDF。
  2. DESCRIBE FUNCTION REMOVE_STOP_WORDS命令提供了關於REMOVE_STOP_WORDS UDF的詳細資訊。

透過以上步驟,您可以成功地在ksqlDB中建立和佈署自定義函式,從而擴充套件ksqlDB的功能以滿足特定的業務需求。

ksqlDB 的進階串流處理與自訂函式

ksqlDB 不僅提供簡單的介面,還支援許多中級到進階的串流處理應用案例,包括在不同集合中連線資料、聚合資料、建立可使用鍵值查詢查詢的物化檢視等。此外,ksqlDB 還提供了一個豐富的內建函式庫,用於處理各種資料處理和豐富的應用案例。

使用自訂函式擴充套件 ksqlDB

ksqlDB 的一大亮點是能夠讓開發者使用自己的自訂 Java 函式擴充套件內建函式庫。這對於需要自訂業務邏輯的應用程式來說非常重要,因為它允許開發者繼續使用簡單的 SQL 方言構建串流處理應用程式。

建立自訂函式的步驟

  1. 開發 Java 函式:首先,需要開發一個 Java 函式,該函式將被用於 ksqlDB 中。

  2. 封裝 Java 函式:將開發好的 Java 函式封裝成 JAR 檔案。

  3. 註冊 JAR 檔案:將 JAR 檔案註冊到 ksqlDB 中,使其可被 ksqlDB 使用。

  4. 使用自訂函式:註冊完成後,即可在 ksqlDB 中像使用內建函式一樣使用自訂函式。

示例:移除停用詞的自訂函式

// 假設這是一個移除停用詞的 Java 函式
public class RemoveStopWords {
    public String removeStopWords(String text) {
        // 實作移除停用詞的邏輯
        // 例如:將 "The quick brown fox jumps over the lazy dog" 
        // 處理為 "quick brown fox jumps lazy dog"
        return processedText;
    }
}

在 ksqlDB 中使用自訂函式

-- 建立一個名為 model_inputs 的串流
CREATE STREAM model_inputs (
    text STRING
)
WITH (
    KAFKA_TOPIC='model_inputs',
    VALUE_FORMAT='JSON',
    PARTITIONS=4
);

-- 向 model_inputs 插入測試資料
INSERT INTO model_inputs VALUES ('The quick brown fox jumps over the lazy dog');

-- 使用自訂函式 remove_stop_words
SELECT 
    text AS original,
    remove_stop_words(text) AS no_stop_words
FROM model_inputs
EMIT CHANGES;

輸出結果將顯示原始文字和移除停用詞後的文字。

測試、監控和佈署

在開發完成後,需要對 ksqlDB 查詢或 Kafka Streams 應用程式進行測試、監控和佈署,以確保其在生產環境中的穩定性和效能。

測試 ksqlDB 查詢

ksqlDB 提供了一個名為 ksql-test-runner 的工具,用於測試 ksqlDB 查詢。該工具需要三個引數:

  • 包含要測試的 SQL 陳述式的檔案
  • 指定來源主題輸入資料的檔案
  • 指定預期輸出結果的檔案
-- statements.sql 中的查詢
CREATE STREAM users (
    ROWKEY INT KEY,
    USERNAME VARCHAR
) WITH (kafka_topic='users', value_format='JSON');

CREATE STREAM greetings
WITH (KAFKA_TOPIC = 'greetings') AS
SELECT ROWKEY, 'Hello, ' + USERNAME AS "greeting"
FROM users
EMIT CHANGES;
// input.json 中的輸入資料
[
    {
        "rowkey": 1,
        "username": "John"
    },
    {
        "rowkey": 2,
        "username": "Jane"
    }
]

使用 ksql-test-runner 執行測試,驗證查詢結果是否符合預期。

測試 ksqlDB 與 Kafka Streams 應用

在開發資料流處理應用時,測試是確保程式碼品質和穩定性的關鍵步驟。本文將介紹如何測試 ksqlDB 和 Kafka Streams 應用。

測試 ksqlDB 查詢

ksqlDB 提供了一個測試工具 ksql-test-runner,可以用於測試查詢的正確性。以下是測試 ksqlDB 查詢的步驟:

  1. 建立輸入資料檔案 input.json,包含測試資料。
  2. 建立查詢陳述式檔案 statements.sql,包含要測試的查詢陳述式。
  3. 建立輸出資料檔案 output.json,包含預期的輸出結果。
  4. 使用 ksql-test-runner 工具執行測試。

input.json

{
  "inputs": [
    {
      "topic": "users",
      "timestamp": 0,
      "value": {"USERNAME": "Isabelle"},
      "key": 0
    },
    {
      "topic": "users",
      "timestamp": 0,
      "value": {"USERNAME": "Elyse"},
      "key": 0
    }
  ]
}

statements.sql

CREATE STREAM users (ROWKEY INT KEY, USERNAME VARCHAR)
  WITH (KAFKA_TOPIC = 'users', VALUE_FORMAT = 'json');

CREATE STREAM greetings
  WITH (KAFKA_TOPIC = 'greetings') AS
  SELECT ROWKEY, 'Hello, ' + USERNAME AS greeting
  FROM users
  EMIT CHANGES;

output.json

{
  "outputs": [
    {
      "topic": "greetings",
      "timestamp": 0,
      "value": {"greeting": "Hello, Isabelle"},
      "key": 0
    },
    {
      "topic": "greetings",
      "timestamp": 0,
      "value": {"greeting": "Hello, Elyse"},
      "key": 0
    }
  ]
}

執行測試的命令如下:

docker run \
  -v "$(pwd)":/ksqldb/ \
  -w /ksqldb \
  -ti confluentinc/ksqldb-server:0.14.0 \
  ksql-test-runner -s statements.sql -i input.json -o output.json

#### 內容解密:

此命令會啟動一個 Docker 容器,掛載當前目錄到 /ksqldb/,並執行 ksql-test-runner 工具。該工具會讀取 statements.sqlinput.jsonoutput.json 檔案,並執行測試。如果測試透過,會輸出 >>> Test passed!

測試 Kafka Streams 應用

測試 Kafka Streams 應用可以使用 JUnit 和 AssertJ 等測試框架。以下是測試 Kafka Streams 應用的步驟:

  1. 新增測試依賴項到 build.gradle 檔案中。
  2. 建立測試類別,使用 JUnit 和 AssertJ 等測試框架。
  3. 編寫測試方法,測試 Kafka Streams 應用的正確性。

build.gradle

dependencies {
  testImplementation "org.apache.kafka:kafka-streams-test-utils:${kafkaVersion}"
  testImplementation 'org.assertj:assertj-core:3.15.0'
  testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
}

test {
  useJUnitPlatform()
}

#### 內容解密:

此組態會新增 Kafka Streams 測試工具、AssertJ 和 JUnit 等依賴項到專案中。並使用 JUnit 平台執行測試。

測試 Kafka Streams 應用程式

當將邏輯移到專用方法後,測試變得更加容易。事實上,我們甚至不需要 kafka-streams-test-utils 套件來測試這段程式碼,因為我們可以直接使用測試框架來單元測試這個方法,如範例 12-1 所示。

簡單的單元測試

範例 12-1. 一個簡單的單元測試,用於測試 selectKey 拓撲步驟,該步驟依賴於一個名為 MyTopology.decodeKey 的專用方法

class MyTopologyTest {
    @Test
    public void testDecodeId() {
        String key = "1XRZTUW3";
        byte[] value = new byte[] {};
        String actualValue = MyTopology.decodeKey(key, value);
        String expectedValue = "decoded-1XRZTUW3";
        assertThat(actualValue).isEqualTo(expectedValue);
    }
}

內容解密:

  1. 我們的測試將呼叫與 selectKey 處理器中使用的相同方法。
  2. 在這裡,我們傳遞一個硬編碼的鍵和值。如果需要,我們也可以使用 JUnit 的引數化測試功能,使用不同的鍵值對來執行多次迭代。
  3. 我們定義了對方法輸出的期望。
  4. 我們使用 AssertJ 來斷言 MyTopology.decodeKey 方法傳回的實際值與我們的期望相符。

使用 Processor API 進行測試

使用 Processor API 時,我們通常不使用 lambda 或方法參考來定義流處理器的邏輯。相反,我們使用實作 ProcessorTransformer 介面的類別。因此,我們的測試策略略有不同,因為我們需要模擬 Kafka Streams 在初始化時傳遞給這些底層流處理器的 ProcessorContext

狀態轉換器測試

讓我們實作一個狀態轉換器,追蹤每個唯一鍵所看到的記錄數量。我們將測試的轉換器實作如下:

public class CountTransformer 
    implements ValueTransformerWithKey<String, String, Long> {
    private KeyValueStore<String, Long> store;

    @Override
    public void init(ProcessorContext context) {
        this.store = (KeyValueStore<String, Long>) context.getStateStore("my-store");
    }

    @Override
    public Long transform(String key, String value) {
        // 處理 tombstones
        if (value == null) {
            store.delete(key);
            return null;
        }
        // 取得此鍵的前一個計數,如果是第一次看到此鍵,則設為 0
        Long previousCount = store.get(key);
        if (previousCount == null) {
            previousCount = 0L;
        }
        // 計算新的計數
        Long newCount = previousCount + 1;
        store.put(key, newCount);
        return newCount;
    }

    @Override
    public void close() {}
}

內容解密:

  1. ProcessorTransformer 實作有一個初始化函式,接受 ProcessorContext
  2. 此轉換器是有狀態的,因為它需要記住每個唯一鍵的計數,因此我們將儲存對狀態儲存的參考。
  3. transform 方法包含此流處理器的邏輯,註解中討論了實作細節。

使用 MockProcessorContext 進行測試

我們的轉換器就緒後,就可以開始編寫單元測試。與前面的簡單方法級測試不同,這個測試需要使用 kafka-streams-test-utils 套件中的一些輔助工具。該套件允許我們建立 MockProcessorContext 物件,使我們能夠存取狀態和排程週期函式,而無需實際執行拓撲。

public class CountTransformerTest {
    MockProcessorContext processorContext;

    @BeforeEach
    public void setup() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        processorContext = new MockProcessorContext(props);
        KeyValueStore<String, Long> store = 
            Stores.keyValueStoreBuilder(
                Stores.inMemoryKeyValueStore("my-store"),
                Serdes.String(), Serdes.Long())
            .withLoggingDisabled()
            .build();
        store.init(processorContext, store);
        processorContext.register(store, null);
    }
}

內容解密:

  1. 一種常見的方法是在設定函式中建立 MockProcessorContext,該函式在單元測試執行之前執行。
  2. 如果處理器是有狀態的,則還需要初始化和註冊狀態儲存。
  3. 雖然拓撲可能使用持久狀態儲存,但建議在單元測試中使用記憶體儲存。