返回文章列表

DuckDB Polars 高效能資料處理

本文探討如何結合 DuckDB 和 Polars 進行高效能資料處理,並整合 FugueSQL 進行分散式計算。DuckDB 的內嵌式設計和向量化引擎使其具備出色的效能,Polars 則以其高效能的 DataFrame 函式庫和與 DuckDB 的無縫整合,提供更全面的資料分析和操作生態系統。文章涵蓋了 DuckDB

資料函式庫 資料科學

DuckDB 作為一款內嵌式分析型資料函式庫,能與 Pandas DataFrame 緊密整合,大幅提升資料處理效率。其向量化執行引擎有效平行化查詢操作,尤其在 OLAP 場景下展現優異效能。搭配 Polars 這個高效能 DataFrame 函式庫,更能簡化資料操作流程。Polars 利用 Apache Arrow 陣列進行內部資料表示,在載入時間、記憶體使用和計算方面都優於 Pandas,同時也支援與 DuckDB 無縫整合,提供更全面的資料分析和操作生態系統。此外,文章也介紹瞭如何利用 FugueSQL 進行分散式資料處理,讓使用者能以熟悉的 SQL 語法操作 pandas、Spark 和 Dask DataFrames,進一步提升資料處理效率和可擴充套件性。

使用DuckDB進行分散式資料處理

DuckDB是一種內嵌式資料函式庫,能夠直接在應用程式中執行SQL查詢,無需額外的伺服器或客戶端/伺服器架構。這種設計使其非常適合用於分析型工作負載(OLAP),並且具備出色的效能和可擴充套件性。

將Pandas DataFrame載入DuckDB

首先,我們需要將Pandas DataFrame載入DuckDB中。以下是一個簡單的例子:

import pandas as pd
import duckdb

# 建立一個Pandas DataFrame
mydf = pd.DataFrame({'a': [1, 2, 3]})

# 將DataFrame註冊到DuckDB中
con = duckdb.connect()
con.register('mydf', mydf)

# 執行SQL查詢
query = "SELECT SUM(a) FROM mydf"
result = con.execute(query).fetchall()[0][0]

print(result)

內容解密:

  1. 首先,我們匯入了pandasduckdb函式庫。
  2. 建立了一個名為mydf的Pandas DataFrame,包含一個名為a的欄位和三行資料。
  3. mydf註冊到DuckDB中,並命名為mydf
  4. 執行了一個SQL查詢,計算a欄位的總和。
  5. 使用fetchall()方法取得查詢結果,並列印出來。

使用DuckDB進行分析查詢

import pandas as pd
import duckdb

# 建立書籍銷售資料的DataFrame
data = [
    {'Title': 'Python for Data Analysis', 'Author': 'Wes McKinney', 'Publisher': "TechBooks", 'Price': 39.99, 'UnitsSold': 1000},
    {'Title': 'Hands-On Machine Learning', 'Author': 'Aurélien Géron', 'Publisher': "TechBooks", 'Price': 49.99, 'UnitsSold': 800},
    {'Title': 'Deep Learning', 'Author': 'Ian Goodfellow', 'Publisher': "TechBooks", 'Price': 59.99, 'UnitsSold': 1200},
    {'Title': 'Data Science from Scratch', 'Author': 'Joel Grus', 'Publisher': "TechBooks", 'Price': 29.99, 'UnitsSold': 600}
]
df = pd.DataFrame(data)

# 將DataFrame註冊到DuckDB中
con = duckdb.connect()
con.register('sales', df)

# 執行分析查詢
query_total_revenue = """
SELECT SUM(Price * UnitsSold) AS total_revenue
FROM sales
WHERE Publisher = "TechBooks"
"""
total_revenue = con.execute(query_total_revenue).fetchall()[0][0]

print(total_revenue)

內容解密:

  1. 首先,我們建立了一個書籍銷售資料的DataFrame,包含書名、作者、出版社、價格和銷售數量等欄位。
  2. 將DataFrame註冊到DuckDB中,並命名為sales
  3. 執行了一個SQL查詢,計算TechBooks出版社的總收入。
  4. 使用fetchall()方法取得查詢結果,並列印出來。

DuckDB的優勢

DuckDB具有許多優勢,使其成為進行分散式資料處理的理想選擇。以下是一些主要的優勢:

  • 內嵌式設計:DuckDB是一種內嵌式資料函式庫,無需額外的伺服器或客戶端/伺服器架構。
  • 高效能:DuckDB具備出色的效能和可擴充套件性,使其非常適合用於分析型工作負載(OLAP)。
  • 易於使用:DuckDB提供了簡單易用的API,使開發者能夠輕鬆地將其整合到自己的應用程式中。

資料處理的最佳實踐:從DuckDB到Polars的高效能資料分析

在現代資料分析的世界中,高效能的資料處理工具變得越來越重要。從DuckDB的SQL介面到pandas的無縫轉換,再到Polars的高效能DataFrame函式庫,我們將探討如何利用這些工具來提升資料分析的效率。

DuckDB與Pandas的協同工作

DuckDB是一種專為線上分析處理(OLAP)設計的資料函式庫系統,它採用列向量化的方法來提高查詢效能。透過DuckDB,我們可以輕鬆地在SQL介面和pandas之間進行切換,如範例3-58所示。

範例3-58:呼叫df()函式

query_total_revenue = """
SELECT SUM(price * unitsSold) AS total_revenue
FROM sales
WHERE publisher = "TechBooks"
"""
df_total_revenue = con.execute(query_total_revenue).df()

內容解密:

  • 這個SQL查詢計算了TechBooks出版社的總銷售額。
  • SUM(price * unitsSold)計算了每筆交易的總金額並加總。
  • WHERE publisher = "TechBooks"篩選出只屬於TechBooks出版社的銷售記錄。

接著,我們可以使用任何Python中的資料視覺化函式庫來呈現結果,如範例3-59所示。

範例3-59:資料視覺化

# 建立一個長條圖
plt.bar("TechBooks", total_revenue)
# 設定圖表標題和軸標籤
plt.title("Total Revenue for TechBooks Books")
plt.xlabel("Publisher")
plt.ylabel("Total Revenue")

內容解密:

  • 使用plt.bar建立一個長條圖,顯示TechBooks出版社的總銷售額。
  • 設定圖表標題和軸標籤,以便更好地理解圖表內容。

為什麼選擇DuckDB?

傳統的關聯式資料函式倉管理系統(RDBMS)如Postgres和MySQL,由於其逐行處理的特性,在處理大量資料時會遇到效能瓶頸。DuckDB則透過其列向量化的設計,能夠有效地平行化磁碟I/O和查詢執行,從而獲得顯著的效能提升。

Polars:高效能的DataFrame函式庫

Polars是一個完全用Rust編寫的高效能DataFrame函式庫,它不使用索引來表示DataFrame,這使得DataFrame的操作更加直觀和高效。Polars利用Apache Arrow陣列進行內部資料表示,這相比於pandas使用的NumPy陣列,在載入時間、記憶體使用和計算上都有顯著優勢。

安裝Polars

要安裝Polars,可以使用pip套件管理器,如範例3-60所示。

範例3-60:安裝Polars

pip install polars

內容解密:

  • 這個命令會立即使Polars函式庫在我們的Python環境中可用。

使用Polars進行資料操作

範例3-62:Polars DataFrame—最暢銷書籍

# 根據UnitsSold欄位以降序排序DataFrame
top_selling_books = df.sort(by="UnitsSold", reverse=True)
# 取得最暢銷書籍的標題和銷量
top_books_data = top_selling_books.select(["Title", "UnitsSold"]).limit(5).to_pandas()
print("Top-selling TechBooks Books:")
print(top_books_data)

內容解密:

  • 使用sort方法根據UnitsSold欄位以降序排序DataFrame。
  • 使用select方法選擇最暢銷書籍的標題和銷量,並使用limit方法限制結果為前5名。
  • 最後,使用to_pandas()將結果轉換為pandas DataFrame,以便更容易地列印和顯示。

Polars 與 DuckDB 的無縫整合應用

Polars 能夠與外部函式庫(如 DuckDB)無縫整合,充分利用 SQL 功能。使用者可以將資料匯入 Polars 或從 DuckDB 匯入 pandas 資料框架,執行 SQL 查詢,並將 SQL 操作與 Polars 資料框架操作結合。這種整合提供了全面性的資料分析和操作生態系統,結合了 SQL 和 Polars 的優勢。

與 DuckDB 結合使用範例

程式碼範例

import polars as pl
import duckdb

# 建立 DuckDB 連線
con = duckdb.connect()

df = pl.DataFrame({
    'Title': ['Python for Data Analysis', 'Hands-On Machine Learning', 'Deep Learning', 'Data Science from Scratch'],
    'Author': ['Wes McKinney', 'Aurélien Géron', 'Ian Goodfellow', 'Joel Grus'],
    'Publisher': ["TechBooks", "TechBooks", "TechBooks", "TechBooks"],
    'Price': [39.99, 49.99, 59.99, 29.99],
    'UnitsSold': [1000, 800, 1200, 600]
})

# 將資料框架註冊為 DuckDB 中的表格
con.register('books', df)

# 執行 SQL 查詢
result = con.execute("SELECT Title, UnitsSold FROM books WHERE Publisher = 'O''Reilly'")

# 將結果轉換為 Polars 資料框架
result_df = pl.DataFrame(result, columns=['Title', 'UnitsSold'])

# 列印結果
print(result_df)

# 關閉 DuckDB 連線
con.close()

內容解密:

  1. 建立 DuckDB 連線:使用 duckdb.connect() 方法建立與 DuckDB 的連線,這是使用 DuckDB 功能的前提。
  2. 建立 Polars 資料框架:建立一個包含書籍資料的 Polars 資料框架,包括標題、作者、出版商、價格和銷售數量等資訊。
  3. 註冊資料框架至 DuckDB:使用 con.register() 方法將 Polars 資料框架註冊為 DuckDB 中的一個表格,使其能夠被 SQL 查詢存取。
  4. 執行 SQL 查詢:透過 con.execute() 方法執行 SQL 查詢,從註冊的表格中選取特定欄位並根據條件進行過濾。
  5. 結果轉換:將 SQL 查詢的結果轉換為 Polars 資料框架,便於進一步的操作和分析。
  6. 關閉連線:完成操作後,使用 con.close() 方法關閉與 DuckDB 的連線,釋放資源。

Polars 的原生 SQL 支援

Polars 提供原生支援以執行 SQL 查詢,無需依賴外部函式庫。使用者可以直接在程式碼中撰寫 SQL 查詢,利用 SQL 語法進行資料轉換、聚合和過濾操作。這使得使用者能夠在 Polars 框架內利用 SQL 的強大功能,提供一種方便且高效的結構化資料處理方法。

使用 SQLContext

首先,建立一個 SQL 上下文(SQLContext),為執行 SQL 查詢設定環境,如範例 3-64 所示。

df = pl.DataFrame({
    'Title': ['Python for Data Analysis', 'Hands-On Machine Learning', 'Deep Learning', 'Data Science from Scratch'],
    'Author': ['Wes McKinney', 'Aurélien Géron', 'Ian Goodfellow', 'Joel Grus'],
    'Publisher': ["TechBooks", "TechBooks", "TechBooks", "TechBooks"],
    'Price': [39.99, 49.99, 59.99, 29.99],
    'UnitsSold': [1000, 800, 1200, 600]
})

# 建立 SQL 上下文
sql = pl.SQLContext()

註冊資料框架

接著,將要查詢的資料框架註冊到 SQL 上下文中,如範例 3-65 所示。

# 將資料框架註冊到 SQL 上下文中
sql.register('df', df)

執行 SQL 查詢

註冊後,使用 Polars 提供的 execute() 方法執行 SQL 查詢,如範例 3-66 所示。

# 執行 SQL 查詢
result_df = sql.execute("""
    SELECT *
    FROM df
    WHERE Title = 'Python for Data Analysis'
""").collect()

圖表翻譯:

此流程展示瞭如何在 Polars 中使用 SQLContext 建立和執行 SQL 查詢。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title DuckDB Polars 高效能資料處理

package "資料庫架構" {
    package "應用層" {
        component [連線池] as pool
        component [ORM 框架] as orm
    }

    package "資料庫引擎" {
        component [查詢解析器] as parser
        component [優化器] as optimizer
        component [執行引擎] as executor
    }

    package "儲存層" {
        database [主資料庫] as master
        database [讀取副本] as replica
        database [快取層] as cache
    }
}

pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中

master --> replica : 資料同步

note right of cache
  Redis/Memcached
  減少資料庫負載
end note

@enduml

圖表翻譯: 此圖表展示了使用 Polars 和 SQLContext 處理資料的步驟,從建立資料框架到執行 SQL 查詢並取得結果的全過程。

結合 FugueSQL 的分散式資料處理

Fugue 提供了一個統一的介面,用於分散式計算,能夠讓使用者無縫地在 Spark、Dask 和 Ray 等分散式框架上執行 Python、pandas 和 SQL 程式碼。Fugue 的主要用途是平行化和擴充套件現有的 Python 和 pandas 程式碼,使其能夠在分散式框架上無縫執行,從而利用這些系統的可擴充套件性和效能優勢,而無需重寫大量程式碼。

FugueSQL:分散式資料處理的強大工具

Fugue 提供了一個獨特的功能,稱為 FugueSQL,讓使用者能夠透過先進的 SQL 介面,在 pandas、Spark 和 Dask DataFrames 上定義端對端的流程。它結合了熟悉的 SQL 語法和呼叫 Python 程式碼的能力,為使用者提供了一個強大的工具來簡化和自動化資料處理流程。

FugueSQL 的優勢

FugueSQL 提供了多種好處,可以在多種場景中發揮作用,包括作為 Fugue 專案整體目標的一部分進行平行程式碼執行,或是在單機上進行獨立查詢。無論是在分散式系統上工作還是在本地機器上進行資料分析,它都能讓我們高效地查詢 DataFrames。

安裝 Fugue 和 FugueSQL

我們有多種方式來安裝 Fugue,取決於作業系統和安裝型別。可以使用 pip install 來安裝 Fugue。

安裝 Fugue

pip install fugue

Fugue 提供了多種安裝擴充功能,以增強其功能並支援不同的執行引擎和資料處理函式庫。這些擴充功能包括:

  • sql:啟用 FugueSQL 支援。若要安裝此擴充功能,請執行以下指令:

    pip install "fugue[sql]"
    
  • spark:新增對 Spark 作為 Fugue 中的 ExecutionEngine 的支援。安裝此擴充功能後,使用者可以利用 Spark 的功能來執行 Fugue 工作流程。

    pip install "fugue[spark]"
    
  • daskrayduckdbpolarsibiscpp_sql_parser 等其他擴充功能,分別為 Fugue 新增了對不同執行引擎和資料處理函式庫的支援。

同時安裝多個擴充功能

我們可以在一個 pip install 命令中安裝多個擴充功能。例如,要安裝 duckdbpolarsspark 擴充功能,可以執行以下指令:

pip install "fugue[duckdb,spark,polars]"

安裝 Jupyter Notebook 擴充功能

FugueSQL 有一個針對 Jupyter Notebooks 和 JupyterLab 的筆記本擴充功能,提供了語法高亮顯示。要安裝此擴充功能,可以執行以下指令:

pip install fugue-jupyter
fugue-jupyter install startup

使用 FugueSQL 執行 SQL 查詢

FugueSQL 是專為想要使用 Python DataFrames(如 pandas、Spark 和 Dask)的 SQL 使用者設計的。它提供了一個 SQL 介面,可以解析並在所選的底層引擎上執行。這對於喜歡專注於定義邏輯和資料轉換,而不是處理執行複雜度的資料科學家和分析師來說尤其有益。

使用 FugueSQL 的好處

  • 簡化資料處理:FugueSQL 讓 SQL 使用者能夠在流行的資料處理引擎(如 pandas、Spark 和 Dask)上定義端對端的流程,從而簡化了複雜資料管道的協調。
  • 相容標準 SQL 語法:雖然 FugueSQL 支援非標準 SQL 命令,但它與標準 SQL 語法完全相容,確保 SQL 使用者可以無縫地切換到 Fugue,利用現有的 SQL 知識和技能。

#### 內容解密:

此段落主要介紹了FugueSQL的使用方式與好處。FugueSQL是一種能夠讓使用者使用SQL介面來操作Python DataFrame的工具,例如pandas、Spark和Dask。它能夠簡化資料處理流程,並且與標準SQL語法相容。