在資料串流應用日益普及的今天,Kafka Streams 和 ksqlDB 成為了建構高效能串流應用程式的熱門選擇。為了提升應用程式的可維護性和可擴充套件性,容器化佈署成為了主流趨勢。本文將探討如何利用 Docker 和 Kubernetes 佈署及管理 Kafka Streams 和 ksqlDB 應用程式,並涵蓋 Prometheus 監控、應用程式重置以及速率限制等重要議題,提供實務操作和程式碼範例,協助開發者建構更穩健的串流應用。
使用容器佈署 Kafka Streams 和 ksqlDB
在生產環境中佈署 Kafka Streams 和 ksqlDB 時,推薦使用容器化技術,如 Docker。容器化提供了輕量級、隔離的執行環境,使得程式碼的管理、分享和維護變得更加容易。
ksqlDB 容器
Confluent 已經發布了官方的 ksqlDB 伺服器和 CLI 容器映像。佈署查詢到生產環境時,可以使用官方的 ksqlDB 伺服器容器映像(confluentinc/ksqldb-server)或根據需要建立衍生映像。
組態 ksqlDB 容器
官方的 ksqlDB 伺服器映像允許透過環境變數指定大多數 ksqlDB 組態屬性。然而,這種方法有一些缺點,例如組態難以版本控制、執行容器的命令可能變得冗長等。因此,建議將組態檔案掛載到容器內部。
例如,將 ksqlDB 伺服器組態儲存到本地檔案 config/server.properties:
bootstrap.servers=kafka:9092
ksql.service.id=greeter-prod
然後,可以將此組態檔案掛載到容器內部,並啟動 ksqlDB 伺服器:
docker run \
--net=chapter-12_default \
-v "$(pwd)/config":/config \
-ti confluentinc/ksqldb-server:0.14.0 \
ksql-server-start /config/server.properties
處理額外的組態檔案
如果組態中包含 ksql.connect.worker.config 或 queries.file 等屬性,則需要將相關檔案掛載到容器內部。
Kafka Streams 容器
與 ksqlDB 類別似,Kafka Streams 應用程式也可以容器化佈署。雖然文中沒有提供具體的 Kafka Streams 容器範例,但原理是相似的:將應用程式封裝成 Docker 映像,然後在容器中執行。
使用 Prometheus 監控 Kafka Streams 和 ksqlDB
在生產環境中,需要一個強大的監控解決方案來儲存歷史資料、執行查詢和與警示系統整合。Prometheus 是一個推薦的技術。要將指標匯出到 Prometheus,需要:
- 下載 Prometheus JMX 匯出器 JAR。
- 使用
-javaagent引數啟動 Kafka Streams 或 ksqlDB。 - 組態 Prometheus 以抓取 HTTP 端點上的 JMX 指標。
組態 Prometheus 的範例程式碼
本章的原始碼中包含了一個使用 Prometheus 與 Kafka Streams 和 ksqlDB 的完整範例。
為何匯出 JMX 指標很重要
無論使用 Prometheus 或其他監控系統,將 Kafka Streams 和 ksqlDB 的內建 JMX 指標匯出到外部系統都可以提高應用程式的可觀測性。
程式碼與內容解說
文中提到的 Docker 命令和組態檔案是佈署 ksqlDB 的關鍵步驟。下面是對這些內容的詳細解說:
docker run \
--net=chapter-12_default \
-v "$(pwd)/config":/config \
-ti confluentinc/ksqldb-server:0.14.0 \
ksql-server-start /config/server.properties
docker run: 這是 Docker 用於啟動新容器的命令。--net=chapter-12_default: 指定容器連線到名為chapter-12_default的 Docker 網路,這使得容器可以與其他在同一個網路中的容器進行通訊。-v "$(pwd)/config":/config: 將主機上的當前目錄下的config資料夾掛載到容器的/config目錄。這樣,容器就可以存取主機上的組態檔案。-ti confluentinc/ksqldb-server:0.14.0: 指定要執行的 Docker 映像名稱和版本,並分配一個偽TTY,以保持容器的互動性。ksql-server-start /config/server.properties: 這是在容器內執行的命令,用於啟動 ksqlDB 伺服器,並指定使用/config/server.properties組態檔案。
組態檔案的作用
組態檔案 server.properties 中包含了 ksqlDB 的組態屬性,例如:
bootstrap.servers=kafka:9092
ksql.service.id=greeter-prod
bootstrap.servers: 指定 Kafka 叢集的引導伺服器地址。ksql.service.id: 為 ksqlDB 例項指定一個唯一的服務ID。
為何需要掛載組態檔案
掛載組態檔案可以避免將敏感資訊或特定環境的組態寫死在 Docker 映像中,提高了佈署的靈活性。
將Kafka Streams應用程式容器化與佈署
在現代化的軟體開發流程中,將應用程式容器化已經成為了一種標準做法。對於Kafka Streams應用程式而言,容器化不僅能夠簡化佈署流程,還能提高應用的可移植性和可擴充套件性。本篇文章將探討如何使用Jib將Kafka Streams應用程式容器化,以及如何透過容器協調工具(如Kubernetes)進行佈署和管理。
使用Jib容器化Kafka Streams應用程式
Jib是一個由Google開發的開源工具,它簡化了Java應用程式的容器化過程。透過將Jib外掛新增到我們的Gradle構建檔案中,我們可以輕鬆地將Kafka Streams應用程式封裝成Docker映像。
首先,在build.gradle檔案中新增Jib外掛:
plugins {
id 'com.google.cloud.tools.jib' version '2.1.0'
}
jib {
to {
image = 'magicalpipelines/myapp:0.1.0'
}
container {
jvmFlags = []
mainClass = application.mainClassName
format = 'OCI'
}
}
接下來,執行以下命令來構建Docker映像:
./gradlew jibDockerBuild
這個命令會在本地Docker守護程式中構建映像,而不會將其推播到容器登入檔中。成功執行後,您應該會看到類別似以下的輸出:
Built image to Docker daemon as magicalpipelines/myapp:0.1.0
內容解密:
- Jib外掛組態:透過在
build.gradle中新增Jib外掛,我們能夠利用Jib的功能來容器化我們的Java應用程式。 jib區塊組態:在jib區塊中,我們指定了輸出映像的名稱和版本,以及容器的相關組態,如JVM引數和主類別。- 構建Docker映像:執行
./gradlew jibDockerBuild命令後,Jib會根據我們的組態構建Docker映像,並將其載入到本地Docker守護程式中。
容器協調與Kubernetes
對於生產環境中的Kafka Streams應用程式,建議使用容器協調系統(如Kubernetes)來管理和佈署容器。Kubernetes提供了多項好處,包括自動容錯移轉、簡易的擴充套件性、以及對基礎設施的抽象化。
Kubernetes的好處
- 自動容錯移轉:當節點故障時,Kubernetes能夠自動將容器遷移到健康的節點上。
- 簡易的擴充套件性:透過增加容器的副本數量,可以輕鬆地擴充套件工作負載。
- 抽象化的基礎設施:Kubernetes提供了一個抽象層,讓您能夠在不同的基礎設施上執行應用程式。
重置Kafka Streams應用程式
在某些情況下,您可能需要重置Kafka Streams應用程式,例如當發現系統中的錯誤並需要重新處理Kafka主題中的資料時。Kafka提供了一個應用程式重置工具,可以用於此目的。
使用應用程式重置工具
該工具可以:
- 更新源主題上的消費者偏移量到指定的位置。
- 跳過中間主題的末尾。
- 刪除內部的變更日誌和重新分割槽主題。
內容解密:
- 重置工具的功能:該工具提供了多種功能來重置Kafka Streams應用程式的狀態。
- 使用注意事項:在使用該工具時,需要特別小心,以確保正確識別應用程式的消費者群組。
Kafka Streams 應用程式重置與速率限制輸出
在使用 Kafka Streams 處理資料流時,我們可能會遇到需要重置應用程式狀態或限制輸出速率的情況。本文將介紹如何使用 Kafka Streams 的重置工具以及如何利用記錄快取來限制輸出速率。
重置 Kafka Streams 應用程式
在使用 Kafka Streams 處理有狀態的應用程式時,需要特別小心,因為重置工具不會重置應用程式的狀態。以下是重置 Kafka Streams 應用程式的步驟:
- 停止所有應用程式例項:在繼續下一步之前,請確保消費者群組已停止。您可以執行以下命令來檢查群組是否處於非活動狀態:
kafka-consumer-groups
–bootstrap-server kafka:9092
–describe
–group dev
–state
更新 bootstrap 伺服器為其中一個代理伺服器的主機/埠對。群組由 Kafka Streams 中的 `application.id` 設定指定。
2. **執行應用程式重置工具**:執行以下命令以重置應用程式:
```bash
kafka-streams-application-reset \
--application-id dev \
--bootstrap-servers kafka:9092 \
--input-topics users \
--to-earliest
輸出結果將類別似於以下內容:
Reset-offsets for input topics [users]
Following input topics offsets will be reset to (for consumer group dev)
Topic: users Partition: 3 Offset: 0
Topic: users Partition: 2 Offset: 0
Topic: users Partition: 1 Offset: 0
Topic: users Partition: 0 Offset: 0
Done.
Deleting all internal/auto-created topics for application dev
Done.
- 清理應用程式狀態:如果您的應用程式是有狀態的,則需要在重新啟動之前重置應用程式的狀態。您可以手動刪除每個狀態目錄,或在程式碼中呼叫
KafkaStreams.cleanUp方法(僅適用於 Kafka Streams)。
限制 Kafka Streams 應用程式的輸出速率
Kafka Streams 和 ksqlDB 能夠實作高吞吐量,但下游系統可能無法跟上輸出速率。在這種情況下,可以使用記錄快取來限制輸出速率。
記錄快取的工作原理
記錄快取可以減少寫入狀態儲存的輸出記錄數量,以及轉發到下游處理器的記錄數量。每個串流執行緒(由 num.stream.threads 引數控制)都會分配總快取大小的均等份額。
使用記錄快取限制輸出速率
假設我們有一個拓撲結構,需要計算每個鍵的訊息數量。如果我們有以下事件序列:
<key1, value1><key1, value2><key1, value3>
在未停用記錄快取的情況下(即 cache.max.bytes.buffering 設定為非零值),Kafka Streams 和 ksqlDB 可以將原始的聚合序列簡化為單一輸出:
<key1, 3>
這可以減少下游處理器需要處理的資料量。
升級 Kafka Streams
Apache Kafka(包括 Kafka Streams)採用根據時間的釋出計劃。因此,您應該預期每四個月就會發布一個新的 Kafka Streams 版本。版本策略如下所示:
major.minor.bug-fix
大多數情況下,四個月的釋出週期適用於次要版本。在極少數情況下,您可能會看到主要版本的更新,以適應更重大的變更或專案的重要里程碑。錯誤修復很常見,可以隨時發生。
重置工具與記錄快取的使用範例程式碼
// 使用 KafkaStreams.cleanUp 方法清理應用程式狀態
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
// 設定記錄快取大小
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760); // 10 MB
此圖示說明瞭 Kafka Streams 應用程式重置和速率限制輸出的流程
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Kafka Streams ksqlDB 容器化佈署與應用
package "Kubernetes Cluster" {
package "Control Plane" {
component [API Server] as api
component [Controller Manager] as cm
component [Scheduler] as sched
database [etcd] as etcd
}
package "Worker Nodes" {
component [Kubelet] as kubelet
component [Kube-proxy] as proxy
package "Pods" {
component [Container 1] as c1
component [Container 2] as c2
}
}
}
api --> etcd : 儲存狀態
api --> cm : 控制迴圈
api --> sched : 調度決策
api --> kubelet : 指令下達
kubelet --> c1
kubelet --> c2
proxy --> c1 : 網路代理
proxy --> c2
note right of api
核心 API 入口
所有操作經由此處
end note
@enduml
此圖示展示了重置 Kafka Streams 應用程式和限制輸出速率的主要步驟。
Kafka Streams 與 ksqlDB 的升級與組態管理
在生產環境中升級 Kafka Streams 和 ksqlDB 需要謹慎規劃,因為新版本的升級可能會引入不相容的變更或需要特定的升級步驟。
升級 Kafka Streams
升級 Kafka Streams 時,務必遵循官方 Kafka 網站發布的升級。某些情況下,可能需要設定 upgrade.from 引數到舊版本,然後進行一系列的重啟,以確保升級的安全性。
取得新版本資訊
有多種方式可以讓你及時瞭解 Kafka 和 ksqlDB 的新版本資訊:
- 訂閱官方郵件列表以接收版本發布公告。
- 關注 Apache Kafka 的官方 GitHub 倉函式庫,並選擇「Releases Only」選項以接收版本更新通知。
- 在 Twitter 上關注 @apachekafka。
升級 ksqlDB
ksqlDB 的升級路徑可能因版本而異,因為它在達到版本 1.0 之前可能會包含破壞性變更。官方檔案建議在升級前仔細閱讀升級,並考慮延遲升級直到新的版本帶來需要的功能或修復,或者直到 ksqlDB 達到版本 1.0 並承諾向後相容性。
組態管理
在開發 Kafka Streams 應用程式時,可以透過建立 Properties 例項並手動設定各種組態引數來進行組態。然而,在將應用程式佈署到生產環境時,建議從檔案中載入組態,以避免硬編碼值直接在應用程式中。這樣可以減少錯誤,並使得管理多個佈署變得更加容易。
Kafka Streams 組態屬性
組態 Kafka Streams 應用程式時,需要設定兩個必要的引數:
application.id:用於標識 Kafka Streams 應用程式的唯一識別碼。bootstrap.servers:Kafka 代理的主機和埠對列表,用於建立與 Kafka 叢集的連線。
此外,還有多個可選引數,例如 acceptable.recovery.lag,用於指定 Kafka Streams 任務在輸入分割槽上的最大延遲,以便被視為“預熱”並準備就緒。
Kafka Streams 設定引數詳解
Kafka Streams 提供多種設定引數,用於控制應用程式的行為和效能。以下是一些重要的設定引數:
1. acceptable.recovery.lag
此引數用於控制 Kafka Streams 在分配任務給應用程式例項之前,允許的最大延遲。當一個有狀態的應用程式例項需要還原部分或全部狀態時,您可能不希望 Kafka Streams 在還原期間分配工作給它。如果是這種情況,您可以設定此引數,以允許應用程式例項僅在延遲低於此閾值時才接收任務分配。
2. cache.max.bytes.buffering
此引數控制 Kafka Streams 中的記錄快取大小。將此值設定為大於 0 的值將啟用記錄快取,並可以對應用程式的輸出進行速率限制。
3. default.deserialization.exception.handler
此引數指定用於處理反序列化錯誤的類別。內建的選項包括 LogAndContinueExceptionHandler 和 LogAndFailExceptionHandler。前者允許 Kafka Streams 在發生反序列化錯誤時繼續處理記錄,而後者將導致 Kafka Streams 記錄異常並停止處理。
4. default.production.exception.handler
此引數指定用於處理與生成資料到 Kafka 相關的錯誤的類別。例如,如果記錄太大,則底層的生產者將丟擲一個需要以某種方式處理的異常。預設情況下,使用內建的 DefaultProductionExceptionHandler 類別,它將導致 Kafka Streams 失敗並關閉。
5. default.timestamp.extractor
此引數指定用於將給定記錄與時間戳相關聯的類別。
6. default.key.serde 和 default.value.serde
這些引數指定用於序列化和反序列化記錄鍵和值的預設類別。
程式碼範例
KStream<byte[], Tweet> stream = builder.stream(
"tweets",
Consumed.with(Serdes.ByteArray(), JsonSerdes.Tweet()));
在這個範例中,鍵的 Serdes(Serdes.ByteArray())和值的 Serdes(JsonSerdes.Tweet())被內聯定義。
內容解密:
Serdes.ByteArray()用於將鍵序列化為位元組陣列。JsonSerdes.Tweet()用於將值序列化為 JSON 格式的 Tweet 物件。- 使用
Consumed.with()方法指定鍵和值的 Serdes。
7. max.task.idle.ms
此引數控制流任務等待所有分割區緩衝區包含資料的最大時間。較高的值可能會增加延遲,但可以防止任務從多個輸入分割區讀取資料時出現亂序資料。
8. max.warmup.replicas
此引數控制可用於預熱任務的副本數量(除了 num.standbys 之外)。
9. metrics.recording.level
此引數控制 Kafka Streams 捕捉的指標的粒度級別。預設為 INFO,但您可以覆寫它為 DEBUG 以獲得更多可見性。
10. num.standby.replicas
此引數控制為每個狀態儲存建立的副本數量。這有助於減少停機時間,因為如果一個有狀態的任務離線,Kafka Streams 可以將工作重新分配給具有複製狀態的其他應用程式例項。
11. num.stream.threads
此引數控制執行 Kafka Streams 任務的執行緒數量。增加執行緒數量可以幫助您充分利用機器上的可用 CPU 資源,並提高效能。
程式碼範例
// 設定 num.stream.threads 為 8
Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
內容解密:
- 使用
Properties物件設定 Kafka Streams 的組態。 - 將
num.stream.threads設定為 8,以允許在 8 個執行緒上平行執行任務。 - 這可以提高應用程式的效能和吞吐量。
12. processing.guarantee
此引數控制處理保證。有三個可選值:
at_least_once:記錄可能會在某些故障情況下被重新傳遞,但永遠不會丟失。exactly_once:使用事務性生產者和嵌入式消費者,記錄被精確處理一次。exactly_once_beta:與exactly_once相比,具有更好的擴放性和減少的開銷。