返回文章列表

MotherDuck最佳實踐與資料管道建置

本文探討 MotherDuck 的最佳實踐,包含提升查詢效能、自動修復 SQL 錯誤以及資料管道的建置。文章將深入解析如何使用 DuckDB CLI 和 Python 等語言與 MotherDuck 整合,並示範如何使用 dlt 載入資料,以及如何利用 DuckDB

資料函式庫 資料工程

MotherDuck 作為無伺服器資料分析平台,讓使用者能直接在瀏覽器中進行資料查詢與分析。透過 md: 協定,可以方便地與 DuckDB CLI 和 Python 等語言整合,並載入 MotherDuck 擴充功能。最佳化查詢效能方面,可以透過避免不必要的表格連線,例如直接從 posts 表格擷取標題和評論數量,有效縮短查詢時間。MotherDuck 也提供自動修復 SQL 錯誤的功能,例如自動新增遺漏的 GROUP BY 子句或修正錯誤的連線欄位名稱,簡化開發流程。MotherDuck 支援多種資料傳輸、商業智慧和資料視覺化工具,並能與現有的 DuckDB 整合和驅動程式相容,只需修改連線字串即可。資料管道建置方面,DuckDB 在資料擷取、轉換和協調中扮演重要角色,並支援批次處理和串流資料兩種處理方式。

利用MotherDuck的最佳實踐

MotherDuck是一個無伺服器的資料分析平台,讓使用者能夠在瀏覽器中查詢和分析雲端儲存中的資料。透過與DuckDB CLI、Python等語言整合的md:協定,能夠無縫地載入MotherDuck擴充功能。

提升查詢效能

由於評論數量是posts表格中的一欄,因此不需要與評論表格進行連線。讓我們看看是否能夠透過調整提示來生成僅使用posts表格的查詢:

call prompt_sql("System: No joins! User: Which 5 questions have the most comments, what is the post title and comment count");

查詢結果:

query = SELECT Title, CommentCount
FROM posts
WHERE PostTypeId = 1
ORDER BY CommentCount DESC
LIMIT 5;

執行時間(秒):實際3.587,使用者0.001733,系統0.000865

內容解密:

  1. SELECT Title, CommentCount:選擇要檢索的欄位,分別是標題和評論數量。
  2. FROM posts:指定要查詢的表格為posts。
  3. WHERE PostTypeId = 1:篩選出PostTypeId為1的資料,即問題型別的帖子。
  4. ORDER BY CommentCount DESC:根據評論數量進行降序排序。
  5. LIMIT 5:限制輸出結果為前5筆資料。

自動修復SQL錯誤

你也可以使用call prompt_fixup()來修復SQL陳述式,例如忘記新增GROUP BY子句:

call prompt_fixup("select postTypeId, count(*) from posts");

查詢結果:

query = SELECT postTypeId, COUNT(*) FROM posts GROUP BY postTypeId

執行時間(秒):實際12.006,使用者0.004266,系統0.002980

內容解密:

  1. SELECT postTypeId, COUNT(*):選擇要檢索的欄位,分別是帖子型別ID和計數。
  2. FROM posts:指定要查詢的表格為posts。
  3. GROUP BY postTypeId:根據postTypeId進行分組。

修復錯誤的連線欄位名稱

或者,你可以使用它來修復錯誤的連線欄位名稱:

call prompt_fixup("select count(*) from posts join users on posts.userId = users.userId");

查詢結果:

query = SELECT COUNT(*) FROM posts JOIN users ON posts.OwnerUserId = users.Id

執行時間(秒):實際2.378,使用者0.001770,系統0.001067

內容解密:

  1. SELECT COUNT(*):計算連線後的資料筆數。
  2. FROM posts JOIN users:將posts表格與users表格進行連線。
  3. ON posts.OwnerUserId = users.Id:指定連線條件,即posts表格中的OwnerUserId與users表格中的Id。

MotherDuck的整合能力

MotherDuck支援多種資料傳輸、商業智慧和資料視覺化工具,如圖7.6所示。它可以位於資料管道的中間,從來源直接或透過其他服務擷取資料,然後儲存在MotherDuck中,供商業智慧應用或特定資料科學工具查詢。

此外,任何現有的DuckDB整合和驅動程式也適用於MotherDuck,只需在資料函式庫連線字串中插入md:字首並附加?motherduck_token=<token>引數即可。

資料管道建置

本章將探討DuckDB在更廣泛的資料生態系統中的角色,強調其在建立強健資料管道和增強工作流程方面的重要性。首先,我們將討論資料管道的意義和相關性,然後評估一些有用的工具,包括擷取、轉換和協調。

資料管道與DuckDB的角色

資料管道通常設定為從各種來源擷取和擷取資料到資料儲存中,例如資料函式庫、雲端中的平面檔案或資料倉函式庫。在儲存之前,資料通常會經過多種處理和轉換,包括連線資料集、過濾、聚合或遮罩,以實作適當的整合和標準化。

圖8.1展示了一個潛在的資料管道,展示了對資料採取的一些操作以及它可以採取的一些方向。

圖8.1 資料管道中的流程

此圖示展示了從資料來源到儲存和產品的流程,包括擷取、轉換和儲存等步驟。

此圖示說明瞭資料管道的主要組成部分,包括:

  1. 資料來源:串流和資料函式庫
  2. 擷取:從來源載入資料
  3. 轉換:對資料進行處理和轉換
  4. 儲存:將處理後的資料儲存在資料函式庫、資料倉函式庫或資料湖中
  5. 產品:將資料用於建立儀錶板、API和機器學習模型等產品

本章涵蓋內容

 資料管道的意義和相關性  DuckDB在管道中的角色  DuckDB如何與Python為基礎的資料載入工具和dbt Labs的資料建置工具整合  使用Dagster協調管道

使用dlt進行資料匯入

在處理資料管線時,通常會遇到兩種主要的資料處理方式:批次處理和串流資料。本章節主要關註批次處理資料的管線。批次處理通常形成一系列指令的工作流程,其中每個指令的輸出成為下一個指令的輸入。當最後的轉換完成並將資料儲存在所需的儲存函式庫中後,處理才算完成。批次處理適用於不需要立即分析每個變更或對即時變更做出反應的情況。

值得注意的是,提取、轉換和載入(ETL)的流程是資料管線的一個子類別。並非所有管線都遵循那個確切的順序。雖然在大多數情況下,提取是第一步,但資料可以先載入到所需的儲存中,然後再進行轉換。這個順序被稱為ELT。在使用像MotherDuck這樣的雲端服務時,構建ETL或ELT管線的選擇非常重要,因為這樣可以充分利用資源。有時,使用本地資源轉換資料更好;而有時,轉換已經儲存在雲端的資料更有效。

那麼DuckDB在這方面扮演什麼角色呢?雖然DuckDB可以作為管線中的儲存系統,但由於其簡單而強大的執行模型,它通常位於管線中的轉換和處理部分:單一的二進位制檔案能夠處理大型資料集,使用多種來源和儲存格式作為輸入,提供完整的SQL引擎以多種不同的方式轉換資料。

SQL的廣泛支援提供了第一個分享語言,以便與相關的處理工具(如本章後面介紹的dbt)整合。第二個分享語言通常——尤其是在將資料儲存到資料湖時——使用Parquet作為輸出格式。

讓我們來看看使用DuckDB可能的資料管線。首先,我們從匯入一些資料開始。

使用dlt進行資料匯入

資料載入工具(dlt;參見https://dlthub.com/)是一個開源的Python函式庫,它允許我們將資料從各種通常比較雜亂的資料來源載入到多種目的地。為什麼要使用dlt而不是執行自己的Python腳原本構建匯入管線?dlt的主要入口——pipeline函式,可以從來源資料推斷schema並將資料載入到目的地,在那裡建立合適的schema。我們可以使用這個管線處理JSON資料、DataFrames或其他可迭代物件,如生成器函式,而無需更改後續的任何處理。該引擎還負責版本控制,以便資料團隊可以專注於使用資料和創造價值,同時透過及時通知任何變更來確保有效的治理。

dlt提供了一組預定義的來源和目的地,包括SQL資料函式庫、GitHub和其他有趣的API。其中一個開箱即用的目的地是DuckDB。也可以定義自定義來源和目的地,但本文不涵蓋這個主題。

一個有趣的內建dlt來源是http://chess.com。他們的API提供了關於玩家和遊戲的資訊。在本文中,我們將使用該來源構建一個小型國際象棋資料函式庫,並使用DuckDB,如圖8.2所示。

dlt是用Python編寫的,我們假設您有一個可用的Python環境和一個可用的pip命令。pip是Python推薦的套件管理器,我們使用它來安裝dlt,方法是執行以下命令:

pip install dlt

如果您跳過了第6章,您可能沒有安裝DuckDB Python擴充套件。為了使管線正常工作,必須安裝它:

pip install duckdb

我們即將以互動方式構建的管線的完整原始碼也可在我們的GitHub範例倉函式庫中找到,網址為https://mng.bz/d6pv。

安裝支援的來源

接下來,我們將初始化我們的Chess.com管線(https://mng.bz/rVle)。這是透過dlt init完成的,它需要兩個引數:來源和目的地。這裡的來源將是內建的chess.com dlt來源,而目的地是duckdb。由於我們的所需來源和目的地都被該工具官方支援,我們可以在shell中執行以下命令來建立我們的第一個管線所需的所有檔案和定義:

dlt init chess duckdb

這個命令將在本地建立一些檔案,包括可執行的指令碼,因此我們需要確認是否願意這樣做:

Looking up the init scripts in https://github.com/dlt-hub/verified-sources.git...
Cloning and configuring a verified source chess (A source loading player profiles and games from chess.com api)
Do you want to proceed? [Y/n]:

一旦完成,我們將看到以下輸出:

Verified source chess was added to your project!
* See the usage examples and code snippets to copy from chess_pipeline.py
* Add credentials for duckdb and other secrets in ./.dlt/secrets.toml

內容解密:

  1. dlt init chess duckdb 命令用於初始化一個新的dlt管線,將chess.com作為資料來源,DuckDB作為目的地。
  2. 這個命令會從GitHub倉函式庫中克隆並組態一個已驗證的chess.com來源。
  3. 初始化過程中會建立必要的檔案和定義,包括可執行的指令碼,用於載入chess.com的玩家資料和遊戲資料到DuckDB中。
  4. 初始化完成後,使用者需要根據指示新增DuckDB的憑證和其他秘密到指定的設定檔中,以便管線能夠正確執行。

圖表說明:從chess.com匯入資料到DuckDB

此圖示展示瞭如何使用dlt將chess.com的資料匯入到DuckDB中。首先,透過dlt提供的API從chess.com提取玩家資料和遊戲資料。然後,這些資料被載入到DuckDB中進行進一步的處理和分析。

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖表說明:從chess.com匯入資料到DuckDB

rectangle "提取玩家資料和遊戲資料" as node1
rectangle "載入資料" as node2
rectangle "處理和分析" as node3

node1 --> node2
node2 --> node3

@enduml

內容解密:

  1. 此圖示展示了使用dlt從chess.com提取資料並載入到DuckDB的流程。
  2. 資料從chess.com的API中提取,包括玩家資料和遊戲資料。
  3. 提取的資料透過dlt載入到DuckDB中。
  4. 在DuckDB中對載入的資料進行進一步的處理和分析,以產生所需的結果。

使用dlt進行資料擷取與管道建立

本章節主要介紹如何使用dlt工具來建立資料管道,並將Chess.com的資料擷取到DuckDB資料函式庫中。

初始化dlt專案

首先,我們需要初始化一個dlt專案。執行初始化指令後,會產生一個名為chess的目錄,其中包含了一些輔助函式和一個名為chess_pipeline.py的範例管道檔案。

安裝必要的套件

requirements.txt檔案中,列出了所需的Python套件。我們可以使用以下指令安裝這些套件:

pip install -r requirements.txt

建立dlt管道

dlt管道是用Python撰寫的。我們可以在新的Python檔案、Python REPL或Jupyter Notebook中輸入以下Python程式碼片段。

匯入必要的函式庫

import dlt
from chess import source

初始化管道

pipeline = dlt.pipeline(
    pipeline_name="chess_pipeline",
    destination="duckdb",
    dataset_name="main"
)

這裡,我們建立了一個名為chess_pipeline的管道,並指定了DuckDB作為資料儲存的目的地。

定義資料來源

data = source(
    players=[
        "magnuscarlsen", "vincentkeymer",
        "dommarajugukesh", "rpragchess"
    ],
    start_month="2022/11",
    end_month="2022/11",
)

這個資料來源包含了四位知名棋手的資料,包括他們的個人資料、對戰紀錄和線上狀態。

選擇特定的資料資源

players_profiles = data.with_resources("players_profiles")

這裡,我們選擇了棋手的個人資料作為要擷取的資料。

執行管道

info = pipeline.run(players_profiles)
print(info)

執行管道後,我們可以看到類別似以下的輸出:

Pipeline chess_pipeline completed in 0.62 seconds
1 load package(s) were loaded to destination duckdb and into dataset main
The duckdb destination used duckdb:////path/to/code/ch08/dlt_example/chess_pipeline.duckdb location to store data
Load package 1696519035.883884 is LOADED and contains no failed jobs

驗證資料是否成功匯入DuckDB

開啟另一個終端機,並使用以下指令載入DuckDB:

duckdb chess_pipeline.duckdb

然後,執行以下指令檢查資料表:

SHOW TABLES;

輸出結果如下:

┌─────────────────────┐
│ name │
│ varchar │
├─────────────────────┤
│ _dlt_loads │
│ _dlt_pipeline_state │
│ _dlt_version │
│ players_profiles │
└─────────────────────┘

我們可以看到,dlt建立了一些用於儲存中繼資料的資料表,以及我們指定的players_profiles資料表。

查詢匯入的資料

SELECT * FROM players_profiles LIMIT 1;

輸出結果如下:

avatar = https://images.chesscomfiles.com/uploads/v1/user/138850604.80351cd5.200x200o.3129ed9b015d.jpeg
player_id = 138850604
aid = https://api.chess.com/pub/player/dommarajugukesh
url = https://www.chess.com/member/DommarajuGukesh
name = Gukesh Dommaraju
username = dommarajugukesh
followers = 3
country = https://api.chess.com/pub/country/IN
location = Chennai
last_online = 2022-07-16 19:18:02+01
joined = 2021-05-05 10:27:46+01
status = basic
is_streamer = false
verified = false
league = Wood
_dlt_load_id = 1696519035.883884
_dlt_id = kldRaeRA40OGBA
title =

這是Gukesh Dommaraju的個人資料,證明管道運作正常。

載入額外的資料

如果我們想要載入更多的資料,例如棋手的對戰紀錄,可以使用以下指令:

info = pipeline.run(data.with_resources("players_profiles", "players_games"))
print(info)

輸出結果如下:

Getting archive from https://api.chess.com/pub/player/magnuscarlsen/games/2022/11
Getting archive from https://api.chess.com/pub/player/vincentkeymer/games/2022/11
Getting archive from https://api.chess.com/pub/player/rpragchess/games/2022/11
Pipeline chess_pipeline completed in 1.89 seconds
1 load package(s) were loaded to destination duckdb and into dataset main
The duckdb destination used duckdb:////path/to/code/ch08/dlt_example/chess_pipeline.duckdb location to store data
Load package 1696519484.186974 is LOADED and contains no failed jobs

詳細解說:

  1. dlt.pipeline()函式:用於建立一個新的管道,引數包括管道名稱、目的地和資料集名稱。
  2. source()函式:用於定義資料來源,這裡指定了四位棋手的名稱和日期範圍。
  3. with_resources()方法:用於選擇特定的資料資源,例如棋手的個人資料或對戰紀錄。
  4. pipeline.run()方法:用於執行管道,將資料匯入DuckDB。
  5. SHOW TABLES;指令:用於檢查DuckDB中的資料表。
  6. SELECT * FROM players_profiles LIMIT 1;指令:用於查詢匯入的資料。