返回文章列表

Kafka資料整合最佳實踐與深度解析

本文探討Kafka在資料整合中的角色,超越單純的資料管道終點,強調更廣泛的架構思維。文章涵蓋建構資料管道時的關鍵考量,例如時效性、可靠性、吞吐量和資料格式,並闡述Kafka如何有效解決這些挑戰。同時,也深入比較Kafka Connect API與傳統生產者和消費者的差異,提供實務上的應用,並探討Kafka

資料工程 串流處理

Kafka不僅僅是資料管道的終點,更是一個強大的串流平台,能有效整合不同系統的資料。在設計資料架構時,應考量資料的時效性、可靠性、吞吐量以及格式轉換等問題。Kafka的彈性架構能滿足近乎即時到每日批次的各種需求,其高吞吐量和容錯機制確保資料管道的穩定性。Kafka Connect API 則簡化了與外部系統的整合,提供更便捷的資料複製方式,並支援精確一次傳遞保證,減少資料遺失或重複的風險。此外,Schema Registry 等工具有助於管理資料格式演進,提升系統的敏捷性。選擇Kafka Connect 或傳統生產者/消費者取決於應用場景,Connect 適用於與無法修改程式碼的外部系統整合,而客戶端則適用於可控的應用程式內部資料傳輸。

資料整合的脈絡

許多組織將Kafka視為資料管道的終點,關注諸如「如何將資料從Kafka傳輸到Elasticsearch?」等問題。雖然這是一個合理且實際的問題,本文將探討如何實作這一點,但更重要的是,我們需要將Kafka置於一個更大的脈絡中,這個脈絡至少包含兩個(甚至更多)非Kafka的終點。面對資料整合問題時,我們鼓勵大家考慮更廣泛的架構,而非僅僅聚焦於眼前的終點。過度關注短期整合,很容易導致資料整合變得複雜且難以維護。

在本章中,我們將討論建構資料管道時需要考慮的一些常見問題。這些挑戰並非Kafka所獨有,而是一般的資料整合問題。儘管如此,我們將闡述為什麼Kafka非常適合用於資料整合,並說明它如何解決許多相關挑戰。我們還會討論Kafka Connect API與傳統生產者和消費者客戶端的差異,以及何時使用每種型別的客戶端。接著,我們將探討Kafka Connect的細節。雖然本章無法全面涵蓋Kafka Connect,但我們將提供基本用法的範例,並指出進一步學習的資源。最後,我們將討論其他資料整合系統以及它們如何與Kafka整合。

建構資料管道時的考量

在設計軟體架構以整合多個系統時,有幾個重要的因素需要考慮。

時效性

有些系統期望資料以大批次的形式每日抵達,而其他系統則要求資料在產生後幾毫秒內到達。大多數資料管道的運作模式介於這兩個極端之間。良好的資料整合系統能夠支援不同管道的不同時效性需求,並且在業務需求變更時,能夠更輕鬆地在不同時間表之間進行遷移。Kafka作為一個具有可擴充套件和可靠儲存功能的串流資料平台,可以支援從近乎實時的管道到每日批次的多種需求。生產者可以根據需要頻繁或不頻繁地寫入Kafka,而消費者既可以即時讀取和傳遞最新事件,也可以以批次方式運作:例如,每小時執行一次,連線到Kafka,並讀取前一小時累積的事件。

將Kafka視為一個巨大的緩衝區,用於解耦生產者和消費者之間的時效性需求,是非常有用的。生產者可以即時寫入事件,而消費者可以批次處理事件,反之亦然。這也使得應用反壓變得簡單——當需要時,Kafka本身透過延遲確認對生產者施加反壓,因為消費速率完全由消費者驅動。

可靠性

我們希望避免單點故障,並允許從各種故障事件中快速自動還原。資料管道通常是資料到達業務關鍵系統的途徑;超過幾秒鐘的故障可能會造成巨大的破壞,尤其是當時效性需求接近毫秒級時。另一個重要的可靠性考慮因素是傳遞保證——有些系統可以承受資料丟失,但大多數情況下,至少需要一次傳遞保證,這意味著來自源系統的每個事件都將到達其目的地,但有時重試可能會導致重複。通常,甚至需要精確一次傳遞保證——來自源系統的每個事件都將到達目的地,不會有丟失或重複的可能性。

我們在第7章中深入討論了Kafka的可用性和可靠性保證。正如我們所討論的,Kafka本身可以提供至少一次傳遞保證,而當與具有事務模型或唯一鍵的外部資料儲存結合使用時,可以實作精確一次傳遞保證。由於許多終點是提供正確語義以實作精確一次傳遞的資料儲存,因此根據Kafka的管道通常可以實作為精確一次傳遞。值得強調的是,Kafka的Connect API透過提供與外部系統整合的API,以便在處理偏移量時,使聯結器更容易建立端對端的精確一次管道。事實上,許多可用的開源聯結器都支援精確一次傳遞。

高吞吐量與變化吞吐量

我們正在建構的資料管道應該能夠擴充套件到非常高的吞吐量,就像現代資料系統通常需要的那樣。更重要的是,它們應該能夠適應吞吐量突然增加的情況。

由於Kafka充當生產者和消費者之間的緩衝區,我們不再需要將消費者的吞吐量與生產者的吞吐量耦合。如果生產者的吞吐量超過了消費者的吞吐量,資料將在Kafka中累積,直到消費者趕上來。透過獨立新增消費者或生產者,Kafka能夠動態且獨立地擴充套件管道的任一側,以滿足不斷變化的需求。

Kafka是一個高吞吐量的分散式系統——即使在適中的叢集上也能處理每秒數百兆位元組——因此無需擔心我們的管道無法隨著需求增長而擴充套件。此外,Kafka Connect API專注於平行化工作,不僅可以在單個節點上執行,還可以透過擴充套件來實作,具體取決於系統需求。在下面的章節中,我們將描述該平台如何允許資料來源和接收器在多個執行緒之間分割工作,並利用可用的CPU資源,即使在單台機器上執行。

Kafka還支援多種壓縮型別,允許使用者和系統管理員在吞吐量需求增加時控制網路和儲存資源的使用。

資料格式

資料管道中最重要的考慮因素之一是協調不同的資料格式和資料型別。不同的資料函式庫和其他儲存系統支援的資料型別各不相同。你可能會將XML和關聯式資料載入Kafka,在Kafka內部使用Avro,然後在寫入Elasticsearch時將資料轉換為JSON,在寫入HDFS時轉換為Parquet,在寫入S3時轉換為CSV。

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 資料格式

rectangle "Load Data" as node1
rectangle "Convert to Avro" as node2
rectangle "Write to Elasticsearch" as node3
rectangle "Write to HDFS" as node4
rectangle "Write to S3" as node5

node1 --> node2
node2 --> node3
node3 --> node4
node4 --> node5

@enduml

此圖示說明瞭不同格式之間的轉換過程,從原始資料載入到 Kafka,然後根據不同的目標系統進行相應的格式轉換。

內容解密:

  1. XML/關聯式資料載入:首先,各種格式的資料(如XML或關聯式資料)被載入到 Kafka 中。
  2. 轉換為 Avro 格式:一旦進入 Kafka,通常會被轉換為 Avro 格式進行內部處理。
  3. 根據目標系統進行格式轉換:根據最終目標的不同,資料會被轉換為 JSON(用於 Elasticsearch)、Parquet(用於 HDFS)或 CSV(用於 S3)等格式。
  4. 輸出至目標系統:最終,不同格式的資料被寫入相應的目標系統,如 Elasticsearch、HDFS 或 S3。

這種靈活性使得 Kafka 成為一個強大的資料整合工具,能夠支援多種資料格式和目標系統的需求。

資料整合框架的關鍵需求

在建立資料整合框架時,需要考慮多個重要因素,包括資料格式的靈活性、處理不同來源和目標系統的差異、資料轉換、以及安全性等。Apache Kafka 和 Kafka Connect 為建立資料整合框架提供了強大的基礎。

資料格式的靈活性

Kafka 和 Kafka Connect 對資料格式保持中立,允許使用任何序列化器來表示資料。這意味著無論使用何種資料格式,都不會限制對聯結器的選擇。許多來源和目標系統具有架構,可以讀取來源的架構並將其儲存在 Kafka Connect 中,以便進行相容性驗證或更新目標系統的架構。

程式碼範例:資料格式轉換

// 使用Kafka Connect的轉換器將資料從一種格式轉換為另一種格式
Properties props = new Properties();
props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("key.converter.schemas.enable", "false");
props.put("value.converter.schemas.enable", "false");

// 建立Kafka Connect的組態
KafkaConnectConfig config = new KafkaConnectConfig(props);

// 使用組態建立聯結器
Connector connector = new MyConnector(config);

內容解密:

  1. 設定轉換器:在 Kafka Connect 中,可以透過設定 key.convertervalue.converter 屬性來選擇合適的轉換器,以處理不同格式的資料。
  2. 啟用或停用 Schema:透過設定 schemas.enable 屬性,可以控制是否啟用或停用 Schema 的支援,以適應不同的資料格式需求。
  3. 建立聯結器:利用設定的組態,建立自定義的聯結器以實作特定的資料整合任務。

處理不同來源和目標系統的差異

不同的來源和目標系統具有不同的行為特徵,例如 Syslog 是推播資料的來源,而關聯式資料函式庫則需要框架提取資料。HDFS 是隻能追加資料的系統,而大多數系統則允許追加和更新現有記錄。Kafka Connect 的設計允許開發人員編寫聯結器以處理這些差異。

資料轉換

在建立資料管道時,資料轉換是一個重要的考慮因素。有兩種主要的方法:ETL(提取-轉換-載入)和 ELT(提取-載入-轉換)。ETL 方法在資料透過管道時進行轉換,而 ELT 方法則是在目標系統中進行轉換。Kafka Connect 提供了單一訊息轉換(Single Message Transformation)功能,允許在將記錄從來源複製到 Kafka 或從 Kafka 複製到目標系統時進行轉換。

安全性

安全性是資料整合框架中的一個重要考慮因素。Kafka 提供了加密、身份驗證和授權功能,以確保資料的安全性。此外,Kafka Connect 也需要能夠連線到外部資料系統並進行身份驗證,因此需要安全地處理憑證。

程式碼範例:使用外部秘密管理系統

# 使用HashiCorp Vault作為外部秘密管理系統
config.providers= vault
config.providers.vault.class=io.confluent.connect.secretmanager.VaultConfigProvider
config.providers.vault.param.url=https://vault.example.com:8200
config.providers.vault.param.vault.token=my_vault_token

內容解密:

  1. 設定外部秘密管理系統:透過設定 config.providers 屬性,可以指定使用外部秘密管理系統,如 HashiCorp Vault。
  2. 組態 Vault 連線引數:需要提供 Vault 的 URL 和 token,以便 Kafka Connect 可以連線到 Vault 並檢索所需的憑證。
  3. 安全地管理憑證:使用外部秘密管理系統可以避免在組態檔案中直接儲存憑證,從而提高安全性。

資料管道的挑戰與解決方案

在建立資料管道時,我們經常會面臨到各種挑戰,例如故障處理、耦合度以及敏捷性等問題。這些挑戰如果不妥善處理,將會對資料管道的穩定性和可維護性造成嚴重的影響。

故障處理

假設所有資料都是完美的,這是一種危險的想法。我們需要提前規劃故障處理,以防止錯誤的記錄進入管道。如果記錄無法被解析,我們是否能夠從中還原?如果壞掉的事件看起來與正常事件完全相同,我們是否能夠在幾天後發現問題?

由於 Kafka 可以被組態為長期儲存所有事件,因此我們可以在需要時回溯時間並從錯誤中還原。這也允許我們將儲存在 Kafka 中的事件重新播放到目標系統,如果它們丟失了。

耦合與敏捷性

資料管道實作的一個理想特性是將資料來源和資料目標解耦。有多種方式可能會意外地導致耦合:

特設管道

一些公司最終為每一對他們想要連線的應用程式建立一個自定義的管道。例如,他們使用 Logstash 將日誌轉儲到 Elasticsearch,使用 Flume 將日誌轉儲到 HDFS,使用 Oracle GoldenGate 從 Oracle 取得資料到 HDFS,使用 Informatica 從 MySQL 和 XML 取得資料到 Oracle,等等。這緊密地將資料管道與特定的端點耦合在一起,並建立了一個需要大量精力來佈署、維護和監控的整合點混亂。這也意味著公司採用的每一個新系統都需要建立額外的管道,從而增加了採用新技術的成本,並抑制了創新。

失去元資料

如果資料管道沒有保留架構元資料並且不允許架構演進,那麼最終會將生產資料的軟體與使用資料的軟體緊密地耦合在一起。沒有架構資訊,兩個軟體產品都需要包含有關如何解析和解釋資料的資訊。如果資料從 Oracle 流向 HDFS,而 DBA 在 Oracle 中新增了一個欄位,卻沒有保留架構資訊並允許架構演進,那麼要麼每個從 HDFS 讀取資料的應用程式都會中斷,要麼所有開發人員都需要同時升級他們的應用程式。兩種選擇都不是敏捷的。

極端處理

正如我們在討論資料轉換時提到的,資料管道中固有的某些資料處理是必要的。畢竟,我們正在不同的系統之間行動資料,不同的資料格式是有意義的,不同的使用案例也是支援的。然而,過多的處理會將所有下游系統與建立管道時的決策繫結在一起,例如保留哪些欄位、如何聚合資料等。這通常會導致管道的不斷變更,因為下游應用程式的需求發生了變化,這不是敏捷、高效或安全的。

何時使用 Kafka Connect 與 Producer 和 Consumer

當寫入 Kafka 或從 Kafka 讀取時,您可以選擇使用傳統的 producer 和 consumer 客戶端,或使用 Kafka Connect API 和聯結器。在深入瞭解 Kafka Connect 的細節之前,您可能已經想知道“何時使用哪一個?”

正如我們所見,Kafka 客戶端是嵌入在您自己的應用程式中的客戶端。它允許您的應用程式將資料寫入 Kafka 或從 Kafka 讀取資料。當您可以修改您想要連線的應用程式的程式碼,並且當您想要將資料推播到 Kafka 或從 Kafka 提取資料時,請使用 Kafka 客戶端。

當您需要連線 Kafka 到您沒有寫過的資料儲存,或者其程式碼或 API 無法或不願意修改時,您將使用 Connect。Connect 將用於從外部資料儲存提取資料到 Kafka,或將資料從 Kafka 推到外部儲存。要使用 Kafka Connect,您需要一個針對您想要連線的資料儲存的聯結器,而現在這些聯結器非常豐富。這意味著在實踐中,Kafka Connect 的使用者只需要編寫組態檔案。

Kafka Connect

Kafka Connect 是 Apache Kafka 的一部分,提供了一種可擴充套件和可靠的方式來在 Kafka 和其他資料儲存之間複製資料。它提供了 API 和執行時來開發和執行聯結器外掛——Kafka Connect 執行的函式庫,負責行動資料。

Kafka Connect 以一組 worker 程式的形式執行。您在 worker 上安裝聯結器外掛,然後使用 REST API 組態和管理聯結器,聯結器以特定的組態執行。聯結器啟動額外的任務,以平行移動大量資料,並更有效地利用 worker 節點上的可用資源。

來源聯結器任務只需要從來源系統讀取資料並向 worker 程式提供 Connect 資料物件。接收聯結器任務從 worker 取得聯結器資料物件,並負責將它們寫入目標資料系統。Kafka Connect 使用轉換器來支援以不同的格式將這些資料物件儲存在 Kafka 中——JSON 格式支援是 Apache Kafka 的一部分,而 Confluent Schema Registry 提供了 Avro 支援。

程式碼範例:Kafka Connect 組態

{
  "name": "my-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/mydb",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "table.whitelist": "mytable"
  }
}

內容解密:

  1. "name":定義了聯結器的名稱,用於識別和管理。
  2. "config":包含了聯結器的組態引數。
  3. "connector.class":指定了聯結器的類別,這裡使用的是 JDBC 來源聯結器。
  4. "tasks.max":定義了最大任務數,用於控制平行度。
  5. "connection.url""connection.user""connection.password":用於連線到 MySQL 資料函式庫的憑證。
  6. "table.whitelist":指定了要複製的表格名稱。

這個組態範例展示瞭如何設定一個 Kafka Connect 聯結器,用於從 MySQL 資料函式庫中讀取資料並將其寫入 Kafka 主題中。

Kafka Connect 深度解析與應用

Kafka Connect 是 Apache Kafka 生態系統中的重要元件,負責將 Kafka 與外部資料系統進行整合。透過 Kafka Connect,使用者可以輕鬆地將資料從 Kafka 輸出到外部系統,或是將外部系統的資料匯入 Kafka 中。

啟動 Kafka Connect

Kafka Connect 隨 Apache Kafka 一起發布,因此無需單獨安裝。在生產環境中,建議將 Kafka Connect 佈署在獨立的伺服器上,以避免與 Kafka Broker 混用。

要啟動 Kafka Connect,可以使用以下指令:

bin/connect-distributed.sh config/connect-distributed.properties

內容解密:

此指令用於啟動 Kafka Connect 的分散式模式。其中,config/connect-distributed.properties 是設定檔,用於指定 Kafka Connect 的相關引數。

Kafka Connect 設定

Kafka Connect 有幾個重要的設定引數:

  • bootstrap.servers:指定 Kafka Broker 的列表,用於連線 Kafka 叢集。
  • group.id:指定 Kafka Connect 叢集的 ID,所有具有相同 ID 的 Worker 都屬於同一叢集。
  • plugin.path:指定 Connector 和其依賴項的存放路徑。

內容解密:

  • bootstrap.servers 設定允許使用者指定多個 Kafka Broker,以提高用性。
  • group.id 設定確保了 Connector 和其任務可以在叢集中的任何 Worker 上執行。
  • plugin.path 設定允許使用者將 Connector 和其依賴項放置在指定的目錄中,以便 Kafka Connect 載入。

資料格式轉換

Kafka Connect 支援多種資料格式,包括 JSON、Avro、Protobuf 等。使用者可以透過 key.convertervalue.converter 設定來指定 Key 和 Value 的轉換器。

例如,若要使用 JSON 轉換器,可以設定:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

內容解密:

此設定指定了 Key 和 Value 都使用 JSON 轉換器。若要使用其他轉換器,例如 Avro 或 Protobuf,可以設定相應的轉換器類別。

REST API

Kafka Connect 提供 REST API,用於組態和管理 Connector。使用者可以透過 REST API 來檢查 Kafka Connect 的狀態、列出可用的 Connector 等。

例如,若要檢查 Kafka Connect 的版本,可以使用以下指令:

curl http://localhost:8083/

內容解密:

此指令向 Kafka Connect 的 REST API 傳送 GET 請求,以取得 Kafka Connect 的版本資訊。

Connector 設定與使用

Kafka Connect 提供多種 Connector,用於與不同的外部資料系統進行整合。使用者可以透過設定 Connector 來指定資料來源或目標。

例如,若要使用 FileStreamSourceConnector,可以設定:

{
  "name": "file-source",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "file": "/path/to/file.txt",
    "topic": "my-topic"
  }
}

內容解密:

此設定指定了 Connector 的名稱、類別、檔案路徑和主題名稱。Kafka Connect 將會從指定的檔案中讀取資料,並將其傳送到指定的主題中。