返回文章列表

PySpark欄位操作與PandasAPI整合應用

本文探討使用 PySpark 操作欄位,包含新增、刪除、重新命名,以及如何結合 Pandas API 進行更簡潔的資料處理。同時,文章也涵蓋了自訂函式、StructType、when 函式等技巧,並提供資料驗證、轉換和最佳實務的建議,讓開發者更有效率地運用 PySpark 處理大規模資料集。

大資料 資料工程

PySpark 提供強大的欄位操作功能,讓開發者能靈活處理資料。除了基本的 withColumn 和 udf 之外,利用 StructType 和自訂函式可以一次建立多個欄位,提升程式碼的簡潔性。搭配 when 函式,更能簡化條件判斷的程式碼撰寫。此外,善用 drop 和 withColumnRenamed 方法,輕鬆管理欄位結構。整合 Pandas API on Spark 後,更方便熟悉 Pandas 語法的開發者無縫轉移到分散式運算環境,大幅提升開發效率。

在實際應用中,理解資料驗證和轉換的機制,能避免不必要的效能損耗。同時,掌握最佳實務,例如使用執行計畫、檢查點機制、避免單一分割槽計算和保留重複欄位名稱等,能有效最佳化 Spark 程式碼,提升大規模資料處理的效能。這些技巧對於處理大型資料集至關重要,能幫助開發者建構更穩健且高效的資料處理流程。

使用正規表示式過濾匹配

前面的表示式也可以重寫如下:

df.filter(df.title.contains('ove')).show()

在某些情況下,你會遇到數千個欄位,並希望根據特定的字首或字尾識別或子集欄位。這可以透過 colRegex 函式實作。首先,識別以“re”開頭的變數(參見圖 2-20)。

df.select(df.colRegex("`re\w*`")).printSchema()

圖表翻譯:

此圖示呈現瞭如何使用正規表示式過濾欄位,識別出以特定字首“re”開頭的變數。

那麼,如何識別以特定字尾結尾的變數呢?在這裡,你有三個以“e”結尾的變數;讓我們透過調整前面的正規表示式來識別它們(參見圖 2-21)。

df.select(df.colRegex("`\w*e`")).printSchema()

圖表翻譯:

此圖示說明瞭如何使用正規表示式根據字尾過濾欄位,成功識別出以“e”結尾的變數。

正規表示式可能會令人困惑,因此讓我們簡要複習一下。一般來說,你可以使用元字元來識別所有字元和數字。正規表示式使用這些元字元來匹配任何文字或字母數字表達式。以下是一些最常用的元字元列表。

單個字元:

  • \d – 識別0到9之間的數字
  • \w – 識別所有大寫和小寫字母以及0到9之間的數字 [A–Z a–z 0–9]
  • \s – 空白字元
  • . – 任意字元

你還有一組量詞,用於指導搜尋多少個字元,如下所示。 量詞:

  • * – 0或更多字元
  • + – 1或更多字元
  • ? – 0或1個字元
  • {min,max} – 指定範圍,包括最小值和最大值在內的所有字元
  • {n} – 正好n個字元

如果你回顧前面的兩個表示式,你會看到我們使用了 \w,表示我們想要匹配所有字母數字字元,忽略大小寫。在我們想要識別具有“re”字首的變數的表示式中,\w 後面跟著量詞 *,表示我們想要包含“re”之後的所有字元。

建立新欄位

建立新特徵(欄位)在許多分析應用中扮演著關鍵角色。Spark 提供了多種方法來建立新欄位。最簡單的方法是透過 withColumn 函式。假設你想要計算流行度的變異數。變異數是一種衡量數字與平均值之間的差異程度的指標,其公式如下:

方差計算公式:

[ S^2 = \frac{\sum (x_i - \bar{x})^2}{n-1} ] 其中:

  • ( x_i ) – 個別觀察值
  • ( \bar{x} ) – 平均值
  • ( n ) – 總觀察次數

首先,讓我們使用以下命令計算平均值。這裡使用的 agg 函式比使用 describe 更好,當你正在尋找特定的統計資料時:

# 計算流行度的平均值
mean_pop = df.agg({'popularity': 'mean'}).collect()[0]['avg(popularity)']
count_obs = df.count()

讓我們使用 withColumnlit 函式將其新增到所有行中,因為我們需要這個值以及個別觀察值。lit 函式是一種與列文字互動的方式。當你想要建立一個具有直接值的列時,它非常有用。

df = df.withColumn('mean_popularity', lit(mean_pop))

內容解密:

  1. df.agg({'popularity': 'mean'}):計算 popularity 欄位的平均值。
  2. .collect()[0]['avg(popularity)']:從聚合結果中提取平均值。
  3. df.withColumn('mean_popularity', lit(mean_pop)):建立一個新的欄位 mean_popularity,並將平均值填充到每一行。

現在,mean_popularity 列對所有行具有相同的值。下面的命令中的 pow 函式幫助你將數字提升到某個冪次。這裡,你需要對數字求平方,因此傳遞2作為引數。

# 計算 'popularity' 欄位的變異數
df = df.withColumn('variance', pow((df['popularity'] - df['mean_popularity']), 2))
variance_sum = df.agg({'variance': 'sum'}).collect()[0]['sum(variance)']

內容解密:

  1. pow((df['popularity'] - df['mean_popularity']), 2):計算每個 popularity 值與平均值之間的差異的平方。
  2. df.agg({'variance': 'sum'}):對所有行的 variance 欄位求和。
  3. .collect()[0]['sum(variance)']:提取方差的總和。

最後一步,將總和除以觀察次數,即可得出結果。

variance_population = variance_sum / (count_obs - 1)
print(variance_population)

輸出結果為:

37.85868805766277

雖然這個變異數計算涉及多個步驟,但其目的是讓你瞭解建立新特徵時可能涉及的操作。一旦你有了變異數的值,就可以根據它建立自定義變數。

使用自定義函式建立多個新欄位

如果你有多個變數需要建立,如何一次性完成並保持良好的結構?首先,定義高、中、低閾值。以下程式碼定義了一個簡單的 Python 函式。它將兩個變數傳入函式,並根據閾值決定傳回新的變數,這兩個新的變數都是字串。

def new_cols(budget, popularity):
    if budget < 10000000:
        budget_cat = 'Small'
    elif budget < 100000000:
        budget_cat = 'Medium'
    else:
        budget_cat = 'Big'
    if popularity < 3:
        ratings = 'Low'
    elif popularity < 5:
        ratings = 'Mid'
    else:
        ratings = 'High'
    return budget_cat, ratings

內容解密:

  1. 此函式根據預算和流行度的閾值傳回兩個分類別結果,分別是預算規模和評級。
  2. 分類別邏輯清晰簡潔,便於理解。

現在,讓我們使用自定義函式。自定義函式允許開發者使用 DataFrame 中的行值作為輸入,並可以將其對映到整個 DataFrame。一個關鍵的事實是,你必須指定輸出資料型別。不幸的是,這是無法避免的。當你有多個列需要寫出時,這可能會很麻煩。我們在之後會討論另一種替代方法。

在命令中,你定義了自定義函式。注意,我們建立了 StructType 來儲存新的變數。我們還明確定義了將傳回的資料型別。我們知道兩個傳回型別都是字串,因此使用 StringType()

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("budget_cat", StringType(), True),
    StructField("ratings", StringType(), True)
])

udf_new_cols = udf(new_cols, schema)
df = df.withColumn("new_cols", udf_new_cols("budget", "popularity"))

內容解密:

  1. 定義了一個包含兩個欄位的 StructType,用於儲存自定義函式的傳回結果。
  2. 使用 udf 將 Python 函式轉換為 Spark 的自定義函式,並應用於 DataFrame。

這種方法可以有效地建立多個新欄位,並保持程式碼結構清晰。

使用 PySpark 進行欄位操作與 Pandas API 整合應用

在處理大規模資料集時,PySpark 提供了豐富的 API 來進行資料操作與轉換。本篇文章將探討如何使用 PySpark 進行欄位的新增、刪除與重新命名,並介紹最新的 Pandas API on Spark 功能。

自訂函式與 StructType 應用

在某些情況下,我們需要對多個欄位進行複雜的邏輯判斷,並將結果儲存在新的欄位中。PySpark 允許我們定義自訂函式(UDF)來實作這一點。以下是一個範例:

from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType

# 定義自訂函式
def new_cols(budget, popularity):
    # 邏輯判斷
    budget_cat = 'Small' if budget < 10000000 else 'Medium' if budget < 100000000 else 'Big'
    ratings = 'Low' if popularity < 3 else 'Mid' if popularity < 5 else 'High'
    return (budget_cat, ratings)

# 建立 UDF
udfB = udf(new_cols, StructType([
    StructField("budget_cat", StringType(), True),
    StructField("ratings", StringType(), True)
]))

# 應用 UDF
temp_df = df.select('id', 'budget', 'popularity').withColumn("newcat", udfB("budget", "popularity"))

# 將 StructType 欄位拆分為獨立欄位
df_with_newcols = temp_df.select(
    'id', 'budget', 'popularity',
    temp_df.newcat.getItem('budget_cat').alias('budget_cat'),
    temp_df.newcat.getItem('ratings').alias('ratings')
).drop('newcat')

df_with_newcols.show(15, False)

內容解密:

  1. 定義自訂函式new_cols 函式根據預算和熱度進行分類別判斷,傳回兩個值。
  2. 建立 UDF:使用 udfnew_cols 函式轉換為 PySpark 可用的 UDF,並指定傳回的資料結構為 StructType
  3. 應用 UDF:將 UDF 應用於 DataFrame 的特定欄位,生成新的 StructType 欄位 newcat
  4. 拆分 StructType:使用 getItem 方法將 newcat 中的欄位拆分為獨立的欄位。

使用 when 函式簡化欄位操作

除了使用 UDF,我們還可以利用 when 函式來簡化條件判斷:

from pyspark.sql.functions import when

df_with_newcols = df.select('id', 'budget', 'popularity') \
    .withColumn('budget_cat', when(df['budget'] < 10000000, 'Small')
                .when(df['budget'] < 100000000, 'Medium')
                .otherwise('Big')) \
    .withColumn('ratings', when(df['popularity'] < 3, 'Low')
                .when(df['popularity'] < 5, 'Mid')
                .otherwise('High'))

內容解密:

  1. when 函式的使用:逐步進行條件判斷並指定,語法簡潔且無需指定輸出資料型別。
  2. 多條件鏈式呼叫:透過鏈式呼叫 whenotherwise 實作多條件判斷。

刪除與重新命名欄位

PySpark 提供了 dropwithColumnRenamed 方法來進行欄位的刪除與重新命名:

# 刪除欄位
columns_to_drop = ['budget_cat']
df_with_newcols = df_with_newcols.drop(*columns_to_drop)

# 重新命名欄位
df_with_newcols = df_with_newcols.withColumnRenamed('id', 'film_id') \
                                  .withColumnRenamed('ratings', 'film_ratings')

內容解密:

  1. drop 方法:用於刪除指定的欄位,可傳入多個欄位名稱。
  2. withColumnRenamed 方法:用於重新命名欄位名稱,提升資料的可讀性。

Pandas API on Spark 的應用

PySpark 近期整合了 Pandas API,讓熟悉 Pandas 的使用者能夠無縫轉移到分散式運算環境:

import pyspark.pandas as ps

# 使用 Pandas API 讀取 CSV
df_pd_distributed = ps.read_csv("movie_data_part1.csv", sep="|")
df_pd_distributed[['budget', 'original_title', 'popularity']].head()

圖表翻譯:

此範例展示瞭如何使用 Pandas API on Spark 讀取 CSV 檔案並進行資料檢視。Pandas API on Spark 結合了 Pandas 的易用性和 Spark 的分散式運算能力。

進階資料操作與視覺化

本章節將探討PySpark中的進階函式應用。為了加強理解,建議複習前一章節的內容,並在任意資料集上嘗試下列操作。我們的重點將放在視窗函式及其他對於開發和應用Spark程式至大型資料集至關重要的主題。隨著章節的結束,我們還將介紹視覺化和機器學習流程,為無縫過渡到後續的機器學習章節做好準備。

本章涵蓋以下主題:

  • 額外資料操作
  • 資料視覺化
  • 機器學習概述

額外資料操作

首先,讓我們回顧一些未在前一章節中涵蓋的資料操作。

字串函式

在本文中,我們重新執行第二章中執行的命令。

# 重新執行以下命令以重新建立budget_cat變數
df_with_newcols = df.select('id', 'budget',' popularity').\
withColumn('budget_cat', when(df['budget']<10000000,'Small').when(df['budget']<100000000,'Medium').otherwise('Big')).\

內容解密:

此段程式碼首先選擇了資料框中的idbudgetpopularity三個欄位。然後,使用withColumn方法新增了一個名為budget_cat的欄位。該欄位根據budget的值被分為三類別:小(‘Small’)、中(‘Medium’)和大(‘Big’)。具體分類別規則為:預算小於1000萬被歸類別為’Small’,預算在1000萬至1億之間被歸類別為’Medium’,而預算超過1億則被歸類別為’Big’。

資料驗證與轉換

在大多數場景中,當呼叫API時,PySpark和Spark上的Pandas API之間的型別轉換是自動的。您可以比較轉換前後的DataFrame,使用dtypes方法進行比較。同樣地,當將Spark上的Pandas API轉換為PySpark時,也是如此。

Spark上的Pandas API與pandas之間的型別轉換也非常相似。但需要注意的是,某些資料型別是pandas所獨有的,例如:

  • pd.SparseDtype
  • pd.DatetimeTZDtype
  • pd.UInt*Dtype
  • pd.BooleanDtype
  • pd.StringDtype

這些型別尚未計劃納入Spark。

資料操作最佳實踐

為了避免無意的低效率,使用者必須意識到一些最佳實踐:

  • 使用執行計畫來瞭解資料將如何被處理;您可以透過呼叫.spark.explain()方法來檢視執行計畫。
  • 在對Spark上的Pandas API物件進行多次操作後,由於龐大而複雜的計畫,底層的Spark規劃器可能會變慢。如果Spark計畫變得龐大或需要很長時間,可以使用DataFrame.spark.checkpoint()DataFrame.spark.local_checkpoint()來幫助最佳化。
  • 由於資料交換,像排序值這樣的操作在平行或分散式環境中比在單一記憶體機器上更具挑戰性。
  • 避免在單一分割槽上進行計算。如果您正在使用排名(rank)而不指定分割槽,則可能會將所有資料放入單一分割槽中的單一機器上。
  • 避免使用保留或重複的欄位名稱。
  • Spark上的Pandas API預設禁止對不同的DataFrame(或Series)進行操作,以防止昂貴的操作。減少對不同DataFrame或Series的操作。