返回文章列表

串流系統一致性探討與比較

本文探討串流系統中資料一致性的挑戰,並以銀行轉帳範例比較 Flink SQL、ksqlDB、Proton 和 RisingWave 等串流處理引擎的一致性模型與輸出結果。實驗模擬帳戶間持續轉帳,觀察各系統如何維持餘額總和為零,藉此分析它們在處理非視窗資料時的一致性表現。結果顯示 RisingWave

串流處理 資料函式庫

在資料函式庫系統中,資料一致性通常不成問題。然而,在串流處理系統中,由於資料的延遲和亂序到達,維持一致性變得更具挑戰性。尤其在需要低延遲和高吞吐量的場景下,許多串流處理器僅能提供最終一致性,這可能導致非預期的結果。本文以銀行帳戶間持續轉帳的範例,比較不同串流處理引擎的一致性表現,包括 Flink SQL、ksqlDB、Proton(Timeplus)和 RisingWave,觀察它們如何處理持續變化的資料並維持餘額總和的正確性。

一致性在串流系統中的重要性

在資料函式庫領域中,一致性是理所當然的。查詢結果總是與輸入資料保持一致。但當我們進入串流世界時,情況就變得複雜了。資料可能延遲到達或亂序,而且系統需要兼顧低延遲和高吞吐量。在這種情況下,傳統的串流處理器只能保證最終一致性。

最終一致性的問題

最終一致性在某些情況下是可接受的,例如在視窗資料上進行聚合運算。但是,當我們處理非視窗資料時,最終一致性可能會導致混淆和反直覺的結果。為了說明這個問題,我們使用了一個來自銀行領域的玩具範例。

玩具範例:銀行轉帳

假設有10個銀行帳戶,不斷地相互轉帳$1。我們希望串流處理器能夠正確地計算出所有帳戶的餘額總和,並且這個總和應該始終為0。任何時候總和不為0,都表示一致性出了問題。

設定玩具範例

我們使用Python程式碼來設定這個玩具範例,如下所示:

import datetime, json, random, time
from kafi.kafi import Cluster

c = Cluster("local")
c.create("transactions", partitions=1)
p = c.producer("transactions")

random.seed(42)
for id_int in range(0, 10000):
    row_str = json.dumps({
        "id": id_int,
        "from_account": random.randint(0, 9),
        "to_account": random.randint(0, 9),
        "amount": 1,
        "ts": datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
    })
    print(row_str)
    p.produce(row_str, key=str(id_int))
    time.sleep(0.01)

p.close()

內容解密:

這段程式碼建立了一個Kafka主題transactions,並產生10000筆交易資料,每筆交易代表一個帳戶轉帳$1給另一個帳戶。每筆交易資料包含交易ID、轉出帳戶、轉入帳戶、轉帳金額和時間戳。

分析交易資料

我們使用SQL來分析交易資料。首先,建立兩個檢視,分別計算每個帳戶的貸方和借方金額:

CREATE VIEW credits AS
SELECT
    to_account as account,
    SUM(amount) as credits
FROM transactions
GROUP BY to_account;

CREATE VIEW debits(account, debits) AS
SELECT
    from_account as account,
    SUM(amount) as debits
FROM transactions
GROUP BY from_account;

內容解密:

這兩個檢視分別計算每個帳戶的貸方和借方金額。貸方金額是轉入該帳戶的金額總和,借方金額是從該帳戶轉出的金額總和。

然後,計算每個帳戶的餘額:

CREATE VIEW balance AS
SELECT
    credits.account AS account,
    credits - debits AS balance
FROM credits
INNER JOIN debits ON credits.account = debits.account;

內容解密:

這個檢視計算每個帳戶的餘額,即貸方金額減去借方金額。

最後,計算所有帳戶的餘額總和:

CREATE VIEW total(total) AS
SELECT SUM(balance) FROM balance;

內容解密:

這個檢視計算所有帳戶的餘額總和。根據一致性原則,這個總和應該始終為0。

比較不同串流處理系統的一致性

我們比較了六種串流處理系統的一致性,包括Flink SQL、ksqlDB、Proton(Timeplus)、RisingWave、Materialize和Pathway。其中一些系統支援更強的一致性保證,例如內部一致性。

Flink是一個流行的串流處理系統。我們使用Flink SQL來測試其一致性。Flink SQL是一種根據SQL的API,用於在Flink上進行串流處理。

在探討串流處理系統的一致性時,Flink SQL 提供了一個有趣的案例。我們首先建立了一個連線到來源主題 transactions 的表格:

CREATE TABLE transactions (
  id BIGINT,
  from_account INT,
  to_account INT,
  amount DOUBLE,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'transactions_flink',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'true',
  'json.ignore-parse-errors' = 'false'
);

內容解密:

  • 此段程式碼建立了一個名為 transactions 的表格,用於連線到 Kafka 主題 transactions
  • 使用了 kafka 聯結器,並指定了相關的組態,如 bootstrap.serversgroup.id 等。
  • 設定了 JSON 格式的資料解析,並啟用了錯誤處理。

接下來,我們建立了多個檢視(views)來計算 creditsdebitsbalancetotal

CREATE VIEW credits(account, credits) AS
SELECT 
  to_account as account, 
  SUM(amount) as credits
FROM 
  transactions
GROUP BY 
  to_account;

CREATE VIEW debits(account, debits) AS
SELECT 
  from_account as account, 
  SUM(amount) as debits
FROM 
  transactions
GROUP BY 
  from_account;

CREATE VIEW balance(account, balance) AS
SELECT 
  credits.account, 
  credits - debits as balance
FROM 
  credits, 
  debits
WHERE 
  credits.account = debits.account;

CREATE VIEW total(total) AS
SELECT 
  SUM(balance)
FROM 
  balance;

內容解密:

  • creditsdebits 檢視分別計算每個帳戶的貸方和借方總額。
  • balance 檢視透過合併 creditsdebits 來計算每個帳戶的餘額。
  • total 檢視計算所有帳戶的餘額總和。

最後,我們將 total 檢視的結果寫入到一個名為 total_flinksql 的 Kafka 主題中:

CREATE TABLE total_sink (
  total DOUBLE,
  PRIMARY KEY (total) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'property-version' = 'universal',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'total_flink',
  'key.format' = 'json',
  'value.format' = 'json',
  'properties.group.id' = 'total_flink'
);

INSERT INTO total_sink SELECT * FROM total;

內容解密:

  • 建立了一個名為 total_sink 的表格,用於將結果寫入 Kafka 主題 total_flink
  • 使用了 upsert-kafka 聯結器來實作結果的寫入。

結果分析

在執行上述 Flink SQL 程式碼後,我們觀察到結果主題 total_flinksql 中出現了大量的訊息,且總和在 +400 和 -600 之間波動。然而,當輸入流停止後,Flink SQL 最終收斂到正確的總和 0。

此圖示展示了 Flink SQL 程式碼的資料流圖,可以看到交易資料如何被處理並最終計算出總和。

一致性模型的探討

Flink SQL 的這種行為是由其設計選擇所決定的,即支援最終一致性(eventual consistency)而非內部一致性(internal consistency)。這種模型在某些情況下可能導致不一致的結果,尤其是在無界、非視窗化的使用場景中。

ksqlDB 的比較

接下來,我們探討了 ksqlDB 的實作。ksqlDB 是 Confluent 提供的一個串流處理系統,我們使用其版本 7.6.0。首先,我們建立了一個連線到來源主題 transactions 的表格:

CREATE TABLE transactions (
  id VARCHAR PRIMARY KEY,
  from_account INT,
  to_account INT,
  amount DOUBLE,
  ts VARCHAR
) WITH (
  kafka_topic = 'transactions',
  value_format = 'json',
  partitions = 1,
  timestamp = 'ts',
  timestamp_format = 'yyyy-MM-dd HH:mm:ss.SSS'
);

內容解密:

  • ksqlDB 使用了與 Flink SQL 相似的方式來建立表格,但使用了不同的語法和組態。

然後,我們建立了多個表格(tables)來計算 creditsdebitsbalancetotal

CREATE TABLE credits WITH (
  kafka_topic = 'credits',
  value_format = 'json'
) AS
SELECT 
  to_account AS account, 
  SUM(amount) AS credits
FROM 
  transactions
GROUP BY 
  to_account EMIT CHANGES;

-- 省略其他表格的建立

此圖示說明瞭 ksqlDB 的資料流圖

此圖示與 Flink SQL 的資料流圖相似,展示了 ksqlDB 如何處理交易資料並計算出總和。

一致性在串流處理系統中的比較

在串流處理系統中,一致性是一個重要的議題。本章節將比較多個串流處理系統的一致性,包括 Flink SQL、ksqlDB、Proton(Timeplus)和 RisingWave。

首先,我們使用 Flink SQL 來處理交易資料。我們建立了一個表來接收輸入的 Kafka 主題交易資料,並計算每個帳戶的餘額。結果顯示在 sink Kafka 主題 total_flink 中。

CREATE TABLE transactions (
    id INT,
    from_account INT,
    to_account INT,
    amount INT,
    ts TIMESTAMP
) WITH (
    'connector' = 'kafka',
    'topic' = 'transactions',
    'properties.bootstrap.servers' = 'broker:29092',
    'format' = 'json'
);

CREATE VIEW credits AS
SELECT 
    to_account AS account,
    SUM(amount) AS credits
FROM 
    transactions
GROUP BY 
    to_account;

CREATE VIEW debits AS
SELECT 
    from_account AS account,
    SUM(amount) AS debits
FROM 
    transactions
GROUP BY 
    from_account;

CREATE VIEW balance AS
SELECT 
    c.account,
    c.credits - d.debits AS balance
FROM 
    credits c
JOIN 
    debits d ON c.account = d.account;

CREATE TABLE total_flink (
    total INT
) WITH (
    'connector' = 'kafka',
    'topic' = 'total_flink',
    'properties.bootstrap.servers' = 'broker:29092',
    'format' = 'json'
);

INSERT INTO total_flink
SELECT 
    SUM(balance) 
FROM 
    balance;

內容解密:

  1. 建立 transactions 表來接收 Kafka 主題的交易資料。
  2. 使用 creditsdebits 檢視來計算每個帳戶的貸方和借方總額。
  3. 使用 balance 檢視來計算每個帳戶的餘額。
  4. balance 的總和插入到 total_flink 表中。

結果顯示,Flink SQL 產生了大約 40,000 條訊息,且結果在正確值 0 周圍波動。當輸入串流停止後,Flink SQL 最終收斂到正確的結果 0。

ksqlDB

接下來,我們使用 ksqlDB 來處理交易資料。我們建立了與 Flink SQL 類別似的表和檢視,並將結果輸出到 sink Kafka 主題 total_ksqldb

程式碼範例:ksqlDB 設定

CREATE STREAM transactions (
    id INT,
    from_account INT,
    to_account INT,
    amount INT,
    ts TIMESTAMP
) WITH (
    kafka_topic = 'transactions',
    value_format = 'json'
);

CREATE TABLE credits AS
SELECT 
    to_account AS account,
    SUM(amount) AS credits
FROM 
    transactions
GROUP BY 
    to_account;

CREATE TABLE debits AS
SELECT 
    from_account AS account,
    SUM(amount) AS debits
FROM 
    transactions
GROUP BY 
    from_account;

CREATE TABLE balance AS
SELECT 
    c.account,
    c.credits - d.debits AS balance
FROM 
    credits c
JOIN 
    debits d ON c.account = d.account;

CREATE SINK total_ksqldb WITH (
    kafka_topic = 'total_ksqldb',
    value_format = 'json'
) AS
SELECT 
    SUM(balance) 
FROM 
    balance;

內容解密:

  1. 建立 transactions 串流來接收 Kafka 主題的交易資料。
  2. 使用 creditsdebits 表來計算每個帳戶的貸方和借方總額。
  3. 使用 balance 表來計算每個帳戶的餘額。
  4. balance 的總和輸出到 total_ksqldb sink Kafka 主題。

結果顯示,ksqlDB 也產生了大約 40,000 條訊息,且結果在正確值 0 周圍波動。當輸入串流停止後,ksqlDB 最終收斂到正確的結果 0。

Proton(Timeplus)

然後,我們使用 Proton(Timeplus)來處理交易資料。我們建立了一個外部串流來接收輸入的 Kafka 主題交易資料,並計算每個帳戶的餘額。

程式碼範例:Proton 設定

CREATE EXTERNAL STREAM transactions (
    id INT,
    from_account INT,
    to_account INT,
    amount INT,
    ts DATETIME64
) SETTINGS 
    type = 'kafka',
    brokers = 'broker:29092',
    topic = 'transactions',
    data_format = 'JSONEachRow';

CREATE VIEW credits AS
SELECT 
    now64() AS ts,
    to_account AS account,
    SUM(amount) AS credits
FROM 
    transactions
GROUP BY 
    to_account EMIT PERIODIC 100ms;

CREATE VIEW debits AS
SELECT 
    now64() AS ts,
    from_account AS account,
    SUM(amount) AS debits
FROM 
    transactions
GROUP BY 
    from_account EMIT PERIODIC 100ms;

CREATE VIEW balance AS
SELECT 
    c.account,
    c.credits - d.debits AS balance
FROM 
    changelog(credits, account, ts, true) AS c
JOIN 
    changelog(debits, account, ts, true) AS d ON c.account = d.account;

CREATE MATERIALIZED VIEW total INTO total_s AS
SELECT 
    SUM(balance) AS total
FROM 
    balance;

內容解密:

  1. 建立 transactions 外部串流來接收 Kafka 主題的交易資料。
  2. 使用 creditsdebits 檢視來計算每個帳戶的貸方和借方總額。
  3. 使用 balance 檢視來計算每個帳戶的餘額。
  4. balance 的總和物化到 total 檢視中。

結果顯示,Proton 只產生了 56 條訊息,且最終收斂到正確的結果 0。

RisingWave

最後,我們使用 RisingWave 來處理交易資料。我們建立了一個表來接收輸入的 Kafka 主題交易資料,並計算每個帳戶的餘額。

程式碼範例:RisingWave 設定

CREATE TABLE transactions (
    id INT,
    from_account INT,
    to_account INT,
    amount INT,
    ts TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'transactions',
    properties.bootstrap.server = 'broker:29092',
    scan.startup.mode = 'earliest',
    scan.startup.timestamp_millis = '140000000'
) ROW FORMAT JSON;

CREATE VIEW credits AS
SELECT 
    to_account AS account,
    SUM(amount) AS credits
FROM 
    transactions
GROUP BY 
    to_account;

CREATE VIEW debits AS
SELECT 
    from_account AS account,
   SUM(amount) AS debits
FROM 
   transactions
GROUP BY 
   from_account;

CREATE VIEW balance AS
SELECT 
   credits.account,
   credits.credits - debits.debits AS balance
FROM 
   credits
INNER JOIN 
   debits ON credits.account = debits.account;

CREATE VIEW total AS
SELECT 
   SUM(balance)
FROM 
   balance;

CREATE SINK total_sink
FROM 
   total WITH (
       connector = 'kafka',
       properties.bootstrap.server = 'broker:29092',
       topic = 'total_risingwave',
       type = 'append-only',
       force_append_only = 'true'
   );

內容解密:

  1. 建立 transactions 表來接收 Kafka 主題的交易資料。
  2. 使用 creditsdebits 檢視來計算每個帳戶的貸方和借方總額。
  3. 使用 balance 檢視來計算每個帳戶的餘額。
  4. balance 的總和輸出到 total_risingwave sink Kafka 主題。

結果顯示,RisingWave 產生了 105 條訊息,且每條訊息都給出了正確的結果:0。