返回文章列表

Kafka Streams ksqlDB 容器化佈署與應用

本文探討如何使用容器技術佈署和管理 Kafka Streams 和 ksqlDB,包含 Docker 容器化、Kubernetes 協調、Prometheus 監控以及應用程式重置和速率限制等關鍵技術。文章詳細說明瞭組態檔案的掛載、JMX 指標匯出、應用程式重置工具的使用、記錄快取的組態以及 Kafka

串流處理 資料工程

在資料串流應用日益普及的今天,Kafka Streams 和 ksqlDB 成為了建構高效能串流應用程式的熱門選擇。為了提升應用程式的可維護性和可擴充套件性,容器化佈署成為了主流趨勢。本文將探討如何利用 Docker 和 Kubernetes 佈署及管理 Kafka Streams 和 ksqlDB 應用程式,並涵蓋 Prometheus 監控、應用程式重置以及速率限制等重要議題,提供實務操作和程式碼範例,協助開發者建構更穩健的串流應用。

使用容器佈署 Kafka Streams 和 ksqlDB

在生產環境中佈署 Kafka Streams 和 ksqlDB 時,推薦使用容器化技術,如 Docker。容器化提供了輕量級、隔離的執行環境,使得程式碼的管理、分享和維護變得更加容易。

ksqlDB 容器

Confluent 已經發布了官方的 ksqlDB 伺服器和 CLI 容器映像。佈署查詢到生產環境時,可以使用官方的 ksqlDB 伺服器容器映像(confluentinc/ksqldb-server)或根據需要建立衍生映像。

組態 ksqlDB 容器

官方的 ksqlDB 伺服器映像允許透過環境變數指定大多數 ksqlDB 組態屬性。然而,這種方法有一些缺點,例如組態難以版本控制、執行容器的命令可能變得冗長等。因此,建議將組態檔案掛載到容器內部。

例如,將 ksqlDB 伺服器組態儲存到本地檔案 config/server.properties

bootstrap.servers=kafka:9092
ksql.service.id=greeter-prod

然後,可以將此組態檔案掛載到容器內部,並啟動 ksqlDB 伺服器:

docker run \
  --net=chapter-12_default \
  -v "$(pwd)/config":/config \
  -ti confluentinc/ksqldb-server:0.14.0 \
  ksql-server-start /config/server.properties

處理額外的組態檔案

如果組態中包含 ksql.connect.worker.configqueries.file 等屬性,則需要將相關檔案掛載到容器內部。

Kafka Streams 容器

與 ksqlDB 類別似,Kafka Streams 應用程式也可以容器化佈署。雖然文中沒有提供具體的 Kafka Streams 容器範例,但原理是相似的:將應用程式封裝成 Docker 映像,然後在容器中執行。

使用 Prometheus 監控 Kafka Streams 和 ksqlDB

在生產環境中,需要一個強大的監控解決方案來儲存歷史資料、執行查詢和與警示系統整合。Prometheus 是一個推薦的技術。要將指標匯出到 Prometheus,需要:

  1. 下載 Prometheus JMX 匯出器 JAR。
  2. 使用 -javaagent 引數啟動 Kafka Streams 或 ksqlDB。
  3. 組態 Prometheus 以抓取 HTTP 端點上的 JMX 指標。

組態 Prometheus 的範例程式碼

本章的原始碼中包含了一個使用 Prometheus 與 Kafka Streams 和 ksqlDB 的完整範例。

為何匯出 JMX 指標很重要

無論使用 Prometheus 或其他監控系統,將 Kafka Streams 和 ksqlDB 的內建 JMX 指標匯出到外部系統都可以提高應用程式的可觀測性。

程式碼與內容解說

文中提到的 Docker 命令和組態檔案是佈署 ksqlDB 的關鍵步驟。下面是對這些內容的詳細解說:

docker run \
  --net=chapter-12_default \
  -v "$(pwd)/config":/config \
  -ti confluentinc/ksqldb-server:0.14.0 \
  ksql-server-start /config/server.properties
  1. docker run: 這是 Docker 用於啟動新容器的命令。
  2. --net=chapter-12_default: 指定容器連線到名為 chapter-12_default 的 Docker 網路,這使得容器可以與其他在同一個網路中的容器進行通訊。
  3. -v "$(pwd)/config":/config: 將主機上的當前目錄下的 config 資料夾掛載到容器的 /config 目錄。這樣,容器就可以存取主機上的組態檔案。
  4. -ti confluentinc/ksqldb-server:0.14.0: 指定要執行的 Docker 映像名稱和版本,並分配一個偽TTY,以保持容器的互動性。
  5. ksql-server-start /config/server.properties: 這是在容器內執行的命令,用於啟動 ksqlDB 伺服器,並指定使用 /config/server.properties 組態檔案。

組態檔案的作用

組態檔案 server.properties 中包含了 ksqlDB 的組態屬性,例如:

bootstrap.servers=kafka:9092
ksql.service.id=greeter-prod
  1. bootstrap.servers: 指定 Kafka 叢集的引導伺服器地址。
  2. ksql.service.id: 為 ksqlDB 例項指定一個唯一的服務ID。

為何需要掛載組態檔案

掛載組態檔案可以避免將敏感資訊或特定環境的組態寫死在 Docker 映像中,提高了佈署的靈活性。

將Kafka Streams應用程式容器化與佈署

在現代化的軟體開發流程中,將應用程式容器化已經成為了一種標準做法。對於Kafka Streams應用程式而言,容器化不僅能夠簡化佈署流程,還能提高應用的可移植性和可擴充套件性。本篇文章將探討如何使用Jib將Kafka Streams應用程式容器化,以及如何透過容器協調工具(如Kubernetes)進行佈署和管理。

使用Jib容器化Kafka Streams應用程式

Jib是一個由Google開發的開源工具,它簡化了Java應用程式的容器化過程。透過將Jib外掛新增到我們的Gradle構建檔案中,我們可以輕鬆地將Kafka Streams應用程式封裝成Docker映像。

首先,在build.gradle檔案中新增Jib外掛:

plugins {
    id 'com.google.cloud.tools.jib' version '2.1.0'
}

jib {
    to {
        image = 'magicalpipelines/myapp:0.1.0'
    }
    container {
        jvmFlags = []
        mainClass = application.mainClassName
        format = 'OCI'
    }
}

接下來,執行以下命令來構建Docker映像:

./gradlew jibDockerBuild

這個命令會在本地Docker守護程式中構建映像,而不會將其推播到容器登入檔中。成功執行後,您應該會看到類別似以下的輸出:

Built image to Docker daemon as magicalpipelines/myapp:0.1.0

內容解密:

  1. Jib外掛組態:透過在build.gradle中新增Jib外掛,我們能夠利用Jib的功能來容器化我們的Java應用程式。
  2. jib區塊組態:在jib區塊中,我們指定了輸出映像的名稱和版本,以及容器的相關組態,如JVM引數和主類別。
  3. 構建Docker映像:執行./gradlew jibDockerBuild命令後,Jib會根據我們的組態構建Docker映像,並將其載入到本地Docker守護程式中。

容器協調與Kubernetes

對於生產環境中的Kafka Streams應用程式,建議使用容器協調系統(如Kubernetes)來管理和佈署容器。Kubernetes提供了多項好處,包括自動容錯移轉、簡易的擴充套件性、以及對基礎設施的抽象化。

Kubernetes的好處

  • 自動容錯移轉:當節點故障時,Kubernetes能夠自動將容器遷移到健康的節點上。
  • 簡易的擴充套件性:透過增加容器的副本數量,可以輕鬆地擴充套件工作負載。
  • 抽象化的基礎設施:Kubernetes提供了一個抽象層,讓您能夠在不同的基礎設施上執行應用程式。

重置Kafka Streams應用程式

在某些情況下,您可能需要重置Kafka Streams應用程式,例如當發現系統中的錯誤並需要重新處理Kafka主題中的資料時。Kafka提供了一個應用程式重置工具,可以用於此目的。

使用應用程式重置工具

該工具可以:

  • 更新源主題上的消費者偏移量到指定的位置。
  • 跳過中間主題的末尾。
  • 刪除內部的變更日誌和重新分割槽主題。
內容解密:
  1. 重置工具的功能:該工具提供了多種功能來重置Kafka Streams應用程式的狀態。
  2. 使用注意事項:在使用該工具時,需要特別小心,以確保正確識別應用程式的消費者群組。

Kafka Streams 應用程式重置與速率限制輸出

在使用 Kafka Streams 處理資料流時,我們可能會遇到需要重置應用程式狀態或限制輸出速率的情況。本文將介紹如何使用 Kafka Streams 的重置工具以及如何利用記錄快取來限制輸出速率。

重置 Kafka Streams 應用程式

在使用 Kafka Streams 處理有狀態的應用程式時,需要特別小心,因為重置工具不會重置應用程式的狀態。以下是重置 Kafka Streams 應用程式的步驟:

  1. 停止所有應用程式例項:在繼續下一步之前,請確保消費者群組已停止。您可以執行以下命令來檢查群組是否處於非活動狀態:

kafka-consumer-groups
–bootstrap-server kafka:9092
–describe
–group dev
–state

   更新 bootstrap 伺服器為其中一個代理伺服器的主機/埠對。群組由 Kafka Streams 中的 `application.id` 設定指定。

2. **執行應用程式重置工具**:執行以下命令以重置應用程式:
   ```bash
kafka-streams-application-reset \
--application-id dev \
--bootstrap-servers kafka:9092 \
--input-topics users \
--to-earliest

輸出結果將類別似於以下內容:

Reset-offsets for input topics [users]
Following input topics offsets will be reset to (for consumer group dev)
Topic: users Partition: 3 Offset: 0
Topic: users Partition: 2 Offset: 0
Topic: users Partition: 1 Offset: 0
Topic: users Partition: 0 Offset: 0
Done.
Deleting all internal/auto-created topics for application dev
Done.
  1. 清理應用程式狀態:如果您的應用程式是有狀態的,則需要在重新啟動之前重置應用程式的狀態。您可以手動刪除每個狀態目錄,或在程式碼中呼叫 KafkaStreams.cleanUp 方法(僅適用於 Kafka Streams)。

限制 Kafka Streams 應用程式的輸出速率

Kafka Streams 和 ksqlDB 能夠實作高吞吐量,但下游系統可能無法跟上輸出速率。在這種情況下,可以使用記錄快取來限制輸出速率。

記錄快取的工作原理

記錄快取可以減少寫入狀態儲存的輸出記錄數量,以及轉發到下游處理器的記錄數量。每個串流執行緒(由 num.stream.threads 引數控制)都會分配總快取大小的均等份額。

使用記錄快取限制輸出速率

假設我們有一個拓撲結構,需要計算每個鍵的訊息數量。如果我們有以下事件序列:

  • <key1, value1>
  • <key1, value2>
  • <key1, value3>

在未停用記錄快取的情況下(即 cache.max.bytes.buffering 設定為非零值),Kafka Streams 和 ksqlDB 可以將原始的聚合序列簡化為單一輸出:

  • <key1, 3>

這可以減少下游處理器需要處理的資料量。

升級 Kafka Streams

Apache Kafka(包括 Kafka Streams)採用根據時間的釋出計劃。因此,您應該預期每四個月就會發布一個新的 Kafka Streams 版本。版本策略如下所示:

major.minor.bug-fix

大多數情況下,四個月的釋出週期適用於次要版本。在極少數情況下,您可能會看到主要版本的更新,以適應更重大的變更或專案的重要里程碑。錯誤修復很常見,可以隨時發生。

重置工具與記錄快取的使用範例程式碼

// 使用 KafkaStreams.cleanUp 方法清理應用程式狀態
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();

// 設定記錄快取大小
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760); // 10 MB

此圖示說明瞭 Kafka Streams 應用程式重置和速率限制輸出的流程

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Kafka Streams ksqlDB 容器化佈署與應用

package "Kubernetes Cluster" {
    package "Control Plane" {
        component [API Server] as api
        component [Controller Manager] as cm
        component [Scheduler] as sched
        database [etcd] as etcd
    }

    package "Worker Nodes" {
        component [Kubelet] as kubelet
        component [Kube-proxy] as proxy
        package "Pods" {
            component [Container 1] as c1
            component [Container 2] as c2
        }
    }
}

api --> etcd : 儲存狀態
api --> cm : 控制迴圈
api --> sched : 調度決策
api --> kubelet : 指令下達
kubelet --> c1
kubelet --> c2
proxy --> c1 : 網路代理
proxy --> c2

note right of api
  核心 API 入口
  所有操作經由此處
end note

@enduml

此圖示展示了重置 Kafka Streams 應用程式和限制輸出速率的主要步驟。

Kafka Streams 與 ksqlDB 的升級與組態管理

在生產環境中升級 Kafka Streams 和 ksqlDB 需要謹慎規劃,因為新版本的升級可能會引入不相容的變更或需要特定的升級步驟。

升級 Kafka Streams

升級 Kafka Streams 時,務必遵循官方 Kafka 網站發布的升級。某些情況下,可能需要設定 upgrade.from 引數到舊版本,然後進行一系列的重啟,以確保升級的安全性。

取得新版本資訊

有多種方式可以讓你及時瞭解 Kafka 和 ksqlDB 的新版本資訊:

  • 訂閱官方郵件列表以接收版本發布公告。
  • 關注 Apache Kafka 的官方 GitHub 倉函式庫,並選擇「Releases Only」選項以接收版本更新通知。
  • 在 Twitter 上關注 @apachekafka。

升級 ksqlDB

ksqlDB 的升級路徑可能因版本而異,因為它在達到版本 1.0 之前可能會包含破壞性變更。官方檔案建議在升級前仔細閱讀升級,並考慮延遲升級直到新的版本帶來需要的功能或修復,或者直到 ksqlDB 達到版本 1.0 並承諾向後相容性。

組態管理

在開發 Kafka Streams 應用程式時,可以透過建立 Properties 例項並手動設定各種組態引數來進行組態。然而,在將應用程式佈署到生產環境時,建議從檔案中載入組態,以避免硬編碼值直接在應用程式中。這樣可以減少錯誤,並使得管理多個佈署變得更加容易。

Kafka Streams 組態屬性

組態 Kafka Streams 應用程式時,需要設定兩個必要的引數:

  • application.id:用於標識 Kafka Streams 應用程式的唯一識別碼。
  • bootstrap.servers:Kafka 代理的主機和埠對列表,用於建立與 Kafka 叢集的連線。

此外,還有多個可選引數,例如 acceptable.recovery.lag,用於指定 Kafka Streams 任務在輸入分割槽上的最大延遲,以便被視為“預熱”並準備就緒。

Kafka Streams 設定引數詳解

Kafka Streams 提供多種設定引數,用於控制應用程式的行為和效能。以下是一些重要的設定引數:

1. acceptable.recovery.lag

此引數用於控制 Kafka Streams 在分配任務給應用程式例項之前,允許的最大延遲。當一個有狀態的應用程式例項需要還原部分或全部狀態時,您可能不希望 Kafka Streams 在還原期間分配工作給它。如果是這種情況,您可以設定此引數,以允許應用程式例項僅在延遲低於此閾值時才接收任務分配。

2. cache.max.bytes.buffering

此引數控制 Kafka Streams 中的記錄快取大小。將此值設定為大於 0 的值將啟用記錄快取,並可以對應用程式的輸出進行速率限制。

3. default.deserialization.exception.handler

此引數指定用於處理反序列化錯誤的類別。內建的選項包括 LogAndContinueExceptionHandlerLogAndFailExceptionHandler。前者允許 Kafka Streams 在發生反序列化錯誤時繼續處理記錄,而後者將導致 Kafka Streams 記錄異常並停止處理。

4. default.production.exception.handler

此引數指定用於處理與生成資料到 Kafka 相關的錯誤的類別。例如,如果記錄太大,則底層的生產者將丟擲一個需要以某種方式處理的異常。預設情況下,使用內建的 DefaultProductionExceptionHandler 類別,它將導致 Kafka Streams 失敗並關閉。

5. default.timestamp.extractor

此引數指定用於將給定記錄與時間戳相關聯的類別。

6. default.key.serdedefault.value.serde

這些引數指定用於序列化和反序列化記錄鍵和值的預設類別。

程式碼範例

KStream<byte[], Tweet> stream = builder.stream(
    "tweets",
    Consumed.with(Serdes.ByteArray(), JsonSerdes.Tweet()));

在這個範例中,鍵的 Serdes(Serdes.ByteArray())和值的 Serdes(JsonSerdes.Tweet())被內聯定義。

內容解密:

  1. Serdes.ByteArray() 用於將鍵序列化為位元組陣列。
  2. JsonSerdes.Tweet() 用於將值序列化為 JSON 格式的 Tweet 物件。
  3. 使用 Consumed.with() 方法指定鍵和值的 Serdes。

7. max.task.idle.ms

此引數控制流任務等待所有分割區緩衝區包含資料的最大時間。較高的值可能會增加延遲,但可以防止任務從多個輸入分割區讀取資料時出現亂序資料。

8. max.warmup.replicas

此引數控制可用於預熱任務的副本數量(除了 num.standbys 之外)。

9. metrics.recording.level

此引數控制 Kafka Streams 捕捉的指標的粒度級別。預設為 INFO,但您可以覆寫它為 DEBUG 以獲得更多可見性。

10. num.standby.replicas

此引數控制為每個狀態儲存建立的副本數量。這有助於減少停機時間,因為如果一個有狀態的任務離線,Kafka Streams 可以將工作重新分配給具有複製狀態的其他應用程式例項。

11. num.stream.threads

此引數控制執行 Kafka Streams 任務的執行緒數量。增加執行緒數量可以幫助您充分利用機器上的可用 CPU 資源,並提高效能。

程式碼範例

// 設定 num.stream.threads 為 8
Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);

內容解密:

  1. 使用 Properties 物件設定 Kafka Streams 的組態。
  2. num.stream.threads 設定為 8,以允許在 8 個執行緒上平行執行任務。
  3. 這可以提高應用程式的效能和吞吐量。

12. processing.guarantee

此引數控制處理保證。有三個可選值:

  • at_least_once:記錄可能會在某些故障情況下被重新傳遞,但永遠不會丟失。
  • exactly_once:使用事務性生產者和嵌入式消費者,記錄被精確處理一次。
  • exactly_once_beta:與 exactly_once 相比,具有更好的擴放性和減少的開銷。