返回文章列表

Kafka 消費者群組管理技巧

本文探討 Kafka 消費者群組的管理技巧,包含如何使用 AdminClient 檢視群組資訊、計算消費者延遲、重設偏移量,以及不乾淨的長官者選舉和副本重新分配等進階操作。文章提供 Java 程式碼範例,詳細說明每個操作步驟和注意事項,幫助管理員有效管理 Kafka 叢集並確保其穩定性。

Kafka 系統管理

Kafka 消費者群組管理是確保 Kafka 叢集穩定執行的關鍵環節。透過 AdminClient,管理員可以有效地監控和控制消費者群組的行為。本文將介紹如何使用 AdminClient 檢視群組資訊、計算消費者延遲、重設偏移量,以及一些進階管理技巧,例如不乾淨的長官者選舉和副本重新分配。這些技巧可以幫助管理員更好地理解和管理 Kafka 叢集,進而提升系統的效能和穩定性。瞭解如何使用 AdminClient 的各種方法,例如 listConsumerGroups()describeConsumerGroups()listConsumerGroupOffsets() 等,對於有效管理 Kafka 消費者群組至關重要。此外,文章也涵蓋瞭如何計算消費者群組的延遲,這對於監控系統健康狀況和及時發現問題非常有幫助。

管理 Kafka 消費者群組

在 Kafka 中,消費者群組是用於管理消費者的邏輯單位。管理員可以透過 AdminClient 來檢視和修改消費者群組的資訊。

檢視消費者群組

首先,我們可以使用 admin.listConsumerGroups() 方法來列出所有的消費者群組。

admin.listConsumerGroups().valid().get().forEach(System.out::println);

內容解密:

  • admin.listConsumerGroups():列出所有消費者群組。
  • valid():只傳回有效的消費者群組,忽略錯誤。
  • get():取得結果。
  • forEach(System.out::println):列印每個有效的消費者群組。

注意,使用 valid() 方法會忽略錯誤,如果需要取得錯誤資訊,可以使用 errors() 方法。

描述消費者群組

如果需要更詳細的資訊,可以使用 describeConsumerGroups() 方法。

ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST).describedGroups().get(CONSUMER_GROUP).get();
System.out.println("Description of group " + CONSUMER_GROUP + ":" + groupDescription);

內容解密:

  • admin.describeConsumerGroups(CONSUMER_GRP_LIST):描述指定的消費者群組。
  • describedGroups().get(CONSUMER_GROUP).get():取得指定消費者群組的描述。
  • groupDescription:包含消費者群組的詳細資訊,如成員、分配的分割槽、協調器等。

取得消費者群組的偏移量

我們可以使用 listConsumerGroupOffsets() 方法來取得消費者群組的已提交偏移量。

Map<TopicPartition, OffsetAndMetadata> offsets = admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();

內容解密:

  • admin.listConsumerGroupOffsets(CONSUMER_GROUP):取得指定消費者群組的已提交偏移量。
  • partitionsToOffsetAndMetadata().get():取得主題分割槽和偏移量的對映。

計算消費者群組的延遲

要計算消費者群組的延遲,需要取得最新的偏移量並與已提交的偏移量進行比較。

Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
for(TopicPartition tp: offsets.keySet()) {
    requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
    String topic = e.getKey().topic();
    int partition = e.getKey().partition();
    long committedOffset = e.getValue().offset();
    long latestOffset = latestOffsets.get(e.getKey()).offset();
    System.out.println("Consumer group " + CONSUMER_GROUP + " has committed offset " + committedOffset + " to topic " + topic + " partition " + partition + ". The latest offset in the partition is " + latestOffset + " so consumer group is " + (latestOffset - committedOffset) + " records behind");
}

內容解密:

  1. 建立一個對映 requestLatestOffsets 以請求最新的偏移量。
  2. 使用 admin.listOffsets(requestLatestOffsets).all().get() 取得最新的偏移量。
  3. 遍歷已提交的偏移量,並計算與最新偏移量的差異,即延遲。

修改消費者群組

AdminClient 也提供了修改消費者群組的方法,如刪除群組、刪除已提交的偏移量和修改偏移量等。

重設消費者群組的偏移量

重設偏移量是一種常見的操作,可以讓消費者群組從頭開始處理訊息。

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets = admin.listOffsets(requestEarliestOffsets).all().get();
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e: earliestOffsets.entrySet()) {
    resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset()));
}
try {
    admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get();
} catch (ExecutionException e) {
    System.out.println("Failed to update the offsets committed by group " + CONSUMER_GROUP + " with error " + e.getMessage());
    if (e.getCause() instanceof UnknownMemberIdException)

內容解密:

  1. 使用 admin.listOffsets(requestEarliestOffsets).all().get() 取得最早的偏移量。
  2. 建立一個新的對映 resetOffsets 以包含最早的偏移量。
  3. 使用 admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get() 重設消費者群組的偏移量。
  4. 捕捉可能的異常,如 UnknownMemberIdException

注意,在重設偏移量之前,需要確保消費者群組不活躍,以避免衝突。重設偏移量可能會影響到應用程式的狀態,因此需要謹慎操作。

Kafka 管理操作深度解析

在 Kafka 的日常管理與維護中,管理員經常需要執行一些進階操作以確保叢集的穩定性與效能。這些操作包括重設消費者群組、查詢叢集後設資料、新增主題分割區、刪除主題中的舊記錄以及手動觸發 Leader 選舉等。本文將探討這些操作的實作方法及其背後的技術原理。

重設消費者群組偏移量

當需要重設消費者群組的偏移量時,例如將偏移量重置到最早或最新的偏移量,我們需要使用 AdminClient 提供的 listOffsetsalterConsumerGroupOffsets 方法。以下是一個重置消費者群組偏移量到最早偏移量的範例:

// 取得最早的偏移量
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets = 
    admin.listOffsets(requestEarliestOffsets).all().get();

// 將 ListOffsetsResultInfo 轉換為 OffsetAndMetadata
Map<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e : 
     earliestOffsets.entrySet()) {
    newOffsets.put(e.getKey(), 
                   new OffsetAndMetadata(e.getValue().offset()));
}

// 重設消費者群組的偏移量
admin.alterConsumerGroupOffsets(CONSUMER_GROUP_ID, newOffsets).all().get();

內容解密:

  1. 取得最早偏移量:使用 listOffsets 方法取得指定主題分割區的最早偏移量。
  2. 轉換資料結構:將 ListOffsetsResult.ListOffsetsResultInfo 轉換為 OffsetAndMetadata,因為 alterConsumerGroupOffsets 方法需要後者的資料格式。
  3. 重設偏移量:呼叫 alterConsumerGroupOffsets 方法將消費者群組的偏移量重置為指定的偏移量。

查詢叢集後設資料

有時我們需要查詢 Kafka 叢集的後設資料,例如叢集 ID、Broker 列表和 Controller 資訊。以下是一個簡單的範例:

DescribeClusterResult cluster = admin.describeCluster();
System.out.println("Connected to cluster " + cluster.clusterId().get());
System.out.println("The brokers in the cluster are:");
cluster.nodes().get().forEach(node -> System.out.println(" * " + node));
System.out.println("The controller is: " + cluster.controller().get());

內容解密:

  1. 查詢叢集資訊:使用 describeCluster 方法取得叢集的詳細資訊。
  2. 列印叢集 ID:輸出叢集的唯一 ID。
  3. 列出所有 Broker:遍歷並輸出叢集中所有的 Broker 節點。
  4. 輸出 Controller 資訊:顯示當前叢集的 Controller 節點。

新增主題分割區

當主題的吞吐量需求增加時,可能需要擴充套件主題的分割區數量。以下是一個範例:

Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS + 2));
admin.createPartitions(newPartitions).all().get();

內容解密:

  1. 建立分割區變更請求:使用 NewPartitions.increaseTo 方法指定主題的新分割區數量。
  2. 執行分割區擴充套件:呼叫 createPartitions 方法對指定主題進行分割區擴充套件。

刪除主題中的舊記錄

為了滿足資料保留政策,可能需要刪除主題中舊的記錄。以下是一個範例:

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets = 
    admin.listOffsets(requestOlderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e : 
     olderOffsets.entrySet()) {
    recordsToDelete.put(e.getKey(), 
                        RecordsToDelete.beforeOffset(e.getValue().offset()));
}
admin.deleteRecords(recordsToDelete).all().get();

內容解密:

  1. 查詢舊記錄的偏移量:使用 listOffsets 方法查詢舊記錄的偏移量。
  2. 建立刪除請求:根據查詢結果建立刪除請求,指定要刪除的記錄範圍。
  3. 執行刪除操作:呼叫 deleteRecords 方法刪除指定的舊記錄。

手動觸發 Leader 選舉

在某些情況下,可能需要手動觸發 Leader 選舉,以確保分割區的 Leader 分佈更均衡。以下是一個範例:

// 手動觸發 Preferred Leader 選舉
admin.electLeaders(ElectionType.PREFERRED, topicPartitions);

內容解密:

  1. 選擇選舉型別:選擇 PREFERRED 型別的 Leader 選舉,以讓 Kafka 自動選擇最優的 Leader。
  2. 執行 Leader 選舉:呼叫 electLeaders 方法手動觸發 Leader 選舉。

Apache Kafka 管理操作

不乾淨的長官者選舉(Unclean Leader Election)

在 Apache Kafka 中,當某個分割槽的主副本(leader replica)不可用,而其他副本因資料缺失等原因無法成為新的長官者時,該分割槽將變得不可用。為瞭解決這個問題,可以觸發不乾淨的長官者選舉(unclean leader election),即選擇一個不符合長官者資格的副本作為新的長官者。這種做法將導致資料丟失,因為寫入舊長官者但尚未複製到新長官者的事件將會遺失。

使用 electLeaders() 方法

electLeaders() 方法可以用於觸發不乾淨的長官者選舉。該方法是非同步的,這意味著即使它成功傳回,也需要一段時間才能讓所有代理(broker)意識到新的狀態,並且對 describeTopics() 的呼叫可能會傳回不一致的結果。

Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {
    admin.electLeaders(ElectionType.PREFERRED, electableTopics).all().get();
} catch (ExecutionException e) {
    if (e.getCause() instanceof ElectionNotNeededException) {
        System.out.println("All leaders are preferred already");
    }
}

內容解密:

  1. electLeaders() 方法的使用:該方法用於觸發長官者選舉,可以指定特定的分割槽或主題。
  2. 非同步操作的影響:由於 electLeaders() 是非同步的,呼叫後需要等待所有代理更新狀態,因此可能出現短暫的不一致。
  3. 選舉型別的選擇:範例中使用了 ElectionType.PREFERRED,這表示優先選擇首選副本(preferred replica)作為長官者。

重新分配副本(Reassigning Replicas)

在某些情況下,您可能需要調整副本的位置,例如為了平衡負載、隔離高流量主題或移除某個代理。alterPartitionReassignments 提供了對分割槽副本位置的精細控制。

使用 alterPartitionReassignments() 方法

Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment = new HashMap<>();
reassignment.put(new TopicPartition(TOPIC_NAME, 0), Optional.of(new NewPartitionReassignment(Arrays.asList(0,1))));
reassignment.put(new TopicPartition(TOPIC_NAME, 1), Optional.of(new NewPartitionReassignment(Arrays.asList(1))));
reassignment.put(new TopicPartition(TOPIC_NAME, 2), Optional.of(new NewPartitionReassignment(Arrays.asList(1,0))));
reassignment.put(new TopicPartition(TOPIC_NAME, 3), Optional.empty());
admin.alterPartitionReassignments(reassignment).all().get();
System.out.println("currently reassigning: " + admin.listPartitionReassignments().reassignments().get());

內容解密:

  1. alterPartitionReassignments() 的使用:該方法允許您重新分配分割槽的副本,可以增加或移動副本。
  2. 副本分配策略:範例中展示了不同的分配策略,例如為分割槽 0 增加一個副本並將其放在新的代理上。
  3. 取消重新分配:對於分割槽 3,透過傳遞 Optional.empty(),可以取消正在進行的重新分配操作。

測試(Testing)

Apache Kafka 提供了 MockAdminClient 類別,用於在不實際執行 Kafka 叢集的情況下測試應用程式的行為。

使用 MockAdminClient

public TopicCreator(AdminClient admin) {
    this.admin = admin;
}

public void maybeCreateTopic(String topicName) throws ExecutionException, InterruptedException {
    Collection<NewTopic> topics = new ArrayList<>();
    topics.add(new NewTopic(topicName, 1, (short) 1));
    if (topicName.toLowerCase().startsWith("test")) {
        admin.createTopics(topics);
        // alter configs just to demonstrate a point
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        // ...
    }
}

內容解密:

  1. MockAdminClient 的作用:模擬 AdminClient 的行為,用於測試目的。
  2. 測試主題建立:範例展示瞭如何使用 MockAdminClient 測試建立主題的功能。
  3. 注意事項:雖然 MockAdminClient 很方便,但它不是 Kafka API 的一部分,因此可能會在未經通知的情況下更改。