Kafka Streams 提供 Processor API 實作更精細的流處理控制。本文介紹如何使用 Processor API 排程定期函式,例如定期清理狀態儲存中過期資料;如何存取記錄的後設資料,例如時間戳、標頭等,以便進行更全面的資料分析和處理;以及如何新增接收器處理器將處理結果輸出到指定的 Kafka 主題。此外,文章也示範如何結合 Processor API 和高階 DSL 建構更複雜的應用,例如數位分身服務,並提供 RESTful API 進行查詢。透過整合 Processor API 和 DSL,開發者可以兼顧底層控制和程式碼簡潔性,更有效率地開發 Kafka Streams 應用。
使用 Processor API 實作定期函式排程與後設資料存取
在 Kafka Streams 中,Processor API 為開發者提供了更底層的控制能力,以實作複雜的流處理邏輯。本文將介紹如何使用 Processor API 來排程定期函式、存取記錄後設資料以及新增接收器處理器。
排程定期函式
在某些場景下,我們需要週期性地執行某些任務,例如檢查狀態儲存中的記錄是否過期。為此,我們可以使用 context.schedule() 方法來排程一個週期函式。
// 排程一個每五分鐘執行的週期函式
Cancellable punctuator = context.schedule(
Duration.ofMinutes(5),
PunctuationType.WALL_CLOCK_TIME,
(timestamp) -> {
// 在這裡執行週期性任務
try (KeyValueIterator<String, DigitalTwin> iterator = stateStore.all()) {
while (iterator.hasNext()) {
KeyValue<String, DigitalTwin> record = iterator.next();
DigitalTwin digitalTwin = record.value;
long daysSinceLastUpdate = ChronoUnit.DAYS.between(
Instant.ofEpochMilli(digitalTwin.getLastUpdateTimestamp()),
Instant.now()
);
if (daysSinceLastUpdate >= 7) {
stateStore.delete(record.key);
}
}
}
}
);
內容解密:
context.schedule()方法:用於排程一個週期函式,引數包括執行間隔、觸發型別(WALL_CLOCK_TIME表示根據系統時間)和回撥函式。- 週期函式邏輯:遍歷狀態儲存中的所有記錄,檢查每個數字孿生記錄的最後更新時間。如果某記錄超過七天未更新,則從狀態儲存中刪除。
try-with-resources陳述式:確保迭代器在使用後正確關閉,防止資源洩漏。
存取記錄後設資料
在處理 Kafka 記錄時,除了鍵和值之外,還有許多額外的後設資料可以被存取,例如記錄頭、偏移量、分割區、時間戳和主題。這些後設資料可以透過 ProcessorContext 物件存取。
// 存取當前記錄的後設資料
Headers headers = context.headers();
long offset = context.offset();
int partition = context.partition();
long timestamp = context.timestamp();
String topic = context.topic();
// 新增、刪除或檢索記錄頭
headers.add("hello", "world".getBytes(StandardCharsets.UTF_8));
headers.remove("goodbye");
Header[] headerArray = headers.toArray();
內容解密:
ProcessorContext物件:提供了存取當前記錄後設資料的方法,如headers()、offset()、partition()、timestamp()和topic()。- 記錄頭操作:可以新增、刪除或檢索記錄頭,用於注入額外的後設資料或進行分散式追蹤。
- 後設資料應用場景:用於除錯、日誌記錄或將額外上下文資訊寫入下游系統。
新增接收器處理器
接收器處理器用於將處理後的記錄寫入 Kafka 主題。使用 addSink() 方法可以輕鬆實作這一步驟。
// 新增一個接收器處理器,將數字孿生記錄寫入輸出主題
builder.addSink(
"Digital Twin Sink", // 接收器節點名稱
"digital-twins", // 輸出主題名稱
Serdes.String().serializer(), // 鍵序列化器
JsonSerdes.DigitalTwin().serializer(), // 值序列化器
"Digital Twin Processor" // 上游處理器名稱
);
內容解密:
addSink()方法:用於新增一個接收器處理器,引數包括接收器名稱、輸出主題、鍵和值的序列化器,以及上游處理器名稱。- 自定義選項:可以指定自定義的分割槽器或使用預設的序列化器,以滿足不同的應用需求。
互動式查詢
最後,Kafka Streams 的互動式查詢功能允許我們將數字孿生記錄暴露給外部服務。這部分內容在前面的章節中已經詳細介紹過,這裡不再贅述。
使用Kafka Streams Processor API建立數位分身服務
在前面的章節中,我們探討瞭如何使用Kafka Streams的DSL來建立資料處理流程。不過,在某些情況下,我們需要更底層的控制能力,這時就可以使用Processor API。本章節將介紹如何使用Processor API來建立一個數位分身服務。
建立Processor拓撲
首先,我們需要建立一個Processor拓撲。拓撲是由多個Processor節點組成的有向無環圖,每個節點負責處理特定的資料。下面是一個簡單的例子:
Topology builder = new Topology();
builder.addSource(
"Desired State Events",
Serdes.String().deserializer(),
JsonSerdes.TurbineState().deserializer(),
"desired-state-events");
builder.addSource(
"Reported State Events",
Serdes.String().deserializer(),
JsonSerdes.TurbineState().deserializer(),
"reported-state-events");
builder.addProcessor(
"High Winds Flatmap Processor",
HighWindsFlatmapProcessor::new,
"Reported State Events");
builder.addProcessor(
"Digital Twin Processor",
DigitalTwinProcessor::new,
"High Winds Flatmap Processor",
"Desired State Events");
StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("digital-twin-store"),
Serdes.String(),
JsonSerdes.DigitalTwin());
builder.addStateStore(storeBuilder, "Digital Twin Processor");
builder.addSink(
"Digital Twin Sink",
"digital-twins",
Serdes.String().serializer(),
JsonSerdes.DigitalTwin().serializer(),
"Digital Twin Processor");
內容解密:
- Topology建立:使用
Topology類別建立一個新的拓撲。 - Source Processor:新增兩個Source Processor,分別用於讀取
desired-state-events和reported-state-events主題的資料。 - Processor節點:新增兩個Processor節點,分別是
High Winds Flatmap Processor和Digital Twin Processor。前者負責偵測高風速並產生關機訊號,後者負責建立數位分身記錄。 - State Store:建立一個持久化的Key-Value Store,用於儲存數位分身記錄。
- Sink Processor:新增一個Sink Processor,將數位分身記錄寫入
digital-twins主題。
實作REST服務
接下來,我們需要實作一個REST服務,用於查詢數位分身記錄。下面是一個簡單的例子:
class RestService {
private final HostInfo hostInfo;
private final KafkaStreams streams;
RestService(HostInfo hostInfo, KafkaStreams streams) {
this.hostInfo = hostInfo;
this.streams = streams;
}
ReadOnlyKeyValueStore<String, DigitalTwin> getStore() {
return streams.store(
StoreQueryParameters.fromNameAndType(
"digital-twin-store", QueryableStoreTypes.keyValueStore()));
}
void start() {
Javalin app = Javalin.create().start(hostInfo.port());
app.get("/devices/:id", this::getDevice);
}
void getDevice(Context ctx) {
String deviceId = ctx.pathParam("id");
DigitalTwin latestState = getStore().get(deviceId);
ctx.json(latestState);
}
}
內容解密:
- REST服務實作:使用Javalin框架建立一個REST服務。
- 查詢數位分身記錄:實作了一個GET請求,用於查詢指定裝置ID的數位分身記錄。
- State Store查詢:使用
KafkaStreams的store方法查詢State Store中的數位分身記錄。
測試與驗證
最後,我們需要測試和驗證我們的應用程式。下面是一個簡單的測試例子:
$ curl localhost:7000/devices/1 | jq '.'
{
"desired": {
"timestamp": "2020-11-23T09:02:01+08:00",
"windSpeedMph": 68,
"power": "OFF",
"type": "DESIRED"
},
"reported": {
"timestamp": "2020-11-23T09:02:01+08:00",
"windSpeedMph": 68,
"power": "ON",
"type": "REPORTED"
}
}
內容解密:
- 測試資料:產生測試資料到
reported-state-events主題中。 - 查詢結果:使用curl命令查詢REST服務,驗證數位分身記錄是否正確。
結合處理器 API 與 DSL
我們已經驗證了應用程式的運作。然而,如果仔細觀察我們的程式碼,會發現只有其中一個拓撲步驟需要處理器 API 提供的低階存取。也就是數位孿生處理器(Digital Twin Processor)步驟(參見圖 7-1 中的步驟 3),它利用了處理器 API 的重要功能:排程週期性函式的能力。
由於 Kafka Streams 允許我們結合處理器 API 和 DSL,因此可以輕易地重構應用程式,只在數位孿生處理器步驟中使用處理器 API,而其他部分則使用 DSL。這種重構的最大好處是其他串流處理步驟可以被簡化。在本教程中,高風速扁平對映處理器(High Winds Flatmap Processor)提供了最大的簡化機會,但在更大的應用程式中,這種重構可以大幅降低複雜度。
處理器與轉換器
DSL 包含了一組特殊的運算元,允許我們在需要低階存取狀態儲存、記錄元資料和處理器上下文時使用處理器 API。這些特殊的運算元分為兩類別:處理器和轉換器。以下概述了這兩組之間的區別:
- 處理器是一種終端操作(意味著它傳回 void 且下游運算元無法連結),且必須使用處理器介面實作計算邏輯。處理器應該在需要從 DSL 使用處理器 API 但不需要連結任何下游運算元時使用。目前只有一種這種型別的運算元,如下表所示:
| DSL 運算元 | 介面實作 | 描述 |
|---|---|---|
| process | Processor | 對每個記錄套用 Processor |
- 轉換器是一組更多樣化的運算元,可以傳回一個或多個記錄(取決於使用的變體),因此如果需要連結下游運算元,則更為最佳。轉換運算元的變體如表 7-4 所示。
表 7-4 Kafka Streams 中可用的各種轉換運算元
| DSL 運算元 | 介面實作 | 描述 | 輸入/輸出比率 |
|---|---|---|---|
| transform | Transformer | 對每個記錄套用 Transformer,產生一個或多個輸出記錄。單一記錄可以從 Transformer#transform 方法傳回,多個值可以使用 ProcessorContext#forward 發出。 | 1:N |
| transformValues | ValueTransformer | 與 transform 類別似,但無法存取記錄鍵,且無法使用 ProcessorContext#forward 發出多個記錄。如果嘗試發出多個記錄,將會得到 StreamsException。由於狀態儲存操作是根據鍵的,因此如果需要對狀態儲存執行查詢,則此運算元並不理想。此外,輸出記錄將具有與輸入記錄相同的鍵,且下游自動重新分割槽不會被觸發,因為鍵無法被修改(這是有優勢的,因為它可以幫助避免網路傳輸)。 | 1:1 |
| transformValues | ValueTransformerWithKey | 與 transform 類別似,但記錄鍵是唯讀的,不應被修改。同時,您無法使用 ProcessorContext#forward 發出多個記錄(如果嘗試發出多個記錄,將會得到 StreamsException)。 | 1:1 |
| flatTransform | Transformer(具有可迭代的傳回值) | 與 transform 類別似,但不是依賴 ProcessorContext#forward 傳回多個記錄,您可以直接傳回一個值的集合。因此,建議使用 flatTransform 而不是 transform,如果您需要發出多個記錄,因為此方法是型別安全的,而後者不是(因為它依賴於 ProcessorContext#forward)。 | 1:N |
| flatTransformValues | ValueTransformer(具有可迭代的傳回值) | 對每個記錄套用 Transformer,直接從 ValueTransformer#transform 方法傳回一個或多個輸出記錄。 | 1:N |
| flatTransformValues | ValueTransformerWithKey(具有可迭代的傳回值) | flatTransformValues 的有狀態變體,其中一個唯讀鍵被傳遞給 transform 方法,可用於狀態查詢。一個或多個輸出記錄直接從 ValueTransformerWithKey#transform 方法傳回。 | 1:N |
程式碼範例:使用 DSL 重構拓撲
StreamsBuilder builder = new StreamsBuilder();
KStream<String, TurbineState> desiredStateEvents =
builder.stream("desired-state-events",
Consumed.with(Serdes.String(), JsonSerdes.TurbineState()));
KStream<String, TurbineState> highWinds =
builder.stream("reported-state-events",
Consumed.with(Serdes.String(), JsonSerdes.TurbineState()))
.flatMapValues((key, reported) -> ...)
.merge(desiredStateEvents);
內容解密:
- 使用 DSL 重構拓撲的第一步:使用
StreamsBuilder建立KStream物件,分別代表所需的狀態事件和報告的狀態事件。 flatMapValues操作:對報告的狀態事件進行扁平對映操作,以擴充套件或轉換事件內容。merge操作:將所需的狀態事件與經過扁平對映的報告狀態事件合併,創造出新的事件流。- 型別安全與輸入/輸出比率:根據不同的轉換需求選擇合適的轉換器(如
transform、transformValues、flatTransform等),確保正確的輸入/輸出比率和型別安全。
無論選擇哪種變體,如果運算元是有狀態的,則需要在新增運算元之前將狀態儲存連線到拓撲建構器。透過結合處理器 API 和 DSL,我們可以充分利用兩者的優勢,提高開發效率和程式碼的可讀性。