PySpark 提供了豐富的函式和工具,方便開發者進行大資料處理和分析。本文除了介紹基本的字串操作、SQL 查詢和視窗函式外,也探討了更進階的應用,例如使用視窗函式計算排名和收入差異。同時,文章也涵蓋了資料取樣技術,包括簡單隨機取樣和分層取樣,以及如何有效地儲存和合併資料,並說明瞭如何在 PySpark 和 Pandas 之間進行資料轉換,以滿足不同資料處理的需求。
深入理解PySpark中的實用函式與視覺化技術
PySpark為大資料處理提供了豐富的功能和靈活的API,涵蓋了資料處理、轉換、分析等眾多方面。在本章中,我們將重點探討PySpark中的實用函式,包括字串函式、SQL查詢、視窗函式等,並透過具體例項來展示如何有效地使用這些功能。
字串函式的應用
在資料處理過程中,經常需要對字串資料進行操作,如拼接、大小寫轉換、去除空白等。PySpark提供了多種字串函式來滿足這些需求。
# 使用withColumn函式新增ratings欄位
df_with_newcols = df_with_newcols.withColumn('ratings',
when(df['popularity'] < 3, 'Low')
.when(df['popularity'] < 5, 'Mid')
.otherwise('High'))
# 將budget_cat和ratings欄位合併為新的欄位BudgetRating_Category
df_with_newcols = df_with_newcols.withColumn('BudgetRating_Category',
concat(df_with_newcols.budget_cat, df_with_newcols.ratings))
# 將新的欄位轉換為小寫並去除空白
df_with_newcols = df_with_newcols.withColumn('BudgetRating_Category',
trim(lower(df_with_newcols.BudgetRating_Category)))
內容解密:
withColumn函式:用於在DataFrame中新增或替換一個欄位。when和otherwise函式:用於根據條件進行邏輯判斷,這裡根據popularity的值來決定ratings的值。concat函式:用於拼接兩個或多個字串欄位。lower和trim函式:分別用於將字串轉換為小寫和去除字串兩端的空白。
使用SQL查詢DataFrame
PySpark允許將DataFrame註冊為臨時表,並使用SQL查詢來運算元據。
# 將DataFrame註冊為臨時表
df_with_newcols.createOrReplaceTempView('temp_data')
# 使用SQL查詢來統計ratings的分佈
spark.sql('SELECT ratings, COUNT(ratings) FROM temp_data GROUP BY ratings').show()
內容解密:
createOrReplaceTempView函式:將DataFrame註冊為一個臨時表,可以使用SQL查詢來操作。spark.sql函式:執行SQL查詢,這裡用於統計ratings欄位的分佈情況。
視窗函式的應用
視窗函式可以對資料進行分組並計算相關的聚合值,如排名、累計值等。
# 匯入視窗函式相關的模組
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile
# 過濾掉popularity為空或NaN的記錄
df_with_newcols = df_with_newcols.filter((df_with_newcols['popularity'].isNotNull()) & (~isnan(df_with_newcols['popularity'])))
# 使用ntile函式計算popularity的十分位數
df_with_newcols = df_with_newcols.select("id", "budget", "popularity",
ntile(10).over(Window.orderBy(df_with_newcols['popularity'].desc())).alias("decile_rank"))
內容解密:
ntile函式:將資料分成指定的組數(這裡是10組),並傳回每筆資料所屬的組號。Window.orderBy函式:指定視窗內資料的排序方式,這裡根據popularity降序排序。
結合分組統計分析
進一步對分組後的資料進行統計分析,如計算每組的最小值、最大值和計數。
# 對decile_rank進行分組統計
df_with_newcols.groupby("decile_rank").agg(min('popularity').alias('min_popularity'),
max('popularity').alias('max_popularity'),
count('popularity')).show()
內容解密:
groupby函式:根據指定的欄位(這裡是decile_rank)對資料進行分組。agg函式:對分組後的資料進行聚合計算,這裡計算了每組的最小值、最大值和計數。
深入解析PySpark視窗函式與實用工具
視窗函式的應用與實踐
在處理大規模資料集時,PySpark的視窗函式(Window Functions)提供了一種強大的分析工具。讓我們透過一個具體的例子來瞭解如何使用視窗函式找出1970年第二受歡迎的電影。
程式碼解析
# 步驟1:匯入視窗函式相關模組
from pyspark.sql.window import Window
from pyspark.sql.functions import year, rank
# 步驟2:選擇需要的欄位子集
df_second_best = df.select('id', 'popularity', 'release_date')
# 步驟3:從發布日期建立年份欄位
df_second_best = df_second_best.withColumn('release_year', year('release_date')).drop('release_date')
# 步驟4:定義視窗函式
year_window = Window.partitionBy(df_second_best['release_year']).orderBy(df_second_best['popularity'].desc())
# 步驟5:應用視窗函式進行排名
df_second_best = df_second_best.select('id', 'popularity', 'release_year', rank().over(year_window).alias("rank"))
# 步驟6:找出1970年排名第二的電影
df_second_best.filter((df_second_best['release_year'] == 1970) & (df_second_best['rank'] == 2)).show()
內容解密:
- 選擇資料欄位:首先,我們從原始DataFrame中選擇了
id、popularity和release_date三個欄位。 - 建立年份欄位:使用
year函式從release_date中提取年份,並建立新的release_year欄位。 - 定義視窗:使用
Window.partitionBy按release_year進行分割槽,並按popularity降序排序。 - 排名計算:應用
rank().over(year_window)對每年的電影進行排名。 - 篩選結果:最後,篩選出1970年排名第二的電影。
進一步探討:計算年度最高票房與其他電影的收入差異
如果我們想要知道每年最高票房電影與其他電影的收入差異,可以使用以下程式碼:
# 步驟1:匯入必要的模組
from pyspark.sql.window import Window
from pyspark.sql.functions import year, max
# 步驟2:選擇需要的欄位
df_revenue = df.select('id', 'revenue', 'release_date')
# 步驟3:建立年份欄位
df_revenue = df_revenue.withColumn('release_year', year('release_date')).drop('release_date')
# 步驟4:定義包含範圍的視窗函式
windowRev = Window.partitionBy(df_revenue['release_year']).orderBy(df_revenue['revenue'].desc()).rangeBetween(-sys.maxsize, sys.maxsize)
# 步驟5:計算收入差異
revenue_difference = max(df_revenue['revenue']).over(windowRev) - df_revenue['revenue']
# 步驟6:顯示最終結果
df_revenue.select('id', 'revenue', 'release_year', revenue_difference.alias("revenue_difference")).show(10, False)
內容解密:
- 視窗定義的擴充套件:使用了
rangeBetween函式來定義視窗的範圍,確保包含分割槽內的所有行。 - 最高收入的計算:使用
max().over(windowRev)找出每年的最高收入。 - 差異計算:將最高收入減去每行的收入,得到收入差異。
其他實用函式介紹
Collect List
當我們需要收集特定條件下的資料集合時,可以使用collect_list函式。例如,找出"The Lost World"這部電影的所有發行年份:
# 步驟1:建立年份欄位
df = df.withColumn('release_year', year('release_date'))
# 步驟2:應用collect_list函式收集所有出現的年份
df.filter("title=='The Lost World'").groupby('title').agg(collect_list("release_year")).show(1, False)
內容解密:
- 篩選資料:首先篩選出標題為"The Lost World"的電影。
- 分組聚合:使用
groupby和collect_list收集這些電影的發行年份。
資料取樣技術在 PySpark 中的應用
在資料科學領域中,取樣是一項重要的技術,用於從大量資料中選取代表性的子集進行分析。PySpark 提供了多種取樣方法,包括簡單隨機取樣、分層取樣等。本文將探討這些取樣技術的原理及其在 PySpark 中的實作方法。
簡單隨機取樣
簡單隨機取樣是一種基本的取樣方法,每個觀察值被選取的機會均等。PySpark 中的 sample 方法可以用於實作簡單隨機取樣。
簡單隨機取樣(無放回)
df_sample = df.sample(False, 0.4, 11)
df_sample.count()
輸出:
17711
內容解密:
sample方法的第一個引數設為False,表示無放回取樣。- 第二個引數
0.4表示取樣的比例。 - 第三個引數
11是隨機種子,確保每次執行結果一致。 - 由於 Spark 內部使用 Bernoulli 取樣技術,實際取樣數量可能會與預期比例有所不同。
簡單隨機取樣(有放回)
df_sample = df.sample(True, 0.4, 11)
df_sample.count()
輸出:
17745
內容解密:
- 將
sample方法的第一個引數設為True,表示有放回取樣。 - 其他引數與無放回取樣相同。
- 有放回取樣允許同一個觀察值被多次選取。
分層取樣
分層取樣用於從可以劃分為子總體的總體中選取樣本。PySpark 中的 sampleBy 方法可以用於實作分層取樣。
df_strat = df.sampleBy("release_year", fractions={1959: 0.2, 1960: 0.4, 1961: 0.4}, seed=11)
df_strat.count()
輸出:
260
內容解密:
sampleBy方法的第一個引數是分層的欄位名稱,此例中為release_year。- 第二個引數
fractions是一個字典,指定每個分層的取樣比例。 - 第三個引數
seed是隨機種子,用於確保結果的可重現性。
快取與持久化
快取是 Spark 中的一個重要概念,用於暫存資料在記憶體中,以避免重複計算。PySpark 中的 cache 方法可以用於快取 DataFrame。
df.cache()
內容解密:
cache方法將 DataFrame 快取在記憶體中。- 快取可以顯著提高重複存取 DataFrame 的效能。
- 快取是懶惰執行的,意味著第一次操作仍需要正常時間。
持久化與快取類別似,但提供了更多的儲存級別選項,如 MEMORY_ONLY、MEMORY_ONLY_SER、MEMORY_AND_DISK 等。
PySpark 資料儲存與合併技術解析
在 PySpark 的資料處理過程中,資料儲存與合併是兩個至關重要的環節。本篇文章將探討 PySpark 中的資料儲存級別、資料儲存方法以及多種資料合併技術。
資料儲存級別
PySpark 提供多種資料儲存級別,以滿足不同場景下的效能與資源需求。這些儲存級別包括:
- MEMORY_ONLY:將資料儲存在記憶體中,提供最佳的存取速度,但可能導致記憶體不足的問題。
- MEMORY_ONLY_SER:將資料序列化後儲存在記憶體中,減少記憶體使用量,但增加 CPU 負擔。
- MEMORY_AND_DISK:將資料儲存在記憶體中,若記憶體不足,則將部分資料溢位至磁碟。
- MEMORY_AND_DISK_SER:將資料序列化後儲存在記憶體中,若記憶體不足,則將部分資料溢位至磁碟。
- DISK_ONLY:將資料儲存在磁碟上,適合大規模資料的儲存。
儲存級別比較
| 儲存級別 | 空間使用率 | CPU 時間 | 記憶體中 | 磁碟上 | 序列化 |
|---|---|---|---|---|---|
| MEMORY_ONLY | 高 | 低 | 是 | 否 | 否 |
| MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 是 |
| MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 部分 |
| MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 是 |
| DISK_ONLY | 低 | 高 | 否 | 是 | 是 |
資料儲存方法
PySpark 提供多種方法來儲存資料,包括:
將 DataFrame 儲存為文字檔
df.write.format('csv').option('delimiter', '|').save('output_df')
將 DataFrame 儲存為單一檔案
df.coalesce(1).write.format('csv').option('delimiter', '|').save('output_coalesced_df')
將 DataFrame 按照特定欄位分割儲存
df.write.partitionBy('release_year').format('csv').option('delimiter', '|').save('output_df_partitioned')
將 DataFrame 儲存為 Hive 表格
df.write.saveAsTable('film_ratings')
Pandas 支援
PySpark 提供 toPandas() 方法,將 PySpark DataFrame 轉換為 Pandas DataFrame。但需注意,此操作可能導致記憶體問題。
df_pandas = df.toPandas()
相反地,也可以使用 createDataFrame() 方法,將 Pandas DataFrame 轉換為 PySpark DataFrame。
df_py = spark.createDataFrame(df_pandas)
資料合併技術
PySpark 提供多種資料合併技術,包括:
Inner Join
df.join(df_p1, df['id'] == df_p1['id'], 'inner').printSchema()
Left/Left Outer Join
df.join(df_p1, df['id'] == df_p1['id'], 'left').count()
Right/Right Outer Join
df.join(df_p1, df['id'] == df_p1['id'], 'right').count()
Full Outer Join
df.join(df_p1, df['id'] == df_p1['id'], 'outer').count()