現代資料工程中,協調和管理資料管道至關重要。本文將探討如何利用 Dagster 和 DuckDB 建構穩健且可擴充套件的資料管道。首先,我們會利用 Dagster 的 asset 定義資料處理流程,並結合 DuckDB 資源進行資料的載入和轉換。接著,我們將探討如何管理資產間的依賴關係,確保資料處理的正確順序。此外,文章還會示範如何使用 Pandas 進行更進階的資料處理,以及如何將處理後的資料上傳至 MotherDuck 雲端資料函式庫,以利後續應用程式使用。最後,我們將整合所有步驟,並提供一個完整的資料管道範例,讓讀者可以快速上手並應用於實際專案中。
使用Dagster協調資料管道
本章節中,我們探討瞭如何利用DuckDB從外部來源載入資料或在不同格式之間轉換資料。這些是重要的任務,但我們仍然缺少資料管道拼圖中的一塊:如何觸發或協調轉換或攝取程式碼?
在沒有協調工具的世界中,我們需要編寫自己的手動排程和執行程式碼。我們需要設定cron作業來執行dbt命令,並編寫自定義腳原本處理dbt任務的排序和依賴關係。
幸運的是,像Airflow、Luigi、Kestra、Prefect和Dagster(本章使用的工具)這樣的工具確實存在。這些工具控制資料管道的協調,這是我們將在本文中探討的內容。
Dagster簡介
Dagster是一個雲原生的工具,旨在管理和組織模組化管道中的資料流。其核心概念之一是(軟體定義的)資產,它是持久儲存中的一個物件,例如表格、檔案或機器學習模型。軟體定義的資產是對應該存在的資產及其生成和更新方式的程式碼描述。資產構成了資料處理作業的一部分,然後可以排程這些作業。
與dlt和dbt一樣,Dagster是用Python編寫的。它旨在使開發人員在不同階段(例如建立、佈署和監控資料資產)中使用資料變得更加容易,透過使用Python函式來描述資料資產。這些函式告訴Dagster要建立或更新哪些資料資產,以及資產的依賴關係。用Python函式描述資產使我們能夠將任何依賴關係和互動描述為可驗證的程式碼,這對於開發人員來說是一個很大的優勢。
定義資產
首先,我們需要安裝使用Dagster和DuckDB所需的主要依賴項:
pip install dagster dagster-duckdb
我們還將安裝dagster-webserver,它是用於執行Dagster UI並可用於視覺化管道的:
pip install dagster-webserver
接下來,我們建立一個名為atp的目錄,並新增以下檔案:
atp/__init__.py:協調程式碼將放在這裡。atp/assets.py:資產定義程式碼將放在這裡。
我們首先在assets.py中建立一個Python函式來定義名為atp_matches_dataset的資產,該資產將atp_matches_*.csv檔案載入到DuckDB的matches表格中。
from dagster_duckdb import DuckDBResource
from dagster import asset
@asset
def atp_matches_dataset(duckdb_resource: DuckDBResource) -> None:
base = "https://raw.githubusercontent.com/JeffSackmann/tennis_atp/master"
csv_files = [
f"{base}/atp_matches_{year}.csv"
for year in range(1968, 2024)
]
create_query = """
CREATE OR REPLACE TABLE matches AS
SELECT * REPLACE(
cast(strptime(tourney_date, '%Y%m%d') AS date) as tourney_date
)
FROM read_csv_auto($1, types={
'winner_seed': 'VARCHAR',
'loser_seed': 'VARCHAR',
'tourney_date': 'STRING'
})
"""
with duckdb_resource.get_connection() as conn:
conn.execute(create_query, [csv_files])
設定資產和作業
接下來,我們將更新__init__.py以組態atp_matches_dataset作為資產,並指定應該建立DuckDB資料函式庫的位置。該檔案基本上控制了可用的函式庫,決定了何時應該執行作業,並在必要時使用來自環境的資訊。我們還將建立一個包含我們的資產的作業和一個將每小時執行一次該作業的排程。
from dagster_duckdb import DuckDBResource
from dagster import (
AssetSelection,
ScheduleDefinition,
Definitions,
define_asset_job,
load_assets_from_modules,
)
from . import assets
atp_job = define_asset_job("atp_job", selection=AssetSelection.all())
內容解密:
atp_matches_dataset函式:這個函式使用Dagster的@asset裝飾器定義了一個名為atp_matches_dataset的資產。它從GitHub載入CSV檔案並將它們匯入到DuckDB中的matches表格。create_query:這是一個SQL查詢,用於建立或替換matches表格。它使用DuckDB的read_csv_auto函式從提供的URL列表中讀取CSV檔案,並在載入過程中將tourney_date欄位轉換為日期型別。__init__.py中的組態:這部分程式碼設定了Dagster作業和排程。它定義了一個名為atp_job的作業,該作業包含所有資產,並將其排程為每小時執行一次。
透過這種方式,我們成功地使用Dagster協調了資料管道,將CSV檔案載入到DuckDB中,並對資料進行了必要的轉換。這展示瞭如何利用Dagster管理和組織資料流,以實作更高效的資料處理和分析。
使用 Dagster 協調資料管線
建立資料管線
首先,我們需要定義一個 Dagster 的工作(job)和排程(schedule)。以下是一個範例程式碼,用於建立一個名為 atp_job 的工作和一個名為 atp_schedule 的排程:
atp_schedule = ScheduleDefinition(
job=atp_job,
cron_schedule="0 * * * *",
)
程式碼解密:
ScheduleDefinition是 Dagster 中用於定義排程的函式。job=atp_job指定了該排程對應的工作。cron_schedule="0 * * * *"定義了排程的執行時間,這裡表示每小時執行一次。
接下來,我們需要載入資產(assets)並定義 Dagster 的組態:
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
jobs=[atp_job],
resources={"duckdb": DuckDBResource(database="atp.duckdb")},
schedules=[atp_schedule],
)
程式碼解密:
load_assets_from_modules([assets])載入來自assets模組的所有資產。Definitions是 Dagster 中用於整合所有元件的函式,包括資產、工作、資源和排程。resources={"duckdb": DuckDBResource(database="atp.duckdb")}定義了一個名為duckdb的資源,指向一個 DuckDB 資料函式庫檔案。
執行資料管線
要執行 Dagster UI,需要使用 dagster dev 命令,並指定包含定義的目錄:
dagster dev -m atp
然後,在網頁瀏覽器中導航到 http://localhost:3000,即可看到 Dagster UI 中的工作、排程和資產圖表。
管理管線中的依賴關係
為了新增選手資料,我們需要在 assets.py 中新增兩個資產:atp_players_dataset 和 atp_players_name_dataset。
首先,atp_players_dataset 資產用於載入選手資料並進行日期格式轉換:
@asset
def atp_players_dataset(duckdb: DuckDBResource) -> None:
base = "https://raw.githubusercontent.com/JeffSackmann/tennis_atp/master"
csv_file = f"{base}/atp_players.csv"
with duckdb.get_connection() as conn:
conn.execute("""
CREATE OR REPLACE TABLE players AS
SELECT * REPLACE(
CASE
WHEN dob IS NULL THEN NULL
WHEN SUBSTRING(CAST(dob AS VARCHAR), 5, 4) = '0000' THEN
CAST(strptime(
CONCAT(SUBSTRING(CAST(dob AS VARCHAR), 1, 4), '0101'),
'%Y%m%d'
) AS date)
ELSE
CAST(strptime(dob, '%Y%m%d') AS date)
END AS dob
)
FROM read_csv_auto($1, types = {'dob': 'STRING'});
""", [csv_file])
程式碼解密:
- 該資產從指定的 CSV 檔案中載入選手資料。
- 對
dob欄位進行處理,將月份和日期為00的值轉換為有效的日期。
接著,atp_players_name_dataset 資產用於在 players 表中新增 name_full 欄位:
@asset(deps=[atp_players_dataset])
def atp_players_name_dataset(duckdb: DuckDBResource) -> None:
concatenate_query = """
ALTER TABLE players ADD COLUMN name_full VARCHAR;
UPDATE players
SET name_full = name_first || ' ' || name_last
"""
with duckdb.get_connection() as conn:
conn.execute(concatenate_query, [])
程式碼解密:
- 該資產依賴於
atp_players_dataset,確保在執行前選手資料已經被載入。 - 在
players表中新增name_full欄位,並將name_first和name_last合併後填入該欄位。
使用 Dagster 與 DuckDB 建立資料管線
資產依賴與資料處理
在建立資料管線的過程中,資產之間的依賴關係是至關重要的。Dagster 提供了資產(asset)的概念,允許開發者定義資料處理的依賴關係。在本例中,我們定義了多個資產,包括 atp_matches_dataset、atp_players_dataset 和 atp_players_name_dataset。其中,atp_players_name_dataset 依賴於 atp_players_dataset,這意味著只有當 atp_players_dataset 完成後,atp_players_name_dataset 才會開始執行。
程式碼範例:資產定義
@asset
def atp_players_name_dataset(duckdb: DuckDBResource) -> None:
with duckdb.get_connection() as conn:
conn.execute("""
ALTER TABLE players
ADD COLUMN name_full VARCHAR;
UPDATE players
SET name_full = name_first || ' ' || name_last;
""")
內容解密:
@asset修飾符:用於定義 Dagster 中的資產,使其能夠被 Dagster 識別和管理。duckdb: DuckDBResource引數:提供與 DuckDB 資料函式庫的連線資源,使得資產能夠對資料函式庫進行操作。conn.execute()方法:執行 SQL 陳述式,在此範例中用於修改players表結構並更新資料。- SQL 陳述式解析:
ALTER TABLE players ADD COLUMN name_full VARCHAR;:為players表新增一個名為name_full的欄位。UPDATE players SET name_full = name_first || ' ' || name_last;:將name_first和name_last合併為name_full。
資產執行與依賴關係
當執行 Dagster 任務時,資產之間的依賴關係確保了資料處理的正確順序。例如,當執行 atp_job 時,Dagster 會根據資產定義的依賴關係,先執行 atp_matches_dataset 和 atp_players_dataset,然後才執行依賴於 atp_players_dataset 的 atp_players_name_dataset。
命令列執行範例
dagster job execute -m atp --job atp_job
內容解密:
dagster job execute命令:用於執行 Dagster 中的任務。-m atp引數:指定要執行的模組名稱。--job atp_job引數:指定要執行的任務名稱。
高階計算與 Pandas 整合
Dagster 不僅支援基本的資料處理,還允許開發者利用 Python 的強大功能進行複雜的資料計算。例如,可以使用 Pandas 進行資料處理,並將結果存入 DuckDB。
程式碼範例:使用 Pandas DataFrame
import pandas as pd
@asset
def atp_levels_dataset(duckdb: DuckDBResource) -> None:
levels_df = pd.DataFrame({
"short_name": ["G", "M", "A", "C", "S", "F"],
"name": ["Grand Slam", "Tour Finals", "Masters 1000s", "Other Tour Level", "Challengers", "ITFs"],
"rank": [5, 4, 3, 2, 1, 0]
})
with duckdb.get_connection() as conn:
conn.execute("CREATE TABLE IF NOT EXISTS levels AS SELECT * FROM levels_df")
內容解密:
import pandas as pd:匯入 Pandas 函式庫,用於資料處理。levels_df = pd.DataFrame({...}):建立一個包含網球比賽級別資訊的 Pandas DataFrame。conn.execute()方法:將 DataFrame 中的資料存入 DuckDB 的levels表中。
將資料上傳至 MotherDuck
完成本地資料處理後,可以將資料上傳至 MotherDuck 以便於應用程式存取。首先,需要在 Dagster 的設定中修改資料函式庫連線字串,使其指向 MotherDuck。
程式碼範例:修改資料函式庫連線
import dotenv
import os
dotenv.load_dotenv()
mduck_token = os.getenv("motherduck_token")
defs = Definitions(
assets=all_assets,
jobs=[atp_job],
resources={"duckdb": DuckDBResource(
database=f"md:md_atp_db?motherduck_token={mduck_token}",
schema="main"
)},
schedules=[atp_schedule],
)
內容解密:
dotenv.load_dotenv():載入環境變數設定。mduck_token = os.getenv("motherduck_token"):取得 MotherDuck 的 token。database=f"md:md_atp_db?motherduck_token={mduck_token}":設定連線 MotherDuck 資料函式庫的字串。