返回文章列表

串流處理系統一致性比較與最佳實踐

本文比較了不同串流處理系統(Flink SQL、ksqlDB、Proton、RisingWave、Materialize 和 Pathway)的一致性表現,特別關注在處理亂序訊息和同步資料流合併時的差異。文章探討了最終一致性系統在處理 JOIN

資料工程 串流處理

在串流處理領域,資料一致性是系統設計和開發的關鍵考量。不同的串流處理系統採用不同的策略來確保一致性,從而影響系統的效能、可靠性和適用場景。本文以一個模擬銀行交易的案例,比較了 Flink SQL、ksqlDB、Proton、RisingWave、Materialize 和 Pathway 等系統的一致性表現,並探討瞭如何解決最終一致性系統在處理資料同步時遇到的挑戰。透過分析不同系統的實作方式,我們可以更深入地理解一致性模型的 trade-off,並選擇最適合特定應用需求的解決方案。尤其在涉及 JOIN、UNION 等非單調運算元時,資料同步的挑戰更為突出,需要仔細考量系統的設計與最佳化。

比較不同串流處理系統的一致性

Materialize實作範例

Materialize是第五個串流處理系統,是一種提供與PostgreSQL相容API的串流資料函式庫。首先,我們建立一個由輸入Kafka主題transactions餵養的表格。

CREATE CONNECTION kafka_connection TO kafka (broker 'broker:29092');
CREATE SOURCE transactions_source
FROM kafka connection kafka_connection (TOPIC 'transactions', START OFFSET (0)) KEY FORMAT TEXT VALUE FORMAT TEXT INCLUDE KEY ENVELOPE UPSERT WITH (SIZE = '1');
CREATE VIEW transactions AS
SELECT 
    ((text :: jsonb) ->> 'id') :: string AS id,
    ((text :: jsonb) ->> 'from_account') :: int AS from_account,
    ((text :: jsonb) ->> 'to_account') :: int AS to_account,
    ((text :: jsonb) ->> 'amount') :: int AS amount,
    ((text :: jsonb) ->> 'ts') :: timestamp AS ts,
    key
FROM transactions_source;

內容解密:

  1. CREATE CONNECTION:建立與Kafka叢集的連線,指定broker的位置。
  2. CREATE SOURCE:定義一個來源,從指定的Kafka主題讀取資料,並設定資料的格式和處理方式。
  3. CREATE VIEW transactions:建立一個檢視,將來源資料轉換成結構化的交易資料,包含交易ID、轉出帳戶、轉入帳戶、金額和時間戳等欄位。

接著,我們設定如範例6-9所示的檢視。

CREATE VIEW accounts AS
SELECT from_account AS account FROM transactions
UNION
SELECT to_account FROM transactions;

CREATE VIEW credits AS
SELECT transactions.to_account AS account, SUM(transactions.amount) AS credits
FROM transactions
LEFT JOIN accounts ON transactions.to_account = accounts.account
GROUP BY to_account;

CREATE VIEW debits AS
SELECT transactions.from_account AS account, SUM(transactions.amount) AS debits
FROM transactions
LEFT JOIN accounts ON transactions.from_account = accounts.account
GROUP BY from_account;

CREATE VIEW balance AS
SELECT credits.account AS account, credits.credits - debits.debits AS balance
FROM credits
INNER JOIN debits ON credits.account = debits.account;

CREATE VIEW total AS
SELECT SUM(balance) FROM balance;

內容解密:

  1. CREATE VIEW accounts:建立一個包含所有帳戶的檢視,透過UNION操作合並轉出和轉入帳戶。
  2. CREATE VIEW creditsdebits:分別計算每個帳戶的貸方和借方總額。
  3. CREATE VIEW balance:計算每個帳戶的餘額,透過將貸方總額減去借方總額。
  4. CREATE VIEW total:計算所有帳戶的總餘額。

最後,我們將檢視total匯入到主題total_materialize中。

CREATE SINK total_sink
FROM total INTO kafka connection kafka_connection (TOPIC 'total_materialize')
FORMAT JSON ENVELOPE DEBEZIUM WITH (SIZE = '1');

內容解密:

  1. CREATE SINK:將total檢視的結果輸出到Kafka主題total_materialize中,資料格式為JSON。

Pathway實作範例

Pathway是一個Python串流處理函式庫。範例6-10展示瞭如何使用Pathway設定範例。

#!/bin/python
import pathway as pw

rdkafka_settings = {
    "bootstrap.servers": "localhost:56512",
    "group.id": "pw",
    "session.timeout.ms": "6000"
}

class InputSchema(pw.Schema):
    id: int
    from_account: int
    to_account: int
    amount: int
    ts: str

t = pw.io.kafka.read(
    rdkafka_settings,
    topic="transactions",
    schema=InputSchema,
    format="json",
    autocommit_duration_ms=1000
)

credits = pw.sql("""
    SELECT to_account, sum(amount) as credits
    FROM T GROUP BY to_account
""", T=t)

debits = pw.sql("""
    SELECT from_account, sum(amount) as debits
    FROM T GROUP BY from_account
""", T=t)

balance = pw.sql("""
    SELECT CC.to_account, credits - debits as balance
    FROM CC join DD on CC.to_account = DD.from_account
""", CC=credits, DD=debits)

total = pw.sql("""
    SELECT sum(balance) as total FROM BB
""", BB=balance)

pw.io.kafka.write(
    total,
    rdkafka_settings=rdkafka_settings,
    topic_name='total_pathway',
    format='json'
)

pw.run()

內容解密:

  1. 設定Kafka連線資訊和輸入資料的schema。
  2. 從Kafka主題transactions讀取資料,並轉換成結構化的資料流t
  3. 使用SQL查詢計算貸方、借方、餘額和總餘額,分別存放在creditsdebitsbalancetotal中。
  4. 將最終結果輸出到Kafka主題total_pathway中。

結果比較

Materialize和Pathway都正確地輸出了結果0到對應的Kafka主題中,無論執行多少次。圖6-6和圖6-7展示了這兩個系統的視覺化結果。

亂序訊息處理

為了模擬更真實的情況,我們修改了Python程式碼,使其產生約1/10的亂序訊息。實驗結果表明,Flink SQL、ksqlDB和Proton在處理亂序訊息時出現了問題,而RisingWave、Materialize和Pathway則表現正常。

超越最終一致性

本章探討了五個串流處理系統在一致性方面的表現,並提出了一些問題,例如為什麼最終一致性的系統會失敗,以及如何實作內部一致性。這些問題對於理解串流處理系統的設計和最佳化具有重要意義。

事件時間處理中的同步問題與最終一致性串流處理系統

在最終一致性串流處理系統(如 Flink SQL、ksqlDB 和 Proton)中,當處理需要同步的多個資料流時,可能會出現中間結果不正確的問題。本章節將探討此類別問題的成因及可能的解決方案。

問題的起源:JOIN 操作與資料同步

在處理銀行交易的例子中,我們定義了一個 balance 檢視,該檢視透過 JOIN 操作結合了 creditsdebits 兩個檢視。然而,由於 creditsdebits 檢視的輸入資料流並未同步,導致 balance 檢視可能錯誤地組合了輸入資料。

交易範例

考慮以下四筆交易:

  1. 從帳戶 0 轉 $1 到帳戶 1。
  2. 從帳戶 0 轉 $1 到帳戶 2。
  3. 從帳戶 1 轉 $1 到帳戶 2。
  4. 從帳戶 2 轉 $1 到帳戶 0。

錯誤的組合結果

由於 creditsdebits 檢視的輸出並不同步,可能出現以下情況:

  • credits 檢視輸出快於 debits 檢視時,balance 檢視會錯誤地將 credits 的最新結果與 debits 的舊結果結合,導致餘額總和錯誤地變為正數(1、2、3)。
  • debits 檢視輸出快於 credits 檢視時,balance 檢視會錯誤地將 debits 的最新結果與 credits 的舊結果結合,導致餘額總和錯誤地變為負數(-1、-2、-3)。

圖表說明:錯誤的中間結果

此圖示展示了當 credits 檢視輸出快於 debits 檢視時的錯誤組合結果。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title 串流處理系統一致性比較與最佳實踐

package "資料庫架構" {
    package "應用層" {
        component [連線池] as pool
        component [ORM 框架] as orm
    }

    package "資料庫引擎" {
        component [查詢解析器] as parser
        component [優化器] as optimizer
        component [執行引擎] as executor
    }

    package "儲存層" {
        database [主資料庫] as master
        database [讀取副本] as replica
        database [快取層] as cache
    }
}

pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中

master --> replica : 資料同步

note right of cache
  Redis/Memcached
  減少資料庫負載
end note

@enduml

非單調運算元的早期發射問題

最終一致性串流處理系統為了降低延遲,通常會盡早發射結果。然而,這對於非單調運算元(如 JOIN 和 UNION)可能會造成問題,因為它們無法保證輸入資料的同步。

MiniBatch 與快取的作用

使用 MiniBatch(如 Proton 和 Flink 1.19+)或快取(如 ksqlDB)可以減少早期發射問題的發生,從而提高結果的正確性。

無同步的資料流合併

最終一致性串流處理系統在處理無同步的多個資料流時,可能會因為早期發射和缺乏同步機制而導致錯誤的結果。雖然使用全域鎖可以實作同步,但這會影響系統的可擴充套件性。未來章節將探討如何在保持系統平行性和可擴充套件性的同時實作同步。

內部一致性串流處理系統如何透過玩具示例?

正如我們所見,正確處理玩具示例的關鍵在於能夠同步組合資料流。直觀地說,我們必須確保只有那些屬於同一交易的 credits 和 debits 結果才會被組合在一起。換句話說,它們對應於 transactions 源 Kafka 主題中的同一交易。

顯示 total 主題結果的圖表表明,RisingWave(圖 6-5)、Materialize(圖 6-6)和 Pathway(圖 6-7)可能已經找到了一種實作此目的方法。但它們是如何做到的呢?

RisingWave

RisingWave 利用了障礙(barriers)的概念,其靈感來自 Flink 的檢查點障礙(checkpoint barriers)。本質上,障礙是包含紀元(時間戳)的控制記錄,它們會定期(例如,每秒)自動注入所有源中。在 RisingWave 中,障礙被用作資料的版本號。只有當操作員從所有輸入中接收到相同的版本/障礙時,才允許發出與特定版本相關的結果。

Flink 使用障礙進行一致的檢查點。另一方面,RisingWave 也充分利用了它是一個串流資料函式庫而非僅僅是一個串流處理器的事實,因此它對其持久層(包括檢查點)具有完全的控制權。因此,RisingWave 可以超越 Flink,不僅可以使用檢查點障礙進行檢查點,還可以使用它來進行版本控制,這是資料函式庫世界中快照隔離(snapshot isolation)概念的改編。

讓我們來看看 RisingWave 如何處理我們的四個交易。在圖 6-10 中,我們在每個交易之後注入障礙,用垂直線表示。每個障礙都有自己的版本,用下標(1、2、3、4)表示。在 RisingWave 的處理過程中,交易障礙被轉發到下一個操作員。現在,當計算餘額檢視時,障礙用於確保只有當 credits 和 debits 檢視的輸入都先於相同的障礙(即具有相同的版本)時才會被組合在一起。

圖 6-10. 使用交易障礙確保一致性

原則上,障礙也可以較不頻繁地注入。考慮圖 6-11 中的圖表,其中我們只在 transactions 中的每兩個訊息中注入一個障礙。

圖 6-11. 使用(較不頻繁的)交易障礙確保一致性

在 RisingWave 中注入這些紀元障礙的頻率對延遲和記憶體消耗有直接影響。增加註入頻率可以減少端對端延遲,但會增加記憶體消耗,因為維護更多版本的資料會消耗更多記憶體。

乍一看,障礙類別似於來自串流處理器(如 Flink)的水印(watermarks),因為它們也是資料流圖中的控制記錄。然而,障礙和水印具有稍微不同的語義。對於障礙,操作員只有在達到所有輸入的相同障礙時才能發出結果。對於水印,操作員只有在遇到水印時才能繼續進行。水印表示直到某個時間戳的所有事件都應該已經到達。

Materialize

Materialize 根據差分資料流(Differential Dataflow, DD)。DD 中的資料始終是有版本的,所有 DD 的操作員都尊重這些版本。因此,圖 6-12 中 Materialize 的圖表與圖 6-10 非常相似,唯一的差別是資料是開箱即用的版本控制——DD 不需要像障礙這樣的額外概念。

圖 6-12. DD/Materialize 如何透過資料版本確保一致性

DD 中的操作員同步是透過以下方式實作的:

  1. 每個資料都伴隨著一個版本(在圖 6-12 中,它們被稱為“v1”、“v2”、“v3”和“v4”)。
  2. 操作員只能組合相同版本的資料。

透過這種方式,DD 也實作了一種快照隔離的形式,並且可以透過我們的玩具示例。

Pathway

與 Materialize 類別似,Pathway 也根據 DD,因此也使用版本控制來實作操作員輸入同步,以透過玩具示例挑戰。我們包含了 Pathway 以表明這種形式的一致性不僅可以透過串流資料函式庫實作,也可以透過具有內部一致的底層引擎(如 DD)的串流處理函式庫實作。

如何修復最終一致性串流處理系統以透過玩具示例?

我們已經看到,內部一致性串流處理系統的關鍵特徵之一是它們能夠同步其二元、非單調操作員(如 UNION 和 JOIN)的輸入,無論是透過障礙(RisingWave)還是版本(DD、Materialize、Pathway)。更抽象地說,“全域無鎖同步王國”的關鍵是有效的語義上有意義的時間戳系統,允許在串流處理拓撲中發生解耦進展。

我們能否利用這一洞察力推匯出一個修復方案,使 Flink SQL 也能透過我們的玩具示例?

對於 Flink SQL,確實有一些方法可以透過我們的玩具示例。其中一種方法是明確使用餘額檢視的 WHERE 子句中的時間戳欄位 ts,以便僅當 credits 和 debits 的 ts 欄位匹配時才 JOIN 它們。

我們在範例 6-12 中顯示了對 Flink SQL 程式碼的更改。

CREATE VIEW credits(account, credits, ts) AS
SELECT
    to_account as account,
    sum(amount) as credits,
    ts
FROM
    transactions
GROUP BY
    to_account,
    ts;

CREATE VIEW debits(account, debits, ts) AS
SELECT
    from_account as account,
    sum(amount) as debits,
    ts
FROM
    transactions
GROUP BY
    from_account,
    ts;

內容解密:

  1. 建立 credits 檢視:此 SQL 陳述式建立了一個名為 credits 的檢視,該檢視匯總了 transactions 表中每個 to_accountamount,並且保留了 ts 時間戳。
  2. 建立 debits 檢視:同樣地,此陳述式建立了一個名為 debits 的檢視,它匯總了 transactions 表中每個 from_accountamount,並保留了 ts 時間戳。
  3. 使用 ts 進行同步:透過在 creditsdebits 檢視中包含 ts 欄位,我們可以根據 ts 是否匹配來同步這兩個檢視的結果,從而確保只有屬於同一交易的記錄才會被合併。

這種方法有效地利用了時間戳來確保操作的正確同步,從而使 Flink SQL 能夠透過玩具示例。