Kafka 消費者群組管理是確保 Kafka 叢集穩定執行的關鍵環節。透過 AdminClient,管理員可以有效地監控和控制消費者群組的行為。本文將介紹如何使用 AdminClient 檢視群組資訊、計算消費者延遲、重設偏移量,以及一些進階管理技巧,例如不乾淨的長官者選舉和副本重新分配。這些技巧可以幫助管理員更好地理解和管理 Kafka 叢集,進而提升系統的效能和穩定性。瞭解如何使用 AdminClient 的各種方法,例如 listConsumerGroups()、describeConsumerGroups()、listConsumerGroupOffsets() 等,對於有效管理 Kafka 消費者群組至關重要。此外,文章也涵蓋瞭如何計算消費者群組的延遲,這對於監控系統健康狀況和及時發現問題非常有幫助。
管理 Kafka 消費者群組
在 Kafka 中,消費者群組是用於管理消費者的邏輯單位。管理員可以透過 AdminClient 來檢視和修改消費者群組的資訊。
檢視消費者群組
首先,我們可以使用 admin.listConsumerGroups() 方法來列出所有的消費者群組。
admin.listConsumerGroups().valid().get().forEach(System.out::println);
內容解密:
admin.listConsumerGroups():列出所有消費者群組。valid():只傳回有效的消費者群組,忽略錯誤。get():取得結果。forEach(System.out::println):列印每個有效的消費者群組。
注意,使用 valid() 方法會忽略錯誤,如果需要取得錯誤資訊,可以使用 errors() 方法。
描述消費者群組
如果需要更詳細的資訊,可以使用 describeConsumerGroups() 方法。
ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST).describedGroups().get(CONSUMER_GROUP).get();
System.out.println("Description of group " + CONSUMER_GROUP + ":" + groupDescription);
內容解密:
admin.describeConsumerGroups(CONSUMER_GRP_LIST):描述指定的消費者群組。describedGroups().get(CONSUMER_GROUP).get():取得指定消費者群組的描述。groupDescription:包含消費者群組的詳細資訊,如成員、分配的分割槽、協調器等。
取得消費者群組的偏移量
我們可以使用 listConsumerGroupOffsets() 方法來取得消費者群組的已提交偏移量。
Map<TopicPartition, OffsetAndMetadata> offsets = admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
內容解密:
admin.listConsumerGroupOffsets(CONSUMER_GROUP):取得指定消費者群組的已提交偏移量。partitionsToOffsetAndMetadata().get():取得主題分割槽和偏移量的對映。
計算消費者群組的延遲
要計算消費者群組的延遲,需要取得最新的偏移量並與已提交的偏移量進行比較。
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
String topic = e.getKey().topic();
int partition = e.getKey().partition();
long committedOffset = e.getValue().offset();
long latestOffset = latestOffsets.get(e.getKey()).offset();
System.out.println("Consumer group " + CONSUMER_GROUP + " has committed offset " + committedOffset + " to topic " + topic + " partition " + partition + ". The latest offset in the partition is " + latestOffset + " so consumer group is " + (latestOffset - committedOffset) + " records behind");
}
內容解密:
- 建立一個對映
requestLatestOffsets以請求最新的偏移量。 - 使用
admin.listOffsets(requestLatestOffsets).all().get()取得最新的偏移量。 - 遍歷已提交的偏移量,並計算與最新偏移量的差異,即延遲。
修改消費者群組
AdminClient 也提供了修改消費者群組的方法,如刪除群組、刪除已提交的偏移量和修改偏移量等。
重設消費者群組的偏移量
重設偏移量是一種常見的操作,可以讓消費者群組從頭開始處理訊息。
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets = admin.listOffsets(requestEarliestOffsets).all().get();
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e: earliestOffsets.entrySet()) {
resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset()));
}
try {
admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get();
} catch (ExecutionException e) {
System.out.println("Failed to update the offsets committed by group " + CONSUMER_GROUP + " with error " + e.getMessage());
if (e.getCause() instanceof UnknownMemberIdException)
內容解密:
- 使用
admin.listOffsets(requestEarliestOffsets).all().get()取得最早的偏移量。 - 建立一個新的對映
resetOffsets以包含最早的偏移量。 - 使用
admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get()重設消費者群組的偏移量。 - 捕捉可能的異常,如
UnknownMemberIdException。
注意,在重設偏移量之前,需要確保消費者群組不活躍,以避免衝突。重設偏移量可能會影響到應用程式的狀態,因此需要謹慎操作。
Kafka 管理操作深度解析
在 Kafka 的日常管理與維護中,管理員經常需要執行一些進階操作以確保叢集的穩定性與效能。這些操作包括重設消費者群組、查詢叢集後設資料、新增主題分割區、刪除主題中的舊記錄以及手動觸發 Leader 選舉等。本文將探討這些操作的實作方法及其背後的技術原理。
重設消費者群組偏移量
當需要重設消費者群組的偏移量時,例如將偏移量重置到最早或最新的偏移量,我們需要使用 AdminClient 提供的 listOffsets 和 alterConsumerGroupOffsets 方法。以下是一個重置消費者群組偏移量到最早偏移量的範例:
// 取得最早的偏移量
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =
admin.listOffsets(requestEarliestOffsets).all().get();
// 將 ListOffsetsResultInfo 轉換為 OffsetAndMetadata
Map<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e :
earliestOffsets.entrySet()) {
newOffsets.put(e.getKey(),
new OffsetAndMetadata(e.getValue().offset()));
}
// 重設消費者群組的偏移量
admin.alterConsumerGroupOffsets(CONSUMER_GROUP_ID, newOffsets).all().get();
內容解密:
- 取得最早偏移量:使用
listOffsets方法取得指定主題分割區的最早偏移量。 - 轉換資料結構:將
ListOffsetsResult.ListOffsetsResultInfo轉換為OffsetAndMetadata,因為alterConsumerGroupOffsets方法需要後者的資料格式。 - 重設偏移量:呼叫
alterConsumerGroupOffsets方法將消費者群組的偏移量重置為指定的偏移量。
查詢叢集後設資料
有時我們需要查詢 Kafka 叢集的後設資料,例如叢集 ID、Broker 列表和 Controller 資訊。以下是一個簡單的範例:
DescribeClusterResult cluster = admin.describeCluster();
System.out.println("Connected to cluster " + cluster.clusterId().get());
System.out.println("The brokers in the cluster are:");
cluster.nodes().get().forEach(node -> System.out.println(" * " + node));
System.out.println("The controller is: " + cluster.controller().get());
內容解密:
- 查詢叢集資訊:使用
describeCluster方法取得叢集的詳細資訊。 - 列印叢集 ID:輸出叢集的唯一 ID。
- 列出所有 Broker:遍歷並輸出叢集中所有的 Broker 節點。
- 輸出 Controller 資訊:顯示當前叢集的 Controller 節點。
新增主題分割區
當主題的吞吐量需求增加時,可能需要擴充套件主題的分割區數量。以下是一個範例:
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS + 2));
admin.createPartitions(newPartitions).all().get();
內容解密:
- 建立分割區變更請求:使用
NewPartitions.increaseTo方法指定主題的新分割區數量。 - 執行分割區擴充套件:呼叫
createPartitions方法對指定主題進行分割區擴充套件。
刪除主題中的舊記錄
為了滿足資料保留政策,可能需要刪除主題中舊的記錄。以下是一個範例:
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =
admin.listOffsets(requestOlderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e :
olderOffsets.entrySet()) {
recordsToDelete.put(e.getKey(),
RecordsToDelete.beforeOffset(e.getValue().offset()));
}
admin.deleteRecords(recordsToDelete).all().get();
內容解密:
- 查詢舊記錄的偏移量:使用
listOffsets方法查詢舊記錄的偏移量。 - 建立刪除請求:根據查詢結果建立刪除請求,指定要刪除的記錄範圍。
- 執行刪除操作:呼叫
deleteRecords方法刪除指定的舊記錄。
手動觸發 Leader 選舉
在某些情況下,可能需要手動觸發 Leader 選舉,以確保分割區的 Leader 分佈更均衡。以下是一個範例:
// 手動觸發 Preferred Leader 選舉
admin.electLeaders(ElectionType.PREFERRED, topicPartitions);
內容解密:
- 選擇選舉型別:選擇
PREFERRED型別的 Leader 選舉,以讓 Kafka 自動選擇最優的 Leader。 - 執行 Leader 選舉:呼叫
electLeaders方法手動觸發 Leader 選舉。
Apache Kafka 管理操作
不乾淨的長官者選舉(Unclean Leader Election)
在 Apache Kafka 中,當某個分割槽的主副本(leader replica)不可用,而其他副本因資料缺失等原因無法成為新的長官者時,該分割槽將變得不可用。為瞭解決這個問題,可以觸發不乾淨的長官者選舉(unclean leader election),即選擇一個不符合長官者資格的副本作為新的長官者。這種做法將導致資料丟失,因為寫入舊長官者但尚未複製到新長官者的事件將會遺失。
使用 electLeaders() 方法
electLeaders() 方法可以用於觸發不乾淨的長官者選舉。該方法是非同步的,這意味著即使它成功傳回,也需要一段時間才能讓所有代理(broker)意識到新的狀態,並且對 describeTopics() 的呼叫可能會傳回不一致的結果。
Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {
admin.electLeaders(ElectionType.PREFERRED, electableTopics).all().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof ElectionNotNeededException) {
System.out.println("All leaders are preferred already");
}
}
內容解密:
electLeaders()方法的使用:該方法用於觸發長官者選舉,可以指定特定的分割槽或主題。- 非同步操作的影響:由於
electLeaders()是非同步的,呼叫後需要等待所有代理更新狀態,因此可能出現短暫的不一致。 - 選舉型別的選擇:範例中使用了
ElectionType.PREFERRED,這表示優先選擇首選副本(preferred replica)作為長官者。
重新分配副本(Reassigning Replicas)
在某些情況下,您可能需要調整副本的位置,例如為了平衡負載、隔離高流量主題或移除某個代理。alterPartitionReassignments 提供了對分割槽副本位置的精細控制。
使用 alterPartitionReassignments() 方法
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment = new HashMap<>();
reassignment.put(new TopicPartition(TOPIC_NAME, 0), Optional.of(new NewPartitionReassignment(Arrays.asList(0,1))));
reassignment.put(new TopicPartition(TOPIC_NAME, 1), Optional.of(new NewPartitionReassignment(Arrays.asList(1))));
reassignment.put(new TopicPartition(TOPIC_NAME, 2), Optional.of(new NewPartitionReassignment(Arrays.asList(1,0))));
reassignment.put(new TopicPartition(TOPIC_NAME, 3), Optional.empty());
admin.alterPartitionReassignments(reassignment).all().get();
System.out.println("currently reassigning: " + admin.listPartitionReassignments().reassignments().get());
內容解密:
alterPartitionReassignments()的使用:該方法允許您重新分配分割槽的副本,可以增加或移動副本。- 副本分配策略:範例中展示了不同的分配策略,例如為分割槽 0 增加一個副本並將其放在新的代理上。
- 取消重新分配:對於分割槽 3,透過傳遞
Optional.empty(),可以取消正在進行的重新分配操作。
測試(Testing)
Apache Kafka 提供了 MockAdminClient 類別,用於在不實際執行 Kafka 叢集的情況下測試應用程式的行為。
使用 MockAdminClient
public TopicCreator(AdminClient admin) {
this.admin = admin;
}
public void maybeCreateTopic(String topicName) throws ExecutionException, InterruptedException {
Collection<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(topicName, 1, (short) 1));
if (topicName.toLowerCase().startsWith("test")) {
admin.createTopics(topics);
// alter configs just to demonstrate a point
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
// ...
}
}
內容解密:
MockAdminClient的作用:模擬AdminClient的行為,用於測試目的。- 測試主題建立:範例展示瞭如何使用
MockAdminClient測試建立主題的功能。 - 注意事項:雖然
MockAdminClient很方便,但它不是 Kafka API 的一部分,因此可能會在未經通知的情況下更改。