在建構即時分析系統時,選擇合適的查詢方式至關重要。推播查詢能主動推播資料更新,適合即時性要求高的場景;而提取查詢則由客戶端主動請求資料,提供更高的彈性。然而,單獨使用任一種方式都難以兼顧低延遲和高彈性。透過物化檢視,我們可以將查詢結果預先計算並儲存,客戶端只需訂閱變更,即可兼顧兩者的優點。此外,串流資料函式庫的出現,整合了串流處理和資料函式庫功能,更能簡化系統架構,有效提升即時分析的效率和效能。
推播查詢與提取查詢的平衡:實作即時分析的最佳實踐
在即時分析的領域中,推播查詢(push query)與提取查詢(pull query)是兩種不同的查詢方式,它們各自具有不同的優勢和侷限性。推播查詢允許系統主動向客戶端推播資料更新,而提取查詢則需要客戶端主動向伺服器請求資料。在本章中,我們將探討這兩種查詢方式的差異,並介紹如何利用物化檢視(materialized views)和串流資料函式庫(streaming databases)來實作兩者的平衡,從而達到低延遲和高彈性的即時分析。
推播查詢與提取查詢的對比
推播查詢和提取查詢的主要區別在於資料的主動性。推播查詢由系統主動推播資料更新給客戶端,而提取查詢則需要客戶端定期向伺服器請求資料。推播查詢適合於即時資料更新的場景,而提取查詢則適合於客戶端需要靈活控制查詢邏輯的場景。
圖 4-7:當查詢延遲接近零時,推播查詢更受青睞
圖 4-8:推播查詢和提取查詢共同工作以平衡延遲和彈性
然而,是否有辦法同時實作高彈性和低延遲,而不需要使用兩種不同的 SQL 查詢?答案是肯定的。我們可以透過使用物化檢視來實作這一目標。物化檢視可以將查詢結果物化,並將變更通知給客戶端。
利用物化檢視實作推播查詢和提取查詢的平衡
- 客戶端提交推播查詢:客戶端提交一個推播查詢,該查詢建立一個物化檢視。
- 訂閱物化檢視的變更:客戶端訂閱物化檢視的變更,就像訂閱一個 WAL(Write-Ahead Logging)一樣。
透過這種方式,客戶端提交了一個推播查詢,而不是提取查詢。透過允許客戶端修改推播查詢,可以實作臨時查詢的彈性。同時,透過訂閱物化檢視的變更,查詢延遲不再是問題,因為增量變更會被推播到客戶端。
串流資料函式庫的作用
要實作上述架構,需要一個具備以下功能的系統:
- 串流處理能力,如構建物化檢視。
- 能夠將物化檢視暴露給串流平台中的主題(topic),類別似於 WAL。
- 能夠以最佳方式儲存資料以供查詢。
- 能夠提供同步和非同步服務方法。
這些功能只有串流資料函式庫才能提供。串流資料函式庫能夠將串流處理平台和資料函式庫結合在一起,使用相同的 SQL 引擎來處理流動中的資料和靜態資料。
常見的即時分析解決方案
目前最常見的即時分析解決方案是執行一個串流處理平台(如 Apache Flink)和一個 RTOLAP 資料儲存(如 Apache Pinot)。這種架構如圖 4-9所示。
圖 4-9:常見的即時分析解決方案
在這種架構中,OLTP 資料函式庫中的資料透過 CDC(Change Data Capture)聯結器被讀取並寫入串流平台的主題中。串流處理器(如 Apache Flink)讀取主題中的資料並構建物化檢視,然後將物化檢視的變更輸出到另一個主題中。RTOLAP 資料儲存(如 Apache Pinot)讀取該主題中的資料並最佳化儲存以供分析查詢。
CDC 和 Upsert 操作
Upsert 是一種邏輯操作,用於描述應用程式在插入和/或更新資料函式庫表時的行為。它檢查記錄是否存在,如果存在則更新,否則插入新記錄。CDC 資料包含增量變更,如插入、更新和刪除。Upsert 操作可以間接提高選擇查詢的效能和準確性。
圖 4-12:CDC 情境下的 Upsert 操作
Upsert 操作可以簡化提取查詢,並提高資料的完整性和準確性。透過使用物化檢視和串流資料函式庫,可以實作推播查詢和提取查詢的平衡,從而達到低延遲和高彈性的即時分析。
串流資料處理與即時分析系統的整合挑戰
在現代的資料架構中,串流資料處理扮演著越來越重要的角色。企業需要即時處理和分析大量的資料,以做出快速的商業決策。本篇文章將探討串流資料處理中的變更資料擷取(Change Data Capture, CDC)、更新插入(Upsert)邏輯,以及如何在即時OLAP(RTOLAP)資料儲存中實作這些功能。
CDC流程與RTOLAP資料儲存的複製
變更資料擷取(CDC)是一種技術,能夠捕捉資料函式庫中的變更並將這些變更傳遞到下游系統。以下是CDC資料複製到RTOLAP資料儲存的步驟:
應用程式向OLTP資料函式庫傳送交易以插入、更新或刪除記錄。假設我們要更新綠色T恤的庫存,這將涉及更新
Products表格。更新操作首先被寫入OLTP資料函式庫的預寫日誌(Write-Ahead Log, WAL)中。
CDC聯結器讀取WAL以捕捉變更。如果聯結器是新啟動的,它需要對
Products表格進行快照,以取得當前狀態。這樣做的目的是為了建立種子事件,這些事件在邏輯上等同於對Products表格中每條記錄的插入操作。當串流處理器啟動時,如果是第一次消費該主題,它會從頭開始讀取。否則,它會從儲存的偏移量開始讀取。透過從頭讀取主題,串流處理器能夠構建
Products表格的副本。在串流處理器中可以實施複雜的轉換操作,這些操作需要在代表
Products表格副本的物化檢視上進行。如果RTOLAP資料儲存直接從主題讀取,它需要自行處理更新插入(Upsert)邏輯,包括識別插入、更新和刪除操作。
另一種方法是串流處理器直接將資料傳送到RTOLAP資料儲存。對於不支援Upsert的RTOLAP系統,串流處理器需要執行Upsert邏輯。
程式碼範例:串流處理器中的Upsert邏輯
// 假設使用Kafka Streams進行串流處理
KStream<String, String> stream = builder.stream("input_topic");
// 定義Upsert邏輯
stream.map((key, value) -> {
// 解析輸入的變更事件
ChangeEvent event = parseChangeEvent(value);
// 根據事件型別執行Upsert操作
if (event.getType().equals("INSERT") || event.getType().equals("UPDATE")) {
// 更新或插入記錄
return new KeyValue<>(event.getKey(), event.getValue());
} else if (event.getType().equals("DELETE")) {
// 刪除記錄
return new KeyValue<>(event.getKey(), null);
} else {
// 不支援的事件型別
return null;
}
})
.to("output_topic");
內容解密:
- 串流處理器的輸入:首先,我們從名為
input_topic的主題中讀取串流資料。 - 變更事件解析:對於每條輸入記錄,我們解析其內容以確定變更事件的型別(插入、更新或刪除)。
- Upsert邏輯實作:根據變更事件的型別,我們執行相應的操作。如果是插入或更新,我們將新的值寫入輸出主題;如果是刪除操作,我們寫入一個空值以表示刪除。
- 輸出結果:最終結果被寫入名為
output_topic的主題中,供下游系統使用。
壓縮主題與歷史資料儲存
在CDC流程中,使用壓縮主題(Compacted Topic)至關重要。這類別主題保留每個主鍵的最新記錄,使得歷史資料得以儲存,並能夠用於構建下游表格副本,包括歷史記錄。
Upsert邏輯的實作挑戰
Upsert邏輯可以在RTOLAP系統或串流處理器中實作。較簡單且較佳的方法是讓RTOLAP直接從輸出主題讀取並自行應用Upsert邏輯。這種方法還提供了緩衝區,以應對串流處理器產生資料速度快於RTOLAP消費速度的情況。
多系統整合的挑戰
CDC涉及多個系統和複雜邏輯,包括OLTP資料函式庫的WAL、串流平台中的壓縮主題、Upsert邏輯以及物化檢視。這些複雜性會導致資料工程師和分析使用者之間的爭議。整合這些系統並減少冗餘和複雜性是當務之急。串流資料函式庫提供了一種可能的解決方案。
串流的Join操作
在串流處理中,轉換操作需要在代表變更串流的物化檢視和追加串流之間進行。追加串流是指只允許插入操作的串流,例如應用程式產生的點選串流資料。
追加表格與變更表格
在串流處理器中,我們區分兩種型別的表格:
- 追加表格(Append Tables):代表追加串流,不由狀態儲存支援,用於表示透過串流處理器的資料。
- 變更表格(Change Tables):代表物化檢視,由狀態儲存支援,用於儲存變更串流的結果。
同樣,在串流平台中,我們也區分兩種型別的主題:
- 追加主題(Append Topics):包含追加資料的主題。
- 變更主題(Change Topics):包含變更事件或CDC事件的主題,也被稱為“表格主題”。
Join操作的實作
Join操作可以在變更表格和追加表格之間進行。例如,將產品資訊的變更串流與客戶點選行為的追加串流結合,以獲得更豐富的分析結果。
程式碼範例:Join操作
// 假設有兩個KStream,分別代表產品資訊變更串流和客戶點選行為追加串流
KStream<String, Product> productStream = builder.stream("products_topic");
KStream<String, ClickEvent> clickStream = builder.stream("clicks_topic");
// 對兩個串流進行Join操作
productStream.join(clickStream, (product, click) -> {
// 結合產品資訊和點選行為
EnrichedClick enrichedClick = new EnrichedClick(product, click);
return enrichedClick;
})
.to("enriched_clicks_topic");
內容解密:
- 定義輸入串流:我們定義了兩個輸入串流,分別代表產品資訊和客戶點選行為。
- Join操作的實作:我們根據某個共同的鍵對兩個串流進行Join操作,結合產品資訊和點選行為以獲得豐富的分析結果。
- 輸出豐富化的點選事件:最終結果被寫入名為
enriched_clicks_topic的主題中,供下游分析使用。
資料串流的結合:以SQL為基礎的轉換與操作
在處理資料串流時,結合多個串流的邏輯可能會變得複雜。使用SQL作為定義結合和轉換的語言至關重要,因為SQL是操縱資料的通用語言,而SQL引擎需要結合串流和資料函式庫。分享一個SQL引擎來操縱動態資料和靜態資料,最終實作了串流資料函式庫的概念。
Apache Calcite:建構資料函式庫的基礎框架
Apache Calcite是一個用於建構資料函式庫的資料管理框架,它根據關聯代數(relational algebra)。關聯代數是一種正式且數學化的描述方式,用於描述對關聯資料函式庫進行的操作。它是一套規則和符號,幫助我們操縱和查詢儲存在表格中的資料,也就是所謂的關聯。
Apache Calcite的核心特性
Apache Calcite包含了許多構成數學運算的元件,但省略了一些關鍵功能:資料儲存、資料處理演算法以及用於儲存元資料的倉函式庫。如果你想從頭開始建構一個資料函式庫,Apache Calcite是一個可用的建構區塊。許多現有的即時系統都使用了Calcite,例如Apache Flink、Apache Pinot、Apache Kylin、Apache Druid、Apache Beam和Apache Hive等。
使用SQL結合串流
例項分析:將點選事件與客戶、產品資訊結合
CREATE SINK clickstream_enriched AS
SELECT
E.*,
C.*,
P.*
FROM CLICK_EVENTS E
JOIN CUSTOMERS C ON C.ip = E.ip
JOIN PRODUCTS P ON P.product_id = E.product_id
WITH (
connector = 'kafka',
topic = 'click_customer_product',
properties.bootstrap.server = 'kafka:9092',
type = 'upsert',
primary_key = 'id'
);
內容解密:
CREATE SINK clickstream_enriched AS: 建立一個名為clickstream_enriched的輸出結果集,並將其定義為後續的SQL查詢結果。SELECT E.*, C.*, P.* FROM CLICK_EVENTS E JOIN CUSTOMERS C ON C.ip = E.ip JOIN PRODUCTS P ON P.product_id = E.product_id: 從CLICK_EVENTS、CUSTOMERS和PRODUCTS三個表中選擇所有欄位,並根據ip和product_id進行內連線(inner-join)。CLICK_EVENTS是一個附加表(append table),來自於一個附加主題(append topic),包含來自使用者端應用程式的點選事件。CUSTOMERS和PRODUCTS是變更表(change table),分別來自於不同的變更主題(change topic),包含來自OLTP資料函式庫的變更事件,這些事件透過CDC(Change Data Capture)聯結器捕捉。
WITH (...): 定義了輸出結果集的屬性,包括聯結器型別(Kafka)、主題名稱、Kafka伺服器的組態、輸出型別(upsert)以及主鍵。
不同型別表格結合的特性
- 附加表與附加表結合:總是需要視窗(windowed),否則狀態儲存(state store)會耗盡空間。
- 變更表與變更表結合:不需要視窗,只要狀態儲存的大小適當,可以容納結合結果。
- 變更表與附加表結合:同樣需要視窗,否則狀態儲存會耗盡空間。
左連線(Left Join)在串流處理中的應用
在串流處理中,當你對一個附加表串流和一個變更表串流進行左連線操作時,結果是由附加表串流驅動的。
SELECT k.product_id, c.customer_name
FROM click k
LEFT JOIN customers c ON k.customer_id = c.customer_id;
內容解密:
SELECT k.product_id, c.customer_name FROM click k LEFT JOIN customers c ON k.customer_id = c.customer_id: 從click串流和customers串流中選擇欄位,並根據customer_id進行左連線。click串流是左串流,驅動著連線結果。對於click串流中的每個事件,它會根據連線條件包含來自customers串流的匹配事件。
點選流使用案例分析
讓我們逐步分析圖4-13,以全面理解整個流程。
圖4-13:CDC與僅附加事件從應用程式到RTOLAP的路徑
客戶更新其資訊
- a. 資訊被儲存到OLTP資料函式庫中。
- b. CDC(變更資料擷取)程式在OLTP資料函式庫上執行,捕捉
CUSTOMERS表的變更,並將其寫入CDC主題。該主題是一個壓縮主題,可以視為CUSTOMERS表的副本。這允許其他系統建立自己的CUSTOMERS表副本。
同一客戶在電子商務應用程式上點選某產品
- 點選事件被寫入主題。我們不將點選事件寫入OLTP資料函式庫,因為點選事件僅為插入操作。在OLTP資料函式庫中捕捉這些事件可能會導致資料函式庫儲存空間耗盡。
流處理器從CDC和點選事件主題讀取資料
- a. 來自
CUSTOMERS變更表主題的訊息儲存在狀態儲存中,其大小取決於視窗大小(或在Kafka Streams或ksqlDB等情況下,完全儲存在KTable中)。 - b. 來自
CLICK_EVENTS附加表主題的訊息在流處理器中被處理。 - c. 在
CLICK_EVENTS附加表訊息和CUSTOMERS變更表訊息之間執行左連線。連線的結果是CLICK_EVENTS豐富了對應的客戶資訊(如果存在)。
- a. 來自
流處理器將輸出寫入以下主題
- a. 這是一個變更主題,包含CDC客戶變更。由於1b中的主題包含相同的資料,這個主題是多餘的,但為了保持圖表的平衡而保留。
- b. 這是一個附加主題,包含原始的
CLICK_EVENT資料,豐富了客戶資料。
主題被拉入RTOLAP資料儲存以進行即時服務
- a. 這是原始
CUSTOMERS表在OLTP資料函式庫中的副本,從變更主題建立。 - b. 這包含豐富的
CLICK_EVENTS資料。
- a. 這是原始
使用者對RTOLAP資料儲存發出查詢
- a. 使用者可以直接查詢
CUSTOMERS表。 - b. 使用者可以查詢豐富的
CLICK_EVENTS資料,而無需自行連線資料,因為連線已經在流處理器中完成。
- a. 使用者可以直接查詢
內容解密:
此案例展示瞭如何透過流處理器實作即時資料分析和查詢最佳化。流處理器預先將CLICK_EVENTS和CUSTOMER資料連線起來,以提高查詢效能。RTOLAP系統則專注於快速、低延遲的查詢。
物化檢視的優勢
- 高效能查詢:物化檢視儲存預先計算的結果,允許更快、更靈敏的查詢。
- 流批處理整合:物化檢視可以儲存流處理管道的中間結果,提供流資料和批次導向分析之間的橋樑。
- 統一處理模型:透過結合流處理和資料函式庫的優點,物化檢視簡化了資料密集型系統的整體架構。
串流資料函式庫簡介
在試算表中,您可以在一個儲存格中放入公式(例如,另一欄中儲存格的總和),並且每當公式的輸入發生變化時,公式的結果就會自動重新計算。這正是我們在資料系統層面想要的:當資料函式庫中的記錄發生變化時,我們希望該記錄的任何索引都能自動更新,並且任何依賴於該記錄的快取檢視或聚合都能自動重新整理。您不應該擔心這次重新整理的技術細節,而是能夠簡單地相信它能正確運作。 —Martin Kleppmann,《設計資料密集型應用》
在前一章中,我們學習瞭如何像Martin Kleppmann所說的那樣「將資料函式庫翻轉出來」。這涉及將資料函式庫的WAL(寫入前日誌)外部化為輸入變更串流,在其上建立物化檢視,並將處理後的資料寫回輸出變更串流。與傳統資料函式庫(如Oracle或Postgres)中的物化檢視不同,在Flink、Kafka Streams、ksqlDB或Samza等串流處理平台中,物化檢視可以持續重新整理——隨著每個新變更的到來。
「將資料函式庫翻轉出來」的想法使我們能夠建立提供比以往更快速資料的物化檢視。然而,與簡單的傳統資料函式庫安裝相比,它也需要我們處理許多額外的複雜性:為了真正理解由Flink、Kafka Streams、ksqlDB或Samza建立的持續更新的物化檢視,輸出變更串流必須被攝入額外的外部資料函式庫(例如,像Druid、Pinot、ClickHouse或Rockset這樣的RTOLAP資料函式庫)。因此,從架構上講,「將資料函式庫翻轉出來」迫使我們啟動和操作三個系統(串流平台、串流處理器和外部資料函式庫),而不是簡單地擁有一個傳統資料函式庫。
識別串流資料函式庫
到目前為止提供的大多數圖表主要包含以下元件,如圖5-1所示。現在,讓我們忽略讀取和寫入系統所需的聯結器。
圖示:串流處理元件
此圖示顯示了串流處理的不同元件,包括資料函式庫、串流處理器和內部狀態儲存。
從左到右,圖5-1中的元件如下:
- 資料函式庫可以是我們到目前為止討論的三種型別的資料儲存之一:OLTP、RTOLAP和串流處理器中的內部狀態儲存。它們之間的差異決定了資料如何被儲存和查詢。
-- 示範基本的SQL查詢
SELECT * FROM 資料表 WHERE 條件 = '滿足條件';
內容解密:
上述SQL查詢用於從指定的資料表中檢索所有滿足特定條件的記錄。這裡展示了基本的SQL語法結構,用於對資料函式庫進行查詢操作。
走向串流資料函式庫的新時代
我們相信,「將資料函式庫翻轉出來」的想法是建立串流和資料函式庫世界之間橋樑的關鍵第一步,並且它已經為串流處理的重大進展鋪平了道路。但它並沒有走完全程。 在本文的核心章節中,我們將帶您踏上一段旅程,從有狀態的串流處理、根據串流的物化檢視和狀態儲存,到重新根據其原始表述的新物化檢視概念。我們將向您展示,像ksqlDB、Materialize、RisingWave和Timeplus這樣的新的串流資料函式庫即將開始在串流和資料函式庫世界之間的橋樑上放置最後的磚塊,並在「將資料函式庫翻轉出來」之後邁出下一步邏輯步驟:再次將其「翻轉回來」。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title 即時分析推播與提取查詢最佳實踐
package "資料庫架構" {
package "應用層" {
component [連線池] as pool
component [ORM 框架] as orm
}
package "資料庫引擎" {
component [查詢解析器] as parser
component [優化器] as optimizer
component [執行引擎] as executor
}
package "儲存層" {
database [主資料庫] as master
database [讀取副本] as replica
database [快取層] as cache
}
}
pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中
master --> replica : 資料同步
note right of cache
Redis/Memcached
減少資料庫負載
end note
@enduml
圖示說明:
此圖示呈現了從OLTP資料函式庫到外部資料函式庫的資料流程,涉及WAL、串流平台、物化檢視和輸出變更串流等關鍵元件。
內容解密:
- OLTP資料函式庫首先透過WAL捕捉變更。
- 這些變更被發布到串流平台。
- 在串流平台上建立物化檢視以進行即時資料處理。
- 處理後的資料被寫入輸出變更串流。
- 最終,輸出變更串流被攝入外部資料函式庫,以供進一步查詢或分析使用。