返回文章列表

PySpark資料處理與視覺化技術

本文探討PySpark的實用函式與視覺化技術,涵蓋字串函式、SQL查詢、視窗函式等,並以實際案例示範如何應用這些技術進行資料分析。此外,文章也介紹了資料取樣方法、資料儲存與合併技術,以及PySpark與Pandas之間的資料轉換,提供全面的PySpark資料處理。

大資料 資料分析

PySpark 提供了豐富的函式和工具,方便開發者進行大資料處理和分析。本文除了介紹基本的字串操作、SQL 查詢和視窗函式外,也探討了更進階的應用,例如使用視窗函式計算排名和收入差異。同時,文章也涵蓋了資料取樣技術,包括簡單隨機取樣和分層取樣,以及如何有效地儲存和合併資料,並說明瞭如何在 PySpark 和 Pandas 之間進行資料轉換,以滿足不同資料處理的需求。

深入理解PySpark中的實用函式與視覺化技術

PySpark為大資料處理提供了豐富的功能和靈活的API,涵蓋了資料處理、轉換、分析等眾多方面。在本章中,我們將重點探討PySpark中的實用函式,包括字串函式、SQL查詢、視窗函式等,並透過具體例項來展示如何有效地使用這些功能。

字串函式的應用

在資料處理過程中,經常需要對字串資料進行操作,如拼接、大小寫轉換、去除空白等。PySpark提供了多種字串函式來滿足這些需求。

# 使用withColumn函式新增ratings欄位
df_with_newcols = df_with_newcols.withColumn('ratings', 
                                            when(df['popularity'] < 3, 'Low')
                                            .when(df['popularity'] < 5, 'Mid')
                                            .otherwise('High'))

# 將budget_cat和ratings欄位合併為新的欄位BudgetRating_Category
df_with_newcols = df_with_newcols.withColumn('BudgetRating_Category', 
                                            concat(df_with_newcols.budget_cat, df_with_newcols.ratings))

# 將新的欄位轉換為小寫並去除空白
df_with_newcols = df_with_newcols.withColumn('BudgetRating_Category', 
                                            trim(lower(df_with_newcols.BudgetRating_Category)))

內容解密:

  1. withColumn函式:用於在DataFrame中新增或替換一個欄位。
  2. whenotherwise函式:用於根據條件進行邏輯判斷,這裡根據popularity的值來決定ratings的值。
  3. concat函式:用於拼接兩個或多個字串欄位。
  4. lowertrim函式:分別用於將字串轉換為小寫和去除字串兩端的空白。

使用SQL查詢DataFrame

PySpark允許將DataFrame註冊為臨時表,並使用SQL查詢來運算元據。

# 將DataFrame註冊為臨時表
df_with_newcols.createOrReplaceTempView('temp_data')

# 使用SQL查詢來統計ratings的分佈
spark.sql('SELECT ratings, COUNT(ratings) FROM temp_data GROUP BY ratings').show()

內容解密:

  1. createOrReplaceTempView函式:將DataFrame註冊為一個臨時表,可以使用SQL查詢來操作。
  2. spark.sql函式:執行SQL查詢,這裡用於統計ratings欄位的分佈情況。

視窗函式的應用

視窗函式可以對資料進行分組並計算相關的聚合值,如排名、累計值等。

# 匯入視窗函式相關的模組
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile

# 過濾掉popularity為空或NaN的記錄
df_with_newcols = df_with_newcols.filter((df_with_newcols['popularity'].isNotNull()) & (~isnan(df_with_newcols['popularity'])))

# 使用ntile函式計算popularity的十分位數
df_with_newcols = df_with_newcols.select("id", "budget", "popularity",
                                        ntile(10).over(Window.orderBy(df_with_newcols['popularity'].desc())).alias("decile_rank"))

內容解密:

  1. ntile函式:將資料分成指定的組數(這裡是10組),並傳回每筆資料所屬的組號。
  2. Window.orderBy函式:指定視窗內資料的排序方式,這裡根據popularity降序排序。

結合分組統計分析

進一步對分組後的資料進行統計分析,如計算每組的最小值、最大值和計數。

# 對decile_rank進行分組統計
df_with_newcols.groupby("decile_rank").agg(min('popularity').alias('min_popularity'),
                                          max('popularity').alias('max_popularity'),
                                          count('popularity')).show()

內容解密:

  1. groupby函式:根據指定的欄位(這裡是decile_rank)對資料進行分組。
  2. agg函式:對分組後的資料進行聚合計算,這裡計算了每組的最小值、最大值和計數。

深入解析PySpark視窗函式與實用工具

視窗函式的應用與實踐

在處理大規模資料集時,PySpark的視窗函式(Window Functions)提供了一種強大的分析工具。讓我們透過一個具體的例子來瞭解如何使用視窗函式找出1970年第二受歡迎的電影。

程式碼解析

# 步驟1:匯入視窗函式相關模組
from pyspark.sql.window import Window
from pyspark.sql.functions import year, rank

# 步驟2:選擇需要的欄位子集
df_second_best = df.select('id', 'popularity', 'release_date')

# 步驟3:從發布日期建立年份欄位
df_second_best = df_second_best.withColumn('release_year', year('release_date')).drop('release_date')

# 步驟4:定義視窗函式
year_window = Window.partitionBy(df_second_best['release_year']).orderBy(df_second_best['popularity'].desc())

# 步驟5:應用視窗函式進行排名
df_second_best = df_second_best.select('id', 'popularity', 'release_year', rank().over(year_window).alias("rank"))

# 步驟6:找出1970年排名第二的電影
df_second_best.filter((df_second_best['release_year'] == 1970) & (df_second_best['rank'] == 2)).show()

內容解密:

  1. 選擇資料欄位:首先,我們從原始DataFrame中選擇了idpopularityrelease_date三個欄位。
  2. 建立年份欄位:使用year函式從release_date中提取年份,並建立新的release_year欄位。
  3. 定義視窗:使用Window.partitionByrelease_year進行分割槽,並按popularity降序排序。
  4. 排名計算:應用rank().over(year_window)對每年的電影進行排名。
  5. 篩選結果:最後,篩選出1970年排名第二的電影。

進一步探討:計算年度最高票房與其他電影的收入差異

如果我們想要知道每年最高票房電影與其他電影的收入差異,可以使用以下程式碼:

# 步驟1:匯入必要的模組
from pyspark.sql.window import Window
from pyspark.sql.functions import year, max

# 步驟2:選擇需要的欄位
df_revenue = df.select('id', 'revenue', 'release_date')

# 步驟3:建立年份欄位
df_revenue = df_revenue.withColumn('release_year', year('release_date')).drop('release_date')

# 步驟4:定義包含範圍的視窗函式
windowRev = Window.partitionBy(df_revenue['release_year']).orderBy(df_revenue['revenue'].desc()).rangeBetween(-sys.maxsize, sys.maxsize)

# 步驟5:計算收入差異
revenue_difference = max(df_revenue['revenue']).over(windowRev) - df_revenue['revenue']

# 步驟6:顯示最終結果
df_revenue.select('id', 'revenue', 'release_year', revenue_difference.alias("revenue_difference")).show(10, False)

內容解密:

  1. 視窗定義的擴充套件:使用了rangeBetween函式來定義視窗的範圍,確保包含分割槽內的所有行。
  2. 最高收入的計算:使用max().over(windowRev)找出每年的最高收入。
  3. 差異計算:將最高收入減去每行的收入,得到收入差異。

其他實用函式介紹

Collect List

當我們需要收集特定條件下的資料集合時,可以使用collect_list函式。例如,找出"The Lost World"這部電影的所有發行年份:

# 步驟1:建立年份欄位
df = df.withColumn('release_year', year('release_date'))

# 步驟2:應用collect_list函式收集所有出現的年份
df.filter("title=='The Lost World'").groupby('title').agg(collect_list("release_year")).show(1, False)

內容解密:

  1. 篩選資料:首先篩選出標題為"The Lost World"的電影。
  2. 分組聚合:使用groupbycollect_list收集這些電影的發行年份。

資料取樣技術在 PySpark 中的應用

在資料科學領域中,取樣是一項重要的技術,用於從大量資料中選取代表性的子集進行分析。PySpark 提供了多種取樣方法,包括簡單隨機取樣、分層取樣等。本文將探討這些取樣技術的原理及其在 PySpark 中的實作方法。

簡單隨機取樣

簡單隨機取樣是一種基本的取樣方法,每個觀察值被選取的機會均等。PySpark 中的 sample 方法可以用於實作簡單隨機取樣。

簡單隨機取樣(無放回)

df_sample = df.sample(False, 0.4, 11)
df_sample.count()

輸出:

17711

內容解密:

  1. sample 方法的第一個引數設為 False,表示無放回取樣。
  2. 第二個引數 0.4 表示取樣的比例。
  3. 第三個引數 11 是隨機種子,確保每次執行結果一致。
  4. 由於 Spark 內部使用 Bernoulli 取樣技術,實際取樣數量可能會與預期比例有所不同。

簡單隨機取樣(有放回)

df_sample = df.sample(True, 0.4, 11)
df_sample.count()

輸出:

17745

內容解密:

  1. sample 方法的第一個引數設為 True,表示有放回取樣。
  2. 其他引數與無放回取樣相同。
  3. 有放回取樣允許同一個觀察值被多次選取。

分層取樣

分層取樣用於從可以劃分為子總體的總體中選取樣本。PySpark 中的 sampleBy 方法可以用於實作分層取樣。

df_strat = df.sampleBy("release_year", fractions={1959: 0.2, 1960: 0.4, 1961: 0.4}, seed=11)
df_strat.count()

輸出:

260

內容解密:

  1. sampleBy 方法的第一個引數是分層的欄位名稱,此例中為 release_year
  2. 第二個引數 fractions 是一個字典,指定每個分層的取樣比例。
  3. 第三個引數 seed 是隨機種子,用於確保結果的可重現性。

快取與持久化

快取是 Spark 中的一個重要概念,用於暫存資料在記憶體中,以避免重複計算。PySpark 中的 cache 方法可以用於快取 DataFrame。

df.cache()

內容解密:

  1. cache 方法將 DataFrame 快取在記憶體中。
  2. 快取可以顯著提高重複存取 DataFrame 的效能。
  3. 快取是懶惰執行的,意味著第一次操作仍需要正常時間。

持久化與快取類別似,但提供了更多的儲存級別選項,如 MEMORY_ONLYMEMORY_ONLY_SERMEMORY_AND_DISK 等。

PySpark 資料儲存與合併技術解析

在 PySpark 的資料處理過程中,資料儲存與合併是兩個至關重要的環節。本篇文章將探討 PySpark 中的資料儲存級別、資料儲存方法以及多種資料合併技術。

資料儲存級別

PySpark 提供多種資料儲存級別,以滿足不同場景下的效能與資源需求。這些儲存級別包括:

  • MEMORY_ONLY:將資料儲存在記憶體中,提供最佳的存取速度,但可能導致記憶體不足的問題。
  • MEMORY_ONLY_SER:將資料序列化後儲存在記憶體中,減少記憶體使用量,但增加 CPU 負擔。
  • MEMORY_AND_DISK:將資料儲存在記憶體中,若記憶體不足,則將部分資料溢位至磁碟。
  • MEMORY_AND_DISK_SER:將資料序列化後儲存在記憶體中,若記憶體不足,則將部分資料溢位至磁碟。
  • DISK_ONLY:將資料儲存在磁碟上,適合大規模資料的儲存。

儲存級別比較

儲存級別空間使用率CPU 時間記憶體中磁碟上序列化
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK中等部分部分部分
MEMORY_AND_DISK_SER部分部分
DISK_ONLY

資料儲存方法

PySpark 提供多種方法來儲存資料,包括:

將 DataFrame 儲存為文字檔

df.write.format('csv').option('delimiter', '|').save('output_df')

將 DataFrame 儲存為單一檔案

df.coalesce(1).write.format('csv').option('delimiter', '|').save('output_coalesced_df')

將 DataFrame 按照特定欄位分割儲存

df.write.partitionBy('release_year').format('csv').option('delimiter', '|').save('output_df_partitioned')

將 DataFrame 儲存為 Hive 表格

df.write.saveAsTable('film_ratings')

Pandas 支援

PySpark 提供 toPandas() 方法,將 PySpark DataFrame 轉換為 Pandas DataFrame。但需注意,此操作可能導致記憶體問題。

df_pandas = df.toPandas()

相反地,也可以使用 createDataFrame() 方法,將 Pandas DataFrame 轉換為 PySpark DataFrame。

df_py = spark.createDataFrame(df_pandas)

資料合併技術

PySpark 提供多種資料合併技術,包括:

Inner Join

df.join(df_p1, df['id'] == df_p1['id'], 'inner').printSchema()

Left/Left Outer Join

df.join(df_p1, df['id'] == df_p1['id'], 'left').count()

Right/Right Outer Join

df.join(df_p1, df['id'] == df_p1['id'], 'right').count()

Full Outer Join

df.join(df_p1, df['id'] == df_p1['id'], 'outer').count()