返回文章列表

KafkaStreams抑制中間結果與狀態管理

本文探討 Kafka Streams 中的抑制運算元與狀態管理機制,包含如何抑制中間結果、組態緩衝區策略、管理狀態儲存以及查詢視窗化鍵值儲存等關鍵技術,並輔以程式碼範例與圖示說明,讓開發者能更有效地控制串流處理流程並建構更可靠的應用程式。

串流處理 Kafka

在串流處理中,有效管理狀態和控制輸出結果至關重要。Kafka Streams 提供了抑制運算元(suppress)和多種狀態管理機制,幫助開發者精確控制資料流處理流程。本文將探討這些技術,並提供實務上的程式碼範例和圖示說明。透過抑制運算元,我們可以避免釋出中間結果,只輸出最終計算結果,有效減少下游負載。同時,理解 Kafka Streams 的狀態儲存機制,包含磁碟佈局、容錯處理和再平衡策略,對於建構可靠的串流應用至關重要。文章也涵蓋了視窗化鍵值儲存的查詢方法,讓開發者能有效地提取和分析資料。

視窗結果釋出的最佳化:抑制中間結果

在串流處理系統中,如何處理延遲資料是一個巨大的挑戰。許多框架,包括那些遵循Dataflow模型的框架(如Apache Flink),都使用水印(Watermarks)來估計何時應該接收到給定視窗的所有資料。使用者可以指定如何處理延遲事件(由水印決定),預設情況下會丟棄延遲事件。

與水印方法類別似,Kafka Streams允許我們透過設定寬限期(Grace Period)來組態允許的事件延遲。設定寬限期將保持視窗開啟一段時間,以便接收延遲或無序事件。例如,我們可以使用以下程式碼組態翻滾視窗:

TimeWindows tumblingWindow = TimeWindows.of(Duration.ofSeconds(60));

如果我們希望能夠容忍脈搏事件的五秒鐘延遲(用於計算心率),我們可以定義具有寬限期的翻滾視窗:

TimeWindows tumblingWindow = TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(5));

內容解密:

  • TimeWindows.of(Duration.ofSeconds(60)) 定義了一個每60秒進行一次聚合的翻滾視窗。
  • .grace(Duration.ofSeconds(5)) 設定了5秒的寬限期,允許事件在此期間內到達並被納入視窗計算。

然而,在我們的病人監測應用程式中,我們不能使用少於60秒的資料來計算心率,因此需要僅釋出視窗的最終結果。這是suppress運算元的作用所在。suppress運算元可以用來僅釋出視窗的最終計算結果,並抑制所有其他事件。

為了使用suppress運算元,我們需要決定三件事:

  1. 使用哪種抑制策略來抑制中間視窗計算。
  2. 為被抑制的事件緩衝分配多少記憶體(使用Buffer Config設定)。
  3. 當記憶體限制被超過時該怎麼辦(使用Buffer Full Strategy控制)。

抑制策略

Kafka Streams提供了兩種抑制策略,如表5-1所示。

策略描述
Suppressed.untilWindowCloses僅釋出視窗的最終結果。
Suppressed.untilTimeLimit在可組態的時間限制後釋出視窗結果,如果在時間限制內收到相同鍵的新事件,則替換緩衝區中的第一個事件。

在我們的病人監測應用程式中,我們只想在完整的60秒過去後釋出心率視窗的結果。因此,我們將使用Suppressed.untilWindowCloses抑制策略。

內容解密:

  • Suppressed.untilWindowCloses 確保只有在視窗關閉後才釋出最終結果。
  • Suppressed.untilTimeLimit 用於速率限制更新,在特定時間限制後釋出結果。

緩衝區組態

為了定義我們的緩衝策略,我們需要使用Buffer Configs,如表5-2所示。

Buffer Config描述
BufferConfig.maxBytes()緩衝區受組態的位元組數限制。
BufferConfig.maxRecords()緩衝區受組態的鍵數限制。
BufferConfig.unbounded()緩衝區使用盡可能多的堆積空間來儲存被抑制的記錄,如果應用程式記憶體不足,將丟擲OutOfMemoryError。

我們選擇了BufferConfig.unbounded(),因為我們預期我們的鍵空間相對較小。

內容解密:

  • BufferConfig.unbounded() 表示緩衝區將使用盡可能多的堆積空間,這可能導致OutOfMemoryError,但適用於鍵空間較小的情況。

緩衝區已滿策略

當緩衝區已滿時,Kafka Streams提供了幾種Buffer Full Strategies,如表5-3所示。

Buffer Full Strategy描述
shutDownWhenFull當緩衝區已滿時正常關閉應用程式,使用此策略不會看到中間視窗計算結果。
emitEarlyWhenFull當緩衝區已滿時釋出最舊的結果,而不是關閉應用程式,使用此策略仍可能看到中間視窗計算結果。

我們選擇了shutDownWhenFull,因為我們永遠不想提前釋出結果,這可能會導致心率計算不準確。

內容解密:

  • shutDownWhenFull 確保應用程式在緩衝區已滿時關閉,避免釋出不完整的結果。
  • emitEarlyWhenFull 允許在緩衝區已滿時釋出最舊的結果,但可能會導致看到中間結果。

綜合上述,我們可以更新我們的病人監測拓撲以使用suppress運算元,如範例5-4所示。

// 假設KStream物件為heartRateStream
heartRateStream
    .groupByKey()
    .windowedBy(tumblingWindow)
    .aggregate(
        // 初始化和聚合邏輯
    )
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
    .toStream()
    .map((windowedKey, value) -> // 處理最終結果);

內容解密:

  • .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) 使用了untilWindowCloses策略和無界緩衝區組態,確保只有最終結果被釋出,並且使用盡可能多的堆積空間來儲存被抑制的記錄。

時間驅動的資料流處理

在前面的章節中,我們已經討論瞭如何使用Kafka Streams進行視窗化聚合和視窗化連線。在本章中,我們將更深入地探討時間如何影響資料流處理的行為。

使用抑制運算元最佳化視窗化結果

在某些情況下,我們可能只對視窗化操作的最終結果感興趣。在這種情況下,我們可以使用suppress運算元來抑制中間結果,只發出最終結果。

程式碼範例:使用抑制運算元

TimeWindows tumblingWindow = 
    TimeWindows
        .of(Duration.ofSeconds(60))
        .grace(Duration.ofSeconds(5));

KTable<Windowed<String>, Long> pulseCounts = 
    pulseEvents
        .groupByKey()
        .windowedBy(tumblingWindow)
        .count(Materialized.as("pulse-counts"))
        .suppress(
            Suppressed.untilWindowCloses(BufferConfig.unbounded().shutDownWhenFull()));

內容解密:

  1. 視窗化設定:使用TimeWindows定義了一個時間視窗,大小為60秒,並允許5秒的延遲處理。
  2. 分組與計數:將pulseEvents按照鍵分組,並在定義的視窗內進行計數。
  3. 抑制運算元:使用suppress運算元來抑制中間結果,只在視窗關閉時發出最終計數結果。
  4. 緩衝組態:使用BufferConfig.unbounded().shutDownWhenFull()組態無界緩衝,並在緩衝滿時關閉。

過濾和重新鍵值化視窗化KTable

在進行視窗化操作後,我們的鍵值型別可能會發生變化。例如,在上述範例中,鍵值型別從String變成了Windowed<String>。為了能夠與其他資料流進行連線,我們需要重新鍵值化資料流。同時,我們也需要過濾資料,只保留符合特定條件的記錄。

程式碼範例:過濾和重新鍵值化

KStream<String, Long> highPulse = 
    pulseCounts
        .toStream()
        .filter((key, value) -> value >= 100)
        .map(
            (windowedKey, value) -> {
                return KeyValue.pair(windowedKey.key(), value);
            });

KStream<String, BodyTemp> highTemp = 
    tempEvents.filter((key, value) -> value.getTemperature() > 100.4);

內容解密:

  1. 轉換為資料流:將pulseCounts轉換為資料流,以便進行進一步的操作。
  2. 過濾心率資料:只保留心率大於或等於100的記錄。
  3. 重新鍵值化:使用原始鍵值替換視窗化鍵值。
  4. 過濾體溫資料:只保留體溫大於100.4°F的記錄。

視窗化連線

視窗化連線需要使用滑動視窗來比較兩個資料流中事件的時間戳,以確定哪些記錄應該被連線起來。

程式碼範例:視窗化連線

StreamJoined<String, Long, BodyTemp> joinParams = 
    StreamJoined.with(Serdes.String(), Serdes.Long(), JsonSerdes.BodyTemp());

JoinWindows joinWindows = 
    JoinWindows
        .of(Duration.ofSeconds(60))
        .grace(Duration.ofSeconds(10));

ValueJoiner<Long, BodyTemp, CombinedVitals> valueJoiner = 
    (pulseRate, bodyTemp) -> new CombinedVitals(pulseRate.intValue(), bodyTemp);

KStream<String, CombinedVitals> vitalsJoined = 
    highPulse.join(highTemp, valueJoiner, joinWindows, joinParams);

內容解密:

  1. 連線引數設定:定義了連線操作中使用的序列化器。
  2. 滑動視窗設定:定義了一個大小為60秒的滑動視窗,並允許10秒的延遲處理。
  3. 值聯結器:定義瞭如何將心率和體溫資料合併成一個CombinedVitals物件。
  4. 執行連線操作:將highPulsehighTemp資料流按照定義的視窗和聯結器進行連線。

時間驅動的資料流控制

Kafka Streams透過比較不同分割槽中待處理記錄的時間戳來控制資料流的處理順序。具有最低時間戳的記錄將被優先處理。

Stream Time的概念

  • Stream Time是指Kafka Streams內部維護的一個時間概念,代表了某個主題分割槽中觀察到的最高時間戳。
  • Stream Time只會增加或保持不變,只有當新的資料到來時才會更新。

圖示說明:資料流控制流程

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖示說明:資料流控制流程

rectangle "Record Queue" as node1
rectangle "Select Record with Lowest Timestamp" as node2
rectangle "Process Record" as node3

node1 --> node2
node2 --> node3

@enduml

此圖示展示了Kafka Streams如何透過優先佇列來選擇具有最低時間戳的記錄進行處理。

時間驅動的資料流處理與查詢視窗化鍵值儲存

在 Kafka Streams 應用程式中,時間同步和資料處理順序至關重要。透過設定 max.task.idle.ms 組態,可以控制等待新記錄到達的時間,從而改善時間同步問題。預設值為 0,但增加此值可讓應用程式等待更長時間,以處理無序資料。

圖示:記錄時間戳比較示意圖

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖示:記錄時間戳比較示意圖

rectangle "時間戳" as node1
rectangle "最大時間戳" as node2

node1 --> node2

@enduml

此圖示展示了 Kafka Streams 如何比較記錄的時間戳,以決定資料的流動順序。

內容解密:

  1. 記錄1和記錄2代表輸入的資料流。
  2. 比較模組負責比較兩條記錄的時間戳。
  3. 輸出記錄的時間戳為兩條輸入記錄中的最大值。

警示接收器(Alerts Sink)

為了使連線結果可供下游消費者使用,需要將豐富的資料寫回 Kafka。在 Kafka Streams 中新增接收器(sink)非常簡單。以下程式碼展示瞭如何新增警示接收器:

vitalsJoined.to(
    "alerts",
    Produced.with(Serdes.String(), JsonSerdes.CombinedVitals())
);

內容解密:

  1. vitalsJoined 是連線操作的結果流。
  2. to 方法將結果流寫入名為 “alerts” 的 Kafka 主題。
  3. Produced.with 方法指定了輸出記錄的鍵和值的序列化器。

查詢視窗化鍵值儲存

視窗化鍵值儲存支援不同於非視窗化鍵值儲存的查詢型別。由於記錄鍵是多維的(包含原始鍵和視窗範圍),因此查詢方式也有所不同。

鍵 + 視窗範圍掃描

這種查詢型別需要三個引數:要搜尋的鍵、視窗範圍的下限和上限。以下程式碼展示瞭如何執行此類別查詢:

String key = "1";
Instant fromTime = Instant.parse("2020-11-12T09:02:00.00Z");
Instant toTime = Instant.parse("2020-11-12T09:03:00.00Z");
WindowStoreIterator<Long> range = getBpmStore().fetch(key, fromTime, toTime);
while (range.hasNext()) {
    KeyValue<Long, Long> next = range.next();
    Long timestamp = next.key;
    Long count = next.value;
    // 處理提取的值
}
range.close();

內容解密:

  1. getBpmStore().fetch 方法根據指定的鍵和視窗範圍檢索資料。
  2. WindowStoreIterator 用於迭代查詢結果。
  3. next.keynext.value 分別代表記錄的時間戳和計數值。

視窗範圍掃描

這種查詢型別需要兩個引數:視窗範圍的下限和上限。以下程式碼展示瞭如何執行此類別查詢:

Instant fromTime = Instant.parse("2020-11-12T09:02:00.00Z");
Instant toTime = Instant.parse("2020-11-12T09:03:00.00Z");
KeyValueIterator<Windowed<String>, Long> range = getBpmStore().fetchAll(fromTime, toTime);
while (range.hasNext()) {
    KeyValue<Windowed<String>, Long> next = range.next();
    String key = next.key.key();
    Window window = next.key.window();
    Long start = window.start();
    Long end = window.end();
    Long count = next.value;
    // 處理提取的值
}
range.close();

內容解密:

  1. getBpmStore().fetchAll 方法檢索指定視窗範圍內的所有鍵值對。
  2. KeyValueIterator 用於迭代查詢結果。
  3. next.key.key() 取得原始鍵,next.key.window() 取得視窗資訊。

高階狀態管理

在過去的兩章中,我們討論了 Kafka Streams 中的有狀態處理。隨著我們學習如何執行聚合、連線和視窗操作,很明顯有狀態處理很容易入門。

然而,正如之前所暗示的,狀態儲存帶來了額外的操作複雜性。當您擴充套件應用程式、遇到故障並執行例行維護時,您將瞭解到有狀態處理需要更深入地瞭解底層機制,以確保您的應用程式在一段時間內繼續平穩執行。

本章的目標是更深入地探討狀態儲存,以便在構建有狀態流處理應用程式時實作更高的可靠性。本章的大部分內容致力於再平衡的話題,當需要在消費者群組中重新分配工作時就會發生再平衡。再平衡對有狀態應用程式可能產生特別大的影響,因此我們將發展我們的理解,以便您能夠在自己的應用程式中處理這種情況。

我們將回答的一些問題包括:

  • 持久狀態儲存如何在磁碟上表示?
  • 有狀態應用程式如何實作容錯能力?
  • 我們如何組態內建狀態儲存?
  • 哪些型別的事件對有狀態應用程式最具影響力?
  • 可以採取哪些措施來最小化有狀態任務的還原時間?
  • 如何確保狀態儲存不會無限增長?
  • 如何使用DSL快取來限制下游更新的速率?
  • 如何使用狀態還原監聽器來跟蹤狀態還原的進度?
  • 如何使用狀態監聽器來檢測再平衡?

讓我們從檢查持久狀態儲存的磁碟佈局開始。

持久儲存磁碟佈局

Kafka Streams 包括記憶體和持久狀態儲存。後者通常是首選,因為它們可以幫助減少在需要重新初始化狀態(例如,在故障或任務遷移的情況下)時應用程式的還原時間。

預設情況下,持久狀態儲存位於 /tmp/kafka-streams 目錄中。您可以透過設定 StreamsConfig.STATE_DIR_CONFIG 屬性來覆寫此設定,並且考慮到 /tmp 目錄的臨時性質(該目錄的內容在系統重啟/當機期間被刪除),您應該選擇另一個位置來持久化您的應用程式狀態。

由於持久狀態儲存位於磁碟上,我們可以很容易地檢查檔案。這使我們能夠從目錄和檔名中收集大量資訊。範例 6-1 中的檔案樹取自我們在上一章中建立的患者監控應用程式。註解提供了有關重要目錄和檔案的額外詳細資訊。

範例 6-1:持久狀態儲存在磁碟上的表示範例

└── dev-consumer
    ├── 0_0
    │   ├── .lock
    │   └── pulse-counts
    ├── 0_1
    │   ├── .lock
    │   └── pulse-counts
    ├── 0_2
    │   ├── .lock
    │   └── pulse-counts
    ├── 0_3
    │   ├── .checkpoint
    │   ├── .lock
    │   └── pulse-counts
    │   └── ...
    ├── 1_0
    │   ├── ...

內容解密:

此範例展示了持久狀態儲存在磁碟上的佈局。其中,dev-consumer 是 Kafka Streams 應用程式的根目錄。子目錄(如 0_00_1 等)代表不同的任務。每個任務目錄包含一個 .lock 檔案和一個或多個狀態儲存檔案(如 pulse-counts)。.lock 檔案用於鎖定任務目錄,以防止多個例項同時存取。.checkpoint 檔案用於記錄狀態儲存的檢查點,以便在故障還原時使用。

透過檢查磁碟佈局,我們可以更好地瞭解 Kafka Streams 如何管理狀態儲存,並採取措施最佳化其效能和可靠性。