返回文章列表

串流資料整合處理模式與平台應用

本文探討串流資料整合的兩種主要模式:ETL 和 ELT,並分析它們在串流處理中的應用、優缺點和限制。同時,文章也介紹瞭如何在串流處理平台中利用物化檢視最佳化即時分析,以及如何選擇合適的分析資料儲存以滿足不同的服務水準協定(SLA)需求,涵蓋記憶體內資料函式庫、即時 OLAP

資料工程 串流處理

串流資料處理架構的核心在於如何有效地收集、轉換和載入資料以供後續分析。ETL 和 ELT 作為兩種主要的資料整合模式,各有其適用場景和限制。在串流處理的背景下,理解它們之間的差異以及如何與現代串流處理平台整合至關重要。此外,物化檢視的應用和正確的資料儲存選型也是構建高效能即時分析系統的關鍵因素。隨著資料量的增長和即時性需求的提高,選擇合適的技術和策略對於確保資料處理效率和分析效能至關重要。

資料整合與處理模式

資料管道遵循兩種常見的模式:提取、轉換、載入(ETL)和提取、載入、轉換(ELT)。這兩種模式在操作順序上有所不同,並且在串流能力上存在限制。

ETL與ELT的主要區別

  • ETL先提取資料,然後進行轉換,最後載入到目標系統。
  • ELT先提取資料,然後載入到目標系統,最後進行轉換。

資料管道在串流資料處理中的關鍵角色

在串流資料處理的世界中,資料管道扮演著至關重要的角色。資料管道負責收集來自各種來源的資料,將其轉換為可分析的格式,並及時將其交付給分析工具和應用程式。

資料管道的運作機制

延續第一章介紹的水流比喻,串流資料管道就像您家中的水管。如圖2-2所示,水管中的水可能含有礦物質,需要過濾或加熱後才能供使用者使用。就像水管一樣,串流資料管道將事件資料透過一系列轉換後,再傳送給分析消費者。

資料管道在運算元據平面和分析資料平面之間的作用

資料管道的一個重要角色是將資料從運算元據平面移動到分析資料平面。運算元據平面是應用程式(包括微服務和OLTP資料函式庫)所在的區域,而分析資料平面則是分析系統(如資料倉儲、資料湖、湖倉一體等各種OLAP資料儲存)所在的區域。資料管道位於操作和分析資料平面之間,分別對資料進行轉換和整合。如圖2-3所示。

ETL流程詳解

在ETL(提取、轉換、載入)流程中,資料首先被提取出來,然後經過轉換任務,將其轉換為適合目標系統或資料倉儲的格式(如圖2-4所示)。轉換後的資料通常會暫時儲存在中間儲存中。最後,載入操作將轉換後的資料載入到目標系統(如資料倉儲)中,這涉及到將資料插入目標系統中的適當表格或結構中。

簡單的使用案例

讓我們回到一個簡單的使用案例:客戶在應用程式上點選一件綠色T恤。如圖2-5所示,在左側,我們捕捉來自手機應用程式的資料。點選事件被傳送到微服務,該微服務將事件轉發到串流平台中的主題(「點選事件主題」)。產品和客戶資訊被傳送到OLTP資料函式庫。從手機到OLTP資料函式庫的箭頭代表了對客戶或產品進行的交易。這些交易要麼是插入、更新,要麼是刪除。這些交易與CDC(變更資料捕捉)相關聯,我們在第一章中已經討論過。OLTP資料函式庫中的WAL(寫入日誌)中的交易被聯結器消費,並被釋出到對應的主題(「產品主題」、「客戶主題」)。

所有三個主題——點選事件、產品和客戶——都被實作了狀態轉換的串流處理器所消費。串流處理器的結果被釋出到接收主題,然後被OLAP資料儲存所消費。OLAP資料儲存然後將分析資料提供給儀錶板,供資料分析師使用。

ELT的限制

在ELT(提取、載入、轉換)中,轉換和載入任務的順序被顛倒,這使得這種方法更加靈活,但也帶來了一些限制。轉換是在目標系統中執行的,而不是在資料管道中進行的。由於不需要保持轉換的狀態,ELT資料管道更加簡單,不使用狀態化的串流處理。透過將資料載入到目標系統(很可能是可以執行轉換的資料函式庫),資料已經從串流中移除並靜止儲存。由於轉換需要被排程或觸發以在特定時間間隔執行,因此資料管道不再被視為實時處理。這種方法被視為批次處理。

ELT流程詳解

圖2-6展示了ELT流程:

  1. 資料從OLTP資料函式庫中提取出來。
  2. 資料被載入到OLAP資料儲存中。
  3. 當載入完成時,用於轉換資料的SQL被執行。

在目標資料儲存中執行SQL陳述句只會在「載入」完成後被觸發。這意味著流經資料管道的資料是批次處理的,因為批次有結束。如果資料是串流,它永遠不會結束,因此很難確定何時應該執行SQL。如圖2-7所示。

串流處理平台與ELT的整合應用

在即時串流使用案例中,資料管道應該延遲將資料儲存到目標系統,直到轉換完成且資料準備好供使用。乍看之下,ELT似乎並不適合串流處理。

ELT在串流即時案例中的應用

然而,透過利用現代串流平台和即時資料處理框架的能力,ELT可以支援串流即時使用案例。以下是ELT如何應用於串流即時案例的步驟:

提取(Extract)

在ELT中,提取步驟涉及從串流來源檢索資料,例如訊息佇列、事件串流或即時資料饋送。這些來源不斷地即時產生資料,ELT可以提取這個資料串流進行進一步處理。

載入(Load)

提取的資料被載入到支援串流資料擷取和儲存的目標系統中。這可能是一個串流平台或設計用於處理高速度和高容量資料串流的資料儲存系統。目標系統應該能夠有效地擷取串流資料並以允許即時處理的方式儲存它。

轉換(Transform)

在ELT中,轉換步驟發生在資料被載入到目標系統之後,例如像Apache Flink、Apache Kafka Streams或任何有狀態的串流處理平台這樣的即時資料處理框架。

Figure 2-8顯示了ELT如何使用串流處理平台作為其目標。在這種情況下,串流可以是從OLTP資料函式庫提取資料的聯結器,也可以是像Kafka這樣的串流平台。然而,由於許多串流處理平台(如Flink)沒有持久層(不計入其狀態儲存),轉換步驟的結果將不得不被帶到,例如,一個資料函式庫以啟用查詢。串流資料函式庫具有持久層,也可以是啟用這種架構模式的目標系統。

串流處理器

串流處理器是軟體平台或工具,能夠即時處理連續的資料串流。最重要的是,它們可以執行有狀態的轉換,因為它們具有內建的狀態儲存。

這些對事件資料的轉換對於消費者得出所需的分析洞察是必要的。

熱門的串流處理器

以下是一些熱門的串流處理器:

  • Apache Kafka Streams是一個用於根據JVM的程式語言的串流處理函式庫,是Apache Kafka專案的一部分。它允許開發人員建立即時應用程式和微服務,從Kafka消費、處理和產生資料串流。
  • Apache Flink是一個支援批次和串流處理的串流處理器,可以連線到各種來源和接收器,包括Kafka、Pulsar和Kinesis,以及像MongoDB和Elasticsearch這樣的資料函式庫。Flink將批次視為串流的一種特殊情況(具有邊界的資料的串流)。與Kafka Streams不同,Flink不是一個函式庫,而是在自己的叢集上執行。
  • Spark Structured Streaming是Apache Spark的一個元件,能夠進行串流處理。它支援多種聯結器,也是根據叢集的。它與其他串流處理器(如Kafka Streams、Flink和Samza)不同,因為它使用微批次(mini-batching)而不是原生串流處理——對於Spark來說,串流是批次的一種特殊情況,而不是像Flink那樣相反。
  • Apache Samza是由LinkedIn開發的一個串流處理器。它支援Kafka、Azure Event Hubs、Kinesis和HDFS,也是根據叢集的,就像Flink一樣。
  • Apache Beam不是一個串流處理器本身,而是一個統一的程式設計模型和一套用於建立資料處理管道的軟體開發工具包(SDK)。它提供了一個抽象層,允許開發人員編寫可以在各種分散式處理引擎上執行的資料處理作業,例如Apache Flink、Apache Spark、Apache Samza和Google Cloud Dataflow。

新興的串流處理器

以下是一些新興的串流處理器:

  • Quix Streams是一個根據C#/Python的串流處理函式庫,可與Kafka Streams相媲美,用於JVM。Quix Streams也支援串流資料框架,其行為類別似於Pandas或Spark中的資料框架,同時在底層進行增量更新。
  • Bytewax是一個根據Python的串流處理函式庫——類別似於Kafka Streams——利用及時資料流程作為底層的串流處理引擎。
  • Pathway是一個根據Python的串流處理函式庫。它根據差分資料流程(DD)作為底層的串流處理引擎。
  • Estuary Flow是一個支援多種聯結器的串流處理器。它是根據叢集的。

這些串流處理框架提供了諸如事件時間處理、視窗化、狀態管理、容錯性、可擴充套件性和與各種資料來源和接收器的整合等功能。它們使開發人員能夠建立即時資料處理管道,並使應用程式能夠對到達的資料做出反應,使它們成為處理各種使用案例中的串流資料的有價值工具。

內容解密:

  1. 事件時間處理:事件時間處理是指根據事件實際發生的時間進行處理,而不是根據事件到達處理系統的時間。這對於需要根據事件的實際發生順序進行處理的應用程式非常重要。
  2. 視窗化:視窗化是指將連續的資料串流劃分為有限的時間範圍或事件數量的子集,以便進行聚合或其他操作。這對於需要對一段時間內的資料進行分析或匯總的應用程式非常有用。
  3. 狀態管理:狀態管理是指在串流處理過程中維護和管理狀態的能力。有狀態的串流處理器可以在內部儲存和管理狀態,從而支援更複雜的操作,如聚合、連線和事件計數等。
  4. 容錯性:容錯性是指系統在發生故障或錯誤時能夠繼續正常執行的能力。具備容錯能力的串流處理器可以確保即使在部分元件發生故障時,整個系統仍然能夠繼續執行。
  5. 可擴充套件性:可擴充套件性是指系統能夠隨著負載或需求的增加而擴充套件的能力。可擴充套件的串流處理器可以透過增加更多的資源(如計算節點或記憶體)來提高效能,以滿足日益增長的需求。

此圖示說明瞭ELT在串流即時案例中的應用步驟,包括提取、載入和轉換。

流處理平台中的物化檢視與串流型別

儘管流處理器非常強大,但並非所有流處理器都支援物化檢視。物化檢視以類別似資料函式庫的方式表示資料,這對於實作即時分析使用案例非常有用。目前,Kafka Streams、Samza和Pathway等流處理器支援物化檢視。

在Apache Spark中模擬物化檢視

Apache Spark目前尚未內建支援建立物化檢視的功能。然而,Apache Spark提供了多種功能和最佳化,可以幫助實作類別似物化檢視的效益。例如:

  • 利用Spark的快取機制快取中間或最終結果,以提高效能。
  • 使用DataFrames或Datasets定義可重複使用的檢視或轉換,以提供資料抽象和最佳化。
  • 與外部資料儲存和系統(如Apache Hive、Apache HBase或其他資料函式庫)整合,利用其原生支援的物化檢視功能。

值得注意的是,Apache Spark生態系統正在不斷演進,未來版本可能會引入新的功能和增強功能,以擴充套件其功能。

兩種型別的串流

要更好地理解物化檢視,需要了解流處理器中的串流型別。有兩種型別的即時串流資料流經流處理器:變更串流和僅附加串流。變更串流通常來自CDC(變更資料擷取)事件,而僅附加串流則包含離散、不同的事件,例如點選事件。

僅附加串流

僅附加串流,例如點選事件串流,包含離散、不同的事件。每個點選事件都是不同的,即使是由同一客戶對同一T恤進行點選。僅附加串流通常不來自資料函式庫,因為每個點選事件都是唯一的,且會快速增長。

CREATE SOURCE click_events (
  id integer,
  ts long,
  url varchar,
  ipAddress varchar,
  ...
);

變更串流

變更串流通常來自CDC事件,例如對OLTP資料函式庫中的表格進行的交易(插入、更新和刪除)。這些記錄的增長速度不如點選串流資料快,因為有些事件會包含對現有記錄的更新或刪除。變更串流通常代表維度資料的變更,而維度資料在資料函式庫中變化緩慢。

使用SQL進行豐富化

在流處理平台中,變更串流和僅附加串流都將駐留在具有不同名稱的結構中,具體取決於所使用的流處理器。我們可以使用SQL對點選串流進行豐富化。

SELECT 
  click_events.id,
  click_events.ts,
  click_events.url,
  customer.name,
  product.description
FROM 
  click_events
JOIN 
  customer ON click_events.customer_id = customer.id
JOIN 
  product ON click_events.product_id = product.id;

內容解密:

  1. CREATE SOURCE 陳述式:用於從Kafka主題中擷取點選串流資料,並建立一個名為click_events的源表格,該表格是一個僅附加串流。
  2. SELECT 陳述式:用於對點選串流進行豐富化,透過與客戶和產品資料進行JOIN操作,以取得相關的客戶和產品資訊。
  3. JOIN 操作:用於將點選串流資料與客戶和產品資料進行關聯,以提供更豐富的資訊。

資料串流處理中的資料轉換與實時分析

在現代資料處理架構中,資料串流扮演著越來越重要的角色。無論是即時點選流分析、變更資料捕捉(CDC),還是其他型別的串流資料,都需要有效的處理和分析。本篇文章將探討如何在串流處理平台中進行資料轉換,以及如何利用物化檢視(Materialized Views)來最佳化資料查詢和分析。

資料來源的建立與串流資料的擷取

在串流處理中,第一步通常是從不同的來源擷取資料。例如,我們可以從 Kafka 主題中擷取點選流事件、產品資訊和客戶資料。以下是一個建立點選流事件表的範例:

CREATE SOURCE click_events (
  ipAddress varchar,
  sessionId varchar,
  referrer varchar,
  browser varchar
)
WITH (
  connector='kafka',
  topic='clicks',
  properties.bootstrap.server='kafka:9092',
  scan.startup.mode='earliest'
)
ROW FORMAT JSON;

內容解密:

  • CREATE SOURCE 陳述式用於從 Kafka 主題建立一個來源表。
  • WITH 子句指定了聯結器型別、主題名稱和 Kafka 伺服器的組態。
  • ROW FORMAT JSON 表示資料是以 JSON 格式儲存的。

Debezium CDC 資料的擷取與處理

Debezium 是一種流行的 CDC 聯結器,能夠捕捉資料函式庫中的變更並將其釋出到 Kafka 主題。以下是如何擷取產品和客戶資訊的範例:

CREATE SOURCE products (
  before ROW<id long, name varchar, color varchar, barcode long>,
  after ROW<id long, name varchar, color varchar, barcode long>,
  op varchar,
  source <...>
)
WITH (
  connector='kafka',
  topic='products',
  properties.bootstrap.server='kafka:9092',
  scan.startup.mode='earliest'
)
ROW FORMAT JSON;

內容解密:

  • beforeafter 列包含了變更前後的資料狀態。
  • op 列表示執行的操作型別(例如插入、更新或刪除)。
  • 這種結構使得我們能夠追蹤資料變更歷史。

物化檢視的最佳化作用

由於 CDC 資料可能包含多個版本的同一記錄,我們需要使用物化檢視來保留最新的記錄。以下是一個建立物化檢視的範例:

CREATE MATERIALIZED VIEW mv_products AS 
SELECT * FROM products;

內容解密:

  • 物化檢視 mv_products 只保留每個產品 ID 的最新版本。
  • 這種方式減少了資料量,使得查詢更加高效。

資料的 Join 與輸出到 Sink 主題

在完成資料轉換後,我們可以將點選流事件與產品和客戶資訊進行 Join,並將結果輸出到另一個 Kafka 主題:

CREATE SINK http_enrich AS
SELECT
  E.*,
  C.*,
  P.*
FROM click_events E
JOIN customers C ON C.ip = E.ip
JOIN products P ON P.product_id = E.product_id
WITH (
  connector='kafka',
  topic='click_customer_product',
  properties.bootstrap.server='kafka:9092',
  type='upsert',
  primary_key='id'
);

內容解密:

  • JOIN 操作將點選流事件與客戶和產品資訊結合起來。
  • 結果被寫入到名為 click_customer_product 的 Kafka 主題中。
  • type='upsert' 表示結果是根據主鍵進行更新插入操作。

圖示說明:串流處理中的資料轉換流程

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title 串流資料整合處理模式與平台應用

package "系統架構" {
    package "前端層" {
        component [使用者介面] as ui
        component [API 客戶端] as client
    }

    package "後端層" {
        component [API 服務] as api
        component [業務邏輯] as logic
        component [資料存取] as dao
    }

    package "資料層" {
        database [主資料庫] as db
        database [快取] as cache
    }
}

ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取

note right of api
  RESTful API
  或 GraphQL
end note

@enduml

此圖示展示了從 Kafka 資料來源到最終輸出到 Sink 主題的整個資料處理流程。透過物化檢視和 Join 操作,我們能夠有效地整合和分析不同的資料源。

即時資料服務的挑戰與技術選型

在前一章中,我們已經將經過串流處理平台轉換的資料儲存於接收主題(sink topic)中。現在,這些預處理後的資料已經準備好供使用者進行即時分析查詢。本章將重點討論如何將這些即時資料有效地提供給最終使用者。

即時分析的期望與服務水準協定(SLA)

為了滿足使用者對於即時分析的需求,我們需要考慮一系列服務水準協定(SLA)指標,包括:

  • 延遲(Latency):衡量分析查詢或計算完成並傳回結果所需時間。
  • 吞吐量與並發性(Throughput and Concurrency):衡量在給定時間內可以處理的分析查詢或計算數量。
  • 資料新鮮度(Data Freshness):衡量分析結果相對於底層資料流的更新程度。
  • 準確性(Accuracy):衡量分析結果的正確性和精確度。

此外,還有其他 SLA 指標,如:

  • 可用性(Availability):衡量即時分析系統的可操作時間百分比。
  • 一致性(Consistency):確保不同查詢執行和系統副本之間的結果一致性。
  • 可擴充套件性(Scalability):衡量系統處理日益增長的資料量、使用者請求和計算複雜度的能力。
  • 安全性和隱私性(Security and Privacy):確保資料保護、存取控制、加密和法規遵從性。

選擇合適的分析資料儲存

有多種型別的資料儲存可以滿足即時分析的 SLA 需求,包括:

記憶體內資料函式庫

  • Redis
  • SingleStore(前身為 MemSQL)
  • Hazelcast
  • Apache Ignite

這些資料函式庫將資料儲存在記憶體中,以實作快速存取和處理,提供極低的延遲和高吞吐量。

即時 OLAP 資料儲存

  • Apache Pinot
  • Apache Druid
  • ClickHouse
  • StarRocks
  • Apache Doris

這些資料儲存通常是列導向的分散式資料儲存,能夠高效地處理大量資料,並提供卓越的可擴充套件性和高用性。

混合交易/分析處理(HTAP)資料儲存

  • TiDB
  • SingleStore(前身為 MemSQL)

這些資料函式庫支援即時交易處理和分析,能夠在單一系統中提供兩種功能。

程式碼例項與解說

以下是一個使用 Apache Kafka 進行即時資料處理的範例程式碼:

// Kafka 生產者設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 建立 Kafka 生產者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 傳送資料到 Kafka 主題
String topic = "my_topic";
String key = "my_key";
String value = "my_value";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);

內容解密:

  1. 設定 Kafka 生產者的屬性,包括 bootstrap.serverskey.serializervalue.serializer
  2. 建立 Kafka 生產者例項。
  3. 定義要傳送的 Kafka 主題、鍵和值。
  4. 建立 ProducerRecord 物件並傳送到 Kafka 主題。

這個範例展示瞭如何使用 Kafka 生產者將資料傳送到指定的主題中。