返回文章列表

DuckDB建構資料管道與Dbt轉換

本文介紹如何使用 DuckDB 建構資料管道,並結合 dlt 進行資料擷取與轉換,最後使用 dbt 進行資料建模和測試。文章涵蓋了從 CSV 檔案讀取資料、過濾欄位、移除空值、重新格式化日期到輸出 Parquet 檔案的完整流程,並示範瞭如何使用 dbt 進行資料測試和 YAML 語法設定。

資料工程 資料函式庫

DuckDB 作為一款高效能的嵌入式分析型資料函式庫,非常適合用於建構資料管道。搭配 dlt 進行資料擷取和 DuckDB 的資料處理能力,可以快速地從多個來源取得資料並進行初步的清洗和轉換。dbt 則進一步增強了資料管道的建模和測試能力,透過 SQL 語法和 YAML 設定,可以定義複雜的資料轉換邏輯,並確保資料的品質和一致性。此外,dbt 還支援資料測試,可以有效地驗證資料轉換的正確性,並及早發現潛在的錯誤。透過 DuckDB、dlt 和 dbt 的協同工作,可以開發一個高效、可靠且易於維護的現代資料管道。

使用DuckDB建構資料管道

本章節將探討如何利用DuckDB建構資料管道,並結合dlt工具進行資料的擷取與轉換,最後使用dbt進行資料的轉換與建模。

使用dlt與DuckDB進行資料擷取

dlt是一個Python函式庫,能夠簡化資料管道的建構過程。透過dlt,我們可以輕鬆地從不同的來源擷取資料並儲存到DuckDB中。以下是一個使用dlt與DuckDB進行資料擷取的例子:

duckdb chess_pipeline.duckdb 'SELECT count(*) FROM players_games'

輸出結果:

┌──────────────┐
│ count_star() │
│ int64 │
├──────────────┤
│ 589 │
└──────────────┘

這表明我們已經成功地將資料擷取到DuckDB中。當我們再次執行相同的指令時,dlt會發現資料已經被擷取過,因此不會再次擷取相同的資料。

探索管道後設資料

dlt提供了一個名為info的指令,可以用來檢查管道的定義和狀態。執行以下指令:

dlt pipeline chess_pipeline info

輸出結果:

Found pipeline chess_pipeline in /Home/.dlt/pipelines
Synchronized state:
_state_version: 2
_state_engine_version: 2
schema_names: ['chess']
pipeline_name: chess_pipeline
destination: dlt.destinations.duckdb
default_schema_name: chess
staging: None
dataset_name: main
sources:
Add -v option to see sources state. Note that it could be large.
Local state:
first_run: False
_last_extracted_at: 2023-11-04T19:16:35.873231+00:00
Resources in schema: chess
players_profiles with 1 table(s) and 0 resource state slot(s)
players_games with 1 table(s) and 1 resource state slot(s)
Working dir content:
Has 3 completed load packages with following load ids:
1699125395.876516
1699125399.292224
1699125402.854308
Pipeline has last run trace. Use 'dlt pipeline chess_pipeline trace' to inspect

使用dbt進行資料轉換與建模

dbt是一個SQL為中心的轉換工具,能夠幫助我們對資料進行轉換和建模。透過dbt-duckdb函式庫,我們可以將dbt與DuckDB結合使用,實作資料的轉換和建模。

首先,我們需要安裝dbt-duckdb和dbt:

pip install dbt-duckdb dbt

接下來,我們可以使用dbt init指令建立一個新的dbt專案:

dbt init dbt_transformations
cd dbt_transformations

設定dbt專案

在dbt專案中,我們需要建立一個名為profiles.yml的檔案,用於設定dbt與DuckDB的連線:

dbt_transformations:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: '/tmp/atp.db'
      schema: 'main'

這個設定檔案告訴dbt使用DuckDB作為輸出目標,並指定了DuckDB的儲存路徑和schema名稱。

資料轉換流程

使用dbt和DuckDB,我們可以實作一個簡單的資料轉換流程:讀取CSV檔案、過濾欄位、移除空值、重新格式化日期,最後輸出Parquet檔案。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title DuckDB建構資料管道與Dbt轉換

package "軟體測試架構" {
    package "測試層級" {
        component [單元測試
Unit Test] as unit
        component [整合測試
Integration Test] as integration
        component [端對端測試
E2E Test] as e2e
    }

    package "測試類型" {
        component [功能測試] as functional
        component [效能測試] as performance
        component [安全測試] as security
    }

    package "工具框架" {
        component [pytest] as pytest
        component [unittest] as unittest
        component [Selenium] as selenium
        component [JMeter] as jmeter
    }
}

unit --> pytest : 撰寫測試
unit --> integration : 組合模組
integration --> e2e : 完整流程
functional --> selenium : UI 自動化
performance --> jmeter : 負載測試

note right of unit
  測試金字塔基礎
  快速回饋
  高覆蓋率
end note

@enduml

內容解密:

  1. 讀取CSV:使用DuckDB讀取CSV檔案。
  2. 過濾欄位:選擇需要的欄位,過濾掉不需要的欄位。
  3. 移除空值:移除包含空值的列。
  4. 重新格式化日期:將日期欄位格式化為需要的格式。
  5. 輸出Parquet:將處理後的資料輸出為Parquet檔案。

這個流程展示瞭如何使用dbt和DuckDB實作一個簡單的資料轉換流程。透過這個流程,我們可以輕鬆地對資料進行轉換和建模,以滿足不同的業務需求。

使用dbt進行資料轉換與建模

瞭解YAML語法

YAML(YAML Ain’t Markup Language)是一種設計用於人類可讀性以及與指令碼語言互動的資料序列化語言。其語法相對簡單,利用縮排來表示層次結構,並且只使用空格而非跳格符(tab)。

重新命名範例目錄並定義新的資料來源與模型

首先,將範例目錄重新命名為atp,並刪除現有的模型檔案:

mv models/example models/atp
rm models/atp/*.sql

接下來,需要定義新的資料來源和模型。

定義資料來源

dbt的資料來源(sources)提供了一種標準化的方法來參照和記錄外部資料函式庫、資料倉儲或其他來源中的原始資料。透過定義資料來源,使用者可以確保原始資料被一致地存取,同時指定相關資料集的中繼資料和品品檢查。可以在models目錄下的sources.yml檔案中定義資料來源。

這裡使用的資料來自Jeff Sackmann的網球資料集(https://github.com/JeffSackmann/tennis_atp)。首先,建立一個名為models/atp/sources.yml的檔案,以建立一個指向GitHub上atp_matches_2023.csv檔案的資料來源。

version: 2
sources:
  - name: github
    meta:
      external_location: 'https://raw.githubusercontent.com/JeffSackmann/tennis_atp/master/atp_matches_2023.csv'
    tables:
      - name: matches_file

這個資料來源可以透過source('github', 'matches_file')在模型中被參照。

使用模型描述轉換

dbt模型是一組SQL查詢或Python指令碼,用於將原始資料轉換為所需的結構。透過定義這些模型,資料分析師和工程師可以以一致和版本控制的方式建立、測試和記錄他們的資料轉換工作流程。

現在,建立一個名為models/atp/matches.sql的檔案,以定義第一個轉換。這個轉換將從CSV檔案中提取所有比賽記錄,但排除以w_l_開頭的欄位。然後,將查詢結果寫入Parquet檔案中。

{{ config(
    materialized='external',
    location='output/matches.parquet',
    format='parquet'
) }}

WITH noWinLoss AS (
  SELECT COLUMNS(col ->
    NOT regexp_matches(col, 'w_.*') AND
    NOT regexp_matches(col, 'l_.*')
  )
  FROM {{ source('github', 'matches_file') }}
)
SELECT * REPLACE (
  cast(strptime(tourney_date, '%Y%m%d') AS date) as tourney_date
)
FROM noWinLoss

內容解密:

  1. {{ config(...) }}:這是dbt的組態宏,用於指定模型的物化方式、儲存位置和格式。這裡將模型物化為外部的Parquet檔案。
  2. WITH noWinLoss AS (...):這是一個公用表表達式(CTE),用於篩選掉以w_l_開頭的欄位。
    • COLUMNS(col -> NOT regexp_matches(col, 'w_.*') AND NOT regexp_matches(col, 'l_.*')):這裡使用了DuckDB的COLUMNS函式和regexp_matches函式來動態篩選欄位。
  3. SELECT * REPLACE (...):這裡將tourney_date欄位從字串轉換為日期型別。
  4. FROM {{ source('github', 'matches_file') }}:這裡參照了前面定義的資料來源。

接下來,建立輸出目錄以便dbt可以將Parquet檔案寫入其中:

mkdir output

然後,執行dbt命令以執行管道:

dbt run

這個過程將從GitHub取得CSV資料,透過DuckDB執行SQL轉換,並將結果儲存為Parquet檔案。執行成功後,可以在輸出目錄中找到生成的Parquet檔案,並使用DuckDB CLI檢查其內容。

檢查輸出結果

執行以下命令檢查Parquet檔案的大小和內容:

du -h output/*

輸出結果應類別似於:

120K output/matches.parquet

然後,開啟DuckDB CLI會話檢查Parquet檔案的內容:

SELECT count(*) FROM 'output/matches.parquet';

輸出結果應顯示記錄數,例如2986。進一步,可以查詢Parquet檔案的內容以驗證資料是否正確轉換。

.mode line
SELECT * FROM 'output/matches.parquet' LIMIT 1;

輸出結果將顯示第一筆記錄的詳細資訊,包括比賽ID、名稱、日期等欄位。

使用dbt進行資料轉換與測試

在建立資料管道的過程中,資料轉換和測試是至關重要的步驟。dbt(Data Build Tool)是一個強大的工具,它允許我們以SQL為基礎來定義資料轉換,並且提供了豐富的測試功能,以確保資料的品質和一致性。

8.3.4 測試資料轉換和管道

dbt測試是對資料模型進行的斷言,用於確保資料品質和一致性。透過定義這些測試,我們可以驗證我們的轉換,捕捉諸如NULL值、重複值或參照完整性違規等問題。

建立測試

我們可以在schema.yml檔案中定義測試,該檔案位於models/atp/schema.yml,與我們的模型檔案相鄰。我們將為幾個欄位建立測試,但在生產管道中,您需要為所有欄位建立測試,以確保轉換按預期工作。

version: 2
models:
  - name: matches
    description: "ATP網球比賽架構"
    columns:
      - name: tourney_id
        description: "比賽的ID"
        tests:
          - not_null
      - name: winner_id
        description: "獲勝選手的ID"
        tests:
          - not_null
      - name: loser_id
        description: "失敗選手的ID"
        tests:
          - not_null
      - name: surface
        description: "球場的表面"
        tests:
          - not_null
          - accepted_values:
              values: ['Grass', 'Hard', 'Clay']

執行測試

要執行測試,可以執行以下命令:

dbt test

執行結果將顯示每個測試的透過或失敗狀態。

擴充套件測試功能

雖然dbt提供了多個內建斷言,但有時我們需要進行更細粒度的測試。例如,我們可能想要測試tourney_date欄位的值範圍。

安裝dbt_expectations套件

為了實作更複雜的測試,我們需要安裝dbt_expectations套件。在專案的頂層,建立packages.yml檔案,並新增以下內容:

packages:
  - package: calogica/dbt_expectations
    version: 0.10.1

然後執行以下命令安裝套件:

dbt deps

新增測試斷言

更新models/atp/schema.yml,新增一個斷言,檢查tourney_date的值是否在2023年1月1日至2023年12月31日之間。

models:
  - name: matches
    # 保留原有的斷言
    - name: tourney_date
      description: "驗證比賽是否在2023年開始"
      tests:
        - dbt_expectations.expect_column_values_to_be_of_type:
            column_type: date
        - dbt_expectations.expect_column_min_to_be_between:
            min_value: "CAST('2023-01-01' AS DATE)"
            max_value: "CAST('2023-12-31' AS DATE)"

重新執行dbt test,新的斷言將被納入測試輸出中。

8.3.5 處理所有CSV檔案

到目前為止,我們只處理了2023年的比賽資料,但還有從1968年開始的其他CSV檔案需要處理。我們從一個檔案開始,以保持等待時間較短,從而能夠立即獲得有關我們的架構和模型是否有效的反饋。

未來擴充套件方向

未來,我們可以擴充套件我們的資料管道,以處理所有可用的CSV檔案,並進一步最佳化我們的資料轉換和測試流程,以確保資料管道的穩定性和可靠性。

使用dbt進行資料轉換與建模

在資料處理的過程中,我們首先需要更新資料來源的設定。具體來說,我們將修改models/atp/sources.yml檔案,以動態生成1968年至2023年間所有CSV檔案的URL,並使用read_csv_auto函式讀取這些檔案。這是DuckDB作為處理工具而非儲存工具的典型應用範例。

version: 2
sources:
  - name: github
    meta:
      external_location: >
        FROM read_csv_auto(
          list_transform(
            range(1968, 2023),
            y -> 'https://raw.githubusercontent.com/JeffSackmann/tennis_atp/master/atp_matches_' || y || '.csv'
          ),
          types={'winner_seed': 'VARCHAR', 'loser_seed': 'VARCHAR'}
        )
    formatter: oldstyle
    tables:
      - name: matches_file

內容解密:

  1. version: 2:指定dbt組態的版本。
  2. sources:定義資料來源。
  3. external_location:使用read_csv_auto函式從指定的URL讀取CSV檔案。list_transform函式生成1968至2023年間的CSV檔案URL。
  4. types:指定某些欄位的資料型別,以確保正確解析CSV檔案。
  5. formatter: oldstyle:允許在查詢中使用{}字元。

更新完sources.yml後,執行dbt run以生成新的Parquet檔案。隨後,我們可以查詢新生成的Parquet檔案以驗證資料是否正確載入。

SELECT count(*) FROM 'output/matches.parquet';

查詢結果顯示資料列數遠多於之前,表明資料已成功擴充。

測試與除錯

執行dbt test後,我們發現有三個測試失敗:

  • not_null_matches_surface:表示存在NULL的surface值。
  • accepted_values_matches_surface__Grass__Hard__Clay:表示存在非Grass、Hard或Clay的surface值。
  • dbt_expectations_expect_column_min_to_be_between_matches_tourney_date:表示存在不符合預期範圍的tourney_date值。

問題分析與解決

首先,我們來分析surface欄位的問題。查詢結果顯示,除了Grass、Hard和Clay外,還存在Carpet型別的球場表面,以及2937個NULL值。

SELECT surface, count(*) FROM 'output/matches.parquet' GROUP BY ALL;

結果如下:

┌─────────┬──────────────┐
│ surface │ count_star() │
│ varchar │ int64       │
├─────────┼──────────────┤
│ Clay    │ 67537       │
│ Carpet  │ 20900       │
│ Hard    │ 74814       │
│ Grass   │ 22746       │
│         │ 2937        │
└─────────┴──────────────┘

針對這些問題,我們需要更新schema.yml以允許Carpet作為有效的surface值,並在matches.sql中新增過濾條件以排除NULL的surface值。

更新後的schema.yml如下:

version: 2
models:
  - name: matches
    description: "ATP tennis matches schema"
    columns:
      - name: surface
        description: "The surface of the court."
        tests:
          - not_null
          - accepted_values:
              values: ['Grass', 'Hard', 'Clay', 'Carpet']
      - name: tourney_date
        description: "The date when the tournament started"
        tests:
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: date
          - dbt_expectations.expect_column_min_to_be_between:
              min_value: "CAST('1967-12-01' AS DATE)"
              max_value: "CAST('2023-12-31' AS DATE)"

內容解密:

  1. accepted_values:更新測試以允許Carpet作為有效的surface值。
  2. dbt_expectations.expect_column_min_to_be_between:調整日期範圍測試,以涵蓋1967年12月至2023年12月之間的日期。

同時,我們在matches.sql中新增WHERE子句以過濾掉NULL的surface值。

{{ config(
  materialized='external',
  location='output/matches.parquet',
  format='parquet'
) }}

WITH noWinLoss AS (
  SELECT COLUMNS(col ->
    NOT regexp_matches(col, 'w_.*')
    AND NOT regexp_matches(col, 'l_.*')
  )
  FROM {{ source('github', 'matches_file') }}
  WHERE surface IS NOT NULL
)

內容解密:

  1. WHERE surface IS NOT NULL:新增條件以排除NULL的surface值。

透過上述調整,我們能夠更好地處理資料中的異常值,並確保資料品質符合預期。