返回文章列表

Kafka消費者偏移量與退出機制管理

深入探討 Kafka 消費者的偏移量管理策略、Rebalance 監聽機制、安全退出流程與反序列化技術,涵蓋 ConsumerRebalanceListener 介面實作、精確偏移量控制、時間戳定位消費、優雅關閉機制以及 AdminClient 程式設計介面的完整應用

串流處理 分散式系統

Apache Kafka 作為現代分散式串流處理平台的核心元件,其消費者端的偏移量管理與退出機制設計直接影響著訊息處理的可靠性與系統的穩定性。在實際的生產環境中,消費者需要精確地追蹤已處理訊息的位置,以便在發生故障時能夠從正確的位置恢復處理,同時也需要能夠優雅地退出以避免訊息遺失或重複處理。本文將深入探討 Kafka 消費者的偏移量管理策略、Rebalance 事件監聽機制、安全退出流程以及反序列化技術,並透過完整的程式碼範例展示如何在實務中實現這些關鍵功能。

Kafka 消費者偏移量管理架構

在深入探討具體的實作細節之前,我們需要先理解 Kafka 消費者偏移量管理的整體架構。偏移量是 Kafka 用來追蹤消費者在每個分割區中已讀取訊息位置的機制,這個概念對於實現訊息的精確處理至關重要。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "Kafka 消費者偏移量管理架構" {
    rectangle "消費者群組" as CG {
        rectangle "消費者 A" as CA
        rectangle "消費者 B" as CB
        rectangle "消費者 C" as CC
    }

    rectangle "群組協調器" as GC {
        rectangle "成員管理" as MM
        rectangle "偏移量儲存" as OS
        rectangle "Rebalance 協調" as RC
    }

    rectangle "主題分割區" as TP {
        rectangle "分割區 0" as P0
        rectangle "分割區 1" as P1
        rectangle "分割區 2" as P2
    }

    rectangle "__consumer_offsets" as CO {
        rectangle "群組偏移量記錄" as GOR
        rectangle "提交歷史" as CH
    }

    rectangle "偏移量管理策略" as OMS {
        rectangle "自動提交" as AC
        rectangle "同步提交" as SC
        rectangle "非同步提交" as ASC
        rectangle "精確提交" as PC
    }
}

CA --> GC : 心跳與偏移量提交
CB --> GC : 心跳與偏移量提交
CC --> GC : 心跳與偏移量提交
GC --> CO : 持久化偏移量
GC --> TP : 分配分割區
MM --> RC : 觸發重新平衡
OS --> GOR : 記錄偏移量
OMS --> GC : 提交策略選擇

@enduml

Kafka 消費者的偏移量管理涉及多個核心元件的協同運作。消費者群組中的每個消費者都會與群組協調器保持通訊,定期發送心跳訊號以表明其存活狀態,並在適當的時機提交已處理訊息的偏移量。群組協調器負責管理消費者成員關係、儲存偏移量資訊以及協調 Rebalance 過程。當消費者加入或離開群組時,群組協調器會觸發 Rebalance 機制,重新分配分割區給存活的消費者。

偏移量資訊被儲存在一個名為 __consumer_offsets 的內部主題中,這個主題使用壓縮日誌策略來保留每個消費者群組對每個分割區的最新偏移量。這種設計使得偏移量管理具有高可用性和持久性,即使群組協調器發生故障,偏移量資訊也不會遺失。

Rebalance 事件監聽與處理

Rebalance 是 Kafka 消費者群組中最重要的機制之一,它確保分割區能夠在消費者之間公平地分配。當消費者群組的成員發生變化時,例如新消費者加入、現有消費者離開或消費者被判定為失效時,群組協調器就會觸發 Rebalance 過程。在這個過程中,所有消費者都會暫時停止消費,等待新的分割區分配方案。

ConsumerRebalanceListener 介面提供了三個回調方法,讓應用程式能夠在 Rebalance 過程中執行必要的操作。onPartitionsRevoked 方法在消費者即將失去分割區所有權時被呼叫,這是提交偏移量的最後機會。onPartitionsAssigned 方法在消費者獲得新分割區分配後被呼叫,應用程式可以在此時進行初始化操作。onPartitionsLost 方法在分割區所有權因異常情況而丟失時被呼叫,這通常發生在使用 Cooperative Rebalance 協議時。

以下的 Java 程式碼展示了如何實作一個完整的 Rebalance 監聽器,在分割區被撤銷時提交偏移量,並在分割區被分配時進行必要的初始化:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RebalanceAwareConsumer {
    // 建立日誌記錄器用於輸出 Rebalance 事件資訊
    private static final Logger logger = LoggerFactory.getLogger(RebalanceAwareConsumer.class);

    // 儲存當前處理進度的偏移量映射表
    // 鍵為主題分割區,值為該分割區的偏移量和元資料
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    // Kafka 消費者實例
    private KafkaConsumer<String, String> consumer;

    // 輪詢逾時時間設定為 100 毫秒
    private Duration pollTimeout = Duration.ofMillis(100);

    // Rebalance 監聽器內部類別實作
    // 此類別負責處理分割區分配和撤銷事件
    private class OffsetCommittingRebalanceListener implements ConsumerRebalanceListener {

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // 當新的分割區被分配給此消費者時呼叫
            // 可以在此進行分割區特定的初始化操作
            logger.info("分割區已分配: {}", partitions);

            // 針對每個新分配的分割區執行初始化
            for (TopicPartition partition : partitions) {
                // 記錄分割區的當前位置
                long position = consumer.position(partition);
                logger.info("分割區 {} 的起始位置: {}", partition, position);

                // 初始化該分割區的偏移量追蹤
                // 這確保我們從正確的位置開始追蹤
                currentOffsets.put(partition, new OffsetAndMetadata(position, "初始化"));
            }
        }

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // 當分割區即將從此消費者撤銷時呼叫
            // 這是提交偏移量的最後機會,必須確保已處理的訊息偏移量被保存
            logger.info("分割區即將撤銷: {}", partitions);
            logger.info("在 Rebalance 前提交偏移量: {}", currentOffsets);

            // 同步提交當前偏移量以確保資料不會遺失
            // 使用同步提交是因為這是撤銷前的最後機會
            try {
                consumer.commitSync(currentOffsets);
                logger.info("偏移量提交成功");
            } catch (CommitFailedException e) {
                logger.error("偏移量提交失敗: {}", e.getMessage());
            }

            // 清理被撤銷分割區的偏移量追蹤
            for (TopicPartition partition : partitions) {
                currentOffsets.remove(partition);
            }
        }

        @Override
        public void onPartitionsLost(Collection<TopicPartition> partitions) {
            // 當分割區所有權因異常情況丟失時呼叫
            // 這種情況在 Cooperative Rebalance 中可能發生
            logger.warn("分割區所有權丟失: {}", partitions);

            // 在此情況下,偏移量可能已經過期
            // 因為其他消費者可能已經開始處理這些分割區
            // 只需清理本地狀態,不需要提交偏移量
            for (TopicPartition partition : partitions) {
                currentOffsets.remove(partition);
            }
        }
    }

    public void consumeMessages(List<String> topics) {
        // 建立消費者配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092,kafka-broker2:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-aware-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");

        // 關閉自動提交以實現精確的偏移量控制
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 設定自動偏移量重設策略
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 建立消費者實例
        consumer = new KafkaConsumer<>(props);

        try {
            // 訂閱主題並註冊 Rebalance 監聽器
            consumer.subscribe(topics, new OffsetCommittingRebalanceListener());

            // 開始消費迴圈
            while (true) {
                // 輪詢取得訊息
                ConsumerRecords<String, String> records = consumer.poll(pollTimeout);

                // 處理每筆訊息
                for (ConsumerRecord<String, String> record : records) {
                    // 執行業務邏輯處理
                    processRecord(record);

                    // 更新該分割區的偏移量
                    // 注意:偏移量應該是下一筆要讀取的位置,所以要加 1
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    currentOffsets.put(partition,
                        new OffsetAndMetadata(record.offset() + 1, "已處理"));
                }

                // 非同步提交偏移量以提高效能
                // 在正常運作期間使用非同步提交可以減少延遲
                if (!records.isEmpty()) {
                    consumer.commitAsync(currentOffsets, (offsets, exception) -> {
                        if (exception != null) {
                            logger.error("非同步提交失敗: {}", exception.getMessage());
                        }
                    });
                }
            }
        } catch (WakeupException e) {
            // 忽略此例外,因為這是正常的關閉流程
            logger.info("收到喚醒訊號,準備關閉消費者");
        } catch (Exception e) {
            logger.error("消費過程發生錯誤: {}", e.getMessage(), e);
        } finally {
            try {
                // 關閉前使用同步提交確保最後的偏移量被保存
                consumer.commitSync(currentOffsets);
                logger.info("最終偏移量提交成功");
            } catch (Exception e) {
                logger.error("最終偏移量提交失敗: {}", e.getMessage());
            } finally {
                consumer.close();
                logger.info("消費者已關閉");
            }
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // 實作業務邏輯處理
        logger.info("處理訊息 - 主題: {}, 分割區: {}, 偏移量: {}, 鍵: {}, 值: {}",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value());
    }
}

這個實作展示了 Rebalance 監聽器的完整使用方式。在 onPartitionsRevoked 方法中,我們同步提交當前的偏移量,確保在分割區被重新分配給其他消費者之前,所有已處理的訊息都被正確記錄。在 onPartitionsAssigned 方法中,我們記錄新分配的分割區資訊並初始化偏移量追蹤。這種設計模式能夠有效地處理 Rebalance 過程中的狀態轉換,避免訊息遺失或重複處理。

精確偏移量控制與定位消費

在某些應用場景中,消費者需要從特定的偏移量或時間點開始消費訊息。例如,當需要重新處理歷史資料時,消費者可能需要回溯到某個時間點;當需要跳過已知有問題的訊息時,消費者可能需要直接跳轉到特定的偏移量。Kafka 提供了多種方法來實現這種精確的偏移量控制。

seekToBeginning 方法將消費者的位置重設到分割區的最早可用偏移量,這對於需要重新處理所有歷史訊息的場景非常有用。seekToEnd 方法將消費者的位置設定到分割區的最新偏移量,這適用於只關心新訊息的場景。seek 方法允許將消費者的位置設定到任意指定的偏移量,提供了最大的靈活性。

以下的 Java 程式碼展示了如何使用這些方法來實現時間戳定位消費,讓消費者能夠從指定的時間點開始讀取訊息:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.*;
import java.util.*;
import java.util.stream.Collectors;

public class TimestampSeekingConsumer {

    private KafkaConsumer<String, String> consumer;

    public void seekToTimestamp(long targetTimestamp) {
        // 取得當前分配給此消費者的所有分割區
        Set<TopicPartition> assignment = consumer.assignment();

        if (assignment.isEmpty()) {
            // 如果尚未分配分割區,需要先進行一次輪詢以觸發分配
            consumer.poll(Duration.ofMillis(0));
            assignment = consumer.assignment();
        }

        // 建立分割區到時間戳的映射
        // 每個分割區都需要查詢對應時間戳的偏移量
        Map<TopicPartition, Long> partitionTimestampMap = assignment.stream()
            .collect(Collectors.toMap(
                partition -> partition,
                partition -> targetTimestamp
            ));

        // 查詢每個分割區在指定時間戳的偏移量
        // offsetsForTimes 方法會返回每個分割區中時間戳大於或等於指定值的第一筆訊息的偏移量
        Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(partitionTimestampMap);

        // 將消費者的位置設定到查詢到的偏移量
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
            TopicPartition partition = entry.getKey();
            OffsetAndTimestamp offsetAndTimestamp = entry.getValue();

            if (offsetAndTimestamp != null) {
                // 找到了對應的偏移量,設定消費者位置
                consumer.seek(partition, offsetAndTimestamp.offset());
                System.out.printf("分割區 %s 定位到偏移量 %d (時間戳: %d)%n",
                    partition, offsetAndTimestamp.offset(), offsetAndTimestamp.timestamp());
            } else {
                // 指定的時間戳超出了分割區的範圍
                // 將消費者定位到分割區的末尾
                consumer.seekToEnd(Collections.singletonList(partition));
                System.out.printf("分割區 %s 沒有找到對應的偏移量,定位到末尾%n", partition);
            }
        }
    }

    public void seekToOneHourAgo() {
        // 計算一小時前的時間戳(毫秒)
        long oneHourAgo = Instant.now()
            .atZone(ZoneId.systemDefault())
            .minusHours(1)
            .toInstant()
            .toEpochMilli();

        seekToTimestamp(oneHourAgo);
    }

    public void seekToSpecificDateTime(LocalDateTime dateTime) {
        // 將 LocalDateTime 轉換為時間戳
        long timestamp = dateTime
            .atZone(ZoneId.systemDefault())
            .toInstant()
            .toEpochMilli();

        seekToTimestamp(timestamp);
    }

    public void seekToBeginningForAllPartitions() {
        // 取得當前分配的所有分割區
        Set<TopicPartition> assignment = consumer.assignment();

        // 將所有分割區的消費位置重設到最早的偏移量
        consumer.seekToBeginning(assignment);

        // 記錄每個分割區的新位置
        for (TopicPartition partition : assignment) {
            long position = consumer.position(partition);
            System.out.printf("分割區 %s 已重設到起始位置: %d%n", partition, position);
        }
    }

    public void seekToEndForAllPartitions() {
        // 取得當前分配的所有分割區
        Set<TopicPartition> assignment = consumer.assignment();

        // 將所有分割區的消費位置設定到最新的偏移量
        consumer.seekToEnd(assignment);

        // 記錄每個分割區的新位置
        for (TopicPartition partition : assignment) {
            long position = consumer.position(partition);
            System.out.printf("分割區 %s 已定位到末尾位置: %d%n", partition, position);
        }
    }

    public void seekToSpecificOffset(String topic, int partitionNumber, long offset) {
        // 建立特定的主題分割區物件
        TopicPartition partition = new TopicPartition(topic, partitionNumber);

        // 確保此分割區已分配給當前消費者
        if (consumer.assignment().contains(partition)) {
            // 將消費者定位到指定的偏移量
            consumer.seek(partition, offset);
            System.out.printf("分割區 %s 已定位到偏移量: %d%n", partition, offset);
        } else {
            System.out.printf("分割區 %s 未分配給此消費者%n", partition);
        }
    }
}

這個實作展示了多種偏移量定位策略。seekToTimestamp 方法使用 offsetsForTimes API 來查詢指定時間戳對應的偏移量,這對於需要回溯處理歷史資料的場景非常實用。seekToBeginningForAllPartitions 和 seekToEndForAllPartitions 方法分別將消費者定位到分割區的開頭和結尾,適用於不同的業務需求。seekToSpecificOffset 方法允許精確地控制消費者在特定分割區中的位置,提供了最大的靈活性。

安全退出機制與優雅關閉

在生產環境中,消費者的安全退出機制對於維護系統穩定性至關重要。當需要停止消費者時,必須確保所有已處理的訊息偏移量都被正確提交,並且消費者能夠正常地離開消費者群組,讓群組協調器能夠及時觸發 Rebalance 將分割區重新分配給其他消費者。

Kafka 消費者的 wakeup 方法提供了一種安全的方式來中斷 poll 呼叫。當另一個執行緒呼叫 wakeup 時,當前或下一次的 poll 呼叫將拋出 WakeupException,應用程式可以捕捉這個例外並執行清理操作。這種設計模式避免了使用不安全的執行緒中斷方法,確保消費者能夠優雅地退出。

以下的 Java 程式碼展示了如何實作一個支援優雅關閉的消費者,使用 ShutdownHook 來處理 JVM 關閉訊號:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GracefulShutdownConsumer {

    private static final Logger logger = LoggerFactory.getLogger(GracefulShutdownConsumer.class);

    private KafkaConsumer<String, String> consumer;
    private Thread mainThread;
    private CountDownLatch shutdownLatch;

    public void startConsuming(List<String> topics) {
        // 記錄主執行緒的參考,用於在關閉時等待
        mainThread = Thread.currentThread();

        // 建立關閉倒數鎖,用於協調關閉流程
        shutdownLatch = new CountDownLatch(1);

        // 建立消費者配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "graceful-shutdown-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 設定較短的 session timeout 以加快 Rebalance
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);

        // 建立消費者實例
        consumer = new KafkaConsumer<>(props);

        // 註冊 JVM 關閉鉤子
        // 當 JVM 收到終止訊號(如 SIGTERM)時,會執行這個鉤子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("收到關閉訊號,開始優雅關閉流程...");

            // 呼叫 wakeup 來中斷 poll 呼叫
            consumer.wakeup();

            try {
                // 等待主執行緒完成清理工作
                // 設定超時時間避免無限等待
                shutdownLatch.await();
                logger.info("主執行緒已完成清理");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("等待關閉時被中斷");
            }
        }, "shutdown-hook-thread"));

        // 儲存當前偏移量的映射表
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

        try {
            // 訂閱主題
            consumer.subscribe(topics);
            logger.info("開始消費主題: {}", topics);

            // 主消費迴圈
            while (true) {
                // 輪詢取得訊息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                if (records.isEmpty()) {
                    logger.debug("本次輪詢沒有取得訊息");
                    continue;
                }

                // 處理取得的訊息
                for (ConsumerRecord<String, String> record : records) {
                    // 執行業務邏輯
                    processRecord(record);

                    // 更新偏移量
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    currentOffsets.put(partition,
                        new OffsetAndMetadata(record.offset() + 1, null));
                }

                // 輸出當前處理進度
                for (TopicPartition partition : consumer.assignment()) {
                    logger.info("分割區 {} 的當前位置: {}",
                        partition, consumer.position(partition));
                }

                // 同步提交偏移量
                consumer.commitSync(currentOffsets);
                logger.info("偏移量已提交: {}", currentOffsets);
            }

        } catch (WakeupException e) {
            // WakeupException 是正常的關閉流程,不需要處理
            logger.info("消費者被喚醒,準備關閉");

        } catch (Exception e) {
            // 記錄其他未預期的錯誤
            logger.error("消費過程發生錯誤: {}", e.getMessage(), e);

        } finally {
            try {
                // 最後一次提交偏移量
                consumer.commitSync(currentOffsets);
                logger.info("最終偏移量提交成功");
            } catch (Exception e) {
                logger.error("最終偏移量提交失敗: {}", e.getMessage());
            }

            // 關閉消費者
            // close 方法會:
            // 1. 提交偏移量(如果啟用自動提交)
            // 2. 發送 LeaveGroup 請求給群組協調器
            // 3. 關閉網路連線
            consumer.close();
            logger.info("消費者已關閉");

            // 通知關閉鉤子執行緒關閉完成
            shutdownLatch.countDown();
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // 模擬業務邏輯處理
        logger.info("處理訊息 - 主題: {}, 分割區: {}, 偏移量: {}, 鍵: {}, 值: {}",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value());

        // 模擬處理時間
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        GracefulShutdownConsumer consumer = new GracefulShutdownConsumer();
        consumer.startConsuming(Arrays.asList("orders", "payments"));
    }
}

這個實作展示了生產環境中推薦的消費者關閉模式。ShutdownHook 會在 JVM 收到終止訊號時被觸發,它呼叫 wakeup 方法來中斷主執行緒的 poll 呼叫。主執行緒捕捉到 WakeupException 後,會在 finally 區塊中執行清理操作,包括提交最後的偏移量和關閉消費者。CountDownLatch 用於確保關閉鉤子執行緒等待主執行緒完成所有清理工作後再結束,避免資料遺失。

反序列化技術與自定義反序列化器

Kafka 消費者從叢集接收到的是位元組陣列形式的訊息,需要透過反序列化器將其轉換為應用程式能夠處理的 Java 物件。Kafka 提供了多種內建的反序列化器,如 StringDeserializer、ByteArrayDeserializer、IntegerDeserializer 等。對於複雜的業務物件,開發者需要實作自定義的反序列化器。

以下的 Java 程式碼展示了如何實作一個自定義反序列化器,用於將位元組陣列轉換為 Customer 物件:

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class CustomerDeserializer implements Deserializer<Customer> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 此方法可用於讀取配置參數
        // 例如讀取字元編碼設定
        // 在此範例中不需要額外配置
    }

    @Override
    public Customer deserialize(String topic, byte[] data) {
        // 處理空值情況
        if (data == null) {
            return null;
        }

        try {
            // 將位元組陣列包裝為 ByteBuffer 以便於讀取
            ByteBuffer buffer = ByteBuffer.wrap(data);

            // 讀取客戶 ID(4 位元組的整數)
            int customerId = buffer.getInt();

            // 讀取名稱長度(4 位元組的整數)
            int nameLength = buffer.getInt();

            // 根據長度讀取名稱的位元組陣列
            byte[] nameBytes = new byte[nameLength];
            buffer.get(nameBytes);

            // 將位元組陣列轉換為字串
            String customerName = new String(nameBytes, StandardCharsets.UTF_8);

            // 讀取電子郵件長度
            int emailLength = buffer.getInt();

            // 讀取電子郵件位元組陣列
            byte[] emailBytes = new byte[emailLength];
            buffer.get(emailBytes);

            // 將位元組陣列轉換為字串
            String email = new String(emailBytes, StandardCharsets.UTF_8);

            // 建立並返回 Customer 物件
            return new Customer(customerId, customerName, email);

        } catch (Exception e) {
            // 反序列化失敗時拋出 SerializationException
            throw new SerializationException(
                "反序列化 Customer 物件時發生錯誤: " + e.getMessage(), e);
        }
    }

    @Override
    public void close() {
        // 清理資源
        // 在此範例中不需要清理
    }
}

// Customer 實體類別
class Customer {
    private int id;
    private String name;
    private String email;

    public Customer(int id, String name, String email) {
        this.id = id;
        this.name = name;
        this.email = email;
    }

    // Getter 方法
    public int getId() { return id; }
    public String getName() { return name; }
    public String getEmail() { return email; }

    @Override
    public String toString() {
        return String.format("Customer{id=%d, name='%s', email='%s'}", id, name, email);
    }
}

雖然自定義反序列化器提供了完全的控制權,但在實際專案中,更推薦使用 Avro 搭配 Schema Registry 來處理序列化和反序列化。Avro 是一種資料序列化系統,它使用 JSON 定義的 Schema 來描述資料結構,並以緊湊的二進位格式儲存資料。Schema Registry 是一個獨立的服務,用於集中管理 Avro Schema,確保生產者和消費者使用相容的 Schema 版本。

以下的 Java 程式碼展示了如何配置消費者使用 Avro 反序列化器:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;

public class AvroConsumerConfiguration {

    public static KafkaConsumer<String, GenericRecord> createAvroConsumer() {
        Properties props = new Properties();

        // 基本消費者配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");

        // 配置 Avro 反序列化器
        // 鍵使用字串反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");

        // 值使用 Avro 反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            KafkaAvroDeserializer.class.getName());

        // 指定 Schema Registry 的位址
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://schema-registry:8081");

        // 配置是否使用特定的記錄類型
        // 設為 false 表示使用 GenericRecord
        // 設為 true 表示使用特定的 Avro 生成類別
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);

        return new KafkaConsumer<>(props);
    }
}

使用 Avro 和 Schema Registry 的主要優勢包括自動處理 Schema 演進、減少序列化相關的錯誤、以及提供更好的跨語言相容性。當 Schema 發生變化時,Schema Registry 會自動檢查相容性,確保新版本的生產者和舊版本的消費者能夠正常運作。

AdminClient 程式設計介面

在某些情況下,應用程式需要在執行時動態地管理 Kafka 資源,例如建立主題、查詢主題配置或管理消費者群組。Kafka 的 AdminClient 提供了一個程式設計介面來執行這些管理操作,無需依賴命令列工具。

以下的 Java 程式碼展示了 AdminClient 的常見用法:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class KafkaAdminOperations {

    private AdminClient adminClient;

    public KafkaAdminOperations(String bootstrapServers) {
        // 建立 AdminClient 配置
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        // 設定請求逾時時間
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // 建立 AdminClient 實例
        adminClient = AdminClient.create(props);
    }

    public void createTopic(String topicName, int numPartitions, short replicationFactor)
            throws ExecutionException, InterruptedException {

        // 定義新主題的配置
        NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);

        // 設定主題級別的配置
        Map<String, String> configs = new HashMap<>();
        configs.put("cleanup.policy", "delete");
        configs.put("retention.ms", String.valueOf(7 * 24 * 60 * 60 * 1000)); // 7 天
        configs.put("min.insync.replicas", "2");
        newTopic.configs(configs);

        // 建立主題
        CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));

        // 等待操作完成
        result.all().get();
        System.out.printf("主題 %s 已成功建立%n", topicName);
    }

    public Set<String> listTopics() throws ExecutionException, InterruptedException {
        // 列出所有主題
        ListTopicsResult result = adminClient.listTopics();
        return result.names().get();
    }

    public void describeTopic(String topicName) throws ExecutionException, InterruptedException {
        // 取得主題的詳細資訊
        DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topicName));
        Map<String, TopicDescription> descriptions = result.all().get();

        TopicDescription description = descriptions.get(topicName);
        System.out.printf("主題名稱: %s%n", description.name());
        System.out.printf("分割區數量: %d%n", description.partitions().size());

        // 輸出每個分割區的詳細資訊
        for (TopicPartitionInfo partitionInfo : description.partitions()) {
            System.out.printf("  分割區 %d: 領導者=%s, 副本=%s, ISR=%s%n",
                partitionInfo.partition(),
                partitionInfo.leader(),
                partitionInfo.replicas(),
                partitionInfo.isr());
        }
    }

    public void deleteConsumerGroupOffsets(String groupId, String topic, int partition)
            throws ExecutionException, InterruptedException {

        // 建立要刪除偏移量的分割區集合
        Set<TopicPartition> partitions = Collections.singleton(
            new TopicPartition(topic, partition));

        // 刪除消費者群組的偏移量
        DeleteConsumerGroupOffsetsResult result =
            adminClient.deleteConsumerGroupOffsets(groupId, partitions);

        result.all().get();
        System.out.printf("已刪除群組 %s 在主題 %s 分割區 %d 的偏移量%n",
            groupId, topic, partition);
    }

    public void describeConsumerGroup(String groupId)
            throws ExecutionException, InterruptedException {

        // 取得消費者群組的詳細資訊
        DescribeConsumerGroupsResult result =
            adminClient.describeConsumerGroups(Collections.singletonList(groupId));

        Map<String, ConsumerGroupDescription> descriptions = result.all().get();
        ConsumerGroupDescription description = descriptions.get(groupId);

        System.out.printf("群組 ID: %s%n", description.groupId());
        System.out.printf("狀態: %s%n", description.state());
        System.out.printf("協調器: %s%n", description.coordinator());
        System.out.printf("成員數量: %d%n", description.members().size());

        // 輸出每個成員的資訊
        for (MemberDescription member : description.members()) {
            System.out.printf("  成員 ID: %s, Client ID: %s, Host: %s%n",
                member.consumerId(), member.clientId(), member.host());
            System.out.printf("    分配的分割區: %s%n", member.assignment().topicPartitions());
        }
    }

    public void close() {
        // 關閉 AdminClient
        if (adminClient != null) {
            adminClient.close();
        }
    }
}

AdminClient 提供了豐富的 API 來管理 Kafka 叢集的各種資源。createTopic 方法用於動態建立新主題,可以指定分割區數量、複製因子以及各種主題級別的配置。listTopics 和 describeTopic 方法用於查詢主題資訊。describeConsumerGroup 方法可以取得消費者群組的詳細狀態,包括成員資訊和分割區分配情況。這些功能在開發自動化管理工具或需要在應用程式中動態管理 Kafka 資源時非常有用。

Python 消費者監控與偏移量管理

除了 Java 之外,Python 也是開發 Kafka 應用程式的熱門選擇。以下的 Python 程式碼展示了如何使用 kafka-python 函式庫來實現消費者監控和偏移量管理功能:

from kafka import KafkaConsumer, KafkaAdminClient, TopicPartition
from kafka.admin import NewTopic
from kafka.errors import KafkaError
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import threading
import logging

# 配置日誌記錄器
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaOffsetManager:
    """
    Kafka 偏移量管理器
    提供偏移量查詢、重設和監控功能
    """

    def __init__(self, bootstrap_servers: str, group_id: str):
        # 儲存連線配置
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id

        # 建立消費者實例
        # 設定 enable_auto_commit 為 False 以實現手動偏移量控制
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            enable_auto_commit=False,
            auto_offset_reset='earliest',
            key_deserializer=lambda x: x.decode('utf-8') if x else None,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None
        )

        # 建立 AdminClient 用於管理操作
        self.admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

        # 關閉標誌
        self.running = False

        logger.info(f"偏移量管理器已初始化,群組: {group_id}")

    def get_consumer_group_offsets(self) -> Dict[str, Dict[int, int]]:
        """
        取得消費者群組在各分割區的當前偏移量
        返回格式: {topic: {partition: offset}}
        """
        # 取得群組的已提交偏移量
        offsets = {}

        # 取得訂閱的主題
        for topic in self.consumer.subscription():
            # 取得主題的分割區資訊
            partitions = self.consumer.partitions_for_topic(topic)

            if partitions:
                topic_offsets = {}
                for partition in partitions:
                    tp = TopicPartition(topic, partition)
                    # 取得已提交的偏移量
                    committed = self.consumer.committed(tp)
                    if committed is not None:
                        topic_offsets[partition] = committed
                    else:
                        topic_offsets[partition] = -1  # 表示尚未提交

                offsets[topic] = topic_offsets

        return offsets

    def get_topic_end_offsets(self, topic: str) -> Dict[int, int]:
        """
        取得主題各分割區的最新偏移量(log end offset)
        """
        # 取得主題的所有分割區
        partitions = self.consumer.partitions_for_topic(topic)

        if not partitions:
            logger.warning(f"主題 {topic} 不存在或沒有分割區")
            return {}

        # 建立 TopicPartition 物件列表
        topic_partitions = [TopicPartition(topic, p) for p in partitions]

        # 取得各分割區的最新偏移量
        end_offsets = self.consumer.end_offsets(topic_partitions)

        # 轉換為簡單的字典格式
        return {tp.partition: offset for tp, offset in end_offsets.items()}

    def calculate_consumer_lag(self, topic: str) -> Dict[int, int]:
        """
        計算消費者在各分割區的延遲(lag)
        延遲 = 最新偏移量 - 已提交偏移量
        """
        # 取得最新偏移量
        end_offsets = self.get_topic_end_offsets(topic)

        # 取得已提交偏移量
        committed_offsets = self.get_consumer_group_offsets().get(topic, {})

        # 計算延遲
        lag = {}
        for partition, end_offset in end_offsets.items():
            committed = committed_offsets.get(partition, 0)
            if committed == -1:
                committed = 0
            lag[partition] = end_offset - committed

        return lag

    def seek_to_timestamp(self, topic: str, timestamp_ms: int):
        """
        將消費者定位到指定時間戳

        Args:
            topic: 主題名稱
            timestamp_ms: 目標時間戳(毫秒)
        """
        # 確保已訂閱主題
        if topic not in self.consumer.subscription():
            self.consumer.subscribe([topic])
            # 執行一次輪詢以觸發分割區分配
            self.consumer.poll(timeout_ms=1000)

        # 取得分配的分割區
        assignment = self.consumer.assignment()

        if not assignment:
            logger.warning("沒有分配到分割區")
            return

        # 建立分割區到時間戳的映射
        partition_timestamps = {tp: timestamp_ms for tp in assignment if tp.topic == topic}

        # 查詢對應時間戳的偏移量
        offsets_for_times = self.consumer.offsets_for_times(partition_timestamps)

        # 定位到查詢到的偏移量
        for tp, offset_and_timestamp in offsets_for_times.items():
            if offset_and_timestamp:
                self.consumer.seek(tp, offset_and_timestamp.offset)
                logger.info(f"分割區 {tp.partition} 已定位到偏移量 {offset_and_timestamp.offset} "
                           f"(時間戳: {offset_and_timestamp.timestamp})")
            else:
                logger.warning(f"分割區 {tp.partition} 找不到對應時間戳的偏移量")

    def seek_to_hours_ago(self, topic: str, hours: int):
        """
        將消費者定位到指定小時數之前
        """
        # 計算目標時間戳
        target_time = datetime.now() - timedelta(hours=hours)
        timestamp_ms = int(target_time.timestamp() * 1000)

        logger.info(f"定位到 {hours} 小時前: {target_time}")
        self.seek_to_timestamp(topic, timestamp_ms)

    def reset_to_beginning(self, topic: str):
        """
        將消費者重設到主題的開頭
        """
        # 取得分配的分割區
        assignment = [tp for tp in self.consumer.assignment() if tp.topic == topic]

        if assignment:
            self.consumer.seek_to_beginning(assignment)
            for tp in assignment:
                position = self.consumer.position(tp)
                logger.info(f"分割區 {tp.partition} 已重設到開頭,位置: {position}")

    def reset_to_end(self, topic: str):
        """
        將消費者定位到主題的末尾
        """
        # 取得分配的分割區
        assignment = [tp for tp in self.consumer.assignment() if tp.topic == topic]

        if assignment:
            self.consumer.seek_to_end(assignment)
            for tp in assignment:
                position = self.consumer.position(tp)
                logger.info(f"分割區 {tp.partition} 已定位到末尾,位置: {position}")

    def consume_with_manual_commit(self, topics: List[str],
                                   process_func,
                                   commit_interval: int = 100):
        """
        使用手動提交模式消費訊息

        Args:
            topics: 要訂閱的主題列表
            process_func: 訊息處理函式
            commit_interval: 每處理多少筆訊息提交一次偏移量
        """
        # 訂閱主題
        self.consumer.subscribe(topics)

        # 設定運行標誌
        self.running = True

        # 計數器
        message_count = 0

        # 儲存待提交的偏移量
        pending_offsets = {}

        logger.info(f"開始消費主題: {topics}")

        try:
            while self.running:
                # 輪詢取得訊息
                records = self.consumer.poll(timeout_ms=1000)

                for topic_partition, messages in records.items():
                    for message in messages:
                        # 處理訊息
                        try:
                            process_func(message)
                            message_count += 1

                            # 更新待提交的偏移量
                            pending_offsets[topic_partition] = message.offset + 1

                        except Exception as e:
                            logger.error(f"處理訊息時發生錯誤: {e}")
                            # 根據需求決定是否繼續處理

                # 達到提交間隔時提交偏移量
                if message_count >= commit_interval and pending_offsets:
                    # 轉換偏移量格式
                    offsets_to_commit = {
                        tp: offset for tp, offset in pending_offsets.items()
                    }

                    # 同步提交偏移量
                    self.consumer.commit(offsets_to_commit)
                    logger.info(f"已提交 {message_count} 筆訊息的偏移量")

                    # 重設計數器和待提交偏移量
                    message_count = 0
                    pending_offsets.clear()

        except KeyboardInterrupt:
            logger.info("收到中斷訊號")

        finally:
            # 提交最後的偏移量
            if pending_offsets:
                self.consumer.commit(pending_offsets)
                logger.info(f"最終提交 {message_count} 筆訊息的偏移量")

            self.consumer.close()
            logger.info("消費者已關閉")

    def stop(self):
        """
        停止消費者
        """
        self.running = False
        logger.info("已發送停止訊號")

    def close(self):
        """
        關閉所有資源
        """
        if self.consumer:
            self.consumer.close()
        if self.admin_client:
            self.admin_client.close()
        logger.info("偏移量管理器已關閉")

def print_consumer_lag_report(manager: KafkaOffsetManager, topic: str):
    """
    輸出消費者延遲報告
    """
    print(f"\n{'='*60}")
    print(f"消費者延遲報告 - 主題: {topic}")
    print(f"{'='*60}")

    # 取得延遲資訊
    lag = manager.calculate_consumer_lag(topic)

    # 取得最新偏移量
    end_offsets = manager.get_topic_end_offsets(topic)

    # 取得已提交偏移量
    committed = manager.get_consumer_group_offsets().get(topic, {})

    total_lag = 0

    for partition in sorted(lag.keys()):
        partition_lag = lag[partition]
        total_lag += partition_lag

        end_offset = end_offsets.get(partition, 0)
        committed_offset = committed.get(partition, 0)

        print(f"分割區 {partition}: "
              f"已提交={committed_offset}, "
              f"最新={end_offset}, "
              f"延遲={partition_lag}")

    print(f"{'='*60}")
    print(f"總延遲: {total_lag} 筆訊息")
    print(f"{'='*60}\n")

# 使用範例
if __name__ == "__main__":
    # 建立偏移量管理器
    manager = KafkaOffsetManager(
        bootstrap_servers="kafka-broker1:9092,kafka-broker2:9092",
        group_id="offset-manager-demo"
    )

    # 訂閱主題
    manager.consumer.subscribe(["orders"])

    # 執行一次輪詢以觸發分割區分配
    manager.consumer.poll(timeout_ms=1000)

    # 輸出延遲報告
    print_consumer_lag_report(manager, "orders")

    # 示範定位到 1 小時前
    manager.seek_to_hours_ago("orders", 1)

    # 定義訊息處理函式
    def process_message(message):
        logger.info(f"處理訊息: 主題={message.topic}, "
                   f"分割區={message.partition}, "
                   f"偏移量={message.offset}, "
                   f"鍵={message.key}, "
                   f"值={message.value}")

    # 開始消費
    try:
        manager.consume_with_manual_commit(
            topics=["orders"],
            process_func=process_message,
            commit_interval=50
        )
    finally:
        manager.close()

這個 Python 實作提供了完整的偏移量管理功能,包括查詢消費者群組的偏移量、計算消費者延遲、定位到特定時間戳以及手動提交偏移量。calculate_consumer_lag 方法可以幫助監控消費者的處理進度,識別可能的處理瓶頸。seek_to_timestamp 和 seek_to_hours_ago 方法提供了靈活的偏移量定位功能,適用於需要重新處理歷史資料的場景。

總結

Kafka 消費者的偏移量管理與退出機制是構建可靠串流處理應用程式的基礎。透過正確地實作 ConsumerRebalanceListener,應用程式可以在 Rebalance 過程中安全地保存處理進度,避免訊息遺失或重複處理。精確的偏移量控制功能讓應用程式能夠靈活地從特定位置恢復處理,滿足各種業務需求。優雅的退出機制確保消費者能夠正常地離開群組並提交最後的偏移量,維護系統的穩定性。

在實際的生產環境中,開發者應該根據應用程式的特性選擇適當的偏移量提交策略。對於對資料一致性要求較高的場景,應該使用手動提交並在處理完成後立即提交偏移量。對於對效能要求較高的場景,可以使用非同步提交並搭配批次處理來減少提交次數。無論選擇哪種策略,都應該確保在消費者關閉前提交最後的偏移量,以避免訊息重複處理。

反序列化技術的選擇也會影響應用程式的可維護性和效能。雖然自定義反序列化器提供了最大的靈活性,但使用 Avro 搭配 Schema Registry 可以大幅簡化開發工作並確保資料的相容性。AdminClient 提供了程式設計方式來管理 Kafka 資源,讓應用程式能夠動態地建立主題、查詢配置和管理消費者群組,這在開發自動化管理工具時非常有用。

透過本文介紹的技術和最佳實踐,開發者可以構建出健壯、可靠且高效能的 Kafka 消費者應用程式,有效地處理大規模的串流資料。