在資料函式庫系統中,資料一致性通常不成問題。然而,在串流處理系統中,由於資料的延遲和亂序到達,維持一致性變得更具挑戰性。尤其在需要低延遲和高吞吐量的場景下,許多串流處理器僅能提供最終一致性,這可能導致非預期的結果。本文以銀行帳戶間持續轉帳的範例,比較不同串流處理引擎的一致性表現,包括 Flink SQL、ksqlDB、Proton(Timeplus)和 RisingWave,觀察它們如何處理持續變化的資料並維持餘額總和的正確性。
一致性在串流系統中的重要性
在資料函式庫領域中,一致性是理所當然的。查詢結果總是與輸入資料保持一致。但當我們進入串流世界時,情況就變得複雜了。資料可能延遲到達或亂序,而且系統需要兼顧低延遲和高吞吐量。在這種情況下,傳統的串流處理器只能保證最終一致性。
最終一致性的問題
最終一致性在某些情況下是可接受的,例如在視窗資料上進行聚合運算。但是,當我們處理非視窗資料時,最終一致性可能會導致混淆和反直覺的結果。為了說明這個問題,我們使用了一個來自銀行領域的玩具範例。
玩具範例:銀行轉帳
假設有10個銀行帳戶,不斷地相互轉帳$1。我們希望串流處理器能夠正確地計算出所有帳戶的餘額總和,並且這個總和應該始終為0。任何時候總和不為0,都表示一致性出了問題。
設定玩具範例
我們使用Python程式碼來設定這個玩具範例,如下所示:
import datetime, json, random, time
from kafi.kafi import Cluster
c = Cluster("local")
c.create("transactions", partitions=1)
p = c.producer("transactions")
random.seed(42)
for id_int in range(0, 10000):
row_str = json.dumps({
"id": id_int,
"from_account": random.randint(0, 9),
"to_account": random.randint(0, 9),
"amount": 1,
"ts": datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
})
print(row_str)
p.produce(row_str, key=str(id_int))
time.sleep(0.01)
p.close()
內容解密:
這段程式碼建立了一個Kafka主題transactions,並產生10000筆交易資料,每筆交易代表一個帳戶轉帳$1給另一個帳戶。每筆交易資料包含交易ID、轉出帳戶、轉入帳戶、轉帳金額和時間戳。
分析交易資料
我們使用SQL來分析交易資料。首先,建立兩個檢視,分別計算每個帳戶的貸方和借方金額:
CREATE VIEW credits AS
SELECT
to_account as account,
SUM(amount) as credits
FROM transactions
GROUP BY to_account;
CREATE VIEW debits(account, debits) AS
SELECT
from_account as account,
SUM(amount) as debits
FROM transactions
GROUP BY from_account;
內容解密:
這兩個檢視分別計算每個帳戶的貸方和借方金額。貸方金額是轉入該帳戶的金額總和,借方金額是從該帳戶轉出的金額總和。
然後,計算每個帳戶的餘額:
CREATE VIEW balance AS
SELECT
credits.account AS account,
credits - debits AS balance
FROM credits
INNER JOIN debits ON credits.account = debits.account;
內容解密:
這個檢視計算每個帳戶的餘額,即貸方金額減去借方金額。
最後,計算所有帳戶的餘額總和:
CREATE VIEW total(total) AS
SELECT SUM(balance) FROM balance;
內容解密:
這個檢視計算所有帳戶的餘額總和。根據一致性原則,這個總和應該始終為0。
比較不同串流處理系統的一致性
我們比較了六種串流處理系統的一致性,包括Flink SQL、ksqlDB、Proton(Timeplus)、RisingWave、Materialize和Pathway。其中一些系統支援更強的一致性保證,例如內部一致性。
Flink SQL
Flink是一個流行的串流處理系統。我們使用Flink SQL來測試其一致性。Flink SQL是一種根據SQL的API,用於在Flink上進行串流處理。
Flink SQL 中的一致性探討
在探討串流處理系統的一致性時,Flink SQL 提供了一個有趣的案例。我們首先建立了一個連線到來源主題 transactions 的表格:
CREATE TABLE transactions (
id BIGINT,
from_account INT,
to_account INT,
amount DOUBLE,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'transactions_flink',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
內容解密:
- 此段程式碼建立了一個名為
transactions的表格,用於連線到 Kafka 主題transactions。 - 使用了
kafka聯結器,並指定了相關的組態,如bootstrap.servers、group.id等。 - 設定了 JSON 格式的資料解析,並啟用了錯誤處理。
接下來,我們建立了多個檢視(views)來計算 credits、debits、balance 和 total:
CREATE VIEW credits(account, credits) AS
SELECT
to_account as account,
SUM(amount) as credits
FROM
transactions
GROUP BY
to_account;
CREATE VIEW debits(account, debits) AS
SELECT
from_account as account,
SUM(amount) as debits
FROM
transactions
GROUP BY
from_account;
CREATE VIEW balance(account, balance) AS
SELECT
credits.account,
credits - debits as balance
FROM
credits,
debits
WHERE
credits.account = debits.account;
CREATE VIEW total(total) AS
SELECT
SUM(balance)
FROM
balance;
內容解密:
credits和debits檢視分別計算每個帳戶的貸方和借方總額。balance檢視透過合併credits和debits來計算每個帳戶的餘額。total檢視計算所有帳戶的餘額總和。
最後,我們將 total 檢視的結果寫入到一個名為 total_flinksql 的 Kafka 主題中:
CREATE TABLE total_sink (
total DOUBLE,
PRIMARY KEY (total) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'total_flink',
'key.format' = 'json',
'value.format' = 'json',
'properties.group.id' = 'total_flink'
);
INSERT INTO total_sink SELECT * FROM total;
內容解密:
- 建立了一個名為
total_sink的表格,用於將結果寫入 Kafka 主題total_flink。 - 使用了
upsert-kafka聯結器來實作結果的寫入。
結果分析
在執行上述 Flink SQL 程式碼後,我們觀察到結果主題 total_flinksql 中出現了大量的訊息,且總和在 +400 和 -600 之間波動。然而,當輸入流停止後,Flink SQL 最終收斂到正確的總和 0。
此圖示說明瞭 Flink SQL 的資料流圖
此圖示展示了 Flink SQL 程式碼的資料流圖,可以看到交易資料如何被處理並最終計算出總和。
一致性模型的探討
Flink SQL 的這種行為是由其設計選擇所決定的,即支援最終一致性(eventual consistency)而非內部一致性(internal consistency)。這種模型在某些情況下可能導致不一致的結果,尤其是在無界、非視窗化的使用場景中。
ksqlDB 的比較
接下來,我們探討了 ksqlDB 的實作。ksqlDB 是 Confluent 提供的一個串流處理系統,我們使用其版本 7.6.0。首先,我們建立了一個連線到來源主題 transactions 的表格:
CREATE TABLE transactions (
id VARCHAR PRIMARY KEY,
from_account INT,
to_account INT,
amount DOUBLE,
ts VARCHAR
) WITH (
kafka_topic = 'transactions',
value_format = 'json',
partitions = 1,
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss.SSS'
);
內容解密:
- ksqlDB 使用了與 Flink SQL 相似的方式來建立表格,但使用了不同的語法和組態。
然後,我們建立了多個表格(tables)來計算 credits、debits、balance 和 total:
CREATE TABLE credits WITH (
kafka_topic = 'credits',
value_format = 'json'
) AS
SELECT
to_account AS account,
SUM(amount) AS credits
FROM
transactions
GROUP BY
to_account EMIT CHANGES;
-- 省略其他表格的建立
此圖示說明瞭 ksqlDB 的資料流圖
此圖示與 Flink SQL 的資料流圖相似,展示了 ksqlDB 如何處理交易資料並計算出總和。
一致性在串流處理系統中的比較
在串流處理系統中,一致性是一個重要的議題。本章節將比較多個串流處理系統的一致性,包括 Flink SQL、ksqlDB、Proton(Timeplus)和 RisingWave。
Flink SQL
首先,我們使用 Flink SQL 來處理交易資料。我們建立了一個表來接收輸入的 Kafka 主題交易資料,並計算每個帳戶的餘額。結果顯示在 sink Kafka 主題 total_flink 中。
程式碼範例:Flink SQL 設定
CREATE TABLE transactions (
id INT,
from_account INT,
to_account INT,
amount INT,
ts TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'broker:29092',
'format' = 'json'
);
CREATE VIEW credits AS
SELECT
to_account AS account,
SUM(amount) AS credits
FROM
transactions
GROUP BY
to_account;
CREATE VIEW debits AS
SELECT
from_account AS account,
SUM(amount) AS debits
FROM
transactions
GROUP BY
from_account;
CREATE VIEW balance AS
SELECT
c.account,
c.credits - d.debits AS balance
FROM
credits c
JOIN
debits d ON c.account = d.account;
CREATE TABLE total_flink (
total INT
) WITH (
'connector' = 'kafka',
'topic' = 'total_flink',
'properties.bootstrap.servers' = 'broker:29092',
'format' = 'json'
);
INSERT INTO total_flink
SELECT
SUM(balance)
FROM
balance;
內容解密:
- 建立
transactions表來接收 Kafka 主題的交易資料。 - 使用
credits和debits檢視來計算每個帳戶的貸方和借方總額。 - 使用
balance檢視來計算每個帳戶的餘額。 - 將
balance的總和插入到total_flink表中。
結果顯示,Flink SQL 產生了大約 40,000 條訊息,且結果在正確值 0 周圍波動。當輸入串流停止後,Flink SQL 最終收斂到正確的結果 0。
ksqlDB
接下來,我們使用 ksqlDB 來處理交易資料。我們建立了與 Flink SQL 類別似的表和檢視,並將結果輸出到 sink Kafka 主題 total_ksqldb。
程式碼範例:ksqlDB 設定
CREATE STREAM transactions (
id INT,
from_account INT,
to_account INT,
amount INT,
ts TIMESTAMP
) WITH (
kafka_topic = 'transactions',
value_format = 'json'
);
CREATE TABLE credits AS
SELECT
to_account AS account,
SUM(amount) AS credits
FROM
transactions
GROUP BY
to_account;
CREATE TABLE debits AS
SELECT
from_account AS account,
SUM(amount) AS debits
FROM
transactions
GROUP BY
from_account;
CREATE TABLE balance AS
SELECT
c.account,
c.credits - d.debits AS balance
FROM
credits c
JOIN
debits d ON c.account = d.account;
CREATE SINK total_ksqldb WITH (
kafka_topic = 'total_ksqldb',
value_format = 'json'
) AS
SELECT
SUM(balance)
FROM
balance;
內容解密:
- 建立
transactions串流來接收 Kafka 主題的交易資料。 - 使用
credits和debits表來計算每個帳戶的貸方和借方總額。 - 使用
balance表來計算每個帳戶的餘額。 - 將
balance的總和輸出到total_ksqldbsink Kafka 主題。
結果顯示,ksqlDB 也產生了大約 40,000 條訊息,且結果在正確值 0 周圍波動。當輸入串流停止後,ksqlDB 最終收斂到正確的結果 0。
Proton(Timeplus)
然後,我們使用 Proton(Timeplus)來處理交易資料。我們建立了一個外部串流來接收輸入的 Kafka 主題交易資料,並計算每個帳戶的餘額。
程式碼範例:Proton 設定
CREATE EXTERNAL STREAM transactions (
id INT,
from_account INT,
to_account INT,
amount INT,
ts DATETIME64
) SETTINGS
type = 'kafka',
brokers = 'broker:29092',
topic = 'transactions',
data_format = 'JSONEachRow';
CREATE VIEW credits AS
SELECT
now64() AS ts,
to_account AS account,
SUM(amount) AS credits
FROM
transactions
GROUP BY
to_account EMIT PERIODIC 100ms;
CREATE VIEW debits AS
SELECT
now64() AS ts,
from_account AS account,
SUM(amount) AS debits
FROM
transactions
GROUP BY
from_account EMIT PERIODIC 100ms;
CREATE VIEW balance AS
SELECT
c.account,
c.credits - d.debits AS balance
FROM
changelog(credits, account, ts, true) AS c
JOIN
changelog(debits, account, ts, true) AS d ON c.account = d.account;
CREATE MATERIALIZED VIEW total INTO total_s AS
SELECT
SUM(balance) AS total
FROM
balance;
內容解密:
- 建立
transactions外部串流來接收 Kafka 主題的交易資料。 - 使用
credits和debits檢視來計算每個帳戶的貸方和借方總額。 - 使用
balance檢視來計算每個帳戶的餘額。 - 將
balance的總和物化到total檢視中。
結果顯示,Proton 只產生了 56 條訊息,且最終收斂到正確的結果 0。
RisingWave
最後,我們使用 RisingWave 來處理交易資料。我們建立了一個表來接收輸入的 Kafka 主題交易資料,並計算每個帳戶的餘額。
程式碼範例:RisingWave 設定
CREATE TABLE transactions (
id INT,
from_account INT,
to_account INT,
amount INT,
ts TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'transactions',
properties.bootstrap.server = 'broker:29092',
scan.startup.mode = 'earliest',
scan.startup.timestamp_millis = '140000000'
) ROW FORMAT JSON;
CREATE VIEW credits AS
SELECT
to_account AS account,
SUM(amount) AS credits
FROM
transactions
GROUP BY
to_account;
CREATE VIEW debits AS
SELECT
from_account AS account,
SUM(amount) AS debits
FROM
transactions
GROUP BY
from_account;
CREATE VIEW balance AS
SELECT
credits.account,
credits.credits - debits.debits AS balance
FROM
credits
INNER JOIN
debits ON credits.account = debits.account;
CREATE VIEW total AS
SELECT
SUM(balance)
FROM
balance;
CREATE SINK total_sink
FROM
total WITH (
connector = 'kafka',
properties.bootstrap.server = 'broker:29092',
topic = 'total_risingwave',
type = 'append-only',
force_append_only = 'true'
);
內容解密:
- 建立
transactions表來接收 Kafka 主題的交易資料。 - 使用
credits和debits檢視來計算每個帳戶的貸方和借方總額。 - 使用
balance檢視來計算每個帳戶的餘額。 - 將
balance的總和輸出到total_risingwavesink Kafka 主題。
結果顯示,RisingWave 產生了 105 條訊息,且每條訊息都給出了正確的結果:0。