返回文章列表

Dask 與 Ray 資料框架高階技巧與效能調校

本文探討 Dask 和 Ray 兩個 Python 分散式計算框架的資料框架高階操作技巧,包含自動分割槽、DataFrame 合併、效能最佳化以及實務案例。同時比較 Dask 與 Ray 的特性與適用場景,並提供程式碼範例和圖示說明,幫助讀者瞭解如何有效運用這些技術處理大規模資料集。

大資料 資料科學

Dask 和 Ray 作為 Python 分散式計算框架,在處理大規模資料方面各有千秋。Dask 的 map_partitions 函式能有效平行處理 DataFrame 分割槽,簡化複雜操作;repartition 函式則能根據目標大小自動調整分割槽,最佳化資料讀寫效能。理解 Dask 的 hash、broadcast、partitioned 和 stack_partitions 等合併技術,能進一步提升資料處理效率。Ray 則以其高效能和靈活性著稱,尤其在低延遲應用場景中表現出色。Ray Datasets 提供 map_batchesgroupby 和聚合等操作,方便進行資料轉換和分析。此外,Ray 能與 Pandas DataFrame 整合,並支援批次處理,讓使用者更彈性地操作資料。選擇 Dask 或 Ray 取決於具體需求,例如 Dask 適合已建立 Spark 環境的專案,而 Ray 則更注重速度和低延遲。

Dask 資料框架高階技巧

自動分割槽

在處理 Dask 資料框架時,您可能會遇到需要指定分割槽數量或特定分割的情況。然而,Dask 提供了 repartition 函式,可以根據目標大小自動選擇分割槽。這樣的操作雖然不是沒有成本,但確實能夠簡化工作流程。以下是如何讓 Dask 根據位元組數計算分割槽的範例:

reparted = indexed.repartition(partition_size="20kb")

需要注意的是,雖然 set_index 函式也有一個類別似的 partition_size 引數,但截至目前還無法正常執行。

檔案結構

當您寫入 DataFrame 時,每個分割槽都會被分配到不同的檔案中。這樣的結構有時候會導致檔案過大或過小。某些工具只能接受單一檔案作為輸入,或者資料儲存系統可能對特設定檔案大小有最佳化需求,例如 Hadoop 分散式檔案系統 (HDFS) 的預設區塊大小為 128 MB。幸運的是,您可以使用 repartitionset_index 函式來調整所需的輸出結構。

傻瓜式平行操作

Dask 的 map_partitions 函式可以將函式應用到根據 pandas DataFrame 的每個分割槽上,結果也是一個 pandas DataFrame。這些函式實作了傻瓜式平行操作,因為它們不需要任何工作之間的資料傳輸。在傻瓜式平行問題中,分散式計算和通訊的開銷非常低。

以下是如何使用 map_partitions 函式來填補缺失值的範例:

def fillna(df):
    return df.fillna(value={"PostCode": "UNKNOWN"}).fillna(value=0)

new_df = df.map_partitions(fillna)
new_df.clear_divisions()

您不僅限於呼叫 pandas 內建函式。只要您的函式接收並傳回 DataFrame,您就可以在 map_partitions 中做您想要做的任何事情。

內容解密:

這段程式碼展示瞭如何使用 Dask 的 map_partitions 函式來處理 DataFrame 中的缺失值。首先,我們定義了一個名為 fillna 的函式,這個函式接受一個 DataFrame 並傳回一個處理後的 DataFrame。具體來說,它會將 "PostCode" 欄位中的缺失值填充為 "UNKNOWN",並將其他欄位中的缺失值填充為 0。

接著,我們使用 df.map_partitions(fillna) 將這個函式應用到每個分割槽上。最後,我們呼叫 new_df.clear_divisions() 清除分割槽/分割資訊,因為在處理缺失值時可能會有 NA 值存在於索引中。

處理多個 DataFrame

pandas 和 Dask 提供了四種常見的結合 DataFrame 的方法:concatjoinmergeappend。這些方法在不同情況下有不同的效能考量。

  • concat:允許在任何軸上結合 DataFrame。
  • joinmerge:實作特定情況下的常見結合方式。
  • append:類別似於 SQL UNION 的行結合方式。

Dask 的 joinmerge 函式接受大部分標準 pandas 引數,此外還有一個額外的可選引數 npartitions,用於指定目標輸出分割槽數量(僅適用於 hash-based joins)。這些函式會自動重新分配輸入 DataFrame 以達成最佳效能。

小段落標題

當您需要結合多個 DataFrame 時,Dask 提供了多種方法來最佳化效能。例如,當您使用多個 DataFrame 進行 join 操作時,Dask 會自動重新分配輸入 DataFrame 以達到最佳效能。這張圖示展示瞭如何使用 Dask 的 join 或 concatenate 操作來結合多個 DataFrame。

處理行基礎結合與列基礎結合

當您使用 Dask 處理行基礎結合(類別似於 SQL UNION)時,效能取決於結合 DataFrames 的分割是否排序良好。Dask 在所有已知分割下處理行基礎結合作為一種元資料變更而不會進行任何洗牌操作。

列基礎結合(類別似於 SQL JOIN)也有相關限制和效能考量。Dask 支援內部和完整外部連線(inner 和 full outer join),但不支援左或右連線(left 或 right join)。

小段落標題

當您使用 Dask 處理行基礎結合時,分割是否排序良好會影響效能。如果所有分割都已知且排序良好(即沒有重疊),則 Dask 只進行元資料變更而不需要洗牌操作。反之則需要洗牌操作。此圖示展示了不同情況下的處理方式。

資料框架內部技術

Dask 使用四種技術來結合 DataFrames:hash、broadcast、partitioned 和 stack_partitions。每種技術都有不同的效能表現和適用條件:

  • Hash Joins:當沒有其他適合的 join 技術時使用。
  • Broadcast Joins:當其中一個 DataFrame 比其他都小時使用。
  • Partitioned Joins:當所有輸入都已知 partitioner 時使用。
  • Stack Partitions:用於快速處理行基礎組合。

理解這些技術及其適用條件對於最佳化 Dask 操作非常重要。

小段落標題

Dask 提供四種技術來結合 DataFrames:hash joins、broadcast joins、partitioned joins 和 stack partitions。每種技術都有不同的效能表現和適用條件。瞭解這些技術及其適用條件對於最佳化 Dask 操作非常重要。此圖示展示了各種技術及其效能表現之間的關係。

玄貓透過實際案例與深度解析,幫助讀者理解 Dask 在處理大型資料集時的高階技巧及其背後的技術原理。希望這些內容能夠幫助您更有效地利用 Dask 技術來解決實際問題。

資料框架合併與Dask的最佳實踐

在處理大規模資料時,資料框架的合併操作是一項關鍵技術。Dask作為一個分散式計算框架,提供了多種方式來進行資料框架的合併操作,這些操作的選擇會直接影響到計算效率和資源利用率。以下將探討Dask中不同型別的合併操作及其最佳實踐。

合併操作型別

Dask提供了多種合併操作,包括雜湊合併(hash join)、廣播合併(broadcast join)、分割槽合併(partitioned join)以及堆積疊分割槽(stack_partitions)。每種操作都有其特定的應用場景和優缺點。

雜湊合併(Hash Join)

雜湊合併是最常見且靈活的合併方式,適用於沒有明確分割槽邏輯的資料框架。它透過對鍵進行雜湊運算來實作資料的比對,但由於需要進行大量資料交換,效率相對較低。

廣播合併(Broadcast Join)

廣播合併適用於將大資料框架與小資料框架進行合併。Dask會將小資料框架分發到所有工作節點上,以減少資料交換的開銷。這種方式要求小資料框架能夠完全放入記憶體中。

分割槽合併(Partitioned Join)

當資料框架具有已知的分割槽邏輯時,分割槽合併是更高效的選擇。Dask能夠根據已知的分割槽資訊進行資料對齊,減少不必要的資料交換。

堆積疊分割槽(Stack Partitions)

堆積疊分割槽是一種不涉及資料移動的合併方式,結果是上游分割槽列表的聯集。這種方式適用於行基礎的組合操作,但需注意可能會產生過多的分割槽。

最佳實踐

在實際應用中,選擇合適的合併操作對於提升計算效率至關重要。以下是一些具體建議:

  1. 事先處理資料:在進行合併之前,盡量對資料進行預處理,例如設定索引和分割槽。這樣可以避免昂貴的雜湊合併。

  2. 優先使用廣播和分割槽合併:這兩種方法通常比雜湊合併更高效,應盡量優先考慮。

  3. 控制分割槽數量:使用堆積疊分割槽時,可能會產生過多的分割槽,應注意及時進行重新分割槽操作。

  4. 避免頻繁地修改索引:頻繁地重置索引會影響計算效率,應盡量保持索引的一致性。

非程式碼主題特殊處理

除了技術細節外,實務案例和具體資料支援也是必不可少的。例如:

  • 案例一:金融資料處理

在金融領域中,經常需要對大量交易記錄進行聚合計算和異常檢測。如果使用Dask進行交易記錄的合併操作時,可以選擇將較小的異常交易記錄廣播到所有工作節點上,從而提升查詢效率。

  • 案例二:電子商務推薦系統

在電子商務推薦系統中,通常需要對使用者行為日誌和商品目錄進行連線。如果這些日誌有明確的時間戳標記,可以將日誌根據時間戳進行分割槽處理,然後使用分割槽合併來提升連線速度。

圖示解說

以下是一個簡單的流程圖示說明瞭不同型別合併操作的選擇過程:

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Dask 與 Ray 資料框架高階技巧與效能調校

package "Pandas 資料處理" {
    package "資料結構" {
        component [Series
一維陣列] as series
        component [DataFrame
二維表格] as df
        component [Index
索引] as index
    }

    package "資料操作" {
        component [選取 Selection] as select
        component [篩選 Filtering] as filter
        component [分組 GroupBy] as group
        component [合併 Merge/Join] as merge
    }

    package "資料轉換" {
        component [重塑 Reshape] as reshape
        component [透視表 Pivot] as pivot
        component [聚合 Aggregation] as agg
    }
}

series --> df : 組成
index --> df : 索引
df --> select : loc/iloc
df --> filter : 布林索引
df --> group : 分組運算
group --> agg : 聚合函數
df --> merge : 合併資料
df --> reshape : melt/stack
reshape --> pivot : 重新組織

note right of df
  核心資料結構
  類似 Excel 表格
end note

@enduml

此圖示解說:

  • 當面對小規模資料時,選擇廣播合併。
  • 當面對大規模資料且具有已知分割槽資訊時,選擇分割槽合併。
  • 當面對大規模資料且沒有已知分割槽資訊時,選擇雜湊合佈。

使用Ray進行高效資料處理

在現代資料處理中,Ray是一個強大的分散式計算框架,能夠有效地處理大規模資料。Ray的設計目標是提供一個簡單且高效的API,讓開發者能夠輕鬆地進行分散式計算。本文將探討Ray的核心功能及其在資料處理中的應用。

Ray與Dask的比較

雖然Modin on Ray與Dask DataFrames有許多相似之處,但Ray的設計理念和實作方式使其在某些情況下更具優勢。以下是一些關鍵點:

  • 開發週期:從檔案上來看,Dask似乎在開發週期上更為保守,但這並不代表Dask的技術成熟度較低。事實上,Dask的檔案更為穩健,這對於大規模應用來說是一種優勢。
  • 資料處理速度:Ray的設計目標是提供極高的計算效率,特別是在需要快速回應和低延遲的應用場景中。

使用Spark進行大資料處理

如果你已經擁有一個成熟的大資料基礎設施(例如Apache Hive、Iceberg或HBase),Spark將是一個不錯的選擇。Spark具有許多最佳化技術,如filter push-down,能夠顯著提升處理效能。此外,Spark的DataFrame介面更符合傳統大資料處理的需求。

Spark的優勢

  • 生態系統:Spark作為一個Java基礎的工具,具有豐富的Python API,能夠無縫地融入到傳統的大資料生態系統中。
  • 格式支援:Spark支援最廣泛的格式和檔案系統,這使得它成為許多資料處理管道初期階段的首選。
  • 學習資源:有許多學習Spark的資源,包括O’Reilly出版的《Learning Spark》、《High Performance Spark》和《Spark: The Definitive Guide》。

本地工具的使用

有些工具並不適合分散式操作。如果你的資料集經過過濾後足夠小,可以將其轉換為本地記憶體格式。以下是一些常見的方法:

  • 記憶體中轉換:如果整個資料集可以放入記憶體中,使用to_pandasto_arrow是最簡單的轉換方式。
  • 批次處理:對於較大的物件,每個分割槽可能可以放入記憶體中但整個資料集可能不能,使用iter_batches可以一個分割槽一個分割槽地消耗。

批次處理示例

def process_batch(batch):
    # 假設batch是一個Pandas DataFrame
    # 在這裡進行資料處理
    return batch

# 使用iter_batches進行批次處理
for batch in ray_dataset.iter_batches(batch_format="pandas", batch_size=1000):
    processed_batch = process_batch(batch)
    # 進一步處理processed_batch

內容解密:

  • batch_format引數:指定批次格式(如pandas或pyarrow),以便在批次處理中使用適當的格式。
  • batch_size引數:指定每個批次的大小,以控制每次讀取和處理的資料量。
  • 逐行解說:這段程式碼展示瞭如何使用iter_batches來逐批次地處理資料集中的每個分割槽。首先定義了一個process_batch函式來處理每個批次。接著使用iter_batches來遍歷所有批次並逐一呼叫process_batch函式進行處理。

Ray內建資料集操作

除了支援行動資料之外,Ray還提供了一些內建操作。Ray Datasets並不試圖模仿任何現有API,而是提供基本建構模組以滿足特定需求。

基本操作

  • map_batches:這是最核心的一個操作。它允許你在每個批次上執行自定義函式並生成新資料集。
  • groupBys和聚合:Ray最近增加了這些功能來支援更複雜的資料操作。

map_batches示例

def tokenize_batch(batch):
    nested_tokens = map(lambda s: s.split(" "), batch)
    nr = []
    for r in nested_tokens:
        nr.extend(r)
    return nr

def pair_batch(batch):
    return list(map(lambda w: (w, 1), batch))

def filter_for_interesting(batch):
    return list(filter(lambda wc: wc[1] > 1, batch))

words = pages.map_batches(tokenize_batch).map_batches(pair_batch)
grouped_words = words.groupby(lambda wc: wc[0])
interesting_words = grouped_words.map_batches(filter_for_interesting)

內容解密:

  • tokenize_batch函式:將每個批次中的字串拆分成詞語並傳回拆分後的結果。
  • pair_batch函式:將每個詞語轉換成元組形式(詞語, 1),以便進行計數。
  • filter_for_interesting函式:過濾掉只出現一次的詞語。
  • 逐行解說:這段程式碼展示瞭如何使用map_batches來對每個批次進行詞語拆分、計數和過濾操作。首先定義了一系列函式來實作不同步驟的操作。接著使用map_batches來逐步應用這些函式到整個資料集上。

Ray與Pandas結合

Ray還支援將資料轉換為Pandas DataFrame進行進一步處理。以下是一個範例:

def update_empsize_to_median(df):
    def to_median(value):
        if " to " in value:
            f , t = value.replace(",", "").split(" to ")
            return (int(f) + int(t)) / 2.0
        elif "Less than" in value:
            return 100
        else:
            return 10000
    df["EmployerSize"] = df["EmployerSize"].apply(to_median)
    return df

ds_with_median = ds.map_batches(update_empsize_to_median, batch_format="pandas")

內容解密:

  • update_empsize_to_median函式:將員工規模欄位轉換為中位值。
  • to_median子函式:根據員工規模欄位中的字串值計算中位值。
  • 逐行解說:這段程式碼展示瞭如何使用map_batches將Pandas DataFrame中的員工規模欄位轉換為中位值。首先定義了一個更新員工規模欄位值的函式。接著使用map_batches來應用這個更新函式到整個資料集上。

安裝額外函式庫

Ray Datasets沒有內建支援安裝額外函式庫,但可以透過map_batches和任務來實作。以下是一個範例:

def extract_text_for_batch(sites):
    text_futures = map(lambda s: extract_text.remote(s), sites)
    result = ray.get(list(text_futures))
    if result is None:
        return []
    return result

def tokenize_batch(texts):
    token_futures = map(lambda s: tokenize.remote(s), texts)
    result = ray.get(list(token_futures))
    if result is None:
        return []
    nr = []
    for r in result:
        nr.extend(r)
    return nr

內容解密:

  • extract_text_for_batch函式:從網站中提取文字並傳回結果。
  • tokenize_batch函式:對提取到的文字進行詞語拆分並傳回結果。
  • 逐行解說:這段程式碼展示瞭如何使用map_batches和遠端任務來安裝和使用額外函式庫(如HTML解析函式庫)進行文字提取和詞語拆分。

Ray在實務中的應用

在實務中,Ray 的靈活性和高效性使其成為許多企業級應用中的首選工具。無論是大規模機器學習訓練、實時資料流處理還是複雜分析工作負載都能獲得良好的表現。

未來趨勢預測

隨著AI與機器學習技術不斷進步,Ray 作為一種高效、可擴充套件且易於使用 的分散式計算框架,未來必將在更多領域發揮重要作用。