返回文章列表

近零ETL技術革新與應用

近零ETL技術減少了傳統ETL流程的延遲和複雜性,透過直接連線資料來源和分析系統,實作近乎實時的資料交換與分析。本文探討了PeerDB、Proton、DuckDB和chDB等技術在近零ETL中的應用,以及如何利用資料映象、串流處理和嵌入式OLAP資料函式庫來簡化資料整合流程,並提供程式碼範例。

資料函式庫 資料工程

近零ETL架構的核心目標是減少資料在不同系統間移動的延遲和複雜度。傳統ETL流程需要將資料從操作型資料函式庫提取、轉換後再載入到分析型資料函式庫,這會造成時間延遲,並且需要額外的基礎設施來執行ETL流程。近零ETL則透過建立更直接的資料管道,讓分析系統可以直接存取操作型資料函式庫的資料,或是在操作型資料函式庫中嵌入分析能力,以減少資料移動的需求。這種架構可以大幅提升分析效率,並降低維護成本,對於需要即時分析的應用場景尤其重要。

零ETL架構的挑戰與解決方案

AWS的零ETL架構

AWS提供了一種受管理的零ETL整合方案,結合了其OLTP資料函式庫Aurora和資料倉儲Redshift,能夠在幾分鐘內實作近乎實時的分析。這種完全受管理的解決方案,使得交易資料在寫入Aurora後,能夠立即在Redshift中可用。

由於AWS擁有Aurora和Redshift這兩個資料函式庫產品,並且它們只存在於AWS的雲端平台上,因此AWS能夠緊密整合這兩個系統。然而,這種解決方案的缺點是,它們僅適用於AWS生態系統內部。

零ETL的挑戰

雖然零ETL的概念在敏捷性、降低延遲和成本文約方面具有優勢,但它可能不適合所有場景。大多陣列織,尤其是那些具有複雜資料整合需求或監管限制的組織,可能仍需要傳統ETL流程的元素。

在第二章中,我們提到在資料倉儲中轉換資料(也稱為ELT)會強制採用批次處理語義,這將為任何實時分析使用案例增加延遲。在圖8-2中,Aurora和Redshift之間的整合點不會轉換資料,這意味著轉換是在Redshift中完成的。HTAP資料函式庫和零ETL解決方案都存在這個問題。它們都需要觸發批次轉換流程,一旦資料到達資料倉儲。

零ETL的關鍵特點

零ETL具有以下幾個關鍵特點:

1. 實時資料整合

最小化或消除批次處理,以實作實時或近乎實時的資料整合。這對於需要及時洞察的場景尤為重要。

2. 結構描述隨讀

採用結構描述隨讀的方法,資料在ETL過程中不被轉換為預定義的結構描述,而是在分析時被解釋。這允許更靈活地處理多樣和變化的資料。

3. 資料虛擬化

利用資料虛擬化技術,提供跨多個來源的統一和虛擬化的資料檢視,而無需物理移動或轉換資料。這可以減少建立和維護單獨資料倉儲的需求。

4. 資料函式庫內處理

直接在資料函式庫系統中執行轉換和分析,避擴音取和移動大型資料集進行處理。

5. 事件驅動架構

採用事件驅動架構,資料變更會觸發即時更新,減少對定期批次流程的依賴。

6. 現代資料架構

採用現代資料架構,如資料湖和雲端解決方案,提供可擴充套件和成本效益高的選項來管理和分析資料,而無需傳統ETL的瓶頸。

近零ETL:靈活的替代方案

如果需要傳統ETL流程提供的更多靈活性,可以採用近零ETL的替代方案。近零ETL試圖限制ETL元件的基礎設施,同時保持支援複雜資料整合需求所需的靈活性。

使用嵌入式功能的OLTP資料函式庫

一種解決方案是利用具有嵌入式功能的OLTP資料函式庫,將資料傳送到其他系統,而無需自行管理聯結器執行在單獨的基礎設施上。

PeerDB:簡化ETL的開源解決方案

PeerDB是一種開源解決方案,用於將資料從Postgres串流到資料倉儲、佇列/主題和其他儲存引擎。其目標是透過提供資料函式庫體驗來簡化ETL,以建立與分析系統的整合。

設定Peer範例

CREATE PEER source FROM POSTGRES WITH
(
    host = 'catalog',
    port = '5432',
    user = 'postgres',
    password = 'postgres',
    database = 'source'
);

CREATE PEER sf_peer FROM SNOWFLAKE WITH
(
    account_id = '<snowflake_account_identifier>',
    username = '<user_name>',
    private_key = '<private_key>',
    password = '<password>' -- 僅當私鑰加密時提供
    database = '<database_name>',
    schema = '<schema>',
    warehouse = '<ware_house>',
    role = '<role>',
    query_timeout = '<query_timeout_in_seconds>'
);

-- 在Snowflake中查詢表格
SELECT * FROM sf_peer.MY_SCHEMA.MY_TABLE;

內容解密:

  1. PeerDB的基本概念:PeerDB是一種連線不同資料函式庫和儲存系統的工具,透過定義「Peer」來實作不同系統之間的資料交換。
  2. 建立Peer的語法:使用CREATE PEER命令,可以從不同的資料來源(如Postgres或Snowflake)建立連線。
  3. CREATE PEER命令的引數:根據不同的資料來源,需要提供不同的引數,如主機、埠、使用者名稱、密碼等。
  4. 查詢遠端表格:一旦建立了Peer,就可以像查詢本地表格一樣查詢遠端系統中的表格。

近零ETL技術的革新與應用

在現代資料架構中,實作不同系統之間的無縫整合一直是個挑戰。特別是在操作型資料函式庫和分析型資料函式庫之間,由於各自的設計目標和最佳化方向不同,資料交換和同步成為了一大難題。近零ETL(Extract, Transform, Load)技術的出現,為這個問題提供了新的解決方案。

PeerDB:實作跨資料函式庫的資料同步

PeerDB是一種創新工具,允許在不同的資料函式庫之間建立對等關係(peer),從而實作資料的同步和查詢。這種機制使得操作型資料函式庫(如Postgres)能夠直接查詢和分析型資料函式庫(如Snowflake)中的資料,無需複雜的ETL流程。

建立對等關係

要使用PeerDB,首先需要建立對等關係。例如,建立一個與另一個Postgres資料函式庫或Snowflake資料倉儲的對等關係:

CREATE PEER <peer_name> FROM POSTGRES WITH (
  host = '<host>',
  port = <port>,
  user = '<user>',
  password = '<password>',
  database = '<database>'
);

這樣就可以在本地資料函式庫中查詢遠端資料函式庫的資料。

資料映象(Mirroring)

PeerDB支援資料映象功能,可以非同步地將資料從來源對等體複製到目標對等體。以下是一個建立ETL流程的例子:

CREATE MIRROR <mirror_name> [IF NOT EXISTS] FROM
<source_peer> TO <target_peer> FOR
$$
SELECT * FROM <source_table_name> WHERE
<watermark_column> BETWEEN {{.start}} AND {{.end}}
$$
WITH (
  destination_table_name = '<schema_qualified_destination_table_name>',
  watermark_column = '<watermark_column>',
  mode = '<mode>',
  parallelism = <parallelism>,
  refresh_interval = <refresh_interval_in_seconds>
);

這個命令建立了一個映象,將來源對等體中的資料表同步到目標對等體。

內容解密:

  1. CREATE MIRROR:建立一個新的映象任務,用於同步資料。
  2. <source_peer><target_peer>:指定來源和目標對等體。
  3. SELECT 陳述式:定義從來源資料表中提取的資料。
  4. WITH 子句:組態映象任務的詳細引數,如目標表名、水印欄位、平行度等。

Proton:串流OLAP資料函式庫

Proton是一種下一代串流OLAP資料函式庫,支援即時資料攝入和複雜轉換。它可以與PeerDB結合使用,先將資料寫入串流平台(如Kafka),然後由Proton攝入並進行轉換,最終實作即時分析。

建立串流來源

首先,建立一個與Kafka串流平台的對等關係:

CREATE PEER <eh_peer_name> FROM KAFKA WITH (
  bootstrap_server = '<bootstrap-servers>'
);

然後,建立一個映象將資料寫入Kafka主題:

CREATE MIRROR <mirror_name> [IF NOT EXISTS] FROM
<source_peer> TO <target_peer> FOR
$$
SELECT * FROM <source_table_name> WHERE
<watermark_column> BETWEEN {{.start}} AND {{.end}}
$$
WITH (
  destination_table_name = '<topic>'
);

在Proton中建立串流

在Proton中,建立一個外部串流來讀取Kafka主題中的資料:

CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS type='kafka',
brokers='<bootstrap-servers>',
topic='<topic>';

內容解密:

  1. CREATE EXTERNAL STREAM:在Proton中建立一個外部串流。
  2. SETTINGS:組態串流的來源為Kafka,並指定相關連線資訊。

嵌入式OLAP:將分析能力帶入操作平面

隨著技術的發展,將分析型工作負載直接嵌入操作平面變得越來越流行。嵌入式OLAP資料函式庫如DuckDB,可以在微服務中執行,提供即時分析能力。

使用DuckDB進行即時分析

DuckDB是一個專為分析型查詢設計的嵌入式OLAP資料函式庫。它可以在應用程式內執行,讓使用者能夠對資料進行切片和切塊分析。

安裝DuckDB:

pip install duckdb

以下是一個微服務的框架,用於訂閱Kafka主題並將記錄更新到DuckDB表中,然後透過REST API提供分析查詢服務:

import duckdb
from fastapi import FastAPI

app = FastAPI()

# 初始化DuckDB連線
con = duckdb.connect(database=':memory:')

# 訂閱Kafka主題並更新DuckDB表
def subscribe_to_kafka_topic():
    # Kafka訂閱邏輯
    pass

# 提供REST API服務
@app.get("/analytics/")
def read_analytics():
    result = con.execute("SELECT * FROM materialized_view").fetchall()
    return {"data": result}

內容解密:

  1. duckdb.connect:建立DuckDB的記憶體資料函式庫連線。
  2. subscribe_to_kafka_topic:這是一個虛擬函式,代表從Kafka訂閱資料的邏輯。
  3. read_analytics:透過FastAPI提供的REST API端點,用於執行分析查詢。

近零ETL技術在現代資料架構中的應用

現代資料架構中,資料的處理和分析變得越來越重要。近零ETL(Extract, Transform, Load)技術提供了一種高效的方式來整合和分析資料。本篇文章將探討近零ETL技術的原理、應用以及相關的技術實作。

近零ETL的基本概念

近零ETL是一種資料整合技術,旨在減少傳統ETL流程中的延遲和複雜度。它透過在資料來源和分析系統之間建立直接的連線,實作實時或近實時的資料交換和分析。

使用DuckDB實作近零ETL

DuckDB是一種嵌入式OLAP(Online Analytical Processing)資料函式庫,可以與Kafka等訊息佇列系統結合,實作近零ETL。以下是一個使用Python和DuckDB實作近零ETL的範例:

import duckdb
from threading import Thread
from fastapi import FastAPI
from confluent_kafka import Consumer

app = FastAPI()
duckdb_con = duckdb.connect('my_persistent_db.duckdb')

def upsert(msg):
    # 將msg反序列化以取得列值
    primary_key, col1_value, col2_value = deserialize_message(msg)
    duckdb_con.execute("""
        INSERT OR REPLACE INTO t1(id, col1, col2) VALUES(?, ?, ?)
    """, [primary_key, col1_value, col2_value])

def kafka2olap(conf):
    consumer = Consumer(conf)
    try:
        consumer.subscribe("my_data")
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None: 
                continue
            if msg.error():
                # 處理錯誤
                pass
            else:
                upsert(msg)
    finally:
        consumer.close()

@app.on_event("startup")
async def initialize():
    conf = {}  # Kafka組態
    thread = Thread(target=kafka2olap, args=(conf,))
    thread.start()

@app.get("/my_data/")
async def read_item(id: int):
    results = duckdb_con.execute("""
        SELECT id, count(*) AS row_counter, current_timestamp
        FROM my_data
        WHERE id = ?
    """, (id,)).fetchall()
    return results

內容解密:

  1. 建立DuckDB連線:使用duckdb.connect建立一個到DuckDB資料函式庫的連線。
  2. 定義upsert函式:實作將訊息插入或更新到DuckDB的功能。
  3. Kafka消費者:使用confluent_kafka函式庫建立一個Kafka消費者,訂閱特定主題並處理訊息。
  4. FastAPI應用:啟動一個FastAPI應用,在啟動事件中建立一個執行緒來執行Kafka消費者。
  5. 查詢介面:提供一個REST API介面來查詢DuckDB中的資料。

使用chDB實作近零ETL

chDB是另一個嵌入式OLAP資料函式庫,根據ClickHouse。以下是一個使用Flask和chDB實作近零ETL的範例:

from flask import Flask, request
import chdb

app = Flask(__name__)

@app.route('/', methods=["GET"])
def clickhouse():
    query = request.args.get('query', default="", type=str)
    format = request.args.get('default_format', default="JSONCompact", type=str)
    if not query:
        return "Query not found", 400
    res = chdb.query(query, format)
    return res.bytes()

@app.route('/', methods=["POST"])
def play():
    query = request.data
    format = request.args.get('default_format', default="JSONCompact", type=str)
    if not query:
        return "Query not found", 400
    res = chdb.query(query, format)
    return res.bytes()

內容解密:

  1. 建立Flask應用:建立一個Flask應用來提供REST API介面。
  2. 處理GET請求:根據查詢引數執行chDB查詢並傳回結果。
  3. 處理POST請求:根據請求體執行chDB查詢並傳回結果。

資料重力與複製

在許多情況下,分析系統只佈署在單一區域或資料中心,這導致了所謂的「資料重力」問題。為瞭解決這個問題,可以透過提供物化檢視的變更到營運系統,從而將實時分析的副本分發到使用者面對的應用程式佈署的所有區域。

分析資料縮減

透過在分析系統中建立物化檢視,並將其變更流式傳輸到營運系統,可以實作分析資料的縮減。這種方法可以透過使用Proton等技術來實作。

Lambda架構

Lambda架構是一種結合批處理和實時資料處理的資料處理架構。它由Nathan Marz在其2011年的書《Big Data: Principles and Best Practices of Scalable Realtime Data Systems》中提出。Lambda架構使用兩條處理路徑:一條用於批處理,另一條用於實時資料處理。