返回文章列表

Kafka AdminClient 深度解析與應用

本文深入解析 Apache Kafka 的 AdminClient,探討其核心設計原則、使用方法以及在應用程式中的有效運用。AdminClient 提供了管理和監控 Kafka 叢集的強大工具,允許開發者以程式設計方式執行建立主題、管理消費者群組和組態 Kafka 等任務。文章涵蓋了 AdminClient

Kafka Java

Kafka 的 AdminClient 提供了管理和監控 Kafka 叢集的有效途徑,讓開發者能以程式化的方式操作 Kafka。其非同步且最終一致的 API 設計,兼顧了效率和一致性。理解 AdminClient 的核心概念,包含非同步操作和最終一致性,對於正確使用至關重要。透過 AdminClient,開發者可以執行各種管理任務,例如建立和刪除主題、管理消費者群組以及調整叢集組態,從而提升 Kafka 應用程式的可靠性和可維護性。深入瞭解 AdminClient 的生命週期管理、組態引數設定,以及各項功能的應用,能有效提升 Kafka 叢集的管理效率。

Apache Kafka AdminClient 深度解析與應用

Apache Kafka 的 AdminClient 提供了一個強大的工具,用於管理和監控 Kafka 叢集。透過 AdminClient,開發者可以以程式設計的方式執行各種管理任務,例如建立和管理主題、消費者群組,以及組態實體等。在本章中,我們將探討 AdminClient 的核心設計原則、使用方法,以及如何在應用程式中有效地利用它。

AdminClient 概覽

在開始使用 Kafka AdminClient 之前,瞭解其核心設計原則是非常重要的。AdminClient 的設計旨在提供一個非同步且最終一致的 API,這使得它能夠高效地管理和監控 Kafka 叢集。

非同步且最終一致的 API

Kafka 的 AdminClient 是一個非同步 API。每次方法呼叫都會立即傳回,並帶有一個或多個 Future 物件。這些 Future 物件代表了非同步操作的結果,並提供了檢查操作狀態、取消操作、等待操作完成等方法。AdminClient 將這些 Future 物件封裝在 Result 物件中,後者提供了等待操作完成和執行後續操作的輔助方法。

例如,KafkaAdminClient.createTopics 方法傳回一個 CreateTopicsResult 物件,該物件允許你等待所有主題建立完成、檢查每個主題的狀態,並在建立後檢索特定主題的組態。

由於 Kafka 的元資料從控制器傳播到代理是非同步的,因此當控制器狀態完全更新時,AdminClient API 傳回的 Futures 被視為已完成。此時,並非所有代理都可能知道新的狀態,因此 listTopics 請求可能會被尚未更新的代理處理,從而導致傳回的主題列表不包含最近建立的主題。這種特性也被稱為最終一致性:最終每個代理都會知道每個主題,但無法保證何時會發生。

AdminClient 的使用方法

組態和管理主題

AdminClient 提供了豐富的方法來管理 Kafka 主題。例如,你可以使用 createTopics 方法來建立新主題,或使用 deleteTopics 方法來刪除現有主題。

// 建立新主題
NewTopic newTopic = new NewTopic("my_topic", 1, (short) 1);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));

// 等待主題建立完成
result.all().get();

#### 內容解密:

  • NewTopic 物件用於定義新主題的屬性,例如主題名稱、分割區數量和複製因子。
  • createTopics 方法接受一個 NewTopic 物件的集合,並非同步地建立這些主題。
  • CreateTopicsResult 物件提供了等待所有主題建立完成的方法。

管理消費者群組

除了主題管理之外,AdminClient 還允許你管理消費者群組。你可以使用 listConsumerGroups 方法來列出所有消費者群組,或使用 describeConsumerGroups 方法來檢索特定群組的詳細資訊。

// 列出所有消費者群組
ListConsumerGroupsResult result = admin.listConsumerGroups();

// 取得消費者群組列表
List<ConsumerGroupListing> groups = result.all().get();

#### 內容解密:

  • listConsumerGroups 方法用於列出叢集中的所有消費者群組。
  • ListConsumerGroupsResult 物件提供了取得消費者群組列表的方法。

AdminClient 的生命週期管理

要使用 Kafka 的 AdminClient,首先需要建立一個 AdminClient 例項。這可以透過呼叫 AdminClient.create 方法並傳入組態屬性來實作。組態屬性中必須包含 Kafka 叢集的 URI,即用於連線的代理列表。

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);

// 使用 AdminClient 執行管理任務

// 關閉 AdminClient
admin.close(Duration.ofSeconds(30));

#### 內容解密:

  • AdminClient.create 方法用於建立一個新的 AdminClient 例項。
  • 組態屬性中的 BOOTSTRAP_SERVERS_CONFIG 指定了 Kafka 叢集的 URI。
  • 使用完畢後,應呼叫 close 方法來關閉 AdminClient,並指定一個超時時間以確保資源被正確釋放。

管理 Kafka 的 AdminClient 應用

在 Kafka 的管理中,AdminClient 是一個非常重要的工具,它允許開發者對 Kafka 叢集進行多種管理操作,包括主題(Topic)的管理、組態的變更等。為了確保 AdminClient 的高效運作和安全性,需要對其進行適當的組態。

組態 AdminClient

AdminClient 的組態相較於 KafkaProducer 和 KafkaConsumer 來說要簡單得多,但仍有一些重要的組態引數需要了解。

client.dns.lookup

在 Apache Kafka 2.1.0 版本中引入了 client.dns.lookup 組態。預設情況下,Kafka 會根據提供的 bootstrap server 組態中的主機名來驗證、解析和建立連線。這種簡單的模式在大多數情況下都能正常運作,但在某些特定的使用場景下可能會遇到問題,例如使用 DNS 別名或單一 DNS 對應到多個 IP 地址的情況。

使用 DNS 別名

假設有多個 broker,它們的命名規則是 broker1.hostname.combroker2.hostname.com 等。為了簡化組態,可以建立一個 DNS 別名 all-brokers.hostname.com 對應到所有的 broker。如果使用 SASL 認證,由於客戶端會嘗試認證 all-brokers.hostname.com,而伺服器端的主體是 broker2.hostname.com,如果名稱不比對,SASL 將拒絕認證,導致連線失敗。在這種情況下,可以設定 client.dns.lookup=resolve_canonical_bootstrap_servers_only,使客戶端能夠“展開”DNS 別名,實作與直接列出所有 broker 相同的效果。

DNS 名稱對應到多個 IP 地址

在現代網路架構中,常見將所有 broker 置於代理或負載平衡器後面,尤其是在使用 Kubernetes 的環境中。這種情況下,為了避免負載平衡器成為單點故障,通常會將 broker1.hostname.com 對應到多個 IP 地址,這些 IP 地址可能會隨時間變化。預設情況下,Kafka 客戶端只會嘗試連線到主機名解析出的第一個 IP 地址,這意味著如果該 IP 地址不可用,客戶端將無法連線,即使 broker 是可用的。建議設定 client.dns.lookup=use_all_dns_ips,以確保客戶端能夠充分利用高可用性的負載平衡層。

request.timeout.ms

此組態限制了應用程式等待 AdminClient 回應的時間,包括重試的時間。預設值為 120 秒,對於某些操作(如消費者群組管理命令)來說可能需要較長時間才能回應。如果某個 AdminClient 操作對應用程式至關重要,可以使用較低的超時值,並以不同的方式處理 Kafka 未能及時回應的情況。

主題管理

AdminClient 最常見的用途之一是主題管理,包括列出主題、描述主題、建立主題和刪除主題。

列出所有主題

ListTopicsResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);

檢查主題是否存在並建立主題

檢查特定主題是否存在的一種方法是取得所有主題的列表,然後檢查所需主題是否在列表中。但在大型叢集中,這種方法可能效率較低。另一種方法是直接檢查主題是否存在,並確保它具有正確的分割槽數和副本數。

// 檢查主題是否存在,建立主題的邏輯

內容解密:

  1. ListTopicsResult topics = admin.listTopics();:此行程式碼呼叫 AdminClient 的 listTopics() 方法來取得叢集中所有主題的列表,並將結果儲存在 topics 物件中。
  2. topics.names().get().forEach(System.out::println);:這行程式碼首先呼叫 names() 方法取得主題名稱的 Future 物件,然後呼叫 get() 方法等待伺服器回應,最後使用 forEach 遍歷並列印每個主題的名稱。
  3. client.dns.lookup 組態的作用:此組態用於解決使用 DNS 別名或單一 DNS 對應到多個 IP 地址時的連線問題,能夠提高連線的靈活性和可靠性。
  4. request.timeout.ms 組態的重要性:此組態允許開發者控制應用程式等待 AdminClient 操作回應的時間,能夠根據具體需求調整超時值,以最佳化應用程式的行為。

Kafka 主題管理:建立、描述與刪除

在 Kafka 中,AdminClient 是一個強大的工具,用於管理 Kafka 叢集中的主題、組態和其他資源。本章節將探討如何使用 AdminClient 來建立、描述和刪除 Kafka 主題。

描述主題

首先,我們來看看如何描述一個已存在的主題。以下是一個簡單的例子:

DescribeTopicsResult demoTopic = admin.describeTopics(TOPIC_LIST);
try {
    TopicDescription topicDescription = demoTopic.values().get(TOPIC_NAME).get();
    System.out.println("Description of demo topic: " + topicDescription);
    if (topicDescription.partitions().size() != NUM_PARTITIONS) {
        System.out.println("Topic has wrong number of partitions. Exiting.");
        System.exit(-1);
    }
} catch (ExecutionException e) {
    // exit early for almost all exceptions
    if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
        e.printStackTrace();
        throw e;
    }
    // 如果我們在這裡,主題不存在
    System.out.println("Topic " + TOPIC_NAME + " does not exist. Going to create it now");
    // 建立新主題
}

內容解密:

  1. DescribeTopicsResult:這個類別用於描述一個或多個主題的詳細資訊。
  2. demoTopic.values().get(TOPIC_NAME).get():這行程式碼等待 Future 完成並取得主題描述。如果主題不存在,會丟出 ExecutionException
  3. topicDescription.partitions().size():檢查主題的分割槽數量是否符合預期。

建立主題

如果主題不存在,我們可以建立一個新的主題:

CreateTopicsResult newTopic = admin.createTopics(Collections.singletonList(
    new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR)));
// 檢查主題是否正確建立
if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) {
    System.out.println("Topic has wrong number of partitions.");
    System.exit(-1);
}

內容解密:

  1. NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR):建立一個新的主題,指定名稱、分割槽數量和副本因子。
  2. admin.createTopics():非同步建立主題,傳回 CreateTopicsResult 物件。
  3. newTopic.numPartitions(TOPIC_NAME).get():檢查建立的主題分割槽數量是否正確。

刪除主題

刪除主題的操作如下:

admin.deleteTopics(TOPIC_LIST).all().get();
// 檢查主題是否已刪除
try {
    topicDescription = demoTopic.values().get(TOPIC_NAME).get();
    System.out.println("Topic " + TOPIC_NAME + " is still around");
} catch (ExecutionException e) {
    System.out.println("Topic " + TOPIC_NAME + " is gone");
}

內容解密:

  1. admin.deleteTopics(TOPIC_LIST):刪除指定的主題列表。
  2. .all().get():等待刪除操作完成。
  3. 檢查主題是否存在:透過嘗試描述主題來驗證是否已刪除。

使用 KafkaFuture 處理非同步操作

在處理大量管理請求時,阻塞式的 get() 呼叫可能不是最佳選擇。這時,可以利用 KafkaFuture 的非同步特性:

vertx.createHttpServer().requestHandler(request -> {
    String topic = request.getParam("topic");
    String timeout = request.getParam("timeout");
    int timeoutMs = NumberUtils.toInt(timeout, 1000);
    DescribeTopicsResult demoTopic = admin.describeTopics(
        Collections.singletonList(topic),
        new DescribeTopicsOptions().timeoutMs(timeoutMs));
    demoTopic.values().get(topic).whenComplete(
        new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {
            @Override
            public void accept(final TopicDescription topicDescription,
                               final Throwable throwable) {
                if (throwable != null) {
                    request.response().end("Error trying to describe topic "
                        + topic + " due to " + throwable.getMessage());
                } else {
                    request.response().end(topicDescription.toString());
                }
            }
        });
}).listen(8080);

內容解密:

  1. 使用 Vert.x 建立 HTTP 伺服器:當收到請求時,呼叫 AdminClient 描述主題。
  2. whenComplete():非同步處理 Future 結果,當結果可用或出現異常時觸發回撥。

本章節展示瞭如何使用 AdminClient 進行 Kafka 主題的管理,包括建立、描述和刪除主題。這些操作對於維護和管理 Kafka 叢集至關重要。透過瞭解這些 API 的使用,可以更有效地管理和監控 Kafka 環境。

使用AdminClient進行Kafka管理

在前面的章節中,我們討論瞭如何使用Kafka的AdminClient來建立和管理主題。在本章中,我們將繼續探討AdminClient的其他功能,包括組態管理和消費者組管理。

組態管理

組態管理是透過描述和更新ConfigResource的集合來完成的。ConfigResource可以是代理、代理日誌和主題。檢查和修改代理和代理日誌組態通常是使用像kafka-config.sh這樣的工具或其他Kafka管理工具來完成的,但是從使用它們的應用程式中檢查和更新主題組態是非常常見的。

檢查和更新主題組態

例如,許多應用程式依賴於壓縮主題來正確執行。定期(比預設保留期限更頻繁,只是為了安全)檢查主題是否確實被壓縮,如果不是,則採取措施糾正主題組態是有意義的。

ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
DescribeConfigsResult configsResult = admin.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);

// 列印非預設組態
configs.entries().stream().filter(entry -> !entry.isDefault()).forEach(System.out::println);

// 檢查主題是否被壓縮
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
if (!configs.entries().contains(compaction)) {
    // 如果主題未被壓縮,則壓縮它
    Collection<AlterConfigOp> configOp = new ArrayList<>();
    configOp.add(new AlterConfigOp(compensation, AlterConfigOp.OpType.SET));
    Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
    alterConf.put(configResource, configOp);
    admin.incrementalAlterConfigs(alterConf).all().get();
} else {
    System.out.println("主題 " + TOPIC_NAME + " 是壓縮主題");
}

組態修改操作

要修改組態,需要指定要修改的ConfigResource的對映和操作集合。每個組態修改操作由組態條目(組態的名稱和值)和操作型別組成。Kafka中有四種型別的操作可以修改組態:SET,用於設定組態值;DELETE,用於刪除值並重置為預設值;APPEND;和SUBTRACT。最後兩個操作僅適用於具有List型別的組態,允許在不將整個列表傳送到Kafka的情況下新增和刪除列表中的值。

消費者組管理

與大多數訊息佇列不同,Kafka允許您以與之前消費和處理的順序完全相同的順序重新處理資料。在第4章中,我們討論瞭如何使用Consumer API傳回並重新讀取主題中的舊訊息。但是,使用這些API意味著您事先將重新處理資料的功能程式設計到應用程式中。您的應用程式本身必須公開“重新處理”功能。

探索消費者組

如果要探索和修改消費者組,第一步是列出它們:

列出消費者組
// 此處應有列出消費者組的程式碼範例

檢查和修改消費者組偏移量

使用AdminClient,您可以以程式設計方式探索和修改消費者組及其提交的偏移量。在第10章中,我們將研究可用的外部工具來執行相同的操作。

修改消費者組偏移量的範例
// 此處應有修改消費者組偏移量的程式碼範例

#### 內容解密
這段程式碼展示瞭如何使用AdminClient來修改消費者組的偏移量首先我們需要取得消費者組的當前偏移量然後根據需要進行修改最後將新的偏移量提交給Kafka