返回文章列表

Kafka Streams Processor API 定期函式與後設資料存取

本文探討 Kafka Streams 中 Processor API 的使用,包含如何排程定期函式、存取記錄後設資料以及新增接收器處理器,並示範如何結合 Processor API 與 DSL 建立數位分身服務,包含建立 Processor 拓撲、實作 REST 服務以及測試驗證,同時說明如何利用 Processor

串流處理 Kafka

Kafka Streams 提供 Processor API 實作更精細的流處理控制。本文介紹如何使用 Processor API 排程定期函式,例如定期清理狀態儲存中過期資料;如何存取記錄的後設資料,例如時間戳、標頭等,以便進行更全面的資料分析和處理;以及如何新增接收器處理器將處理結果輸出到指定的 Kafka 主題。此外,文章也示範如何結合 Processor API 和高階 DSL 建構更複雜的應用,例如數位分身服務,並提供 RESTful API 進行查詢。透過整合 Processor API 和 DSL,開發者可以兼顧底層控制和程式碼簡潔性,更有效率地開發 Kafka Streams 應用。

使用 Processor API 實作定期函式排程與後設資料存取

在 Kafka Streams 中,Processor API 為開發者提供了更底層的控制能力,以實作複雜的流處理邏輯。本文將介紹如何使用 Processor API 來排程定期函式、存取記錄後設資料以及新增接收器處理器。

排程定期函式

在某些場景下,我們需要週期性地執行某些任務,例如檢查狀態儲存中的記錄是否過期。為此,我們可以使用 context.schedule() 方法來排程一個週期函式。

// 排程一個每五分鐘執行的週期函式
Cancellable punctuator = context.schedule(
    Duration.ofMinutes(5),
    PunctuationType.WALL_CLOCK_TIME,
    (timestamp) -> {
        // 在這裡執行週期性任務
        try (KeyValueIterator<String, DigitalTwin> iterator = stateStore.all()) {
            while (iterator.hasNext()) {
                KeyValue<String, DigitalTwin> record = iterator.next();
                DigitalTwin digitalTwin = record.value;
                long daysSinceLastUpdate = ChronoUnit.DAYS.between(
                    Instant.ofEpochMilli(digitalTwin.getLastUpdateTimestamp()),
                    Instant.now()
                );
                if (daysSinceLastUpdate >= 7) {
                    stateStore.delete(record.key);
                }
            }
        }
    }
);

內容解密:

  1. context.schedule() 方法:用於排程一個週期函式,引數包括執行間隔、觸發型別(WALL_CLOCK_TIME 表示根據系統時間)和回撥函式。
  2. 週期函式邏輯:遍歷狀態儲存中的所有記錄,檢查每個數字孿生記錄的最後更新時間。如果某記錄超過七天未更新,則從狀態儲存中刪除。
  3. try-with-resources 陳述式:確保迭代器在使用後正確關閉,防止資源洩漏。

存取記錄後設資料

在處理 Kafka 記錄時,除了鍵和值之外,還有許多額外的後設資料可以被存取,例如記錄頭、偏移量、分割區、時間戳和主題。這些後設資料可以透過 ProcessorContext 物件存取。

// 存取當前記錄的後設資料
Headers headers = context.headers();
long offset = context.offset();
int partition = context.partition();
long timestamp = context.timestamp();
String topic = context.topic();

// 新增、刪除或檢索記錄頭
headers.add("hello", "world".getBytes(StandardCharsets.UTF_8));
headers.remove("goodbye");
Header[] headerArray = headers.toArray();

內容解密:

  1. ProcessorContext 物件:提供了存取當前記錄後設資料的方法,如 headers()offset()partition()timestamp()topic()
  2. 記錄頭操作:可以新增、刪除或檢索記錄頭,用於注入額外的後設資料或進行分散式追蹤。
  3. 後設資料應用場景:用於除錯、日誌記錄或將額外上下文資訊寫入下游系統。

新增接收器處理器

接收器處理器用於將處理後的記錄寫入 Kafka 主題。使用 addSink() 方法可以輕鬆實作這一步驟。

// 新增一個接收器處理器,將數字孿生記錄寫入輸出主題
builder.addSink(
    "Digital Twin Sink", // 接收器節點名稱
    "digital-twins", // 輸出主題名稱
    Serdes.String().serializer(), // 鍵序列化器
    JsonSerdes.DigitalTwin().serializer(), // 值序列化器
    "Digital Twin Processor" // 上游處理器名稱
);

內容解密:

  1. addSink() 方法:用於新增一個接收器處理器,引數包括接收器名稱、輸出主題、鍵和值的序列化器,以及上游處理器名稱。
  2. 自定義選項:可以指定自定義的分割槽器或使用預設的序列化器,以滿足不同的應用需求。

互動式查詢

最後,Kafka Streams 的互動式查詢功能允許我們將數字孿生記錄暴露給外部服務。這部分內容在前面的章節中已經詳細介紹過,這裡不再贅述。

使用Kafka Streams Processor API建立數位分身服務

在前面的章節中,我們探討瞭如何使用Kafka Streams的DSL來建立資料處理流程。不過,在某些情況下,我們需要更底層的控制能力,這時就可以使用Processor API。本章節將介紹如何使用Processor API來建立一個數位分身服務。

建立Processor拓撲

首先,我們需要建立一個Processor拓撲。拓撲是由多個Processor節點組成的有向無環圖,每個節點負責處理特定的資料。下面是一個簡單的例子:

Topology builder = new Topology();
builder.addSource(
    "Desired State Events",
    Serdes.String().deserializer(),
    JsonSerdes.TurbineState().deserializer(),
    "desired-state-events");
builder.addSource(
    "Reported State Events",
    Serdes.String().deserializer(),
    JsonSerdes.TurbineState().deserializer(),
    "reported-state-events");
builder.addProcessor(
    "High Winds Flatmap Processor",
    HighWindsFlatmapProcessor::new,
    "Reported State Events");
builder.addProcessor(
    "Digital Twin Processor",
    DigitalTwinProcessor::new,
    "High Winds Flatmap Processor",
    "Desired State Events");
StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("digital-twin-store"),
        Serdes.String(),
        JsonSerdes.DigitalTwin());
builder.addStateStore(storeBuilder, "Digital Twin Processor");
builder.addSink(
    "Digital Twin Sink",
    "digital-twins",
    Serdes.String().serializer(),
    JsonSerdes.DigitalTwin().serializer(),
    "Digital Twin Processor");

內容解密:

  1. Topology建立:使用Topology類別建立一個新的拓撲。
  2. Source Processor:新增兩個Source Processor,分別用於讀取desired-state-eventsreported-state-events主題的資料。
  3. Processor節點:新增兩個Processor節點,分別是High Winds Flatmap ProcessorDigital Twin Processor。前者負責偵測高風速並產生關機訊號,後者負責建立數位分身記錄。
  4. State Store:建立一個持久化的Key-Value Store,用於儲存數位分身記錄。
  5. Sink Processor:新增一個Sink Processor,將數位分身記錄寫入digital-twins主題。

實作REST服務

接下來,我們需要實作一個REST服務,用於查詢數位分身記錄。下面是一個簡單的例子:

class RestService {
    private final HostInfo hostInfo;
    private final KafkaStreams streams;

    RestService(HostInfo hostInfo, KafkaStreams streams) {
        this.hostInfo = hostInfo;
        this.streams = streams;
    }

    ReadOnlyKeyValueStore<String, DigitalTwin> getStore() {
        return streams.store(
            StoreQueryParameters.fromNameAndType(
                "digital-twin-store", QueryableStoreTypes.keyValueStore()));
    }

    void start() {
        Javalin app = Javalin.create().start(hostInfo.port());
        app.get("/devices/:id", this::getDevice);
    }

    void getDevice(Context ctx) {
        String deviceId = ctx.pathParam("id");
        DigitalTwin latestState = getStore().get(deviceId);
        ctx.json(latestState);
    }
}

內容解密:

  1. REST服務實作:使用Javalin框架建立一個REST服務。
  2. 查詢數位分身記錄:實作了一個GET請求,用於查詢指定裝置ID的數位分身記錄。
  3. State Store查詢:使用KafkaStreamsstore方法查詢State Store中的數位分身記錄。

測試與驗證

最後,我們需要測試和驗證我們的應用程式。下面是一個簡單的測試例子:

$ curl localhost:7000/devices/1 | jq '.'
{
  "desired": {
    "timestamp": "2020-11-23T09:02:01+08:00",
    "windSpeedMph": 68,
    "power": "OFF",
    "type": "DESIRED"
  },
  "reported": {
    "timestamp": "2020-11-23T09:02:01+08:00",
    "windSpeedMph": 68,
    "power": "ON",
    "type": "REPORTED"
  }
}

內容解密:

  1. 測試資料:產生測試資料到reported-state-events主題中。
  2. 查詢結果:使用curl命令查詢REST服務,驗證數位分身記錄是否正確。

結合處理器 API 與 DSL

我們已經驗證了應用程式的運作。然而,如果仔細觀察我們的程式碼,會發現只有其中一個拓撲步驟需要處理器 API 提供的低階存取。也就是數位孿生處理器(Digital Twin Processor)步驟(參見圖 7-1 中的步驟 3),它利用了處理器 API 的重要功能:排程週期性函式的能力。

由於 Kafka Streams 允許我們結合處理器 API 和 DSL,因此可以輕易地重構應用程式,只在數位孿生處理器步驟中使用處理器 API,而其他部分則使用 DSL。這種重構的最大好處是其他串流處理步驟可以被簡化。在本教程中,高風速扁平對映處理器(High Winds Flatmap Processor)提供了最大的簡化機會,但在更大的應用程式中,這種重構可以大幅降低複雜度。

處理器與轉換器

DSL 包含了一組特殊的運算元,允許我們在需要低階存取狀態儲存、記錄元資料和處理器上下文時使用處理器 API。這些特殊的運算元分為兩類別:處理器和轉換器。以下概述了這兩組之間的區別:

  • 處理器是一種終端操作(意味著它傳回 void 且下游運算元無法連結),且必須使用處理器介面實作計算邏輯。處理器應該在需要從 DSL 使用處理器 API 但不需要連結任何下游運算元時使用。目前只有一種這種型別的運算元,如下表所示:
DSL 運算元介面實作描述
processProcessor對每個記錄套用 Processor
  • 轉換器是一組更多樣化的運算元,可以傳回一個或多個記錄(取決於使用的變體),因此如果需要連結下游運算元,則更為最佳。轉換運算元的變體如表 7-4 所示。

表 7-4 Kafka Streams 中可用的各種轉換運算元

DSL 運算元介面實作描述輸入/輸出比率
transformTransformer對每個記錄套用 Transformer,產生一個或多個輸出記錄。單一記錄可以從 Transformer#transform 方法傳回,多個值可以使用 ProcessorContext#forward 發出。1:N
transformValuesValueTransformer與 transform 類別似,但無法存取記錄鍵,且無法使用 ProcessorContext#forward 發出多個記錄。如果嘗試發出多個記錄,將會得到 StreamsException。由於狀態儲存操作是根據鍵的,因此如果需要對狀態儲存執行查詢,則此運算元並不理想。此外,輸出記錄將具有與輸入記錄相同的鍵,且下游自動重新分割槽不會被觸發,因為鍵無法被修改(這是有優勢的,因為它可以幫助避免網路傳輸)。1:1
transformValuesValueTransformerWithKey與 transform 類別似,但記錄鍵是唯讀的,不應被修改。同時,您無法使用 ProcessorContext#forward 發出多個記錄(如果嘗試發出多個記錄,將會得到 StreamsException)。1:1
flatTransformTransformer(具有可迭代的傳回值)與 transform 類別似,但不是依賴 ProcessorContext#forward 傳回多個記錄,您可以直接傳回一個值的集合。因此,建議使用 flatTransform 而不是 transform,如果您需要發出多個記錄,因為此方法是型別安全的,而後者不是(因為它依賴於 ProcessorContext#forward)。1:N
flatTransformValuesValueTransformer(具有可迭代的傳回值)對每個記錄套用 Transformer,直接從 ValueTransformer#transform 方法傳回一個或多個輸出記錄。1:N
flatTransformValuesValueTransformerWithKey(具有可迭代的傳回值)flatTransformValues 的有狀態變體,其中一個唯讀鍵被傳遞給 transform 方法,可用於狀態查詢。一個或多個輸出記錄直接從 ValueTransformerWithKey#transform 方法傳回。1:N

程式碼範例:使用 DSL 重構拓撲

StreamsBuilder builder = new StreamsBuilder();
KStream<String, TurbineState> desiredStateEvents = 
    builder.stream("desired-state-events", 
        Consumed.with(Serdes.String(), JsonSerdes.TurbineState()));
KStream<String, TurbineState> highWinds = 
    builder.stream("reported-state-events", 
        Consumed.with(Serdes.String(), JsonSerdes.TurbineState()))
    .flatMapValues((key, reported) -> ...)
    .merge(desiredStateEvents);

內容解密:

  1. 使用 DSL 重構拓撲的第一步:使用 StreamsBuilder 建立 KStream 物件,分別代表所需的狀態事件和報告的狀態事件。
  2. flatMapValues 操作:對報告的狀態事件進行扁平對映操作,以擴充套件或轉換事件內容。
  3. merge 操作:將所需的狀態事件與經過扁平對映的報告狀態事件合併,創造出新的事件流。
  4. 型別安全與輸入/輸出比率:根據不同的轉換需求選擇合適的轉換器(如 transformtransformValuesflatTransform 等),確保正確的輸入/輸出比率和型別安全。

無論選擇哪種變體,如果運算元是有狀態的,則需要在新增運算元之前將狀態儲存連線到拓撲建構器。透過結合處理器 API 和 DSL,我們可以充分利用兩者的優勢,提高開發效率和程式碼的可讀性。