Kafka Streams 應用程式中,狀態儲存管理至關重要,影響效能、一致性和可靠性。本文將探討如何最佳化狀態儲存,包含調整日誌壓縮、使用 LRU 快取、減少寫入操作及監控狀態儲存。日誌壓縮可透過設定 segment.bytes 和 min.cleanable.dirty.ratio 等引數來調整壓縮頻率,減少儲存空間。LRU 快取則透過 Stores.lruMap 方法設定固定大小,自動淘汰較少使用的資料。利用記錄快取可減少狀態更新寫入次數,cache.max.bytes.buffering 和 commit.interval.ms 引數控制快取大小和提交頻率,需權衡記憶體用量、延遲與故障還原成本。狀態儲存監控方面,可使用狀態監聽器追蹤應用程式狀態變化,例如偵測重新平衡事件。狀態還原監聽器則提供 onRestoreStart、onRestoreEnd 和 onBatchRestored 方法,監控狀態還原過程。Kafka Streams 內建 JMX 指標提供狀態儲存操作和查詢的速率、執行時間等資訊,RocksDB 儲存區更提供位元組級 I/O 流量指標。互動式查詢方面,Kafka Streams 2.5 版本起支援備用副本提供過時結果,提升高用性。
進階狀態管理:最佳化 Kafka Streams 狀態儲存
調整日誌壓縮以最小化狀態儲存大小
在 Kafka Streams 中,狀態儲存的大小可能會無限增長,因為底層儲存媒體理論上是無界的。為了最小化狀態儲存的大小,可以採用日誌壓縮(log compaction)機制。日誌壓縮會移除重複的記錄,只保留最新的值,從而減少需要重播的記錄數量。
相關組態引數
下表列出了可用於觸發更頻繁日誌壓縮的 Topic 組態引數:
| 組態引數 | 預設值 | 定義 |
|---|---|---|
| segment.bytes | 1073741824 (1 GB) | 控制日誌段檔案的大小。壓縮始終以檔案為單位進行,因此較大的段大小意味著較少的檔案,但對保留的控制較不精細。 |
| segment.ms | 604800000 (7 週) | 控制 Kafka 強制日誌滾動的時間段,即使段檔案未滿,以確保舊資料可以被壓縮或刪除。 |
| min.cleanable.dirty.ratio | 0.5 | 控制日誌壓縮器清理日誌的頻率。預設情況下,如果日誌中超過 50% 的內容已經被壓縮,則避免清理該日誌。這個比例限制了日誌中因重複內容而浪費的最大空間。 |
| max.compaction.lag.ms | Long.MAX_VALUE - 1 | 訊息在日誌中保持不可壓縮狀態的最大時間。僅適用於正在被壓縮的日誌。 |
| min.compaction.lag.ms | 0 | 訊息在日誌中保持未壓縮狀態的最小時間。僅適用於正在被壓縮的日誌。 |
調整組態範例
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put("segment.bytes", "536870912"); // 將段大小減少到 512 MB
topicConfigs.put("min.cleanable.dirty.ratio", "0.3"); // 將最小可清理的汙穢比例減少到 30%
StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], String> stream = builder.stream("patient-events");
KTable<byte[], Long> counts = stream
.groupByKey()
.count(
Materialized.<byte[], Long, KeyValueStore<Bytes, byte[]>>as("counts")
.withKeySerde(Serdes.ByteArray())
.withValueSerde(Serdes.Long())
.withLoggingEnabled(topicConfigs));
內容解密:
segment.bytes組態:將段大小從預設的 1 GB 調整為 512 MB,有助於更頻繁地進行日誌壓縮,因為壓縮操作是按檔案進行的。min.cleanable.dirty.ratio組態:將最小可清理的汙穢比例從 0.5 調整為 0.3,使得日誌壓縮更加頻繁,減少因重複內容而浪費的空間。
使用固定大小的 LRU 快取
另一種控制狀態儲存大小的方法是使用具有固定大小的最近最少使用(LRU)快取。這是一種簡單的鍵值儲存,具有可組態的固定容量,當狀態超過組態的大小時,會自動刪除最近最少使用的條目。
使用 LRU 快取範例
KeyValueBytesStoreSupplier storeSupplier = Stores.lruMap("counts", 10); // 建立一個最大容量為 10 的 LRU 快取
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("patient-events");
stream
.groupByKey()
.count(
Materialized.<String, Long>as(storeSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
return builder.build();
使用 StoreBuilder 建立 LRU 快取(Processor API)
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.lruMap("counts", 10);
StoreBuilder<KeyValueStore<String, Long>> lruStoreBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.Long());
builder.addStateStore(lruStoreBuilder);
內容解密:
- 建立 LRU 快取:使用
Stores.lruMap方法建立一個名為 “counts” 的 LRU 快取,最大容量為 10。 - 組態 Materialized:使用
Materialized.as方法將 LRU 快取應用於count操作。 - Processor API 中的使用:展示瞭如何使用
StoreBuilder在 Processor API 中新增 LRU 快取。
進階狀態管理:狀態儲存與效能最佳化
在 Kafka Streams 應用程式中,狀態管理扮演著至關重要的角色。適當的狀態管理不僅能提升應用程式的效能,還能確保資料的一致性和可靠性。本章將探討 Kafka Streams 的狀態管理機制,包括如何最佳化狀態儲存、減少寫入操作以及監控狀態儲存。
使用記錄快取減少寫入操作
在 Kafka Streams 中,某些 DSL 方法(如 suppress)可以用於限制視窗化儲存中的更新頻率。此外,還有一個操作引數可以用來控制狀態更新寫入底層狀態儲存和下游處理器的頻率。這些引數如表 6-2 所示。
表 6-2. 可用於減少寫入狀態儲存和下游處理器的主題組態
| 原始組態 | StreamsConfig 屬性 | 預設值 | 定義 |
|---|---|---|---|
| cache.max.bytes.buffering | CACHE_MAX_BYTES_BUFFERING_CONFIG | 1048576 (10 MB) | 用於跨執行緒緩衝的最大記憶體量(以位元組為單位) |
| commit.interval.ms | COMMIT_INTERVAL_MS_CONFIG | 30000 (30 秒) | 儲存處理器位置的頻率 |
較大的快取大小和較高的提交間隔有助於對相同鍵的連續更新進行重複資料刪除。這帶來了多個好處,包括:
- 減少讀取延遲
- 減少寫入量:
- 狀態儲存
- 其底層變更日誌主題(如果啟用)
- 下游串流處理器
因此,如果瓶頸似乎出現在讀取/寫入狀態儲存或網路 I/O(這可能是由於頻繁更新變更日誌主題所致),則應考慮調整這些引數。當然,較大的記錄快取也有一些權衡:
- 更高的記憶體使用量
- 更高的延遲(記錄發出的頻率較低)
快取大小與提交間隔的權衡
由 cache.max.bytes.buffering 引數控制的記錄快取總記憶體量在所有串流執行緒之間共用。記憶體池將被均勻地細分,因此處理“熱區”(即資料量相對較高的分割區)的執行緒將更頻繁地重新整理其快取。無論快取大小或提交間隔,最終的有狀態計算結果將保持相同。
使用較大的提交間隔也有一個權衡,即在發生故障後需要重做的作業量將隨著此組態值的增加而增加。
有時,可能需要檢視每個中間狀態的變化,而不進行任何快取。事實上,有時剛接觸 Kafka Streams 的人會觀察到重複資料刪除或快取重新整理的延遲,並認為拓撲結構有問題,因為他們向源主題產生了一定數量的記錄,但只看到一部分狀態變化被重新整理(可能在幾秒鐘的延遲之後,而不是立即)。因此,有時人們會在開發環境中完全停用快取並設定較小的提交間隔。但請注意在生產環境中執行此操作,因為它可能會影響效能。
狀態儲存監控
在將應用程式佈署到生產環境之前,確保對應用程式有足夠的可見性以正確支援它至關重要。本文將討論監控有狀態應用程式的常見方法,以便您可以減少維運負擔並在發生錯誤時擁有足夠的資訊進行偵錯。
新增狀態監聽器
Kafka Streams 應用程式可以處於多種狀態之一(不要與狀態儲存混淆)。圖 6-5 顯示了這些狀態及其有效轉換。
如前所述,重新平衡狀態對有狀態的 Kafka Streams 應用程式可能特別具有影響。因此,能夠追蹤應用程式何時轉換到重新平衡狀態,以及這種情況發生的頻率,對於監控目的非常有用。幸運的是,Kafka Streams 使我們能夠非常容易地監控應用程式狀態何時變更,使用所謂的狀態監聽器。狀態監聽器只是一個回呼方法,每當應用程式狀態變更時就會被呼叫。
根據您的應用程式,您可能希望在重新平衡發生時採取某些行動。例如,在 Mailchimp,我們在 Kafka Streams 應用程式中建立了一個特殊的指標,每當觸發重新平衡時,該指標就會遞增。該指標被傳送到我們的監控系統(Prometheus),可以在那裡進行查詢,甚至用於建立警示。
圖 6-5. Kafka Streams 中的應用程式狀態及其有效轉換
以下程式碼顯示瞭如何向 Kafka Streams 拓撲結構新增狀態監聽器的範例,該監聽器專門監聽到重新平衡狀態的轉換:
KafkaStreams streams = new KafkaStreams(...);
streams.setStateListener(
(oldState, newState) -> {
if (newState.equals(State.REBALANCING)) {
// 做些事情
}
});
- 使用
KafkaStreams.setStateListener方法在應用程式狀態變更時呼叫一個方法。 StateListener類別的方法簽章包括舊狀態和新狀態。- 當應用程式進入重新平衡狀態時,有條件地執行某些動作。
雖然狀態監聽器非常有用,但它們並不是我們可以在 Kafka Streams 應用程式中利用的唯一監聽器。下一節將討論另一種可用於提高有狀態應用程式可見性的方法。
程式碼解析
KafkaStreams streams = new KafkaStreams(...);
streams.setStateListener(
(oldState, newState) -> {
if (newState.equals(State.REBALANCING)) {
// 做些事情
}
});
內容解密:
KafkaStreams streams = new KafkaStreams(...);:建立一個KafkaStreams例項,用於管理 Kafka Streams 應用程式的生命週期。streams.setStateListener((oldState, newState) -> {...});:設定一個狀態監聽器,用於監控應用程式的狀態變化。(oldState, newState) -> {...}:這是一個 lambda 表示式,定義了當應用程式狀態變化時要執行的動作。它接收兩個引數:oldState(舊狀態)和newState(新狀態)。if (newState.equals(State.REBALANCING)) {...}:檢查新狀態是否為重新平衡(REBALANCING)狀態。如果是,則執行特定的動作。
這個範例展示瞭如何在 Kafka Streams 應用程式中新增一個狀態監聽器,以便在應用程式進入重新平衡狀態時執行特定的動作。透過這種方式,可以實作對應用程式狀態變化的精細控制和監控。
新增狀態還原監聽器
在上一節中,我們學習瞭如何在 Kafka Streams 應用程式中監聽再平衡(rebalance)觸發事件。然而,再平衡主要是在狀態儲存區被重新初始化時才會引起關注。Kafka Streams 提供了一個名為 StateRestoreListener 的監聽器,可以在狀態儲存區被重新初始化時被呼叫。以下程式碼展示瞭如何在 Kafka Streams 應用程式中新增 StateRestoreListener:
KafkaStreams streams = new KafkaStreams(...);
streams.setGlobalStateRestoreListener(new MyRestoreListener());
MyRestoreListener 類別是 StateRestoreListener 的實作,需要實作三個方法,每個方法都與狀態還原過程的生命週期相關聯。以下程式碼區塊描述了每個方法的使用場景:
class MyRestoreListener implements StateRestoreListener {
private static final Logger log = LoggerFactory.getLogger(MyRestoreListener.class);
@Override
public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
log.info("以下狀態儲存區正在被還原:{}", storeName);
}
@Override
public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
log.info("以下狀態儲存區還原完成:{}", storeName);
}
@Override
public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
// 這裡的記錄非常詳細,不建議在此處記錄任何資訊
}
}
內容解密:
onRestoreStart方法:在狀態重新初始化開始時被呼叫。startingOffset引數尤其重要,因為它指示是否需要重播整個狀態(當使用記憶體儲存區或持久化儲存區且之前的狀態遺失時發生)。如果startingOffset為 0,則需要完全重新初始化;如果大於 0,則只需要部分還原。onRestoreEnd方法:在狀態還原完成時被呼叫。onBatchRestored方法:在單一批次記錄被還原時被呼叫。批次的最大大小與MAX_POLL_RECORDS組態相同。由於該方法可能會被頻繁呼叫,因此在該方法中進行同步處理時需謹慎,以避免拖慢還原過程。
內建指標
Kafka Streams 包含一組內建的 JMX 指標,其中許多與狀態儲存區相關。例如,可以存取某些狀態儲存區操作和查詢的速率(例如 get、put、delete、all、range)、這些操作的平均和最大執行時間,以及抑制緩衝區的大小。對於根據 RocksDB 的儲存區,還有許多指標,例如 bytes-written-rate 和 bytes-read-rate,在檢視位元組級別的 I/O 流量時尤其有用。
互動式查詢
在 Kafka Streams 2.5 之前,對於使用互動式查詢公開其狀態的應用程式來說,再平衡尤其痛苦。在這些舊版本的函式庫中,離線或重新平衡的分割槽將導致對狀態儲存區的互動式查詢失敗。然而,從 Kafka Streams 2.5 開始,備用副本可以用於在新遷移的狀態儲存區正在初始化時提供過時的結果。這使得即使在應用程式進入重新平衡狀態時,API 仍然保持高用性。
圖表說明
以下 Plantuml 圖表展示了 Kafka Streams 狀態儲存區的還原過程:
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title KafkaStreams 狀態儲存最佳化實務
package "系統架構" {
package "前端層" {
component [使用者介面] as ui
component [API 客戶端] as client
}
package "後端層" {
component [API 服務] as api
component [業務邏輯] as logic
component [資料存取] as dao
}
package "資料層" {
database [主資料庫] as db
database [快取] as cache
}
}
ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取
note right of api
RESTful API
或 GraphQL
end note
@enduml
此圖示展示了根據 startingOffset 的值決定是完全重新初始化還是部分還原的流程。
詳細解析
queryMetadataForKey方法:用於取得指定鍵的後設資料,包括主機和埠號對。standbyHosts方法:用於取得備用主機的資訊,在 Kafka Streams 2.5 及之後版本可用。- 檢查主機存活狀態:需要實作
isAlive方法來檢查主機是否存活,可以結合StateListener和 RPC 伺服器來實作。
使用範例
KeyQueryMetadata metadata = streams.queryMetadataForKey(storeName, key, Serdes.String().serializer());
if (isAlive(metadata.activeHost())) {
// 將查詢路由到活躍主機
} else {
// 將查詢路由到備用主機
Set<HostInfo> standbys = metadata.standbyHosts();
}
程式碼解析:
- 使用
queryMetadataForKey方法取得鍵的後設資料。 - 檢查活躍主機是否存活,如果是,則路由到活躍主機;否則,路由到備用主機。
- 取得備用主機的集合,用於查詢路由。