Kafka 的 AdminClient 提供了管理和監控 Kafka 叢集的有效途徑,讓開發者能以程式化的方式操作 Kafka。其非同步且最終一致的 API 設計,兼顧了效率和一致性。理解 AdminClient 的核心概念,包含非同步操作和最終一致性,對於正確使用至關重要。透過 AdminClient,開發者可以執行各種管理任務,例如建立和刪除主題、管理消費者群組以及調整叢集組態,從而提升 Kafka 應用程式的可靠性和可維護性。深入瞭解 AdminClient 的生命週期管理、組態引數設定,以及各項功能的應用,能有效提升 Kafka 叢集的管理效率。
Apache Kafka AdminClient 深度解析與應用
Apache Kafka 的 AdminClient 提供了一個強大的工具,用於管理和監控 Kafka 叢集。透過 AdminClient,開發者可以以程式設計的方式執行各種管理任務,例如建立和管理主題、消費者群組,以及組態實體等。在本章中,我們將探討 AdminClient 的核心設計原則、使用方法,以及如何在應用程式中有效地利用它。
AdminClient 概覽
在開始使用 Kafka AdminClient 之前,瞭解其核心設計原則是非常重要的。AdminClient 的設計旨在提供一個非同步且最終一致的 API,這使得它能夠高效地管理和監控 Kafka 叢集。
非同步且最終一致的 API
Kafka 的 AdminClient 是一個非同步 API。每次方法呼叫都會立即傳回,並帶有一個或多個 Future 物件。這些 Future 物件代表了非同步操作的結果,並提供了檢查操作狀態、取消操作、等待操作完成等方法。AdminClient 將這些 Future 物件封裝在 Result 物件中,後者提供了等待操作完成和執行後續操作的輔助方法。
例如,KafkaAdminClient.createTopics 方法傳回一個 CreateTopicsResult 物件,該物件允許你等待所有主題建立完成、檢查每個主題的狀態,並在建立後檢索特定主題的組態。
由於 Kafka 的元資料從控制器傳播到代理是非同步的,因此當控制器狀態完全更新時,AdminClient API 傳回的 Futures 被視為已完成。此時,並非所有代理都可能知道新的狀態,因此 listTopics 請求可能會被尚未更新的代理處理,從而導致傳回的主題列表不包含最近建立的主題。這種特性也被稱為最終一致性:最終每個代理都會知道每個主題,但無法保證何時會發生。
AdminClient 的使用方法
組態和管理主題
AdminClient 提供了豐富的方法來管理 Kafka 主題。例如,你可以使用 createTopics 方法來建立新主題,或使用 deleteTopics 方法來刪除現有主題。
// 建立新主題
NewTopic newTopic = new NewTopic("my_topic", 1, (short) 1);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
// 等待主題建立完成
result.all().get();
#### 內容解密:
NewTopic物件用於定義新主題的屬性,例如主題名稱、分割區數量和複製因子。createTopics方法接受一個NewTopic物件的集合,並非同步地建立這些主題。CreateTopicsResult物件提供了等待所有主題建立完成的方法。
管理消費者群組
除了主題管理之外,AdminClient 還允許你管理消費者群組。你可以使用 listConsumerGroups 方法來列出所有消費者群組,或使用 describeConsumerGroups 方法來檢索特定群組的詳細資訊。
// 列出所有消費者群組
ListConsumerGroupsResult result = admin.listConsumerGroups();
// 取得消費者群組列表
List<ConsumerGroupListing> groups = result.all().get();
#### 內容解密:
listConsumerGroups方法用於列出叢集中的所有消費者群組。ListConsumerGroupsResult物件提供了取得消費者群組列表的方法。
AdminClient 的生命週期管理
要使用 Kafka 的 AdminClient,首先需要建立一個 AdminClient 例項。這可以透過呼叫 AdminClient.create 方法並傳入組態屬性來實作。組態屬性中必須包含 Kafka 叢集的 URI,即用於連線的代理列表。
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// 使用 AdminClient 執行管理任務
// 關閉 AdminClient
admin.close(Duration.ofSeconds(30));
#### 內容解密:
AdminClient.create方法用於建立一個新的AdminClient例項。- 組態屬性中的
BOOTSTRAP_SERVERS_CONFIG指定了 Kafka 叢集的 URI。 - 使用完畢後,應呼叫
close方法來關閉AdminClient,並指定一個超時時間以確保資源被正確釋放。
管理 Kafka 的 AdminClient 應用
在 Kafka 的管理中,AdminClient 是一個非常重要的工具,它允許開發者對 Kafka 叢集進行多種管理操作,包括主題(Topic)的管理、組態的變更等。為了確保 AdminClient 的高效運作和安全性,需要對其進行適當的組態。
組態 AdminClient
AdminClient 的組態相較於 KafkaProducer 和 KafkaConsumer 來說要簡單得多,但仍有一些重要的組態引數需要了解。
client.dns.lookup
在 Apache Kafka 2.1.0 版本中引入了 client.dns.lookup 組態。預設情況下,Kafka 會根據提供的 bootstrap server 組態中的主機名來驗證、解析和建立連線。這種簡單的模式在大多數情況下都能正常運作,但在某些特定的使用場景下可能會遇到問題,例如使用 DNS 別名或單一 DNS 對應到多個 IP 地址的情況。
使用 DNS 別名
假設有多個 broker,它們的命名規則是 broker1.hostname.com、broker2.hostname.com 等。為了簡化組態,可以建立一個 DNS 別名 all-brokers.hostname.com 對應到所有的 broker。如果使用 SASL 認證,由於客戶端會嘗試認證 all-brokers.hostname.com,而伺服器端的主體是 broker2.hostname.com,如果名稱不比對,SASL 將拒絕認證,導致連線失敗。在這種情況下,可以設定 client.dns.lookup=resolve_canonical_bootstrap_servers_only,使客戶端能夠“展開”DNS 別名,實作與直接列出所有 broker 相同的效果。
DNS 名稱對應到多個 IP 地址
在現代網路架構中,常見將所有 broker 置於代理或負載平衡器後面,尤其是在使用 Kubernetes 的環境中。這種情況下,為了避免負載平衡器成為單點故障,通常會將 broker1.hostname.com 對應到多個 IP 地址,這些 IP 地址可能會隨時間變化。預設情況下,Kafka 客戶端只會嘗試連線到主機名解析出的第一個 IP 地址,這意味著如果該 IP 地址不可用,客戶端將無法連線,即使 broker 是可用的。建議設定 client.dns.lookup=use_all_dns_ips,以確保客戶端能夠充分利用高可用性的負載平衡層。
request.timeout.ms
此組態限制了應用程式等待 AdminClient 回應的時間,包括重試的時間。預設值為 120 秒,對於某些操作(如消費者群組管理命令)來說可能需要較長時間才能回應。如果某個 AdminClient 操作對應用程式至關重要,可以使用較低的超時值,並以不同的方式處理 Kafka 未能及時回應的情況。
主題管理
AdminClient 最常見的用途之一是主題管理,包括列出主題、描述主題、建立主題和刪除主題。
列出所有主題
ListTopicsResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);
檢查主題是否存在並建立主題
檢查特定主題是否存在的一種方法是取得所有主題的列表,然後檢查所需主題是否在列表中。但在大型叢集中,這種方法可能效率較低。另一種方法是直接檢查主題是否存在,並確保它具有正確的分割槽數和副本數。
// 檢查主題是否存在,建立主題的邏輯
內容解密:
ListTopicsResult topics = admin.listTopics();:此行程式碼呼叫 AdminClient 的listTopics()方法來取得叢集中所有主題的列表,並將結果儲存在topics物件中。topics.names().get().forEach(System.out::println);:這行程式碼首先呼叫names()方法取得主題名稱的 Future 物件,然後呼叫get()方法等待伺服器回應,最後使用forEach遍歷並列印每個主題的名稱。client.dns.lookup組態的作用:此組態用於解決使用 DNS 別名或單一 DNS 對應到多個 IP 地址時的連線問題,能夠提高連線的靈活性和可靠性。request.timeout.ms組態的重要性:此組態允許開發者控制應用程式等待 AdminClient 操作回應的時間,能夠根據具體需求調整超時值,以最佳化應用程式的行為。
Kafka 主題管理:建立、描述與刪除
在 Kafka 中,AdminClient 是一個強大的工具,用於管理 Kafka 叢集中的主題、組態和其他資源。本章節將探討如何使用 AdminClient 來建立、描述和刪除 Kafka 主題。
描述主題
首先,我們來看看如何描述一個已存在的主題。以下是一個簡單的例子:
DescribeTopicsResult demoTopic = admin.describeTopics(TOPIC_LIST);
try {
TopicDescription topicDescription = demoTopic.values().get(TOPIC_NAME).get();
System.out.println("Description of demo topic: " + topicDescription);
if (topicDescription.partitions().size() != NUM_PARTITIONS) {
System.out.println("Topic has wrong number of partitions. Exiting.");
System.exit(-1);
}
} catch (ExecutionException e) {
// exit early for almost all exceptions
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
e.printStackTrace();
throw e;
}
// 如果我們在這裡,主題不存在
System.out.println("Topic " + TOPIC_NAME + " does not exist. Going to create it now");
// 建立新主題
}
內容解密:
DescribeTopicsResult:這個類別用於描述一個或多個主題的詳細資訊。demoTopic.values().get(TOPIC_NAME).get():這行程式碼等待Future完成並取得主題描述。如果主題不存在,會丟出ExecutionException。topicDescription.partitions().size():檢查主題的分割槽數量是否符合預期。
建立主題
如果主題不存在,我們可以建立一個新的主題:
CreateTopicsResult newTopic = admin.createTopics(Collections.singletonList(
new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR)));
// 檢查主題是否正確建立
if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) {
System.out.println("Topic has wrong number of partitions.");
System.exit(-1);
}
內容解密:
NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR):建立一個新的主題,指定名稱、分割槽數量和副本因子。admin.createTopics():非同步建立主題,傳回CreateTopicsResult物件。newTopic.numPartitions(TOPIC_NAME).get():檢查建立的主題分割槽數量是否正確。
刪除主題
刪除主題的操作如下:
admin.deleteTopics(TOPIC_LIST).all().get();
// 檢查主題是否已刪除
try {
topicDescription = demoTopic.values().get(TOPIC_NAME).get();
System.out.println("Topic " + TOPIC_NAME + " is still around");
} catch (ExecutionException e) {
System.out.println("Topic " + TOPIC_NAME + " is gone");
}
內容解密:
admin.deleteTopics(TOPIC_LIST):刪除指定的主題列表。.all().get():等待刪除操作完成。- 檢查主題是否存在:透過嘗試描述主題來驗證是否已刪除。
使用 KafkaFuture 處理非同步操作
在處理大量管理請求時,阻塞式的 get() 呼叫可能不是最佳選擇。這時,可以利用 KafkaFuture 的非同步特性:
vertx.createHttpServer().requestHandler(request -> {
String topic = request.getParam("topic");
String timeout = request.getParam("timeout");
int timeoutMs = NumberUtils.toInt(timeout, 1000);
DescribeTopicsResult demoTopic = admin.describeTopics(
Collections.singletonList(topic),
new DescribeTopicsOptions().timeoutMs(timeoutMs));
demoTopic.values().get(topic).whenComplete(
new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {
@Override
public void accept(final TopicDescription topicDescription,
final Throwable throwable) {
if (throwable != null) {
request.response().end("Error trying to describe topic "
+ topic + " due to " + throwable.getMessage());
} else {
request.response().end(topicDescription.toString());
}
}
});
}).listen(8080);
內容解密:
- 使用 Vert.x 建立 HTTP 伺服器:當收到請求時,呼叫
AdminClient描述主題。 whenComplete():非同步處理Future結果,當結果可用或出現異常時觸發回撥。
本章節展示瞭如何使用 AdminClient 進行 Kafka 主題的管理,包括建立、描述和刪除主題。這些操作對於維護和管理 Kafka 叢集至關重要。透過瞭解這些 API 的使用,可以更有效地管理和監控 Kafka 環境。
使用AdminClient進行Kafka管理
在前面的章節中,我們討論瞭如何使用Kafka的AdminClient來建立和管理主題。在本章中,我們將繼續探討AdminClient的其他功能,包括組態管理和消費者組管理。
組態管理
組態管理是透過描述和更新ConfigResource的集合來完成的。ConfigResource可以是代理、代理日誌和主題。檢查和修改代理和代理日誌組態通常是使用像kafka-config.sh這樣的工具或其他Kafka管理工具來完成的,但是從使用它們的應用程式中檢查和更新主題組態是非常常見的。
檢查和更新主題組態
例如,許多應用程式依賴於壓縮主題來正確執行。定期(比預設保留期限更頻繁,只是為了安全)檢查主題是否確實被壓縮,如果不是,則採取措施糾正主題組態是有意義的。
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
DescribeConfigsResult configsResult = admin.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);
// 列印非預設組態
configs.entries().stream().filter(entry -> !entry.isDefault()).forEach(System.out::println);
// 檢查主題是否被壓縮
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
if (!configs.entries().contains(compaction)) {
// 如果主題未被壓縮,則壓縮它
Collection<AlterConfigOp> configOp = new ArrayList<>();
configOp.add(new AlterConfigOp(compensation, AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
alterConf.put(configResource, configOp);
admin.incrementalAlterConfigs(alterConf).all().get();
} else {
System.out.println("主題 " + TOPIC_NAME + " 是壓縮主題");
}
組態修改操作
要修改組態,需要指定要修改的ConfigResource的對映和操作集合。每個組態修改操作由組態條目(組態的名稱和值)和操作型別組成。Kafka中有四種型別的操作可以修改組態:SET,用於設定組態值;DELETE,用於刪除值並重置為預設值;APPEND;和SUBTRACT。最後兩個操作僅適用於具有List型別的組態,允許在不將整個列表傳送到Kafka的情況下新增和刪除列表中的值。
消費者組管理
與大多數訊息佇列不同,Kafka允許您以與之前消費和處理的順序完全相同的順序重新處理資料。在第4章中,我們討論瞭如何使用Consumer API傳回並重新讀取主題中的舊訊息。但是,使用這些API意味著您事先將重新處理資料的功能程式設計到應用程式中。您的應用程式本身必須公開“重新處理”功能。
探索消費者組
如果要探索和修改消費者組,第一步是列出它們:
列出消費者組
// 此處應有列出消費者組的程式碼範例
檢查和修改消費者組偏移量
使用AdminClient,您可以以程式設計方式探索和修改消費者組及其提交的偏移量。在第10章中,我們將研究可用的外部工具來執行相同的操作。
修改消費者組偏移量的範例
// 此處應有修改消費者組偏移量的程式碼範例
#### 內容解密:
這段程式碼展示瞭如何使用AdminClient來修改消費者組的偏移量。首先,我們需要取得消費者組的當前偏移量,然後根據需要進行修改,最後將新的偏移量提交給Kafka。