返回文章列表

Kafka 工具平台與生態系統深度解析

深入探討 Apache Kafka 生態系統中的各種工具和平台,涵蓋託管服務、叢集管理、監控工具、客戶端程式庫與串流處理框架,提供完整的 Kafka 工具選型指南與實務應用範例

大資料 串流處理

Apache Kafka 作為現代分散式串流處理的核心基礎設施,已經從最初的訊息佇列系統演進成為一個完整的事件串流平台。隨著 Kafka 在各種規模企業中的廣泛採用,圍繞著它發展出了一個豐富而多樣的生態系統,包含了託管服務、管理工具、監控解決方案以及各種語言的客戶端程式庫。這個生態系統的蓬勃發展不僅反映了 Kafka 本身的成熟度,也展現了社群對於降低使用門檻與提升開發體驗的持續努力。

對於剛接觸 Kafka 的團隊而言,面對如此豐富的工具選擇可能會感到無所適從。每種工具都有其特定的使用場景與優缺點,選擇不當可能導致額外的維運負擔或功能限制。因此,深入了解各種工具的特性、適用情境與整合方式,對於建構穩健可靠的串流處理系統至關重要。本文將系統性地介紹 Kafka 生態系統中的主要工具與平台,從託管服務到自建叢集管理,從效能監控到資料探索,協助讀者根據自身需求做出明智的技術選型決策。

Kafka 生態系統架構概覽

在深入探討個別工具之前,我們需要先理解 Kafka 生態系統的整體架構與各組件之間的關係。一個完整的 Kafka 部署通常包含多個相互協作的元件,每個元件都有對應的工具與服務來支援其運作。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "Kafka 生態系統架構" {

    package "託管服務層" {
        [Confluent Cloud] as CC
        [Amazon MSK] as MSK
        [Azure HDInsight] as HDI
        [Aiven] as AV
    }

    package "核心基礎設施" {
        [Kafka Broker 叢集] as KB
        [ZooKeeper / KRaft] as ZK
        [Schema Registry] as SR
    }

    package "資料整合層" {
        [Kafka Connect] as KC
        [REST Proxy] as RP
        [Source Connectors] as SC
        [Sink Connectors] as SK
    }

    package "串流處理層" {
        [Kafka Streams] as KS
        [Apache Flink] as FL
        [Apache Spark] as SP
        [Apache Samza] as SM
    }

    package "管理監控層" {
        [AKHQ] as AK
        [Cruise Control] as CRC
        [Burrow] as BU
        [Xinfra Monitor] as XM
    }

    package "客戶端層" {
        [Java Client] as JC
        [librdkafka] as LR
        [Sarama Go] as SG
        [kafka-python] as KP
    }
}

CC --> KB
MSK --> KB
HDI --> KB
AV --> KB

KB --> ZK
KB --> SR

KC --> KB
RP --> KB
SC --> KC
SK --> KC

KS --> KB
FL --> KB
SP --> KB
SM --> KB

AK --> KB
CRC --> KB
BU --> KB
XM --> KB

JC --> KB
LR --> KB
SG --> KB
KP --> KB

@enduml

這個架構圖展示了 Kafka 生態系統的分層結構。最底層是核心基礎設施,包含 Kafka Broker 叢集與協調服務。在此之上是資料整合層,負責連接外部系統與 Kafka。串流處理層提供了各種框架來處理流經 Kafka 的資料。管理監控層則確保整個系統的健康運作。而客戶端層則是應用程式與 Kafka 互動的介面。

理解這個分層架構有助於我們在選擇工具時考慮各層之間的相容性與整合需求。例如,選擇了特定的託管服務可能會影響可用的監控工具選項,而選擇的串流處理框架則會決定客戶端程式庫的選擇。

託管 Kafka 平台深度比較

對於許多組織而言,自行管理 Kafka 叢集需要投入大量的人力與專業知識。託管 Kafka 平台提供了一個更為便捷的選擇,讓團隊能夠專注於應用程式開發而非基礎設施維運。然而,各家託管服務在功能、整合性與定價上都有顯著差異,選擇時需要仔細評估。

Confluent Cloud 全方位解決方案

Confluent Cloud 是由 Kafka 原始開發團隊創辦的公司所提供的託管服務,因此在功能完整性與技術領先性上具有明顯優勢。這個平台不僅提供基本的 Kafka 叢集託管,還整合了 Schema Registry、ksqlDB、Kafka Connect 以及完整的監控解決方案,形成一個端到端的串流處理平台。

Confluent Cloud 的一大特色是其跨雲端的支援能力,可以在 AWS、Azure 與 Google Cloud Platform 三大公有雲上部署,並且提供跨區域與跨雲端的資料複製功能。這對於需要多雲策略或災難復原的企業特別有價值。

在安全性方面,Confluent Cloud 提供了完整的 RBAC(Role-Based Access Control)支援,可以精細控制使用者對 Topic、Consumer Group 與 Cluster 的存取權限。此外,它還支援 Private Link 連接,確保資料傳輸不經過公共網際網路。

以下是使用 Confluent Cloud Java Client 連接的範例程式碼:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ConfluentCloudProducer {

    public static void main(String[] args) {
        // 建立 Producer 組態屬性物件
        // 這個物件將包含所有連接 Confluent Cloud 所需的設定
        Properties props = new Properties();

        // 設定 Confluent Cloud 叢集的 Bootstrap Server 位址
        // 此位址可從 Confluent Cloud Console 取得
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "pkc-xxxxx.region.provider.confluent.cloud:9092");

        // 設定安全協定為 SASL_SSL
        // 這確保連接同時使用 SASL 認證與 SSL/TLS 加密
        props.put("security.protocol", "SASL_SSL");

        // 設定 SASL 機制為 PLAIN
        // Confluent Cloud 使用 API Key 作為認證方式
        props.put("sasl.mechanism", "PLAIN");

        // 設定 SASL JAAS 組態
        // 包含 API Key 與 API Secret 作為使用者名稱與密碼
        String jaasConfig = String.format(
            "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username='%s' password='%s';",
            System.getenv("CONFLUENT_API_KEY"),      // 從環境變數讀取 API Key
            System.getenv("CONFLUENT_API_SECRET")   // 從環境變數讀取 API Secret
        );
        props.put("sasl.jaas.config", jaasConfig);

        // 設定 Key 序列化器
        // 使用 StringSerializer 將訊息 Key 轉換為位元組陣列
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());

        // 設定 Value 序列化器
        // 使用 StringSerializer 將訊息 Value 轉換為位元組陣列
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());

        // 設定確認機制為 all
        // 這表示訊息必須被所有同步副本確認後才算發送成功
        // 這是最高級別的持久性保證
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        // 啟用冪等性 Producer
        // 確保即使發生重試也不會產生重複訊息
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // 設定重試次數
        // 當發送失敗時最多重試的次數
        props.put(ProducerConfig.RETRIES_CONFIG, 3);

        // 設定批次大小為 16KB
        // Producer 會將多個小訊息批次處理以提高吞吐量
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // 設定延遲時間為 5 毫秒
        // Producer 會等待這段時間來累積更多訊息進行批次發送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);

        // 使用 try-with-resources 建立 Producer 實例
        // 確保 Producer 在使用完畢後正確關閉
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {

            // 定義要發送的 Topic 名稱
            String topic = "user-events";

            // 發送 10 筆測試訊息
            for (int i = 0; i < 10; i++) {
                // 建立訊息 Key,使用使用者 ID 作為分區依據
                String key = "user-" + (i % 3);

                // 建立訊息 Value,包含事件資料
                String value = String.format(
                    "{\"userId\": \"%s\", \"action\": \"click\", \"timestamp\": %d}",
                    key, System.currentTimeMillis()
                );

                // 建立 ProducerRecord 物件
                // 指定 Topic、Key 與 Value
                ProducerRecord<String, String> record =
                    new ProducerRecord<>(topic, key, value);

                // 同步發送訊息並等待結果
                // get() 方法會阻塞直到收到 Broker 的確認
                RecordMetadata metadata = producer.send(record).get();

                // 輸出發送結果
                System.out.printf("訊息已發送至 Topic: %s, Partition: %d, Offset: %d%n",
                    metadata.topic(),
                    metadata.partition(),
                    metadata.offset());
            }

            // 確保所有緩衝的訊息都已發送
            producer.flush();

            System.out.println("所有訊息發送完成");

        } catch (InterruptedException | ExecutionException e) {
            // 處理發送過程中可能發生的例外
            System.err.println("訊息發送失敗: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

Amazon MSK 與 AWS 生態整合

Amazon Managed Streaming for Apache Kafka(Amazon MSK)是 AWS 提供的託管 Kafka 服務,其最大優勢在於與 AWS 生態系統的深度整合。如果您的基礎設施主要建構在 AWS 上,MSK 提供了無縫的整合體驗。

MSK 支援與 AWS Glue Schema Registry 的整合,提供 Schema 管理功能。它也可以與 Amazon CloudWatch 整合進行監控,並支援 AWS IAM 進行身份驗證與授權。此外,MSK 還提供了 MSK Connect 功能,這是 Kafka Connect 的託管版本,可以輕鬆部署 Connector。

值得注意的是,MSK 提供了兩種部署選項:MSK Provisioned 與 MSK Serverless。前者提供更多的控制權限與組態彈性,適合有特定效能需求的工作負載;後者則是完全無伺服器的選項,自動擴展且按使用量計費,適合變動負載或剛開始使用 Kafka 的團隊。

以下是使用 Python 連接 Amazon MSK 的範例:

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import ssl
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

class MSKClient:
    """
    Amazon MSK 客戶端封裝類別

    這個類別封裝了連接 Amazon MSK 所需的組態,
    並提供簡化的 Producer 與 Consumer 介面
    """

    def __init__(self, bootstrap_servers: str, region: str):
        """
        初始化 MSK 客戶端

        Args:
            bootstrap_servers: MSK 叢集的 Bootstrap Server 位址
            region: AWS 區域,例如 'ap-northeast-1'
        """
        # 儲存 Bootstrap Server 位址
        self.bootstrap_servers = bootstrap_servers

        # 儲存 AWS 區域
        self.region = region

        # 建立 SSL Context 用於加密連接
        # 這確保客戶端與 MSK 之間的通訊是加密的
        self.ssl_context = ssl.create_default_context()

    def _get_auth_token(self) -> tuple:
        """
        取得 IAM 認證 Token

        使用 MSK IAM SASL Signer 產生認證 Token
        這個 Token 用於 SASL/OAUTHBEARER 認證

        Returns:
            tuple: (token, expiry_time)
        """
        # 建立 Token Provider
        # 這會使用 AWS 認證鏈取得憑證
        auth_token_provider = MSKAuthTokenProvider(
            region=self.region,
            # 可選:指定 AWS Profile
            # aws_profile='your-profile'
        )

        # 產生 Token
        return auth_token_provider.generate_auth_token()

    def create_producer(self) -> KafkaProducer:
        """
        建立 Kafka Producer 實例

        Returns:
            KafkaProducer: 已組態的 Producer 實例
        """
        # 定義 OAuth Token 回呼函式
        # Kafka 客戶端會在需要認證時呼叫這個函式
        def oauth_cb(oauth_config):
            token, _ = self._get_auth_token()
            return token, token

        # 建立並返回 Producer 實例
        return KafkaProducer(
            # Bootstrap Server 位址
            bootstrap_servers=self.bootstrap_servers,

            # 安全協定:SASL + SSL
            security_protocol='SASL_SSL',

            # SASL 機制:使用 OAuth Bearer Token
            sasl_mechanism='OAUTHBEARER',

            # OAuth Token 回呼函式
            sasl_oauth_token_provider=type(
                'TokenProvider', (), {
                    'token': lambda self: oauth_cb(None)
                }
            )(),

            # SSL Context
            ssl_context=self.ssl_context,

            # Value 序列化器:將 Python 物件轉換為 JSON 字串再編碼為 bytes
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),

            # Key 序列化器:將字串編碼為 bytes
            key_serializer=lambda k: k.encode('utf-8') if k else None,

            # 確認機制:等待所有同步副本確認
            acks='all',

            # 重試次數
            retries=5,

            # 重試間隔時間(毫秒)
            retry_backoff_ms=500,

            # 請求逾時時間(毫秒)
            request_timeout_ms=30000,

            # 批次大小(bytes)
            batch_size=32768,

            # 延遲時間(毫秒)
            linger_ms=10
        )

    def create_consumer(self, group_id: str, topics: list) -> KafkaConsumer:
        """
        建立 Kafka Consumer 實例

        Args:
            group_id: Consumer Group ID
            topics: 要訂閱的 Topic 列表

        Returns:
            KafkaConsumer: 已組態的 Consumer 實例
        """
        # 定義 OAuth Token 回呼函式
        def oauth_cb(oauth_config):
            token, _ = self._get_auth_token()
            return token, token

        # 建立 Consumer 實例
        consumer = KafkaConsumer(
            # Bootstrap Server 位址
            bootstrap_servers=self.bootstrap_servers,

            # 安全協定
            security_protocol='SASL_SSL',

            # SASL 機制
            sasl_mechanism='OAUTHBEARER',

            # OAuth Token Provider
            sasl_oauth_token_provider=type(
                'TokenProvider', (), {
                    'token': lambda self: oauth_cb(None)
                }
            )(),

            # SSL Context
            ssl_context=self.ssl_context,

            # Consumer Group ID
            group_id=group_id,

            # Value 反序列化器
            value_deserializer=lambda v: json.loads(v.decode('utf-8')),

            # Key 反序列化器
            key_deserializer=lambda k: k.decode('utf-8') if k else None,

            # 自動提交 Offset
            enable_auto_commit=True,

            # 自動提交間隔(毫秒)
            auto_commit_interval_ms=5000,

            # 當沒有初始 Offset 時從最早的訊息開始讀取
            auto_offset_reset='earliest',

            # 每次 poll 最多取得的訊息數量
            max_poll_records=500,

            # Session 逾時時間(毫秒)
            session_timeout_ms=45000,

            # 心跳間隔時間(毫秒)
            heartbeat_interval_ms=15000
        )

        # 訂閱指定的 Topics
        consumer.subscribe(topics)

        return consumer

def main():
    """
    主程式:示範如何使用 MSK Client
    """
    # MSK 叢集的 Bootstrap Server 位址
    # 這個位址可從 AWS Console 或 AWS CLI 取得
    bootstrap_servers = 'b-1.msk-cluster.xxxxx.kafka.ap-northeast-1.amazonaws.com:9098'

    # AWS 區域
    region = 'ap-northeast-1'

    # 建立 MSK Client 實例
    msk_client = MSKClient(bootstrap_servers, region)

    # 建立 Producer
    producer = msk_client.create_producer()

    # 發送測試訊息
    topic = 'test-topic'
    for i in range(5):
        # 建立訊息
        message = {
            'event_id': f'evt-{i}',
            'event_type': 'test',
            'payload': {'index': i}
        }

        # 發送訊息
        future = producer.send(topic, key=f'key-{i}', value=message)

        try:
            # 等待發送完成
            record_metadata = future.get(timeout=10)
            print(f'訊息已發送至 {record_metadata.topic} '
                  f'partition {record_metadata.partition} '
                  f'offset {record_metadata.offset}')
        except KafkaError as e:
            print(f'發送失敗: {e}')

    # 關閉 Producer
    producer.close()

    # 建立 Consumer
    consumer = msk_client.create_consumer(
        group_id='test-consumer-group',
        topics=[topic]
    )

    # 消費訊息
    print('開始消費訊息...')
    try:
        for message in consumer:
            print(f'收到訊息: topic={message.topic}, '
                  f'partition={message.partition}, '
                  f'offset={message.offset}, '
                  f'key={message.key}, '
                  f'value={message.value}')
    except KeyboardInterrupt:
        print('停止消費')
    finally:
        consumer.close()

if __name__ == '__main__':
    main()

Azure HDInsight 與 Cloudera CDP

Azure HDInsight 是微軟提供的託管大數據平台,除了 Kafka 之外還支援 Hadoop、Spark、HBase 等多種元件。這對於需要完整大數據解決方案的企業是一個吸引人的選項,因為可以在同一個平台上管理多種工作負載。

HDInsight 的 Kafka 服務與 Azure 生態系統有良好的整合,包括 Azure Active Directory 用於身份驗證、Azure Monitor 用於監控,以及 Azure Storage 用於長期資料保存。然而,HDInsight 主要專注於核心 Kafka 功能,Schema Registry 與 REST Proxy 等進階元件需要使用者自行部署。

Cloudera 則提供了另一種選擇,其 Cloudera Data Platform(CDP)包含了完整的資料管理功能。CDP 不僅提供 Kafka 託管服務,還整合了資料目錄、資料治理與機器學習平台。對於需要完整資料生命週期管理的企業,Cloudera 提供了一站式解決方案。

Aiven 開源友善的選擇

Aiven 是一家專注於開源資料庫與串流平台託管的公司,除了 Kafka 之外還提供 PostgreSQL、MySQL、Redis、Elasticsearch 等多種服務的託管。Aiven 的一大特色是其對開源的承諾,其開發的 Karapace 專案提供了 Schema Registry 與 REST Proxy 功能,並採用 Apache 2.0 授權,沒有使用限制。

Aiven 支援多個雲端平台與區域,並提供跨區域的資料複製功能。其定價模式相對透明,按照節點規格與使用時間計費,沒有隱藏的資料傳輸費用。對於重視成本可預測性的團隊,Aiven 是一個值得考慮的選項。

叢集部署與管理工具

對於選擇自行管理 Kafka 叢集的團隊,有多種工具可以協助簡化部署與維運工作。這些工具涵蓋了從初始部署到日常管理的各個層面。

Strimzi Kubernetes 原生部署

Strimzi 是一個 Kubernetes Operator,專門用於在 Kubernetes 環境中部署與管理 Kafka 叢集。它採用宣告式組態方式,讓使用者可以透過 Kubernetes Custom Resources 來定義 Kafka 叢集的規格。

使用 Strimzi 的最大好處是可以充分利用 Kubernetes 的自動化能力,包括自動擴展、滾動更新、自我修復等。這大幅降低了管理 Kafka 叢集的複雜度,特別是在需要管理多個叢集的情況下。

以下是 Strimzi Kafka 叢集的 Custom Resource 定義範例:

# Strimzi Kafka 叢集組態
# 這個 Custom Resource 定義了一個完整的 Kafka 叢集
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  # 叢集名稱
  name: production-cluster
  # 部署的命名空間
  namespace: kafka
spec:
  kafka:
    # Kafka 版本
    version: 3.6.0
    # Broker 副本數量
    replicas: 3
    # 監聽器組態
    listeners:
      # 內部明文監聽器
      - name: plain
        port: 9092
        type: internal
        tls: false
      # 內部 TLS 監聽器
      - name: tls
        port: 9093
        type: internal
        tls: true
      # 外部 LoadBalancer 監聽器
      - name: external
        port: 9094
        type: loadbalancer
        tls: true
        # 外部監聽器的認證組態
        authentication:
          type: scram-sha-512
    # Broker 組態
    config:
      # Offset Topic 的副本因子
      offsets.topic.replication.factor: 3
      # Transaction State Log 的副本因子
      transaction.state.log.replication.factor: 3
      # Transaction State Log 的最小 ISR
      transaction.state.log.min.isr: 2
      # 預設的副本因子
      default.replication.factor: 3
      # 最小 ISR
      min.insync.replicas: 2
      # 允許自動建立 Topic
      auto.create.topics.enable: false
      # Log 保留時間(小時)
      log.retention.hours: 168
      # Log 保留大小(bytes)
      log.retention.bytes: 107374182400
    # 儲存組態
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 500Gi
          deleteClaim: false
          class: premium-ssd
    # 資源限制
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "4"
    # JVM 選項
    jvmOptions:
      -Xms: 4096m
      -Xmx: 4096m
    # 機架感知組態
    rack:
      topologyKey: topology.kubernetes.io/zone
    # Metrics 組態
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml

  # ZooKeeper 組態
  zookeeper:
    # ZooKeeper 副本數量
    replicas: 3
    # 儲存組態
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
      class: premium-ssd
    # 資源限制
    resources:
      requests:
        memory: 2Gi
        cpu: "1"
      limits:
        memory: 2Gi
        cpu: "2"
    # Metrics 組態
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: zookeeper-metrics
          key: zookeeper-metrics-config.yml

  # Entity Operator 組態
  # 用於管理 Topic 與 User
  entityOperator:
    topicOperator:
      resources:
        requests:
          memory: 256Mi
          cpu: "0.1"
        limits:
          memory: 512Mi
          cpu: "0.5"
    userOperator:
      resources:
        requests:
          memory: 256Mi
          cpu: "0.1"
        limits:
          memory: 512Mi
          cpu: "0.5"

  # Kafka Exporter 組態
  # 用於匯出 Consumer Group Lag 等指標
  kafkaExporter:
    topicRegex: ".*"
    groupRegex: ".*"
    resources:
      requests:
        memory: 128Mi
        cpu: "0.1"
      limits:
        memory: 256Mi
        cpu: "0.5"

Strimzi 還提供了 Strimzi Kafka Bridge,這是一個 REST Proxy 實作,讓不使用 Kafka 原生協定的應用程式也能與 Kafka 互動。以下是 Kafka Bridge 的部署組態:

# Strimzi Kafka Bridge 組態
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: kafka-bridge
  namespace: kafka
spec:
  # Bridge 副本數量
  replicas: 2
  # Bootstrap Server 位址
  bootstrapServers: production-cluster-kafka-bootstrap:9092
  # HTTP 組態
  http:
    port: 8080
    cors:
      allowedOrigins: "*"
      allowedMethods: "GET,POST,PUT,DELETE,OPTIONS"
  # Producer 組態
  producer:
    config:
      acks: all
      delivery.timeout.ms: 300000
  # Consumer 組態
  consumer:
    config:
      auto.offset.reset: earliest
  # 資源限制
  resources:
    requests:
      memory: 512Mi
      cpu: "0.5"
    limits:
      memory: 1Gi
      cpu: "1"

AKHQ 圖形化管理介面

AKHQ(原名 KafkaHQ)是一個功能強大的 Kafka 叢集管理 Web 介面。它提供了直覺的圖形化介面,讓使用者可以瀏覽 Topic、查看訊息內容、管理 Consumer Group、以及設定 ACL。

AKHQ 支援多叢集管理,可以在同一個介面中切換不同的 Kafka 叢集。它也支援 Schema Registry,可以瀏覽與管理 Schema。此外,AKHQ 還提供了 LDAP 與 OAuth 整合,適合企業環境使用。

以下是 AKHQ 的 Docker Compose 部署組態:

# AKHQ Docker Compose 組態檔
version: '3'
services:
  akhq:
    image: tchiotludo/akhq:latest
    container_name: akhq
    environment:
      # AKHQ 組態
      # 使用 YAML 格式定義
      AKHQ_CONFIGURATION: |
        akhq:
          # 連接組態
          connections:
            # 第一個叢集:開發環境
            dev-cluster:
              properties:
                # Bootstrap Server 位址
                bootstrap.servers: "kafka-dev:9092"
              # Schema Registry 組態
              schema-registry:
                url: "http://schema-registry-dev:8081"
              # Kafka Connect 組態
              connect:
                - name: "connect-dev"
                  url: "http://connect-dev:8083"

            # 第二個叢集:生產環境
            prod-cluster:
              properties:
                bootstrap.servers: "kafka-prod:9092"
                # 安全協定
                security.protocol: SASL_SSL
                sasl.mechanism: SCRAM-SHA-512
                sasl.jaas.config: >-
                  org.apache.kafka.common.security.scram.ScramLoginModule
                  required username="admin" password="${KAFKA_PASSWORD}";
              schema-registry:
                url: "http://schema-registry-prod:8081"
                properties:
                  basic.auth.credentials.source: USER_INFO
                  basic.auth.user.info: "admin:${SR_PASSWORD}"
              connect:
                - name: "connect-prod"
                  url: "http://connect-prod:8083"

          # 安全組態
          security:
            # 預設群組
            default-group: reader
            # 群組定義
            groups:
              admin:
                # 管理員角色
                roles:
                  - topic/read
                  - topic/insert
                  - topic/delete
                  - topic/config/update
                  - node/read
                  - node/config/update
                  - acls/read
                  - acls/insert
                  - acls/delete
                  - connect/read
                  - connect/insert
                  - connect/update
                  - connect/delete
                  - connect/state/update
                  - registry/read
                  - registry/insert
                  - registry/update
                  - registry/delete
                  - registry/version/delete
                  - group/read
                  - group/delete
                  - group/offsets/update
              reader:
                # 唯讀角色
                roles:
                  - topic/read
                  - node/read
                  - acls/read
                  - connect/read
                  - registry/read
                  - group/read

            # 基本認證
            basic-auth:
              - username: admin
                password: "${AKHQ_ADMIN_PASSWORD}"
                passwordHash: null
                groups:
                  - admin
              - username: reader
                password: "${AKHQ_READER_PASSWORD}"
                passwordHash: null
                groups:
                  - reader

    ports:
      # Web 介面連接埠
      - "8080:8080"

    # 環境變數檔案
    env_file:
      - .env

    # 重啟策略
    restart: unless-stopped

    # 資源限制
    deploy:
      resources:
        limits:
          memory: 1G
        reservations:
          memory: 512M

Cruise Control 自動化叢集管理

Cruise Control 是 LinkedIn 開源的 Kafka 叢集管理工具,專門用於處理大規模 Kafka 叢集的自動化管理任務。它最重要的功能是自動重新平衡叢集中的資料,確保各個 Broker 的負載均衡。

當叢集中新增或移除 Broker 時,Cruise Control 可以自動計算最佳的分區分配方案,並執行資料遷移。這個過程考慮了多個因素,包括 Broker 的磁碟使用率、網路流量、以及副本分布等。

Cruise Control 還提供了異常檢測功能,可以識別效能不佳的 Broker 或有問題的 Topic。它也支援設定目標(Goals),例如保持所有 Broker 的 CPU 使用率在 70% 以下,並自動採取行動來達成這些目標。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

title Cruise Control 工作流程

participant "使用者" as User
participant "Cruise Control\nREST API" as CC
participant "Analyzer" as AN
participant "Executor" as EX
participant "Kafka Cluster" as KC

User -> CC: 請求重新平衡
activate CC

CC -> AN: 分析當前狀態
activate AN

AN -> KC: 收集指標資料
KC --> AN: 返回 Broker 負載

AN -> AN: 計算最佳方案
AN --> CC: 返回提案
deactivate AN

CC --> User: 顯示提案預覽
User -> CC: 確認執行

CC -> EX: 執行提案
activate EX

loop 每個分區遷移
    EX -> KC: 遷移分區
    KC --> EX: 遷移完成
    EX -> EX: 節流控制
end

EX --> CC: 執行完成
deactivate EX

CC --> User: 返回結果
deactivate CC

@enduml

JulieOps GitOps 風格管理

JulieOps(前身為 Kafka Topology Builder)採用 GitOps 模式來管理 Kafka 的 Topic 與 ACL。使用者透過 YAML 檔案定義所需的狀態,JulieOps 會自動將實際狀態調整為與定義一致。

這種方式的好處是所有組態都版本控制在 Git 中,可以追蹤變更歷史、進行 Code Review、以及實作 CI/CD 流程。這對於需要嚴格變更控制的企業環境特別有價值。

以下是 JulieOps 的組態範例:

# JulieOps Topology 組態
# 定義 Topic、ACL 與 Schema 的期望狀態

# 專案定義
context: "production"
company: "example-corp"

# 專案列表
projects:
  # 第一個專案:使用者服務
  - name: "user-service"

    # Topic 定義
    topics:
      # 使用者事件 Topic
      - name: "user-events"
        config:
          # 分區數量
          num.partitions: "12"
          # 副本因子
          replication.factor: "3"
          # 保留時間:7 天
          retention.ms: "604800000"
          # 清理策略
          cleanup.policy: "delete"
          # 最小 ISR
          min.insync.replicas: "2"
          # 壓縮類型
          compression.type: "lz4"

        # 資料契約定義
        dataType: "avro"
        schemas:
          # Schema 主體
          value.schema.file: "schemas/user-event.avsc"
          # 相容性級別
          value.compatibility: "BACKWARD"

      # 使用者設定檔 Topic(壓縮)
      - name: "user-profiles"
        config:
          num.partitions: "6"
          replication.factor: "3"
          cleanup.policy: "compact"
          min.cleanable.dirty.ratio: "0.3"
          delete.retention.ms: "86400000"

    # Consumer 定義
    consumers:
      # Consumer 應用程式
      - principal: "User:user-service-consumer"
        group: "user-service-consumer-group"
        topics:
          - "user-events"
          - "user-profiles"

    # Producer 定義
    producers:
      # Producer 應用程式
      - principal: "User:user-service-producer"
        topics:
          - "user-events"
          - "user-profiles"
        # 交易 ID 前綴(用於 Exactly-Once)
        transactionIdPrefix: "user-service-tx"

    # Stream 應用程式定義
    streams:
      - principal: "User:user-aggregator"
        topics:
          read:
            - "user-events"
          write:
            - "user-profiles"
        applicationId: "user-aggregator-app"

  # 第二個專案:訂單服務
  - name: "order-service"

    topics:
      - name: "orders"
        config:
          num.partitions: "24"
          replication.factor: "3"
          retention.ms: "2592000000"  # 30 天
          min.insync.replicas: "2"

      - name: "order-status"
        config:
          num.partitions: "12"
          replication.factor: "3"
          cleanup.policy: "compact"

    consumers:
      - principal: "User:order-processor"
        group: "order-processor-group"
        topics:
          - "orders"

    producers:
      - principal: "User:order-api"
        topics:
          - "orders"
          - "order-status"

# 平台範圍的組態
platform:
  # Schema Registry 組態
  schemaRegistry:
    url: "http://schema-registry:8081"

  # Kafka Connect 組態
  kafkaConnect:
    - name: "connect-cluster"
      url: "http://connect:8083"

監控與資料探索工具

監控是 Kafka 維運中最重要的環節之一。由於 Kafka 處理的是即時資料流,任何效能問題或故障都可能導致嚴重的業務影響。因此,建立完善的監控體系對於確保系統穩定運作至關重要。

Xinfra Monitor 端到端監控

Xinfra Monitor(前身為 Kafka Monitor)是 LinkedIn 開發的端到端監控工具。它透過產生合成資料來測量 Kafka 叢集的延遲、可用性與資料完整性。這種方式可以模擬實際的 Producer 與 Consumer 行為,提供最真實的效能指標。

Xinfra Monitor 會持續發送測試訊息到指定的 Topic,然後消費這些訊息並計算端到端延遲。如果訊息遺失或延遲過高,系統會發出警報。這對於 SLA 監控特別有用,可以確保 Kafka 叢集始終符合服務等級協議。

Burrow Consumer Lag 監控

Burrow 是另一個 LinkedIn 開源的監控工具,專注於 Consumer Group 的 Lag 監控。Consumer Lag 是指 Consumer 落後於 Producer 的訊息數量,是衡量系統健康度的重要指標。

傳統的 Lag 監控通常只看絕對數值,但這往往不夠準確。例如,在高吞吐量時期,一定程度的 Lag 是正常的;而在低吞吐量時期,即使是很小的 Lag 也可能表示有問題。Burrow 採用了更智慧的演算法,考慮 Lag 的變化趨勢來判斷 Consumer 的健康狀態。

Burrow 將 Consumer 狀態分為三種:OK、WARNING 與 ERROR。只有當 Lag 持續增加時才會觸發警報,避免了誤報的問題。

以下是使用 Burrow API 查詢 Consumer 狀態的範例:

import requests
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class ConsumerStatus(Enum):
    """
    Consumer 狀態列舉

    Burrow 定義的三種狀態
    """
    OK = "OK"
    WARNING = "WARN"
    ERROR = "ERR"
    STOPPED = "STOP"
    STALLED = "STALL"

@dataclass
class PartitionStatus:
    """
    分區狀態資料類別
    """
    topic: str
    partition: int
    current_offset: int
    end_offset: int
    lag: int
    status: ConsumerStatus

@dataclass
class ConsumerGroupStatus:
    """
    Consumer Group 狀態資料類別
    """
    cluster: str
    group: str
    status: ConsumerStatus
    total_lag: int
    partitions: List[PartitionStatus]
    max_lag: Optional[PartitionStatus]

class BurrowClient:
    """
    Burrow API 客戶端

    用於查詢 Kafka Consumer Group 的 Lag 狀態
    """

    def __init__(self, burrow_url: str):
        """
        初始化 Burrow 客戶端

        Args:
            burrow_url: Burrow 服務的 URL,例如 'http://burrow:8000'
        """
        # 儲存 Burrow URL,移除尾部斜線
        self.base_url = burrow_url.rstrip('/')

        # 建立 Session 以重用連接
        self.session = requests.Session()

        # 設定預設逾時時間
        self.timeout = 10

    def get_clusters(self) -> List[str]:
        """
        取得所有已註冊的 Kafka 叢集

        Returns:
            叢集名稱列表
        """
        # 發送 GET 請求到 /v3/kafka 端點
        response = self.session.get(
            f'{self.base_url}/v3/kafka',
            timeout=self.timeout
        )

        # 檢查回應狀態
        response.raise_for_status()

        # 解析回應
        data = response.json()

        # 返回叢集列表
        return data.get('clusters', [])

    def get_consumer_groups(self, cluster: str) -> List[str]:
        """
        取得指定叢集中的所有 Consumer Group

        Args:
            cluster: 叢集名稱

        Returns:
            Consumer Group 名稱列表
        """
        response = self.session.get(
            f'{self.base_url}/v3/kafka/{cluster}/consumer',
            timeout=self.timeout
        )

        response.raise_for_status()

        data = response.json()

        return data.get('consumers', [])

    def get_consumer_status(self, cluster: str, group: str) -> ConsumerGroupStatus:
        """
        取得 Consumer Group 的詳細狀態

        Args:
            cluster: 叢集名稱
            group: Consumer Group 名稱

        Returns:
            ConsumerGroupStatus 物件
        """
        # 發送請求到 lag 端點
        response = self.session.get(
            f'{self.base_url}/v3/kafka/{cluster}/consumer/{group}/lag',
            timeout=self.timeout
        )

        response.raise_for_status()

        data = response.json()

        # 取得狀態資訊
        status_data = data.get('status', {})

        # 解析分區狀態
        partitions = []
        for p in status_data.get('partitions', []):
            partition_status = PartitionStatus(
                topic=p.get('topic', ''),
                partition=p.get('partition', 0),
                current_offset=p.get('end', {}).get('offset', 0),
                end_offset=p.get('end', {}).get('lag', 0),
                lag=p.get('end', {}).get('lag', 0),
                status=ConsumerStatus(p.get('status', 'OK'))
            )
            partitions.append(partition_status)

        # 計算總 Lag
        total_lag = sum(p.lag for p in partitions)

        # 找出最大 Lag 的分區
        max_lag_partition = max(partitions, key=lambda p: p.lag) if partitions else None

        # 建立並返回 ConsumerGroupStatus
        return ConsumerGroupStatus(
            cluster=cluster,
            group=group,
            status=ConsumerStatus(status_data.get('status', 'OK')),
            total_lag=total_lag,
            partitions=partitions,
            max_lag=max_lag_partition
        )

    def get_all_consumer_statuses(self, cluster: str) -> Dict[str, ConsumerGroupStatus]:
        """
        取得叢集中所有 Consumer Group 的狀態

        Args:
            cluster: 叢集名稱

        Returns:
            以 Group 名稱為鍵的狀態字典
        """
        # 取得所有 Consumer Group
        groups = self.get_consumer_groups(cluster)

        # 查詢每個 Group 的狀態
        statuses = {}
        for group in groups:
            try:
                status = self.get_consumer_status(cluster, group)
                statuses[group] = status
            except Exception as e:
                # 記錄錯誤但繼續處理其他 Group
                print(f'無法取得 {group} 的狀態: {e}')

        return statuses

    def check_health(self) -> bool:
        """
        檢查 Burrow 服務的健康狀態

        Returns:
            True 表示健康,False 表示不健康
        """
        try:
            response = self.session.get(
                f'{self.base_url}/burrow/admin',
                timeout=self.timeout
            )
            return response.status_code == 200
        except Exception:
            return False

def monitor_consumer_lag(burrow_url: str, cluster: str,
                         lag_threshold: int = 10000,
                         alert_callback=None):
    """
    監控 Consumer Lag 並在超過閾值時發出警報

    Args:
        burrow_url: Burrow 服務 URL
        cluster: 叢集名稱
        lag_threshold: Lag 閾值
        alert_callback: 警報回呼函式
    """
    # 建立 Burrow 客戶端
    client = BurrowClient(burrow_url)

    # 取得所有 Consumer Group 的狀態
    statuses = client.get_all_consumer_statuses(cluster)

    # 檢查每個 Group
    for group_name, status in statuses.items():
        # 檢查狀態
        if status.status in [ConsumerStatus.ERROR, ConsumerStatus.STALLED]:
            message = (
                f'Consumer Group {group_name} 狀態異常: {status.status.value}\n'
                f'Total Lag: {status.total_lag}'
            )
            print(f'[警報] {message}')

            if alert_callback:
                alert_callback(group_name, status)

        # 檢查 Lag 閾值
        elif status.total_lag > lag_threshold:
            message = (
                f'Consumer Group {group_name} Lag 超過閾值\n'
                f'Total Lag: {status.total_lag} (閾值: {lag_threshold})'
            )
            print(f'[警告] {message}')

            if alert_callback:
                alert_callback(group_name, status)

        else:
            print(f'[正常] Consumer Group {group_name}: '
                  f'Status={status.status.value}, Lag={status.total_lag}')

if __name__ == '__main__':
    # 範例使用
    burrow_url = 'http://localhost:8000'
    cluster = 'production'

    # 執行監控
    monitor_consumer_lag(
        burrow_url=burrow_url,
        cluster=cluster,
        lag_threshold=5000
    )

Prometheus 與 Grafana 整合

對於已經使用 Prometheus 與 Grafana 進行監控的團隊,Kafka 可以很好地整合到現有的監控體系中。Kafka Broker 透過 JMX 暴露指標,可以使用 JMX Exporter 將這些指標轉換為 Prometheus 格式。

以下是 JMX Exporter 的組態範例,定義了要收集的 Kafka 指標:

# JMX Exporter 組態
# 用於將 Kafka JMX 指標轉換為 Prometheus 格式

# 開始時是否小寫輸出
lowercaseOutputName: true
lowercaseOutputLabelNames: true

# 規則定義
rules:
  # Broker 指標
  # 訊息輸入速率
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
    name: kafka_server_brokertopicmetrics_messagesin_total
    type: COUNTER
    labels:
      topic: "$1"
    help: "每個 Topic 的訊息輸入總數"

  # 位元組輸入速率
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
    name: kafka_server_brokertopicmetrics_bytesin_total
    type: COUNTER
    labels:
      topic: "$1"
    help: "每個 Topic 的位元組輸入總數"

  # 位元組輸出速率
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
    name: kafka_server_brokertopicmetrics_bytesout_total
    type: COUNTER
    labels:
      topic: "$1"
    help: "每個 Topic 的位元組輸出總數"

  # 請求指標
  # 請求總數
  - pattern: 'kafka.network<type=RequestMetrics, name=RequestsPerSec, request=(.+), version=(.+)><>Count'
    name: kafka_network_requestmetrics_requests_total
    type: COUNTER
    labels:
      request: "$1"
      version: "$2"
    help: "請求總數"

  # 請求延遲
  - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(.+)><>(\w+)'
    name: kafka_network_requestmetrics_totaltime_ms
    type: GAUGE
    labels:
      request: "$1"
      aggregate: "$2"
    help: "請求總處理時間"

  # 分區指標
  # Under Replicated Partitions
  - pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
    name: kafka_server_replicamanager_underreplicatedpartitions
    type: GAUGE
    help: "未完全複製的分區數量"

  # Under Min ISR Partitions
  - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
    name: kafka_server_replicamanager_underminisrpartitioncount
    type: GAUGE
    help: "ISR 小於最小值的分區數量"

  # Offline Partitions
  - pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
    name: kafka_controller_offlinepartitionscount
    type: GAUGE
    help: "離線分區數量"

  # Leader 選舉
  - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
    name: kafka_controller_leaderelection_total
    type: COUNTER
    help: "Leader 選舉總次數"

  # Log 大小
  - pattern: 'kafka.log<type=Log, name=Size, topic=(.+), partition=(.+)><>Value'
    name: kafka_log_size_bytes
    type: GAUGE
    labels:
      topic: "$1"
      partition: "$2"
    help: "分區 Log 大小(位元組)"

  # Consumer Group 指標
  # Group Lag
  - pattern: 'kafka.server<type=FetcherLagMetrics, name=ConsumerLag, clientId=(.+), topic=(.+), partition=(.+)><>Value'
    name: kafka_server_fetcherlagmetrics_consumerlag
    type: GAUGE
    labels:
      clientId: "$1"
      topic: "$2"
      partition: "$3"
    help: "Consumer Lag"

  # ZooKeeper 連接狀態
  - pattern: 'kafka.server<type=SessionExpireListener, name=ZooKeeperDisconnectsPerSec><>Count'
    name: kafka_server_zookeeperdisconnects_total
    type: COUNTER
    help: "ZooKeeper 斷開連接總次數"

  # JVM 指標
  # GC 時間
  - pattern: 'java.lang<type=GarbageCollector, name=(.+)><>CollectionTime'
    name: jvm_gc_collection_seconds_total
    type: COUNTER
    labels:
      gc: "$1"
    help: "GC 總時間"

  # Heap 使用量
  - pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
    name: jvm_memory_heap_used_bytes
    type: GAUGE
    help: "Heap 記憶體使用量"

Streams Explorer 串流應用視覺化

Streams Explorer 是專門用於視覺化 Kafka Streams 與 Kafka Connect 應用程式的工具。它可以自動發現在 Kubernetes 中部署的串流應用程式,並繪製出資料流向圖,讓開發者可以直觀地了解資料是如何在各個應用程式之間流動的。

這個工具特別適合複雜的串流處理架構,其中有多個應用程式串聯處理資料。透過視覺化,可以快速識別瓶頸、了解依賴關係,以及追蹤資料的來源與去向。

客戶端程式庫詳解

雖然 Apache Kafka 官方提供了 Java 客戶端,但在實際專案中,我們經常需要使用其他程式語言。Kafka 生態系統提供了多種語言的客戶端程式庫,各有其特色與適用場景。

librdkafka 高效能 C 實作

librdkafka 是用 C 語言實作的 Kafka 客戶端,以其卓越的效能著稱。許多其他語言的 Kafka 客戶端,包括 Python 的 confluent-kafka、Go 的 confluent-kafka-go、以及 .NET 的 Confluent.Kafka,都是基於 librdkafka 開發的。

使用基於 librdkafka 的客戶端可以獲得接近原生 Java 客戶端的效能,同時享有 librdkafka 優秀的穩定性與活躍的社群支援。librdkafka 採用寬鬆的 BSD 授權,可以在各種商業專案中使用。

以下是使用 confluent-kafka-python(基於 librdkafka)的進階範例:

from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
import json
import socket
from typing import Optional, Callable
import logging

# 設定日誌
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaClientConfig:
    """
    Kafka 客戶端組態類別

    集中管理所有 Kafka 相關的組態設定
    """

    def __init__(self, bootstrap_servers: str,
                 security_protocol: str = 'PLAINTEXT',
                 sasl_mechanism: Optional[str] = None,
                 sasl_username: Optional[str] = None,
                 sasl_password: Optional[str] = None):
        """
        初始化組態

        Args:
            bootstrap_servers: Kafka Broker 位址
            security_protocol: 安全協定(PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
            sasl_mechanism: SASL 機制(PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
            sasl_username: SASL 使用者名稱
            sasl_password: SASL 密碼
        """
        # 基本組態
        self.config = {
            'bootstrap.servers': bootstrap_servers,
            'client.id': f'{socket.gethostname()}-{__name__}',
        }

        # 安全組態
        self.config['security.protocol'] = security_protocol

        if sasl_mechanism:
            self.config['sasl.mechanism'] = sasl_mechanism
            self.config['sasl.username'] = sasl_username
            self.config['sasl.password'] = sasl_password

    def get_producer_config(self, **kwargs) -> dict:
        """
        取得 Producer 組態

        Args:
            **kwargs: 額外的 Producer 組態

        Returns:
            完整的 Producer 組態字典
        """
        # 複製基本組態
        config = self.config.copy()

        # Producer 特定組態
        producer_defaults = {
            # 確認機制:等待所有 ISR 確認
            'acks': 'all',

            # 啟用冪等性
            'enable.idempotence': True,

            # 批次大小(bytes)
            'batch.size': 32768,

            # 延遲時間(ms)
            'linger.ms': 10,

            # 壓縮類型
            'compression.type': 'lz4',

            # 緩衝區記憶體(bytes)
            'buffer.memory': 67108864,

            # 重試次數
            'retries': 2147483647,  # MAX_INT

            # 飛行中請求數(冪等性要求 <= 5)
            'max.in.flight.requests.per.connection': 5,

            # 傳遞逾時(ms)
            'delivery.timeout.ms': 120000,

            # 錯誤回報回呼
            'error_cb': self._error_callback,

            # 統計回呼
            'stats_cb': self._stats_callback,

            # 統計間隔(ms)
            'statistics.interval.ms': 60000,
        }

        # 合併組態
        config.update(producer_defaults)
        config.update(kwargs)

        return config

    def get_consumer_config(self, group_id: str, **kwargs) -> dict:
        """
        取得 Consumer 組態

        Args:
            group_id: Consumer Group ID
            **kwargs: 額外的 Consumer 組態

        Returns:
            完整的 Consumer 組態字典
        """
        config = self.config.copy()

        # Consumer 特定組態
        consumer_defaults = {
            # Consumer Group ID
            'group.id': group_id,

            # 自動 Offset 重設
            'auto.offset.reset': 'earliest',

            # 停用自動提交(建議手動管理)
            'enable.auto.commit': False,

            # Session 逾時(ms)
            'session.timeout.ms': 45000,

            # 心跳間隔(ms)
            'heartbeat.interval.ms': 15000,

            # 最大 Poll 間隔(ms)
            'max.poll.interval.ms': 300000,

            # 每次 Fetch 最小 bytes
            'fetch.min.bytes': 1,

            # Fetch 最大等待時間(ms)
            'fetch.wait.max.ms': 500,

            # 每個分區 Fetch 最大 bytes
            'max.partition.fetch.bytes': 1048576,

            # 錯誤回報回呼
            'error_cb': self._error_callback,

            # 統計回呼
            'stats_cb': self._stats_callback,

            # 統計間隔(ms)
            'statistics.interval.ms': 60000,
        }

        config.update(consumer_defaults)
        config.update(kwargs)

        return config

    def _error_callback(self, err):
        """
        錯誤回呼函式

        處理 Kafka 客戶端的錯誤事件
        """
        logger.error(f'Kafka 錯誤: {err}')

    def _stats_callback(self, stats_json_str):
        """
        統計回呼函式

        處理 Kafka 客戶端的統計資訊
        """
        stats = json.loads(stats_json_str)
        # 這裡可以將統計資訊發送到監控系統
        logger.debug(f'Kafka 統計: tx_msgs={stats.get("txmsgs", 0)}, '
                    f'rx_msgs={stats.get("rxmsgs", 0)}')

class HighThroughputProducer:
    """
    高吞吐量 Producer

    針對批次處理場景最佳化的 Producer 實作
    """

    def __init__(self, config: KafkaClientConfig):
        """
        初始化 Producer

        Args:
            config: Kafka 客戶端組態
        """
        # 取得 Producer 組態
        producer_config = config.get_producer_config(
            # 增加批次大小以提高吞吐量
            batch_size=65536,
            # 增加延遲時間以累積更多訊息
            linger_ms=50,
        )

        # 建立 Producer 實例
        self.producer = Producer(producer_config)

        # 追蹤已發送與已確認的訊息數
        self.sent_count = 0
        self.delivered_count = 0
        self.failed_count = 0

    def _delivery_callback(self, err, msg):
        """
        傳遞結果回呼函式

        當訊息傳遞完成(成功或失敗)時被呼叫

        Args:
            err: 錯誤物件(如果有)
            msg: 訊息物件
        """
        if err:
            self.failed_count += 1
            logger.error(f'訊息傳遞失敗: {err}')
        else:
            self.delivered_count += 1
            logger.debug(f'訊息已傳遞至 {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

    def produce(self, topic: str, value: bytes,
                key: Optional[bytes] = None,
                headers: Optional[dict] = None):
        """
        發送訊息

        Args:
            topic: Topic 名稱
            value: 訊息內容
            key: 訊息 Key
            headers: 訊息標頭
        """
        try:
            # 轉換 headers 格式
            kafka_headers = [(k, v.encode('utf-8') if isinstance(v, str) else v)
                           for k, v in (headers or {}).items()]

            # 發送訊息
            self.producer.produce(
                topic=topic,
                value=value,
                key=key,
                headers=kafka_headers,
                callback=self._delivery_callback
            )

            self.sent_count += 1

            # 定期呼叫 poll 以處理回呼
            # 這是非阻塞的,只處理已完成的傳遞
            self.producer.poll(0)

        except BufferError:
            # 緩衝區已滿,等待並重試
            logger.warning('緩衝區已滿,等待...')
            self.producer.poll(1.0)
            self.producer.produce(
                topic=topic,
                value=value,
                key=key,
                headers=kafka_headers,
                callback=self._delivery_callback
            )
            self.sent_count += 1

    def flush(self, timeout: float = 30.0):
        """
        確保所有訊息都已發送

        Args:
            timeout: 逾時時間(秒)
        """
        remaining = self.producer.flush(timeout)
        if remaining > 0:
            logger.warning(f'{remaining} 個訊息未能在逾時前發送')

    def get_stats(self) -> dict:
        """
        取得發送統計

        Returns:
            統計資料字典
        """
        return {
            'sent': self.sent_count,
            'delivered': self.delivered_count,
            'failed': self.failed_count,
            'pending': self.sent_count - self.delivered_count - self.failed_count
        }

    def close(self):
        """
        關閉 Producer
        """
        self.flush()
        logger.info(f'Producer 已關閉。統計: {self.get_stats()}')

class ReliableConsumer:
    """
    可靠 Consumer

    實作手動 Offset 管理與錯誤處理的 Consumer
    """

    def __init__(self, config: KafkaClientConfig, group_id: str):
        """
        初始化 Consumer

        Args:
            config: Kafka 客戶端組態
            group_id: Consumer Group ID
        """
        # 取得 Consumer 組態
        consumer_config = config.get_consumer_config(group_id)

        # 建立 Consumer 實例
        self.consumer = Consumer(consumer_config)

        # 處理訊息計數
        self.processed_count = 0

    def subscribe(self, topics: list, on_assign: Optional[Callable] = None,
                  on_revoke: Optional[Callable] = None):
        """
        訂閱 Topics

        Args:
            topics: Topic 列表
            on_assign: 分區分配回呼
            on_revoke: 分區撤銷回呼
        """
        def default_on_assign(consumer, partitions):
            logger.info(f'分區已分配: {partitions}')
            if on_assign:
                on_assign(consumer, partitions)

        def default_on_revoke(consumer, partitions):
            logger.info(f'分區已撤銷: {partitions}')
            # 在撤銷前提交 Offset
            consumer.commit(asynchronous=False)
            if on_revoke:
                on_revoke(consumer, partitions)

        self.consumer.subscribe(
            topics,
            on_assign=default_on_assign,
            on_revoke=default_on_revoke
        )

        logger.info(f'已訂閱 Topics: {topics}')

    def consume(self, process_message: Callable,
                batch_size: int = 100,
                poll_timeout: float = 1.0):
        """
        消費訊息

        Args:
            process_message: 訊息處理函式
            batch_size: 批次提交大小
            poll_timeout: Poll 逾時時間
        """
        try:
            while True:
                # Poll 訊息
                msg = self.consumer.poll(poll_timeout)

                if msg is None:
                    # 沒有訊息
                    continue

                if msg.error():
                    # 處理錯誤
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # 分區結尾,正常情況
                        logger.debug(f'到達分區結尾: {msg.topic()} [{msg.partition()}]')
                    else:
                        raise KafkaException(msg.error())
                else:
                    # 處理訊息
                    try:
                        process_message(msg)
                        self.processed_count += 1

                        # 批次提交
                        if self.processed_count % batch_size == 0:
                            self.consumer.commit(asynchronous=False)
                            logger.debug(f'已提交 Offset,處理訊息數: {self.processed_count}')

                    except Exception as e:
                        # 處理訊息時發生錯誤
                        logger.error(f'處理訊息失敗: {e}')
                        # 可以選擇繼續或中斷
                        # 這裡選擇繼續處理下一個訊息

        except KeyboardInterrupt:
            logger.info('收到中斷信號')
        finally:
            self.close()

    def close(self):
        """
        關閉 Consumer
        """
        # 提交最後的 Offset
        self.consumer.commit(asynchronous=False)
        # 關閉 Consumer
        self.consumer.close()
        logger.info(f'Consumer 已關閉。總處理訊息數: {self.processed_count}')

def main():
    """
    主程式:示範高效能 Producer 與 Consumer
    """
    # 建立組態
    config = KafkaClientConfig(
        bootstrap_servers='localhost:9092',
        security_protocol='PLAINTEXT'
    )

    # 示範 Producer
    producer = HighThroughputProducer(config)

    topic = 'test-topic'

    # 發送 1000 個訊息
    for i in range(1000):
        message = json.dumps({
            'id': i,
            'data': f'Message {i}'
        }).encode('utf-8')

        producer.produce(
            topic=topic,
            value=message,
            key=f'key-{i % 10}'.encode('utf-8')
        )

    # 確保所有訊息都已發送
    producer.close()

    # 示範 Consumer
    def process_message(msg):
        """處理訊息"""
        data = json.loads(msg.value().decode('utf-8'))
        logger.info(f'收到訊息: {data}')

    consumer = ReliableConsumer(config, 'test-group')
    consumer.subscribe([topic])

    # 開始消費(這會阻塞直到中斷)
    consumer.consume(process_message, batch_size=50)

if __name__ == '__main__':
    main()

Sarama Go 原生實作

Sarama 是 Shopify 開發的純 Go 語言 Kafka 客戶端。由於是原生 Go 實作,不需要 CGO 依賴,部署更為簡單。Sarama 支援完整的 Kafka 協定,包括 Consumer Group、交易、以及 Admin API。

對於 Go 語言專案,Sarama 是最受歡迎的選擇之一。它的 API 設計符合 Go 的慣例,程式碼風格清晰易讀。以下是使用 Sarama 的範例:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/IBM/sarama"
)

// Message 訊息結構
type Message struct {
	ID        int       `json:"id"`
	Data      string    `json:"data"`
	Timestamp time.Time `json:"timestamp"`
}

// KafkaConfig Kafka 組態結構
type KafkaConfig struct {
	Brokers  []string
	Topic    string
	GroupID  string
	Username string
	Password string
}

// createProducerConfig 建立 Producer 組態
func createProducerConfig(kafkaConfig *KafkaConfig) *sarama.Config {
	// 建立 Sarama 組態物件
	config := sarama.NewConfig()

	// 設定 Kafka 版本
	// 確保與叢集版本相容
	config.Version = sarama.V3_0_0_0

	// Producer 組態
	// 等待所有 ISR 確認寫入
	config.Producer.RequiredAcks = sarama.WaitForAll

	// 啟用成功回傳
	// 這樣才能取得傳送結果
	config.Producer.Return.Successes = true

	// 啟用錯誤回傳
	config.Producer.Return.Errors = true

	// 設定重試次數
	config.Producer.Retry.Max = 5

	// 設定重試間隔
	config.Producer.Retry.Backoff = 100 * time.Millisecond

	// 啟用冪等性
	// 確保不會產生重複訊息
	config.Producer.Idempotent = true

	// 冪等性要求的組態
	config.Net.MaxOpenRequests = 1

	// 設定壓縮
	config.Producer.Compression = sarama.CompressionLZ4

	// 設定批次大小
	config.Producer.Flush.Bytes = 32768

	// 設定批次延遲
	config.Producer.Flush.Frequency = 10 * time.Millisecond

	// 如果有 SASL 認證
	if kafkaConfig.Username != "" {
		config.Net.SASL.Enable = true
		config.Net.SASL.User = kafkaConfig.Username
		config.Net.SASL.Password = kafkaConfig.Password
		config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
		config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
			return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
		}
	}

	return config
}

// AsyncProducer 非同步 Producer 封裝
type AsyncProducer struct {
	producer sarama.AsyncProducer
	topic    string
	wg       sync.WaitGroup

	// 統計
	sentCount      int64
	deliveredCount int64
	failedCount    int64
	mu             sync.Mutex
}

// NewAsyncProducer 建立非同步 Producer
func NewAsyncProducer(kafkaConfig *KafkaConfig) (*AsyncProducer, error) {
	// 建立 Sarama 組態
	config := createProducerConfig(kafkaConfig)

	// 建立 AsyncProducer
	producer, err := sarama.NewAsyncProducer(kafkaConfig.Brokers, config)
	if err != nil {
		return nil, fmt.Errorf("建立 Producer 失敗: %w", err)
	}

	ap := &AsyncProducer{
		producer: producer,
		topic:    kafkaConfig.Topic,
	}

	// 啟動結果處理 Goroutine
	ap.wg.Add(2)
	go ap.handleSuccesses()
	go ap.handleErrors()

	return ap, nil
}

// handleSuccesses 處理成功的傳送
func (ap *AsyncProducer) handleSuccesses() {
	defer ap.wg.Done()

	for msg := range ap.producer.Successes() {
		ap.mu.Lock()
		ap.deliveredCount++
		ap.mu.Unlock()

		log.Printf("訊息已傳送至 %s [%d] @ %d",
			msg.Topic, msg.Partition, msg.Offset)
	}
}

// handleErrors 處理傳送錯誤
func (ap *AsyncProducer) handleErrors() {
	defer ap.wg.Done()

	for err := range ap.producer.Errors() {
		ap.mu.Lock()
		ap.failedCount++
		ap.mu.Unlock()

		log.Printf("訊息傳送失敗: %v", err.Err)
	}
}

// Send 發送訊息
func (ap *AsyncProducer) Send(key string, value interface{}) error {
	// 序列化訊息
	valueBytes, err := json.Marshal(value)
	if err != nil {
		return fmt.Errorf("序列化失敗: %w", err)
	}

	// 建立 ProducerMessage
	msg := &sarama.ProducerMessage{
		Topic: ap.topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(valueBytes),
		// 可以加入自訂標頭
		Headers: []sarama.RecordHeader{
			{
				Key:   []byte("content-type"),
				Value: []byte("application/json"),
			},
		},
	}

	// 發送到 Input Channel
	ap.producer.Input() <- msg

	ap.mu.Lock()
	ap.sentCount++
	ap.mu.Unlock()

	return nil
}

// Close 關閉 Producer
func (ap *AsyncProducer) Close() error {
	// 關閉 Producer Input Channel
	err := ap.producer.Close()

	// 等待結果處理 Goroutine 完成
	ap.wg.Wait()

	log.Printf("Producer 已關閉。sent=%d, delivered=%d, failed=%d",
		ap.sentCount, ap.deliveredCount, ap.failedCount)

	return err
}

// createConsumerConfig 建立 Consumer 組態
func createConsumerConfig(kafkaConfig *KafkaConfig) *sarama.Config {
	config := sarama.NewConfig()

	// 設定 Kafka 版本
	config.Version = sarama.V3_0_0_0

	// Consumer 組態
	// 從最早的 Offset 開始讀取
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// 自動提交間隔
	config.Consumer.Offsets.AutoCommit.Enable = true
	config.Consumer.Offsets.AutoCommit.Interval = 5 * time.Second

	// Fetch 組態
	config.Consumer.Fetch.Min = 1
	config.Consumer.Fetch.Default = 1024 * 1024

	// 重新平衡策略
	config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
		sarama.NewBalanceStrategyRoundRobin(),
	}

	// Session 逾時
	config.Consumer.Group.Session.Timeout = 45 * time.Second

	// 心跳間隔
	config.Consumer.Group.Heartbeat.Interval = 15 * time.Second

	// 如果有 SASL 認證
	if kafkaConfig.Username != "" {
		config.Net.SASL.Enable = true
		config.Net.SASL.User = kafkaConfig.Username
		config.Net.SASL.Password = kafkaConfig.Password
		config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
		config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
			return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
		}
	}

	return config
}

// ConsumerGroupHandler Consumer Group 處理器
type ConsumerGroupHandler struct {
	// 訊息處理函式
	ProcessMessage func(*sarama.ConsumerMessage) error
}

// Setup 在 Consumer Group Session 開始時呼叫
func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	log.Printf("Consumer Group Session 開始,成員 ID: %s", session.MemberID())
	return nil
}

// Cleanup 在 Consumer Group Session 結束時呼叫
func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	log.Println("Consumer Group Session 結束")
	return nil
}

// ConsumeClaim 處理分配到的分區
func (h *ConsumerGroupHandler) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error {

	log.Printf("開始消費分區 %d", claim.Partition())

	// 遍歷訊息
	for msg := range claim.Messages() {
		// 呼叫處理函式
		if err := h.ProcessMessage(msg); err != nil {
			log.Printf("處理訊息失敗: %v", err)
			// 可以選擇繼續或返回錯誤
			continue
		}

		// 標記訊息為已處理
		// Sarama 會在適當時機提交 Offset
		session.MarkMessage(msg, "")
	}

	return nil
}

// ConsumerGroup Consumer Group 封裝
type ConsumerGroup struct {
	client  sarama.ConsumerGroup
	handler *ConsumerGroupHandler
	topics  []string
}

// NewConsumerGroup 建立 Consumer Group
func NewConsumerGroup(kafkaConfig *KafkaConfig,
	processFunc func(*sarama.ConsumerMessage) error) (*ConsumerGroup, error) {

	// 建立組態
	config := createConsumerConfig(kafkaConfig)

	// 建立 Consumer Group
	client, err := sarama.NewConsumerGroup(
		kafkaConfig.Brokers,
		kafkaConfig.GroupID,
		config)
	if err != nil {
		return nil, fmt.Errorf("建立 Consumer Group 失敗: %w", err)
	}

	return &ConsumerGroup{
		client: client,
		handler: &ConsumerGroupHandler{
			ProcessMessage: processFunc,
		},
		topics: []string{kafkaConfig.Topic},
	}, nil
}

// Start 開始消費
func (cg *ConsumerGroup) Start(ctx context.Context) error {
	// 在迴圈中消費
	// 當發生重新平衡時會重新進入迴圈
	for {
		// Consume 會阻塞直到 session 結束
		err := cg.client.Consume(ctx, cg.topics, cg.handler)
		if err != nil {
			return fmt.Errorf("消費錯誤: %w", err)
		}

		// 檢查 Context 是否已取消
		if ctx.Err() != nil {
			return ctx.Err()
		}
	}
}

// Close 關閉 Consumer Group
func (cg *ConsumerGroup) Close() error {
	return cg.client.Close()
}

func main() {
	// Kafka 組態
	kafkaConfig := &KafkaConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   "test-topic",
		GroupID: "test-consumer-group",
	}

	// 建立 Producer
	producer, err := NewAsyncProducer(kafkaConfig)
	if err != nil {
		log.Fatalf("建立 Producer 失敗: %v", err)
	}

	// 發送測試訊息
	for i := 0; i < 100; i++ {
		msg := Message{
			ID:        i,
			Data:      fmt.Sprintf("Message %d", i),
			Timestamp: time.Now(),
		}

		key := fmt.Sprintf("key-%d", i%10)

		if err := producer.Send(key, msg); err != nil {
			log.Printf("發送訊息失敗: %v", err)
		}
	}

	// 關閉 Producer
	if err := producer.Close(); err != nil {
		log.Printf("關閉 Producer 失敗: %v", err)
	}

	// 建立 Consumer
	processFunc := func(msg *sarama.ConsumerMessage) error {
		var data Message
		if err := json.Unmarshal(msg.Value, &data); err != nil {
			return fmt.Errorf("反序列化失敗: %w", err)
		}

		log.Printf("收到訊息: topic=%s, partition=%d, offset=%d, key=%s, data=%+v",
			msg.Topic, msg.Partition, msg.Offset, string(msg.Key), data)

		return nil
	}

	consumer, err := NewConsumerGroup(kafkaConfig, processFunc)
	if err != nil {
		log.Fatalf("建立 Consumer 失敗: %v", err)
	}

	// 設定信號處理
	ctx, cancel := context.WithCancel(context.Background())

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		<-sigChan
		log.Println("收到終止信號")
		cancel()
	}()

	// 開始消費
	log.Println("開始消費訊息...")
	if err := consumer.Start(ctx); err != nil && err != context.Canceled {
		log.Printf("消費錯誤: %v", err)
	}

	// 關閉 Consumer
	if err := consumer.Close(); err != nil {
		log.Printf("關閉 Consumer 失敗: %v", err)
	}

	log.Println("程式結束")
}

串流處理框架比較

Kafka 不僅是一個訊息傳遞系統,更是一個串流處理平台。Kafka 生態系統提供了多種串流處理框架,各有其特色與適用場景。

Kafka Streams 輕量級處理

Kafka Streams 是 Kafka 內建的串流處理程式庫,最大的特色是不需要額外的叢集基礎設施。它是一個 Java 程式庫,可以直接嵌入到應用程式中,透過 Kafka 本身進行協調與容錯。

Kafka Streams 支援 Exactly-Once 語義,可以保證每筆資料只被處理一次。它也支援有狀態的處理,包括聚合、Join、以及 Windowing 操作。狀態會自動備份到 Kafka Topic,確保故障恢復時可以還原狀態。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

title Kafka Streams 處理拓撲

rectangle "Input Topics" {
    [orders] as OT
    [users] as UT
}

rectangle "Stream Processing" {
    [Filter] as F
    [Map] as M
    [Join] as J
    [Aggregate] as A
    [Window] as W
}

rectangle "State Stores" {
    database "User Cache" as UC
    database "Order Counts" as OC
}

rectangle "Output Topics" {
    [enriched-orders] as EO
    [order-stats] as OS
}

OT --> F : orders stream
F --> M : filtered orders
UT --> J : users table
M --> J : transformed orders
J --> EO : enriched orders
J --> A : join result
A --> W : aggregated
W --> OS : windowed stats
UC --> J : lookup
A --> OC : update

@enduml

Apache Flink 是一個專注於串流處理的分散式計算框架,以其極低的延遲著稱。Flink 使用事件時間處理,可以正確處理亂序與延遲的事件。它支援複雜的 Windowing 操作,包括滾動視窗、滑動視窗、與 Session 視窗。

Flink 需要獨立的叢集來運行,可以部署在 Kubernetes、YARN、或獨立模式。雖然需要額外的基礎設施,但 Flink 提供了更強大的功能與更好的效能,適合大規模的串流處理工作負載。

Apache Spark Structured Streaming

Apache Spark 原本是批次處理框架,但透過 Structured Streaming 也支援串流處理。Spark 將串流視為不斷增長的表格,使用 SQL 風格的 API 進行處理。這對於熟悉 Spark 或需要同時處理批次與串流資料的團隊是一個吸引人的選項。

然而,Spark Structured Streaming 的延遲通常比 Flink 或 Kafka Streams 高,因為它是基於微批次的架構。如果對延遲要求不高,且團隊已經熟悉 Spark,這可以是一個合理的選擇。

Apache Beam 統一程式模型

Apache Beam 提供了一個統一的程式模型,可以在不同的執行引擎(Runner)上運行,包括 Flink、Spark、以及 Google Cloud Dataflow。這意味著你可以用同一套程式碼在不同的環境中執行,提供了最大的彈性。

然而,使用抽象層也有其代價,可能無法充分利用各個引擎的特有功能。Beam 最適合需要在多個環境中運行相同邏輯,或正在評估不同引擎的團隊。

Kafka Connect 資料整合

Kafka Connect 是 Kafka 生態系統中用於資料整合的框架。它提供了一個標準化的方式來連接 Kafka 與外部系統,包括資料庫、檔案系統、搜尋引擎、以及各種 SaaS 應用程式。

Source Connector 與 Sink Connector

Kafka Connect 有兩種類型的 Connector:Source Connector 用於從外部系統讀取資料並寫入 Kafka,Sink Connector 用於從 Kafka 讀取資料並寫入外部系統。

社群與各家廠商提供了大量的 Connector,涵蓋了幾乎所有常見的資料系統。例如,Debezium 提供了各種資料庫的 CDC(Change Data Capture)Connector,可以即時捕獲資料庫的變更並寫入 Kafka。

以下是 Debezium PostgreSQL Source Connector 的組態範例:

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",

    "database.hostname": "postgres.example.com",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/secrets/postgres-password}",
    "database.dbname": "ecommerce",
    "database.server.name": "ecommerce-db",

    "plugin.name": "pgoutput",
    "publication.name": "dbz_publication",
    "slot.name": "debezium_slot",

    "table.include.list": "public.orders,public.customers,public.products",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "transforms": "unwrap,route",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "ecommerce-db.public.(.*)",
    "transforms.route.replacement": "db.$1",

    "topic.creation.default.replication.factor": "3",
    "topic.creation.default.partitions": "12",

    "heartbeat.interval.ms": "10000",
    "heartbeat.topics.prefix": "__debezium-heartbeat",

    "snapshot.mode": "initial",
    "snapshot.fetch.size": "10240",

    "decimal.handling.mode": "double",
    "time.precision.mode": "adaptive_time_microseconds",

    "include.schema.changes": "true",
    "provide.transaction.metadata": "true"
  }
}

Single Message Transform

Kafka Connect 支援 Single Message Transform(SMT),可以在訊息被寫入目標之前進行轉換。常見的轉換包括欄位重新命名、資料型別轉換、路由調整、以及加密解密等。

以下是使用多個 SMT 的 Sink Connector 範例:

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "3",

    "topics": "db.orders,db.customers",

    "connection.url": "https://elasticsearch.example.com:9200",
    "connection.username": "elastic",
    "connection.password": "${file:/secrets/es-password}",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "transforms": "route,timestamp,flatten,mask",

    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "db\\.(.*)",
    "transforms.route.replacement": "$1-index",

    "transforms.timestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.timestamp.timestamp.field": "indexed_at",

    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": "_",

    "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.mask.fields": "credit_card_number,ssn",
    "transforms.mask.replacement": "****",

    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "false",

    "behavior.on.null.values": "delete",
    "behavior.on.malformed.documents": "warn",

    "batch.size": "2000",
    "max.buffered.records": "20000",
    "flush.timeout.ms": "180000",

    "max.retries": "5",
    "retry.backoff.ms": "1000"
  }
}

安全性組態實務

Kafka 的安全性組態對於生產環境至關重要。一個完整的安全組態應該涵蓋認證、授權、與加密三個層面。

SASL 認證機制

Kafka 支援多種 SASL 認證機制,包括 PLAIN、SCRAM-SHA-256/512、GSSAPI(Kerberos)、以及 OAUTHBEARER。在選擇認證機制時,需要考慮安全性需求與現有基礎設施。

SCRAM-SHA-512 是一個好的折衷選擇,它比 PLAIN 更安全(密碼不以明文傳輸),又比 Kerberos 更容易設定。以下是 Broker 的 SASL/SCRAM 組態:

# Broker SASL/SCRAM 組態

# 監聽器組態
listeners=SASL_SSL://0.0.0.0:9093
advertised.listeners=SASL_SSL://broker1.example.com:9093

# 安全協定對應
listener.security.protocol.map=SASL_SSL:SASL_SSL

# 啟用的 SASL 機制
sasl.enabled.mechanisms=SCRAM-SHA-512

# Inter-Broker 通訊的 SASL 機制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

# SSL/TLS 組態
ssl.keystore.location=/etc/kafka/ssl/broker1.keystore.jks
ssl.keystore.password=${KEYSTORE_PASSWORD}
ssl.key.password=${KEY_PASSWORD}
ssl.truststore.location=/etc/kafka/ssl/truststore.jks
ssl.truststore.password=${TRUSTSTORE_PASSWORD}

# 要求客戶端認證
ssl.client.auth=required

# TLS 版本
ssl.enabled.protocols=TLSv1.2,TLSv1.3

# 加密套件
ssl.cipher.suites=TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256

ACL 授權組態

Kafka 的 ACL(Access Control List)提供了細粒度的授權控制。管理員可以定義哪些使用者可以對哪些資源執行哪些操作。以下是使用 kafka-acls 命令設定 ACL 的範例:

#!/bin/bash

# ACL 組態腳本
# 此腳本示範如何設定 Kafka ACL 以實現最小權限原則

# Kafka 叢集的 Bootstrap Server
BOOTSTRAP_SERVER="broker1.example.com:9093"

# 命令基礎組態
# 包含 SASL 認證資訊
KAFKA_ACLS_CMD="kafka-acls --bootstrap-server $BOOTSTRAP_SERVER \
  --command-config /etc/kafka/admin.properties"

# 移除所有現有的 ACL(小心使用!)
# $KAFKA_ACLS_CMD --remove --force

# 建立管理員群組的完全存取權限
# 管理員可以管理所有 Topic、Group 與 Cluster
echo "設定管理員權限..."
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:admin" \
  --operation All \
  --topic '*' \
  --cluster

# 為 Producer 應用程式設定權限
# order-service-producer 可以寫入 orders Topic
echo "設定 order-service-producer 權限..."
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-service-producer" \
  --operation Write \
  --operation Describe \
  --topic "orders"

# 如果使用交易,需要額外的權限
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-service-producer" \
  --operation Write \
  --operation Describe \
  --transactional-id "order-service-tx-*"

$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-service-producer" \
  --operation IdempotentWrite \
  --cluster

# 為 Consumer 應用程式設定權限
# order-processor 可以讀取 orders Topic
echo "設定 order-processor 權限..."
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-processor" \
  --operation Read \
  --operation Describe \
  --topic "orders"

# Consumer 需要 Group 權限
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-processor" \
  --operation Read \
  --group "order-processor-group"

# 為 Kafka Streams 應用程式設定權限
# Streams 應用程式需要讀取輸入、寫入輸出、以及管理內部 Topic
echo "設定 order-aggregator 權限..."

# 讀取輸入 Topic
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-aggregator" \
  --operation Read \
  --topic "orders"

# 寫入輸出 Topic
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-aggregator" \
  --operation Write \
  --topic "order-aggregates"

# 管理內部 Topic(以 application.id 為前綴)
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-aggregator" \
  --operation All \
  --topic "order-aggregator-*" \
  --resource-pattern-type prefixed

# Consumer Group 權限
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:order-aggregator" \
  --operation Read \
  --group "order-aggregator"

# 為 Kafka Connect 設定權限
echo "設定 Kafka Connect 權限..."

# Connect 的內部 Topic
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:connect-worker" \
  --operation All \
  --topic "connect-configs"

$KAFKA_ACLS_CMD --add \
  --allow-principal "User:connect-worker" \
  --operation All \
  --topic "connect-offsets"

$KAFKA_ACLS_CMD --add \
  --allow-principal "User:connect-worker" \
  --operation All \
  --topic "connect-status"

# Connect Worker 的 Consumer Group
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:connect-worker" \
  --operation Read \
  --group "connect-cluster"

# Connector 的資料 Topic 權限(視 Connector 而定)
$KAFKA_ACLS_CMD --add \
  --allow-principal "User:connect-worker" \
  --operation Write \
  --topic "db.*" \
  --resource-pattern-type prefixed

# 列出所有 ACL
echo "列出所有 ACL..."
$KAFKA_ACLS_CMD --list

echo "ACL 組態完成"

效能調優最佳實踐

Kafka 的效能調優是一個複雜的主題,需要根據具體的工作負載特性進行調整。以下是一些通用的調優建議。

Broker 調優

Broker 的調優主要集中在記憶體管理、磁碟 I/O、以及網路設定。JVM 堆積大小通常設定為 6-8GB 就足夠了,因為 Kafka 主要依賴作業系統的 Page Cache 而非 JVM 堆積。

磁碟 I/O 是 Kafka 效能的關鍵因素。建議使用 SSD 並確保有足夠的磁碟頻寬。可以透過增加 num.io.threadsnum.network.threads 來提高並行處理能力。

Producer 調優

Producer 的調優主要在於平衡延遲與吞吐量。增加 batch.sizelinger.ms 可以提高吞吐量,但會增加延遲。啟用壓縮可以減少網路傳輸量,但會增加 CPU 使用。

對於需要高吞吐量的場景,建議設定 acks=1acks=0,但這會犧牲一些持久性保證。對於需要強一致性的場景,則必須使用 acks=all

Consumer 調優

Consumer 的調優主要在於 Fetch 設定與 Consumer Group 管理。增加 fetch.min.bytes 可以減少 Fetch 請求的次數,提高吞吐量。調整 max.poll.records 可以控制每次 poll 返回的訊息數量。

對於需要手動管理 Offset 的場景,要注意 max.poll.interval.ms 的設定。如果處理時間超過這個值,Consumer 會被認為已經死亡,觸發重新平衡。

總結

Apache Kafka 生態系統的豐富程度反映了它在現代資料架構中的重要地位。從托管服務到自建管理工具,從效能監控到安全組態,每個層面都有多種成熟的解決方案可供選擇。

選擇合適的工具需要考慮多個因素,包括團隊的技術能力、基礎設施現況、預算限制、以及業務需求。對於剛開始使用 Kafka 的團隊,托管服務可以大幅降低入門門檻;對於有豐富維運經驗的團隊,自建叢集搭配開源工具可能更具成本效益。

無論選擇哪種方案,深入理解 Kafka 的核心概念與運作原理都是必要的。只有在理解原理的基礎上,才能正確使用這些工具,並在遇到問題時有效地進行排錯。

隨著 Kafka 3.x 版本逐漸移除對 ZooKeeper 的依賴,以及 KRaft 協定的成熟,Kafka 的部署與管理將變得更加簡單。同時,隨著更多企業採用事件驅動架構,Kafka 生態系統也將持續發展,提供更多創新的工具與服務。

對於正在評估 Kafka 工具的團隊,建議先明確自己的需求,然後根據本文的介紹進行初步篩選,最後透過 POC(Proof of Concept)來驗證選擇的正確性。選擇正確的工具可以大幅提升開發效率與系統穩定性,是值得投入時間仔細評估的重要決策。