返回文章列表

Spark 資料轉換:DataFrame 類型化與欄位操作實務

本文闡述 Apache Spark 的核心資料轉換流程,從讀取 CSV 檔案生成 DataFrame,到執行欄位清理與類型轉換。文章核心在於展示如何結合 Scala 的 `case class` 將 DataFrame 轉換為強型別 Dataset,以確保資料結構的嚴謹性。此外,內容深入比較 `select` 與 `selectExpr` 兩種方法,並搭配 `explode` 及 `split` 函數對欄位進行選擇與拆分,突顯 DataFrame API 與 Spark SQL 表達式在實務應用上的差異。

大數據 資料工程

在 Apache Spark 資料處理中,DataFrame 與 Dataset 是核心的資料抽象。DataFrame 提供優化的執行計畫與彈性操作,但在大型專案中,缺乏編譯時期的類型檢查可能引發執行期錯誤。因此,將 DataFrame 轉換為強型別的 Dataset,是確保資料處理流程穩健性的關鍵步驟。本文將探討此轉換過程,從原始資料讀取、清理到生成符合 case class 結構的 Dataset。同時,我們也將比較 Spark 提供的 DataFrame DSL 與 Spark SQL 表達式兩種欄位操作 API,解析其在不同場景下的應用優勢與風格差異,為資料工程實務提供清晰的技術指引。

第六章:深入理解資料轉換

轉換與動作:Spark 執行機制的核心差異

  • org.apache.spark.sql.types:包含各種Spark資料類型,例如DateTypeIntegerType
  • org.apache.apache.spark.sql.functions:包含各種用於轉換的函數,例如groupByaggcolwithColumn

在本章中,玄貓將使用上述物件來執行程式碼。請注意import spark.implicits._這行程式碼,它沒有包含在物件頂部的導入中。這是因為它需要一個已實例化的SparkSession,所以它位於spark宣告之後。spark.implicits._導入語句為玄貓提供了語法糖(syntactic sugar)語法糖是一種使程式碼更易於閱讀或表達的方式,作為編寫完整表達式的捷徑。

例如,在玄貓的大多數範例中,玄貓不會將欄位指定為col("columnName")。相反,玄貓將使用一些語法糖來表達相同的語句為$"columnName"。兩者都將起作用,但在使用SparkScala時,預計會看到兩者都被使用。

現在,要使用Dataset API,玄貓要做的第一件事是創建一個case class

case class NetflixTitle(
show_id: String,
stype: String, // 將 type 替換為 stype 以避免關鍵字衝突------
title: String,
director: String,
cast: String,
country: String,
date_added: Date,
release_year: Int,
rating: String,
duration: String,
listed_in: String,
description: String
)

上述case class為玄貓的Dataset物件提供了一個類型。玄貓將使用Netflix的電影和電視節目資料集。

下一步,玄貓將讀取一個位於專案中的檔案作為DataFrame。讓玄貓將其讀入DataFrame,如下所示:

val df: DataFrame = spark.read
.option("header", true) // 將檔案的第一行作為欄位名稱
.csv("src/main/scala/com/packt/dewithscala/chapter6/data/netflix_titles.csv")
.na.fill("") // 將所有 null 值替換為空字串

請注意,在讀取CSV檔案時,玄貓使用了header選項。這將把檔案中的第一條記錄作為生成的DataFrame的欄位名稱。玄貓還使用了.na.fill("")來將任何null值替換為空字串。

接下來,玄貓將DataFrame轉換為Dataset,透過一系列的轉換步驟:

val dsNetflixTitles: Dataset[NetflixTitle] =
df
.withColumnRenamed("type", "stype") // 重新命名 "type" 欄位為 "stype"
.withColumn("date_added", to_date($"date_added", "MMMM dd, yyyy")) // 將日期字串轉換為日期類型
.withColumn("release_year", $"release_year".cast(IntegerType)) // 將發行年份轉換為整數類型
.as[NetflixTitle] // 將 DataFrame 轉換為 Dataset[NetflixTitle]

在上述範例中,玄貓將DataFrame轉換為Dataset並修復了一些問題。Spark不喜歡名為Type的欄位,因為它是保留字。因此,玄貓使用withColumnRenamed函數將其更改為stype。玄貓可以使用反引號引用type,但玄貓強烈建議除非絕對必要,否則不要這樣做。玄貓還需要將date_added欄位轉換為日期類型,因為它從CSV檔案中讀取時是字串,並且在映射到NetflixTitle case class中的目標欄位date_added(其類型為java.sql.Date)時會引發錯誤。玄貓將使用to_date函數並傳遞一個日期格式作為第二個參數,該格式與玄貓源檔案中的格式匹配。最後,玄貓將release_year欄位轉換為整數,並使用.as[NetflixTitle]將整個DataFrame類型化為Dataset[NetflixTitle]as[T]方法由spark.implicits._提供,玄貓已將其導入到玄貓的原始碼中。

現在,運行printSchema()將為玄貓提供DatasetSchema,以便玄貓可以檢查玄貓的資料類型是否正確:

dsNetflixTitles.printSchema()

輸出結果將會是:

root
|-- show_id: string (nullable = false)
|-- stype: string (nullable = false)
|-- title: string (nullable = false)
|-- director: string (nullable = false)
|-- cast: string (nullable = false)
|-- country: string (nullable = false)
|-- date_added: date (nullable = true)
|-- release_year: integer (nullable = true)
|-- rating: string (nullable = false)
|-- duration: string (nullable = false)
|-- listed_in: string (nullable = false)
|-- description: string (nullable = false)

玄貓的資料類型看起來一切正常!還要請注意,這些類型不再是Spark資料類型(例如IntegerTypeStringType),而是Scala類型

此圖示:資料轉換流程:從原始資料到類型化 Dataset

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "原始資料輸入" as RawInput {
file "netflix_titles.csv" as CsvFile
}

package "Spark 讀取與初步處理" as SparkRead {
component "spark.read.csv()" as ReadCsv
component "option(\"header\", true)" as HeaderOption
component "na.fill(\"\")" as FillNulls
rectangle "DataFrame (推斷 Schema)" as InitialDF
}

package "資料清洗與類型化" as DataCleaning {
component "withColumnRenamed(\"type\", \"stype\")" as RenameColumn
component "withColumn(\"date_added\", to_date())" as CastDate
component "withColumn(\"release_year\", cast(IntegerType))" as CastInt
rectangle "DataFrame (清洗後)" as CleanedDF
}

package "Dataset 轉換" as DatasetConversion {
class "case class NetflixTitle" as CaseClass
component ".as[NetflixTitle]" as AsDataset
rectangle "Dataset[NetflixTitle]" as FinalDataset
}

package "Schema 驗證" as SchemaValidation {
component "printSchema()" as PrintSchema
note right of PrintSchema
驗證資料類型是否符合預期
end note
}

CsvFile --> ReadCsv
ReadCsv --> HeaderOption
HeaderOption --> FillNulls
FillNulls --> InitialDF

InitialDF --> RenameColumn
RenameColumn --> CastDate
CastDate --> CastInt
CastInt --> CleanedDF

CleanedDF --> AsDataset
AsDataset --> FinalDataset
CaseClass --> AsDataset : 提供類型定義

FinalDataset --> PrintSchema

@enduml

看圖說話:

此圖示清晰地描繪了從原始CSV檔案類型化Spark Dataset的整個資料轉換流程。首先,netflix_titles.csv檔案作為原始資料輸入,透過spark.read.csv()讀取,並利用option("header", true)將第一行視為欄位名稱,再透過na.fill("")處理空值,生成一個帶有推斷Schema的初始DataFrame。接著進入資料清洗與類型化階段,進行關鍵的轉換操作:withColumnRenamed將保留字type改為stypewithColumn函數則用於將date_added字串轉換為日期類型,並將release_year轉換為整數類型,最終得到一個清洗後的DataFrame。最後,透過as[NetflixTitle]方法,結合預先定義的case class NetflixTitle,將清洗後的DataFrame轉換為強類型化的Dataset[NetflixTitle]。整個流程的最後一步是透過printSchema()進行Schema驗證,確保所有資料類型都已正確轉換並符合預期,為後續的資料分析和處理奠定堅實的基礎。

第六章:深入理解資料轉換

選擇資料欄位

Spark DataFramesDatasets不可變的(immutable),這意味著每當玄貓轉換一個Dataset時,玄貓並不是在修改現有資料;而是在基於先前的資料集創建一個新的資料集。

到目前為止,玄貓已經了解了如何將檔案讀取到Spark中作為DataFrame並將其轉換為Dataset。現在,讓玄貓開始探索從Dataset中選擇欄位的各種方式:

import org.apache.spark.sql.functions._ // 確保導入了 functions 函式庫

val dfCastMembersSelect: DataFrame =
dsNetflixTitles.select(
$"show_id",
explode(split($"cast", ",")).alias("cast_member") // 將 cast 欄位拆分並展開
)
dfCastMembersSelect.show(10)

考慮上述程式碼。玄貓dsNetflixTitles資料集中的cast欄位是一個字串,它可能為空或包含一個或多個由逗號分隔的演員名稱,這正是explode(split($"cast", ","))所做的事情。

玄貓的程式碼返回一個名為dfCastMembersSelect的新DataFrame,使用select函數,並利用explodesplit函數將演員成員拆分到各自的行中。split函數將字串轉換為字串陣列。然後,explode函數為陣列中的每個元素創建一行。玄貓還包含了show_id值,以便玄貓可以將演員追溯到節目。呼叫show(10)會將資料集的前10條記錄列印到控制台:

+-------+------------------+
|show_id| cast_member|
+-------+------------------+
| s1| |
| s2| Ama Qamata|
| s2| Khosi Ngema|
| s2| Gail Mabalane|
| s2| Thabang Molaba|
| s2| Dillon Windvogel|
| s2| Natasha Thahane|
| s2| Arno Greeff|
| s2| Xolile Tshabalala|
| s2| Getmore Sithole|
+-------+------------------+

現在玄貓已經了解了select,讓玄貓將注意力轉向它的變體之一——selectExpr。顧名思義,它接受表達式而不是欄位名稱,如下面的範例所示:

val dfCastMembersSelectExpr: DataFrame = dsNetflixTitles
.selectExpr(
"show_id",
"explode(split(cast, ',')) as cast_member" // 使用 SQL 表達式進行拆分和展開
)
.selectExpr("show_id", "trim(cast_member) as cast_member") // 再次使用 selectExpr 進行 trim
dfCastMembersSelectExpr.show(10, 0) // 顯示前 10 條記錄,不截斷

這是它的輸出:

+-------+-----------------+
|show_id| cast_member|
+-------+-----------------+
| s1| |
| s2| Ama Qamata|
| s2| Khosi Ngema|
| s2| Gail Mabalane|
| s2| Thabang Molaba|
| s2| Dillon Windvogel|
| s2| Natasha Thahane|
| s2| Arno Greeff|
| s2|Xolile Tshabalala|
| s2| Getmore Sithole|
+-------+-----------------+

玄貓得到了相同的結果,但現在請注意,玄貓正在傳遞一個SQL表達式,它返回玄貓想要的值。selectselectExpr之間的區別在於前者使用內建於DataFrame API的方法,而後者使用Spark SQL。最後要注意的是,玄貓還為每個演員成員添加了trim以刪除演員名稱周圍的空白。

上述兩種方法都有效,並且對於任何有SQL背景的人來說都很容易上手。在繼續之前,讓玄貓執行另一個類似的操作:

val dfDirectorByShowSelectExpr: DataFrame = dsNetflixTitles
.selectExpr(
"show_id",
"explode(split(director, ',')) as director" // 拆分 director 欄位
)
.selectExpr("show_id", "trim(director) as director") // 去除空白
dfDirectorByShowSelectExpr.show(10)

這是導演資料:

+-------+---------------+
|show_id| director|
+-------+---------------+
| s1|Kirsten Johnson|
| s2| |
| s3|Julien Leclercq|
| s4| |
| s5| |
| s6| Mike Flanagan|
| s7| Robert Cullen|
| s7| José Luis Ucha|
| s8| Haile Gerima|
| s9|Andy Devonshire|
+-------+---------------+

為了以更具Scala風格的方式執行相同的選擇,讓玄貓定義兩個case classCastMemberCastMembers,並遍歷dsNetflixTitles資料集的元素:

case class CastMember(show_id: String, cast_member: String)
case class CastMembers(show_id: String, cast: Seq[String]) // 注意這裡使用 Seq[String]

此圖示:資料欄位選擇與轉換流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "初始 Dataset" as InitialDataset {
rectangle "dsNetflixTitles: Dataset[NetflixTitle]" as NetflixDS
}

package "演員資料提取 (DataFrame API)" as CastExtractionDF {
component "select($"show_id", explode(split($"cast", \",\")).alias(\"cast_member\"))" as SelectExplodeSplit
rectangle "dfCastMembersSelect: DataFrame" as CastDF
}

package "演員資料提取 (Spark SQL)" as CastExtractionSQL {
component "selectExpr(\"show_id\", \"explode(split(cast, ',')) as cast_member\")" as SelectExprExplodeSplit
component "selectExpr(\"show_id\", \"trim(cast_member) as cast_member\")" as SelectExprTrim
rectangle "dfCastMembersSelectExpr: DataFrame" as CastDFExpr
}

package "導演資料提取 (Spark SQL)" as DirectorExtractionSQL {
component "selectExpr(\"show_id\", \"explode(split(director, ',')) as director\")" as SelectExprDirectorSplit
component "selectExpr(\"show_id\", \"trim(director) as director\")" as SelectExprDirectorTrim
rectangle "dfDirectorByShowSelectExpr: DataFrame" as DirectorDFExpr
}

NetflixDS --> SelectExplodeSplit
SelectExplodeSplit --> CastDF : 輸出每個演員一行

NetflixDS --> SelectExprExplodeSplit
SelectExprExplodeSplit --> SelectExprTrim
SelectExprTrim --> CastDFExpr : 輸出每個演員一行 (含去空白)

NetflixDS --> SelectExprDirectorSplit
SelectExprDirectorSplit --> SelectExprDirectorTrim
SelectExprDirectorTrim --> DirectorDFExpr : 輸出每個導演一行 (含去空白)

note right of CastDF
- 使用 DataFrame DSL
- 適用於程式化操作
end note

note right of CastDFExpr
- 使用 Spark SQL 表達式
- 適用於熟悉 SQL 的開發者
end note

note right of DirectorDFExpr
- 示範相同邏輯應用於不同欄位
end note

@enduml

看圖說話:

此圖示展示了從初始Netflix Dataset中提取和轉換特定欄位的不同方法。首先,dsNetflixTitles作為原始輸入,其中包含多個欄位,包括以逗號分隔的演員和導演列表。圖示分為三個主要流程:

  1. 演員資料提取(DataFrame API):此路徑展示了如何使用Spark DataFrame DSL來處理cast欄位。透過select函數結合explodesplit,將原始的逗號分隔字串轉換為每個演員一行的結構,生成dfCastMembersSelect這個DataFrame
  2. 演員資料提取(Spark SQL):此路徑則展示了如何使用selectExpr函數,它允許直接嵌入SQL表達式。首先,selectExpr用於拆分cast欄位並展開,然後再次使用selectExpr結合trim函數來去除演員名稱周圍的空白,最終生成dfCastMembersSelectExpr
  3. 導演資料提取(Spark SQL):此路徑與第二個路徑類似,但應用於director欄位,同樣使用selectExpr進行拆分、展開和去空白,生成dfDirectorByShowSelectExpr

透過這三個流程,圖示不僅說明了如何從複雜的字串欄位中提取多個值並將其轉換為獨立的行,還比較了DataFrame DSLSpark SQL表達式兩種不同的編程風格,並強調了資料不可變的特性,每次轉換都會產生新的DataFrame

第六章結論:從資料塑模到價值提煉的思維躍遷

解構本章闡述的資料轉換方法,其關鍵元素可以發現,這不僅是技術的實踐,更是一種將原始資訊提煉為結構化洞見的思維修煉。從無類型的 DataFrame 透過 case class 轉化為強類型的 Dataset,其核心價值在於建立了一份清晰的「資料契約」。這一步驟如同為專案設定明確的規格與目標,大幅降低了後續開發中的模糊性與潛在錯誤,是從混亂邁向精準的第一個關鍵突破。

在欄位選擇與重塑的過程中,select 的程式化建構與 selectExpr 的 SQL 表達式,呈現了兩種解決問題的路徑。前者提供嚴謹的編譯期檢查與結構化思維,後者則賦予熟悉 SQL 的開發者極高的靈活性與表達效率。真正的挑戰並非比較兩者優劣,而是辨識在特定情境下,哪種工具能更高效、更清晰地表達業務意圖。這種對工具的選擇性駕馭能力,正是區分資深專家與初階執行者的瓶頸所在。

展望未來,資料處理工具將持續深化程式化與宣告式兩種範式的融合。一個卓越的資料架構師,其價值不再僅限於單一技能的深度,而在於能根據團隊能力、維護成本與開發速度,在這兩種思維模式間流暢切換。

玄貓認為,資料轉換的精髓不在於記憶繁複的函式,而在於培養一種「結構化思維」。對於追求卓越的資料專家而言,熟練地在不同轉換策略間權衡取捨,並將資料的不可變性視為一種可追溯、可複現的策略資產,才是從資料工匠晉升為價值創造者的核心關鍵。