PySpark 提供兩種主要的資料結構:RDD 和 DataFrame。RDD 適合處理半結構化和非結構化資料,而 DataFrame 更適用於結構化資料,並受益於 Catalyst 最佳化器帶來效能提升。在 Python 中,Dataset API 的優勢已融入 DataFrame。本文將使用 TMDB 電影資料集示範如何使用 PySpark 進行資料操作,包含從 CSV 檔案和 Hive 表格讀取資料、檢視 Schema、計算記錄數、選擇特定欄位、處理缺失值、進行單變數頻率分析、排序、篩選資料以及進行資料型別轉換,最後示範如何使用 PySpark 進行基本統計分析,例如計算中位數、不同值計數以及使用 like 和 rlike 方法進行資料篩選。
PySpark 基礎:RDD 與 DataFrame 的比較及應用
在前一章中,我們介紹了安裝和使用 Spark 的不同方法。現在,讓我們進入有趣的部分,看看如何執行資料操作。在本文中,我們使用 Docker 版本的 PySpark,在單一機器上執行。如果您在分散式系統上安裝了 PySpark,我們鼓勵您使用它來釋放平行計算的能力。
PySpark 中的 Resilient Distributed Datasets (RDD) 與 DataFrames
RDD 是一種不可變的分散式資料元素集合。它跨叢集中的節點進行分割槽,能夠與低階 API 進行互動以實作平行處理。RDD 在處理結構化、半結構化和非結構化資料時非常靈活。它們最適合用於半結構化和非結構化資料。RDD 是 Spark 核心函式庫的一部分,只需要 Spark 上下文即可進行操作。
然而,RDD 也有其挑戰。使用 DataFrames 可獲得的最佳化與效能提升在 RDD 中並不存在。如果您正在處理結構化資料,您的首選應該始終是 DataFrames。只要能夠定義 schema,您就可以將 RDD 轉換為 DataFrame。
Dataset 是分散式資料集合。它結合了 RDD 的優勢(強型別,能夠使用強大的 lambda 函式)與 Spark SQL 最佳化執行引擎的優勢。Dataset 可以從 JVM 物件構建,然後使用功能轉換(map、flatMap、filter 等)進行操作。Dataset API 僅在 Scala 和 Java 中可用,Python 不支援 Dataset API。但由於 Python 的動態特性,Dataset API 的許多優勢已經可用(即,您可以自然地按名稱存取行的欄位:row.columnName)。R 語言的情況類別似。
DataFrame 是一種按欄位排序的資料集。它以欄位和行的格式儲存資料,就像一個表格。您可以將欄位視為變數,將行視為相關的資料點。它在概念上等同於關聯式資料函式庫中的表格或 R/Python 中的 dataframe,但具有更豐富的最佳化機制。DataFrames 可以從多種來源構建,例如結構化資料檔案、Hive 中的表格、外部資料函式庫或現有的 RDD。DataFrame API 在 Scala、Java、Python 和 R 中可用。需要 SQLContext 來程式設計 DataFrames,因為它們位於 Spark 生態系統的 SparkSQL 區域。您可以根據需要將 DataFrame 轉換為 RDD。
表 2-1:RDD、DataFrame 和 Dataset 的特性比較
| 特性 | RDD | DataFrame | Dataset |
|---|---|---|---|
| 不可變性 | 是 | 是 | 是 |
| 容錯性 | 是 | 是 | 是 |
| 型別安全 | 是 | 否 | 是 |
| Schema | 否 | 是 | 是 |
| 最佳化引擎 | 無 | Catalyst | Catalyst |
| 執行最佳化 | 否 | 是 | 是 |
| API 操作層級 | 低階 | 高階 | 高階 |
| 語言支援 | Java、Scala、Python、R | Java、Scala、Python、R | Java、Scala |
資料型別
Spark SQL 的 DataType 類別是所有資料型別的基礎類別,它們主要用於所有 DataFrames(見表 2-2)。
表 2-2:資料型別
| 資料型別 | 大類別 |
|---|---|
| BooleanType | 原子型別:用於表示非空、陣列、結構或對映以外的所有內容 |
| BinaryType | |
| DateType | |
| StringType | |
| TimestampType | |
| ArrayType | 非原子型別 |
| MapType | |
| StructType | |
| CalendarIntervalType | |
| NumericType | |
| ShortType | |
| IntegerType | |
| LongType | |
| FloatType | |
| DoubleType | |
| DecimalType | |
| ByteType | |
| HiveStringType | |
| ObjectType | |
| NullType |
程式碼範例:建立 DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# 建立 SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
# 定義 schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# 建立 DataFrame
data = [("John", 25), ("Mary", 31)]
df = spark.createDataFrame(data, schema)
# 顯示 DataFrame
df.show()
#### 內容解密:
1. `SparkSession.builder.appName("example").getOrCreate()`:建立一個名為 "example" 的 SparkSession。如果已經存在,則取得現有的 SparkSession。
2. `StructType` 和 `StructField`:用於定義 DataFrame 的 schema。`StructType` 用於定義結構,而 `StructField` 用於定義欄位。
3. `createDataFrame(data, schema)`:根據提供的資料和 schema 建立 DataFrame。
4. `df.show()`:顯示 DataFrame 的內容。
使用 PySpark 進行資料操作
在單機與叢集環境中執行資料操作的差異在於處理速度。當使用單機時,將失去分散式處理的優勢。
例項資料集介紹
本章節使用來自 The Movie Database (TMDB) 的資料集進行資料操作示範。TMDB 是一個社群驅動的電影與電視節目資料函式庫。我們透過 API 提取了一個包含 44,000 部國際發行的電影資訊的樣本資料集。
資料集欄位說明
資料集包含了以下欄位:
belongs_to_collection:電影是否屬於某個系列budget:電影預算id:電影唯一識別碼original_language:電影原始語言original_title:電影原始標題overview:電影簡介popularity:電影熱度指數production_companies:製作公司列表production_countries:電影製作國家release_date:電影上映日期revenue:電影票房收入(0 表示缺失)runtime:電影播放時間(分鐘)status:電影是否已上映tagline:電影標語title:電影英文別名標題vote_average:觀眾平均評分
初始化 PySpark 工作階段
在使用 PySpark 之前,需要初始化 SparkSession:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data_Wrangling").getOrCreate()
內容解密:
此程式碼段用於建立一個 PySpark 的 SparkSession 物件,這是與 Spark 叢集互動的入口點。appName 方法用於為應用程式命名,而 getOrCreate() 方法則用於取得現有的 SparkSession 或建立一個新的。
讀取資料
從檔案讀取資料
首先,指設定檔案位置和型別:
file_location = "movie_data_part1.csv"
file_type = "csv"
infer_schema = "False"
first_row_is_header = "True"
delimiter = "|"
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
內容解密:
此段程式碼負責從 CSV 檔案中讀取資料。format() 方法指定了檔案型別,而 option() 方法則用於設定諸如是否推斷 schema、是否有標頭列以及分隔符等引數。最後,load() 方法載入指定的檔案。
從 Hive 表格讀取資料
若資料儲存在 Hive 表格中,可以使用以下命令讀取:
df = spark.sql("select * from database.table_name")
內容解密:
此命令直接從 Hive 資料函式庫中的指定表格讀取資料。Spark 會在讀取時對映資料源的結構,但不會立即將資料載入記憶體。
檢視 Metadata
使用以下命令檢視 DataFrame 的 schema:
df.printSchema()
或者,使用 dtypes 取得相同資訊:
df.dtypes
內容解密:
printSchema() 方法用於列印 DataFrame 的 schema,包括欄位名稱和資料型別。dtypes 屬性則傳回一個包含欄位名稱和資料型別的列表。
統計記錄數
計算 DataFrame 中的記錄數:
df.count()
並可使用 print 陳述式包裝結果以獲得更具描述性的輸出:
print('The total number of records in the movie dataset is ' + str(df.count()))
內容解密:
count() 方法傳回 DataFrame 中的記錄數。結合 Python 的 print 功能,可以輸出更易讀的結果。
篩選欄位與檢視資料
選擇特定的欄位並檢視資料:
select_columns = ['id', 'budget', 'popularity', 'release_date', 'revenue', 'title']
df = df.select(*select_columns)
df.show()
內容解密:
此段程式碼首先定義了一個包含所需欄位的列表,然後使用 select() 方法篩選出這些欄位。最後,show() 方法用於顯示 DataFrame 中的前 20 行資料。
透過以上步驟,我們展示瞭如何使用 PySpark 進行基本的資料操作,包括讀取資料、檢視 metadata、統計記錄數以及篩選欄位等。這些操作為後續的資料分析和處理奠定了基礎。
資料操作與分析基礎
在進行資料分析時,瞭解如何有效地操作和轉換資料是至關重要的。PySpark 提供了一系列強大的工具來處理結構化和半結構化資料。
資料選擇與顯示
首先,我們需要了解如何從 DataFrame 中選擇特定的欄位並顯示結果。PySpark 提供了 select 方法來實作這一點。
df.select('id', 'budget', 'popularity', 'release_date', 'revenue', 'title').show()
或者,你也可以透過欄位索引來選擇欄位。
df.select(df[2], df[1], df[6], df[9], df[10], df[14]).show()
內容解密:
df.select():用於選擇 DataFrame 中的特定欄位。show():用於顯示結果,預設顯示前 20 行。- 透過索引選擇欄位時需注意,新 DataFrame 的欄位順序和索引可能會有所不同。
此外,你還可以透過 show() 方法的引數來控制顯示的行數。
df.show(25, False)
內容解密:
show(25, False):顯示前 25 行,且不截斷內容。
處理缺失值
在實際資料中,缺失值是常見的問題。PySpark 提供了多種方法來檢測和處理缺失值。
from pyspark.sql.functions import *
df.filter((df['popularity'] == '') | df['popularity'].isNull() | isnan(df['popularity'])).count()
內容解密:
filter():用於根據條件篩選資料。isNull():檢查值是否為 null。isnan():檢查值是否為 NaN(Not a Number)。
若要計算 DataFrame 中所有欄位的缺失值,可以使用以下命令:
df.select([count(when((col(c) == '') | col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()
內容解密:
count(when()):用於計算滿足條件的列數。alias(c):為計算出的缺失值列重新命名。
單變數頻率分析
在處理類別型變數時,瞭解每個類別的頻率是非常有用的。PySpark 的 groupBy 和 count 方法可以幫助我們實作這一點。
df.groupBy(df['title']).count().show()
內容解密:
groupBy():根據指定的欄位分組。count():計算每個分組的資料筆數。
若要檢視排序後的結果,可以結合 sort 方法。
df.groupby(df['title']).count().sort(desc("count")).show(10, False)
內容解密:
sort(desc("count")):根據計數以降序排列結果。
篩選與排序
在進行頻率分析時,我們可能需要篩選出特定的資料並進行排序。
df_temp = df.filter((df['title'] != '') & (df['title'].isNotNull()) & (~isnan(df['title'])))
df_temp.groupby(df_temp['title']).count().filter("`count` > 4").sort(col("count").desc()).show(10, False)
內容解密:
filter():根據多個條件篩選資料。groupby()和count():計算每個標題出現的次數。filter("count> 4"):篩選出出現次數大於 4 的標題。sort(col("count").desc()):以降序排列結果。
資料型別轉換
在進行資料分析之前,瞭解並正確轉換資料型別是非常重要的。
df = df.withColumn('budget', df['budget'].cast("float"))
內容解密:
withColumn():用於更新或建立新的欄位。cast("float"):將指定欄位轉換為浮點數型別。
在轉換資料型別之前,建議先檢查目前的資料型別。
df.dtypes
你可以批次轉換多個欄位的資料型別。
from pyspark.sql.types import *
int_vars = ['id']
float_vars = ['budget', 'popularity', 'revenue']
date_vars = ['release_date']
for column in int_vars:
df = df.withColumn(column, df[column].cast(IntegerType()))
內容解密:
cast(IntegerType()):將指定欄位轉換為整數型別。- 使用迴圈批次轉換多個欄位的資料型別。
資料型別轉換與基本統計分析
在進行資料分析前,確保資料型別的正確性是至關重要的步驟。以下展示如何使用 PySpark 進行資料型別轉換以及基本的統計分析。
資料型別轉換
首先,我們需要將某些欄位的資料型別轉換為適當的型別,例如將浮點數變數轉換為 FloatType,將日期變數轉換為 DateType。
from pyspark.sql.types import FloatType, DateType
# 定義需要轉換的浮點數變數和日期變數
float_vars = ['budget', 'revenue'] # 假設範例
date_vars = ['release_date'] # 假設範例
# 轉換浮點數變數
for column in float_vars:
df = df.withColumn(column, df[column].cast(FloatType()))
# 轉換日期變數
for column in date_vars:
df = df.withColumn(column, df[column].cast(DateType()))
# 檢查資料型別
df.dtypes
內容解密:
- 匯入必要的模組:從
pyspark.sql.types匯入FloatType和DateType,以便進行資料型別轉換。 - 定義變數列表:指定需要轉換的浮點數變數和日期變數。
- 使用
withColumn方法轉換資料型別:遍歷變數列表,利用cast方法將欄位的資料型別轉換為指定的型別。 - 檢查轉換結果:使用
dtypes屬性檢視 DataFrame 中各欄位的資料型別。
基本統計分析
PySpark 提供了豐富的內建函式來進行基本統計分析,例如使用 describe 方法來計算各欄位的基本統計量。
# 使用 describe 方法計算基本統計量
df.describe().show()
內容解密:
describe方法:計算 DataFrame 中各數值欄位的計數、平均值、標準差、最小值和最大值。- 結果展示:使用
show方法展示計算結果。
中位數計算
中位數是一個重要的統計量,對 outlier 較為穩健。PySpark 使用 approxQuantile 方法來近似計算中位數。
# 過濾掉 budget 欄位中不合法的值(例如 0 或空值)
df_temp = df.filter((df['budget'] != 0) & (df['budget'].isNotNull()) & (~isnan(df['budget'])))
# 計算 budget 欄位的中位數
median = df_temp.approxQuantile('budget', [0.5], 0.1)
# 列印中位數結果
print('The median of budget is ' + str(median))
內容解密:
- 資料過濾:排除
budget欄位中的不合法值,以確保計算結果的準確性。 approxQuantile方法:計算指定欄位的中位數,第二個引數[0.5]表示計算中位數,第三個引數0.1表示相對誤差。- 結果輸出:列印計算得到的中位數。
不同值計數與篩選
瞭解欄位中不同值的數量對於資料分析十分重要。PySpark 提供了 countDistinct 和 distinct 方法來實作這一功能。
# 使用 countDistinct 方法計算不同值的數量
df.agg(countDistinct(col("title")).alias("count")).show()
# 使用 distinct 方法檢視不同值
df.select('title').distinct().show(10, False)
內容解密:
countDistinct方法:計算指定欄位中不同值的數量。distinct方法:傳回指定欄位中的不同值。
資料篩選
PySpark 中的 filter 和 where 方法可用於資料篩選。
# 篩選 title 以 "Meet" 開頭的記錄
df.filter(df['title'].like('Meet%')).show(10, False)
# 篩選 title 不以 "s" 結尾的記錄
df.filter(~df['title'].like('%s')).show(10, False)
# 使用 rlike 方法篩選 title 中包含 "ove" 的記錄
df.filter(df['title'].rlike('\w*ove')).show(10, False)
內容解密:
like方法:用於模糊匹配,與 SQL 中的LIKE功能相似。rlike方法:支援正規表示式的模糊匹配,能夠進行更複雜的篩選條件設定。