ksqlDB 聯結器是串流處理應用程式與外部系統整合的關鍵環節。透過聯結器,ksqlDB 可以讀取來自各種資料來源的資料,例如資料函式庫、訊息佇列等,並將處理後的資料寫入到目標系統,例如 Elasticsearch、其他 Kafka 主題等。Confluent Hub 提供了預先建置的聯結器,簡化了安裝流程。開發者可以使用 confluent-hub install 命令安裝所需的聯結器,並指定安裝目錄和工作節點組態。在 ksqlDB 中,使用 CREATE SOURCE CONNECTOR 和 CREATE SINK CONNECTOR 命令分別建立源聯結器和接收器聯結器,並設定聯結器的屬性,例如連線 URL、資料格式等。ksqlDB 提供了 SHOW CONNECTORS 和 DESCRIBE CONNECTOR 命令來監控聯結器的狀態,並使用 DROP CONNECTOR 命令刪除聯結器。
在ksqlDB中安裝與管理聯結器
在ksqlDB中,聯結器(Connectors)扮演著至關重要的角色,它們負責將外部資料來源與Kafka叢集進行整合。無論是從資料函式庫讀取資料還是將資料寫入到Elasticsearch等系統,聯結器都提供了靈活且強大的資料整合能力。本章節將詳細介紹如何在ksqlDB中安裝和管理聯結器。
安裝聯結器
在開始使用聯結器之前,首先需要安裝它們。Confluent Hub提供了一個方便的方式來安裝和管理聯結器。安裝聯結器的命令語法如下:
confluent-hub install <owner>/<component>:<version> [options]
舉例來說,要安裝Elasticsearch接收器聯結器,可以執行以下命令:
confluent-hub install confluentinc/kafka-connect-elasticsearch:10.0.2 \
--component-dir /home/appuser \
--worker-configs /etc/ksqldb-server/connect.properties \
--no-prompt
內容解密:
confluent-hub install:用於安裝聯結器的命令。<owner>/<component>:<version>:指定要安裝的聯結器的擁有者、名稱和版本。--component-dir:指定聯結器的安裝目錄。--worker-configs:指定工作節點的組態檔案位置。--no-prompt:自動接受所有提示和許可協定,適用於指令碼化安裝。
同樣地,可以使用類別似的命令安裝PostgreSQL源聯結器:
confluent-hub install confluentinc/kafka-connect-jdbc:10.0.0 \
--component-dir /home/appuser/ \
--worker-configs /etc/ksqldb-server/connect.properties \
--no-prompt
使用ksqlDB建立聯結器
在ksqlDB中建立聯結器的語法如下:
CREATE { SOURCE | SINK } CONNECTOR [ IF NOT EXISTS ] <identifier> WITH(
property_name = expression [, ...]);
假設有一個執行中的PostgreSQL例項位於postgres:5432,可以透過以下命令建立一個源聯結器來讀取名為titles的表:
CREATE SOURCE CONNECTOR `postgres-source` WITH(
"connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
"connection.url"='jdbc:postgresql://postgres:5432/root?user=root&password=secret',
"mode"='incrementing',
"incrementing.column.name"='id',
"topic.prefix"='',
"table.whitelist"='titles',
"key"='id');
內容解密:
CREATE SOURCE CONNECTOR:建立一個源聯結器的陳述式。connector.class:指定聯結器的Java類別。connection.url:用於連線到資料儲存的URL(在本例中為PostgreSQL資料函式庫)。mode:指定聯結器的執行模式(例如,增量模式用於捕捉新新增的記錄)。incrementing.column.name:指定用於跟蹤已處理記錄的自動增量列的名稱。topic.prefix:可選的主題名稱字首。table.whitelist:指定要串流到Kafka中的表列表。key:指定用作記錄鍵的值。
建立接收器聯結器將應用程式的輸出寫入到Elasticsearch的過程類別似:
CREATE SINK CONNECTOR `elasticsearch-sink` WITH(
"connector.class"='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
"connection.url"='http://elasticsearch:9200',
"connection.username"='',
"connection.password"='',
"batch.size"='1',
"write.method"='insert',
"topics"='titles',
"type.name"='changes',
"key"='title_id');
顯示和管理聯結器
在互動模式下,可以列出所有正在執行的聯結器及其狀態。列出聯結器的語法如下:
{ LIST | SHOW } [ { SOURCE | SINK } ] CONNECTORS
例如,要列出所有聯結器,可以執行:
SHOW CONNECTORS;
這將顯示類別似以下的輸出:
Connector Name | Type | Class | Status
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--
postgres-source | SOURCE | ... | RUNNING (1/1 tasks RUNNING)
elasticsearch-sink | SINK | ... | RUNNING (1/1 tasks RUNNING)
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--
綜上所述,本章節介紹瞭如何在ksqlDB中安裝和管理聯結器,包括使用Confluent Hub安裝聯結器、在ksqlDB中建立和管理源和接收器聯結器,以及如何列出和檢查聯結器的狀態。這些功能使得在ksqlDB中整合和管理外部資料來源變得更加方便和靈活。
ksqlDB 中的聯結器管理
ksqlDB 提供了強大的聯結器管理功能,使得與外部資料來源的整合變得更加容易。在本章中,我們將探討如何使用 ksqlDB 管理聯結器,包括建立、描述和刪除聯結器。
顯示聯結器狀態
使用 SHOW CONNECTORS 命令可以列印預出活躍聯結器的有用資訊,包括其狀態。例如:
SHOW CONNECTORS;
這將輸出類別似於以下內容:
Connector Name | Type | Class | Status
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---
postgres-source | SOURCE | ... | RUNNING
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---
如果聯結器出現故障,例如與組態的 PostgreSQL 資料函式庫失去連線,則會看到類別似於以下內容:
Connector Name | Type | Class | Status
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---
postgres-source | SOURCE | ... | FAILED
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
---
描述聯結器
使用 DESCRIBE CONNECTOR 命令可以檢索聯結器的狀態。例如:
DESCRIBE CONNECTOR `postgres-source`;
這將輸出類別似於以下內容:
Name : postgres-source
Class : io.confluent.connect.jdbc.JdbcSourceConnector
Type : source
State : FAILED
WorkerId : 192.168.65.3:8083
Trace : org.apache.kafka.connect.errors.ConnectException
Task ID | State | Error Trace
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--
0 | FAILED | org.apache.kafka.connect.errors.ConnectException
或者,如果聯結器執行正常,則會看到類別似於以下內容:
Name : postgres-source
Class : io.confluent.connect.jdbc.JdbcSourceConnector
Type : source
State : RUNNING
WorkerId : 192.168.65.3:8083
Task ID | State | Error Trace
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--
0 | RUNNING |
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
刪除聯結器
使用 DROP CONNECTOR 命令可以刪除聯結器。例如:
DROP CONNECTOR `postgres-source`;
這將輸出類別似於以下內容:
Message
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--
Dropped connector "postgres-source"
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
-
---
--
驗證源聯結器
可以使用 PRINT 陳述式驗證 PostgreSQL 源聯結器是否正常工作。例如:
PRINT `titles` FROM BEGINNING;
這將輸出類別似於以下內容:
Key format: JSON or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2020/10/28 ..., key: 1, value: {"id": 1, "title": "Stranger Things"}
rowtime: 2020/10/28 ..., key: 2, value: {"id": 2, "title": "Black Mirror"}
rowtime: 2020/10/28 ..., key: 3, value: {"id": 3, "title": "The Office"}
聯結器管理流程圖
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title KsqlDB聯結器安裝與管理
package "資料庫架構" {
package "應用層" {
component [連線池] as pool
component [ORM 框架] as orm
}
package "資料庫引擎" {
component [查詢解析器] as parser
component [優化器] as optimizer
component [執行引擎] as executor
}
package "儲存層" {
database [主資料庫] as master
database [讀取副本] as replica
database [快取層] as cache
}
}
pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中
master --> replica : 資料同步
note right of cache
Redis/Memcached
減少資料庫負載
end note
@enduml
此圖示說明瞭 ksqlDB 中聯結器管理的流程。
與 Kafka Connect 簇直接互動
在某些情況下,您可能需要直接與 Kafka Connect 簇互動,例如重啟失敗的任務。以下是一些示例查詢:
- 列出聯結器:
curl -XGET localhost:8083/connectors - 描述聯結器:
curl -XGET localhost:8083/connectors/elasticsearch-sink - 列出任務:
curl -XGET -s localhost:8083/connectors/elasticsearch-sink/tasks
內容解密:
- 本章介紹了 ksqlDB 中的聯結器管理功能,包括建立、描述和刪除聯結器。
- 使用
SHOW CONNECTORS命令可以顯示活躍聯結器的狀態。 - 使用
DESCRIBE CONNECTOR命令可以檢索聯結器的狀態。 - 使用
DROP CONNECTOR命令可以刪除聯結器。 - 可以使用
PRINT陳述式驗證源聯結器是否正常工作。 - 在某些情況下,需要直接與 Kafka Connect 簇互動,例如重啟失敗的任務。
使用 ksqlDB 進行基礎串流處理
本章節將學習如何使用 ksqlDB 執行常見的串流處理任務。涵蓋的主題包括:
- 建立串流和表格
- 利用 ksqlDB 資料型別
- 使用簡單布林條件、萬用字元和範圍篩選器篩選資料
- 透過展平複雜或巢狀結構來重塑資料
- 使用投影選取可用欄位子集
- 使用條件運算式處理 NULL 值
- 建立衍生串流和表格,並將結果寫回 Kafka
在本章結束時,您將具備使用 ksqlDB 的 SQL 方言處理基本資料預處理和轉換任務的能力。此外,雖然前一章節中探討的 SQL 結構利用了 ksqlDB 的 Kafka Connect 整合,但本章中將使用的所有 SQL 陳述式都將利用 ksqlDB 的 Kafka Streams 整合。這是我們真正開始看到 ksqlDB 強大功能的地方,因為只需幾行 SQL,就可以建立完整的串流處理應用程式。
教學:監控 Netflix 的變更
Netflix 每年投資數十億美元於影片內容。由於同時有多部電影和電視劇正在製作中,當製作變更發生時(例如,發行日期變更、財務更新、人才排程等),向各個系統傳達更新對於保持運作順暢至關重要。歷史上,Netflix 使用 Apache Flink 處理其串流處理案例。但與 Netflix 工程師 Nitin Sharma 合作,我們決定使用 ksqlDB 來解決這個問題。
應用程式目標
該應用程式的目標很簡單:我們需要消費製作變更串流,篩選和轉換資料以進行處理,豐富和聚合資料以供報告,並最終使處理後的資料可供下游系統使用。
架構圖
我們的應用程式將從兩個主題讀取:
titles主題是一個壓縮主題,包含 Netflix 服務上託管的電影和電視劇(在本教學中統稱為標題)的元資料(標題名稱、發行日期等)。- 每當目前正在製作中的標題的人才排程、預算、發行日期或季度長度發生變更時,就會寫入
production_changes主題。
處理步驟
- 基本預處理:篩選和轉換
production_changes資料,以準備進行豐富處理。預處理後的串流將只包含季度長度變更,並寫入名為season_length_changes的 Kafka 主題。 - 資料豐富:將
season_length_changes串流與titles資料聯接,建立來自多個來源和維度的組合記錄。 - 聚合:執行視窗化和非視窗化聚合,以計算五分鐘內的變更次數。產生的表格將被物化,並可供查詢。
- 資料輸出:使豐富和聚合後的資料可供兩種不同型別的客戶使用。第一種客戶將透過推播查詢接收連續更新,第二種客戶將使用類別似於傳統資料函式庫查詢的提取查詢執行點查詢。
本章重點
本章將重點介紹建立串流和表格,以及基本資料預處理和轉換步驟(即步驟 1-2)。下一章將重點介紹資料豐富、聚合和推播/提取查詢。
-- 建立 titles 表格
CREATE TABLE titles (
title_id VARCHAR PRIMARY KEY,
title_name VARCHAR,
release_date DATE
) WITH (
KAFKA_TOPIC = 'titles',
VALUE_FORMAT = 'JSON'
);
-- 建立 production_changes 串流
CREATE STREAM production_changes (
title_id VARCHAR,
change_type VARCHAR,
change_time TIMESTAMP,
season_length INT
) WITH (
KAFKA_TOPIC = 'production_changes',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'change_time'
);
-- 篩選和轉換 production_changes 資料
CREATE STREAM season_length_changes AS
SELECT
title_id,
season_length
FROM
production_changes
WHERE
change_type = 'season_length';
-- 將 season_length_changes 串流與 titles 表格聯接
CREATE STREAM enriched_season_length_changes AS
SELECT
slc.title_id,
t.title_name,
slc.season_length
FROM
season_length_changes slc
LEFT JOIN
titles t ON slc.title_id = t.title_id;
內容解密:
- 建立表格和串流:使用
CREATE TABLE和CREATE STREAM陳述式建立titles表格和production_changes串流,分別用於儲存標題元資料和製作變更資料。 - 篩選和轉換:使用
CREATE STREAM AS SELECT陳述式篩選和轉換production_changes資料,建立season_length_changes串流。 - 聯接:使用
CREATE STREAM AS SELECT陳述式將season_length_changes串流與titles表格聯接,建立豐富的enriched_season_length_changes串流。
本教學示範瞭如何使用 ksqlDB 建立串流和表格,執行基本資料預處理和轉換,以及豐富資料。下一章將繼續探討資料聚合和推播/提取查詢。