Kafka Streams 提供強大的 Processor API,允許開發者以更底層的方式控制串流處理邏輯。Processor API 不僅能處理單筆記錄的轉換,還能結合狀態儲存實作更複雜的應用,例如維護數位孿生資料。藉由 ProcessorContext,開發者可以存取狀態儲存並排程週期性函式,例如定期清理過期資料。實作 TTL 邏輯時,可以利用根據牆鐘時間的排程策略,確保定時執行清理任務,而不受串流資料流量影響。理解 Processor API 的核心概念,包含 Processor 介面、ProcessorContext 和狀態儲存的互動,對於構建高效且可靠的串流應用至關重要。
無狀態處理器(Stateless Processors)的建立
在使用Processor API的addProcessor方法時,我們需要實作Processor介面,該介面包含了處理和轉換串流中記錄的邏輯。該介面有三個方法,如下所示:
public interface Processor<K, V> {
void init(ProcessorContext context);
void process(K key, V value);
void close();
}
注意到Processor介面指定了兩個泛型:一個用於鍵型別(K),另一個用於值型別(V)。我們將在後面看到如何利用這些泛型來實作自己的處理器。
init 方法
init方法在Processor第一次例項化時被呼叫。如果處理器需要執行任何初始化任務,可以在這個方法中指定初始化邏輯。傳遞給init方法的ProcessorContext非常有用,包含了許多我們將在本章中探討的方法。
process 方法
每當處理器接收到新的記錄時,就會呼叫process方法。它包含了每個記錄的資料轉換/處理邏輯。在我們的例子中,這是我們將新增邏輯以檢測風速是否超過渦輪機的安全操作水平的地方。
close 方法
當Kafka Streams完成使用這個運算元時(例如,在關閉期間),就會呼叫close方法。這個方法通常封裝了處理器及其本地資源所需的任何清理邏輯。但是,您不應該嘗試清理任何由Kafka Streams管理的資源,如狀態儲存,因為這是由程式函式庫本身處理的。
內容解密:
init方法的作用:初始化處理器,儲存ProcessorContext以便後續使用。process方法的邏輯:處理輸入記錄,並根據條件生成新的記錄。close方法的用途:執行清理邏輯,但不應清理 Kafka Streams 管理的資源。
範例:實作一個檢測危險風速的處理器
public class HighWindsFlatmapProcessor
implements Processor<String, TurbineState, String, TurbineState> {
private ProcessorContext<String, TurbineState> context;
@Override
public void init(ProcessorContext<String, TurbineState> context) {
this.context = context;
}
@Override
public void process(Record<String, TurbineState> record) {
TurbineState reported = record.value();
context.forward(record); // 將原始記錄轉發給下游處理器
// 檢查風速是否超過安全閾值且渦輪機目前已開啟
if (reported.getWindSpeedMph() > 65 && reported.getPower() == Power.ON) {
TurbineState desired = TurbineState.clone(reported);
desired.setPower(Power.OFF);
desired.setType(Type.DESIRED);
// 建立包含所需狀態的新記錄
Record<String, TurbineState> newRecord =
new Record<>(record.key(), desired, record.timestamp());
context.forward(newRecord); // 將新記錄轉發給下游處理器
}
}
@Override
public void close() {
// 無需執行任何操作
}
}
內容解密:
- 儲存
ProcessorContext:將ProcessorContext儲存為例項屬性,以便稍後存取。 - 轉發原始記錄:使用
context.forward(record)將原始記錄轉發給下游處理器。 - 檢查風速條件:檢查風速是否超過65 mph且渦輪機目前已開啟。
- 生成新記錄:如果條件滿足,生成包含所需狀態(關閉)的新記錄,並將其轉發給下游處理器。
close方法:在這個處理器中,無需執行任何特殊的清理邏輯。
使用 forward 方法
您可以呼叫 ProcessorContext 例項上的 forward 方法,將記錄傳送給下游處理器。這個方法接受您要轉發的記錄。在我們的處理器實作中,我們總是希望轉發報告的狀態記錄,這就是為什麼我們使用未修改的記錄呼叫 context.forward。
此外,還有一種 forward 方法的變體,可以接受下游處理器的名稱列表,告訴Kafka Streams將輸出轉發給哪些子處理器。例如:
context.forward(newRecord, "some-child-node");
這種變體允許您將輸出廣播給特定的下游處理器,而不是所有下游處理器。
使用狀態儲存的流處理器建立
在第98頁的「狀態儲存」章節中,我們瞭解到Kafka Streams中的狀態化操作需要所謂的狀態儲存來儲存之前所見的資料。為了建立我們的數位孿生記錄,我們需要將所需的狀態和記錄的狀態事件合併成一條記錄。由於這些記錄在給定的風力渦輪機上會在不同的時間到達,因此我們有個狀態化的需求,需要記住每個渦輪機的最後記錄和所需狀態記錄。
到目前為止,在本文中,我們主要專注於在DSL中使用狀態儲存。此外,DSL為我們提供了幾種使用狀態儲存的不同選項。我們可以使用預設的內部狀態儲存,只需使用狀態化的運算元而不指定狀態儲存,如下所示:
grouped.aggregate(initializer, adder);
或者,我們可以使用Stores工廠類別建立一個儲存供應者,並使用Materialized類別與狀態化的運算元一起實作狀態儲存,如下所示:
KeyValueBytesStoreSupplier storeSupplier =
Stores.persistentTimestampedKeyValueStore("my-store");
grouped.aggregate(
initializer,
adder,
Materialized.<String, String>as(storeSupplier));
在處理器API中使用狀態儲存與DSL略有不同。與DSL不同,處理器API不會為您建立內部狀態儲存。因此,當您需要執行狀態化操作時,您必須始終建立並連線狀態儲存到適當的流處理器。此外,雖然我們仍然可以使用Stores工廠類別,但我們將使用這個類別中可用的不同方法來建立狀態儲存。我們將使用建立儲存建構器的方法,而不是使用傳回儲存供應者的方法。
例如,為了儲存數位孿生記錄,我們需要一個簡單的鍵值儲存。檢索鍵值儲存建構器的工廠方法被稱為keyValueStoreBuilder,下面的程式碼演示瞭如何使用此方法建立我們的數位孿生儲存:
StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("digital-twin-store"),
Serdes.String(),
JsonSerdes.DigitalTwin());
內容解密:
- 我們使用
Stores.keyValueStoreBuilder方法建立了一個鍵值儲存建構器,用於建立數位孿生記錄的狀態儲存。 Stores.persistentKeyValueStore("digital-twin-store")指定了狀態儲存的名稱和型別,這裡使用的是持久化的鍵值儲存。Serdes.String()和JsonSerdes.DigitalTwin()分別用於序列化和反序列化鍵和值。
一旦您為您的狀態化處理器建立了儲存建構器,就可以實作Processor介面。此過程與我們在第214頁「建立無狀態處理器」中新增無狀態處理器時所看到的非常相似。我們只需使用處理器API中的addProcessor方法,如下所示:
builder.addProcessor(
"Digital Twin Processor",
DigitalTwinProcessor::new,
"High Winds Flatmap Processor", "Desired State Events");
內容解密:
builder.addProcessor方法用於將新的處理器新增到拓撲結構中。"Digital Twin Processor"是這個流處理器的名稱。DigitalTwinProcessor::new是一個ProcessorSupplier,它是一個可以用來檢索Processor例項的方法。我們將很快實作這個DigitalTwinProcessor。"High Winds Flatmap Processor"和"Desired State Events"是父處理器的名稱。透過指定多個父處理器,我們實際上是在執行DSL中的合併操作。
在實作DigitalTwinProcessor之前,讓我們先將新的狀態儲存新增到拓撲結構中。我們可以使用Topology#addStateStore方法來做到這一點,其用法示例如範例7-4所示。
builder.addStateStore(
storeBuilder,
"Digital Twin Processor"
);
內容解密:
builder.addStateStore方法用於將狀態儲存新增到拓撲結構中。storeBuilder是用於取得狀態儲存的建構器。"Digital Twin Processor"是應該具有存取此儲存許可權的處理器名稱。
最後一步是實作我們的新的狀態化處理器:DigitalTwinProcessor。與無狀態流處理器一樣,我們需要實作Processor介面。但是,這次我們的實作將會稍微複雜一些,因為這個處理器需要與狀態儲存互動。範例7-5中的程式碼以及其後的註解將描述如何實作一個狀態化處理器。
public class DigitalTwinProcessor
implements Processor<String, TurbineState, String, DigitalTwin> {
private ProcessorContext<String, DigitalTwin> context;
private KeyValueStore<String, DigitalTwin> kvStore;
@Override
public void init(ProcessorContext<String, DigitalTwin> context) {
this.context = context;
this.kvStore = (KeyValueStore) context.getStateStore("digital-twin-store");
}
@Override
public void process(Record<String, TurbineState> record) {
String key = record.key();
TurbineState value = record.value();
DigitalTwin digitalTwin = kvStore.get(key);
if (digitalTwin == null) {
digitalTwin = new DigitalTwin();
}
if (value.getType() == Type.DESIRED) {
digitalTwin.setDesired(value);
} else if (value.getType() == Type.REPORTED) {
digitalTwin.setReported(value);
}
kvStore.put(key, digitalTwin);
Record<String, DigitalTwin> newRecord =
new Record<>(record.key(), digitalTwin, record.timestamp());
context.forward(newRecord);
}
@Override
public void close() {
// nothing to do
}
}
內容解密:
DigitalTwinProcessor類別實作了Processor介面,用於處理輸入記錄並產生輸出記錄。- 在
init方法中,我們從上下文中取得了狀態儲存的參照,並將其儲存在kvStore欄位中。 - 在
process方法中,我們根據輸入記錄的鍵從狀態儲存中檢索了數位孿生記錄。如果找不到,則建立一個新的數位孿生記錄。 - 根據輸入記錄的值的型別,我們更新了數位孿生記錄的所需或報告狀態。
- 然後,我們將更新後的數位孿生記錄放回狀態儲存中,並將其轉發到下游處理器。
- 最後,在
close方法中,由於沒有需要釋放的資源,因此沒有執行任何操作。
Kafka Streams 處理器 API:探討與實作
在 Kafka Streams 中,處理器 API(Processor API)提供了一種靈活的方式來處理串流資料。本文將探討處理器 API 的使用,特別是在實作週期性函式(Periodic Functions)方面的應用。
處理器介面與上下文
在 Kafka Streams 中,處理器介面(Processor Interface)定義了處理串流資料的基本操作。處理器的泛型引數分別代表輸入鍵值對的型別以及輸出鍵值對的型別。例如,在 Processor<String, TurbineState, String, DigitalTwin> 中,前兩個泛型引數 String 和 TurbineState 分別代表輸入鍵和值的型別,而後兩個泛型引數 String 和 DigitalTwin 則代表輸出鍵和值的型別。
處理器上下文(ProcessorContext)提供了對處理器執行環境的存取,包括狀態儲存(State Store)的存取。我們可以透過 ProcessorContext 的 getStateStore 方法來檢索之前附加到串流處理器的狀態儲存。
// 初始化處理器上下文和狀態儲存
@Override
public void init(ProcessorContext<String, DigitalTwin> context) {
this.context = context;
this.kvStore = (KeyValueStore<String, DigitalTwin>) context.getStateStore("digital-twin-store");
}
週期性函式的實作
在某些情況下,我們需要在 Kafka Streams 應用程式中執行週期性的任務。處理器 API 提供了一種簡單的方式來排程這些任務,使用 ProcessorContext#schedule 方法。根據使用情境的不同,我們可以選擇兩種不同的排程策略:根據串流時間(Stream Time)或根據牆鐘時間(Wall Clock Time)。
根據牆鐘時間的排程
在本文的例子中,我們需要實作一個 TTL(Time To Live)函式,以刪除在過去七天內沒有更新的數字孿生(Digital Twin)記錄。由於這個任務的執行不依賴於新資料的到達,因此我們選擇根據牆鐘時間的排程策略。
// 排程 TTL 函式
@Override
public void init(ProcessorContext<String, DigitalTwin> context) {
punctuator = this.context.schedule(
Duration.ofMinutes(5),
PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
// ...
}
// 實作 TTL 函式
public void enforceTtl(Long timestamp) {
try (KeyValueIterator<String, DigitalTwin> iter = kvStore.all()) {
while (iter.hasNext()) {
KeyValue<String, DigitalTwin> entry = iter.next();
TurbineState lastReportedState = entry.value.getReported();
if (lastReportedState == null) {
continue;
}
Instant lastUpdated = Instant.parse(lastReportedState.getTimestamp());
long daysSinceLastUpdate =
Duration.between(lastUpdated, Instant.now()).toDays();
if (daysSinceLastUpdate >= 7) {
kvStore.delete(entry.key);
}
}
}
}
程式碼解析
init方法中的排程: 在init方法中,我們使用ProcessorContext#schedule方法來排程enforceTtl函式,排程策略為根據牆鐘時間,每 5 分鐘執行一次。enforceTtl方法: 這個方法負責檢查數字孿生記錄是否超過七天沒有更新,如果是,則從狀態儲存中刪除該記錄。