在數據工程領域,使用 Scala 開發 Spark 應用程式是處理巨量資料的主流方法。然而,僅僅編寫程式碼並不足以確保效能,理解其底層的執行原理至關重要。Spark 透過將複雜的計算任務拆解為一系列的工作、階段與任務,並以有向無環圖(DAG)進行排程。其中,跨執行器的資料移動,即「資料洗牌」(Shuffle),是影響整體運算效率的核心環節。掌握這些理論有助於開發者撰寫更高效、更穩健的數據管線,並為後續的效能調優奠定穩固基礎。
使用Scala建立Spark應用程式
要在Scala中建立數據工程管線,玄貓需要利用Spark框架並建立一個Spark應用程式。Spark提供了各種不同類型的API來處理數據,每種API都有其優缺點。無論玄貓使用哪種API,玄貓都需要將它們封裝在一個Spark應用程式中。現在,讓玄貓來建立一個。
數據工程高科技養成:從理論到實踐的玄貓指引
數據工程核心理論與實踐
第三章:Apache Spark與其核心API:DataFrame、Dataset與Spark SQL
使用Scala建立Spark應用程式
每個以Scala編寫的Spark應用程式都需要一個SparkSession。SparkSession是一個提供Spark API入口點的物件。
為了使用SparkSession,玄貓需要創建一個Scala物件。這個物件是單例模式的實現。玄貓使用物件是因為每個Spark應用程式都需要一個Spark的單一實例,這可以透過物件來保證。讓玄貓為第一個Spark應用程式創建一個包含常用匯入的Scala物件:
package com.blackcat.descala.scalaplayground
import org.apache.spark.sql.{
DataFrame,
Dataset,
Row,
SparkSession
}
import org.apache.spark.sql.functions.{avg, sum}
object FirstSparkApplication extends App {
// 程式碼將在此處
}
現在,在FirstSparkApplication物件內部,使用以下程式碼實例化一個SparkSession:
val spark: SparkSession = SparkSession
.builder()
.master("local[1]")
.appName("SparkPlayground")
.getOrCreate()
import spark.implicits._
現在玄貓有了一個起點,可以在接下來的幾個部分中探索Spark的架構和API。玄貓將從Spark的DataFrame、Dataset和Spark SQL開始。
Spark階段
讓玄貓深入探討Spark的階段 (stages) 以及它們如何運作。考慮以下程式碼片段:
val source_storage_location = "my_data_path" // 假設為數據儲存路徑
val df: DataFrame = spark
.read
.format("parquet")
.load(source_storage_location)
玄貓將df定義為一個Spark DataFrame的實例,使用SparkSession從指定來源位置讀取一些Parquet檔案。當這段程式碼運行時,會創建一個包含一個階段的工作 (job)。透過Spark UI(更多內容將在性能調優部分討論),玄貓可以視覺化這個過程:
此圖示:Spark UI顯示階段處理
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
rectangle "Spark UI" as SparkUI {
rectangle "工作 (Job 0)" as Job0 {
rectangle "階段 (Stage 0)" as Stage0 {
component "任務 (Task 0.0)" as Task0_0
component "任務 (Task 0.1)" as Task0_1
component "..." as Task0_N
}
}
component "事件時間軸" as Timeline
component "DAG 可視化" as DAGViz
component "日誌" as Logs
}
Job0 --> Stage0
Stage0 --> Task0_0
Stage0 --> Task0_1
Stage0 --> Task0_N
SparkUI --> Job0 : 顯示工作資訊
SparkUI --> Stage0 : 顯示階段詳情
@enduml
看圖說話:
此圖示模擬了Spark UI中顯示一個簡單數據讀取操作的階段處理情況。當Spark應用程式執行spark.read.format("parquet").load(source_storage_location)這樣的程式碼時,Spark會創建一個工作 (Job 0),該工作包含一個階段 (Stage 0)。這個階段的核心任務是從指定的儲存位置讀取Parquet檔案,並將其轉換為DataFrame。在UI中,使用者可以看到這個階段被分解為多個任務 (Task),每個任務負責處理數據的一個分區。Spark UI還會提供事件時間軸、DAG可視化和日誌等功能,幫助使用者監控和理解這個數據讀取過程的執行細節。
玄貓的一個Scala命令創建了這個有界階段,其唯一的工作是將Parquet檔案讀取到DataFrame中。目前這並不是很實用,因為玄貓沒有對數據做任何處理,所以讓玄貓添加另一個Scala命令:
println(df.count())
DataFrame物件上的count函數將計算DataFrame中的所有記錄並返回一個Long數據類型。println函數將在標準輸出中顯示該值。
當玄貓運行此命令時,玄貓可以從Spark UI中看到這個有向無環圖 (DAG):
此圖示:DAG中階段的詳細資訊
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
rectangle "Spark 應用程式" as SparkApp {
rectangle "工作 (Job)" as Job {
rectangle "階段 22 (Stage 22)" as Stage22 {
component "任務 (Task 22.1)" as Task22_1
component "任務 (Task 22.2)" as Task22_2
component "..." as Task22_N
}
rectangle "階段 23 (Stage 23)" as Stage23 {
component "任務 (Task 23.1)" as Task23_1
}
}
}
Stage22 -down-> Stage23 : 數據洗牌 (Shuffle)
Task22_1 -right-> Task22_2
Task22_2 -right-> Task22_N
Task23_1
@enduml
看圖說話:
此圖示展示了Spark UI中一個簡單count函數所涉及的DAG階段細節。當執行df.count()操作時,Spark會創建一個包含兩個主要階段的工作。階段22 (Stage 22) 負責在每個執行器上進行數據的局部計數。由於數據被劃分為多個分區並分發到不同的執行器上,每個執行器會計算其所擁有的分區中的記錄數量,這表現為多個任務(例如Task 22.1、Task 22.2等)。一旦所有執行器完成局部計數,數據就需要被重新組織以進行全局匯總。這會觸發一個數據洗牌 (Shuffle) 操作,將局部計數結果匯聚到一個或幾個執行器上。這個洗牌操作標誌著從階段22到階段23的轉換。階段23 (Stage 23) 則負責將這些局部計數結果合併成最終的全局總數,這通常只涉及一個任務 (Task 23.1)。這種階段劃分和洗牌機制是Spark處理分佈式聚合操作的關鍵。
僅僅一個簡單的count函數就發生了許多事情。階段22正在各個執行器上進行記錄的局部計數。如果讀者回想一下,數據是以數據分區的形式移動到執行器上的。如果階段22已完成所有局部執行器上的計數,那麼階段23將生成一個洗牌查詢 (shuffle query),該查詢將指定一個執行器讀取局部計數並進行全局計數,以生成數據集中記錄的總數。所有Spark階段之間都有一個邏輯邊界,它們不會在前一個階段完成之前運行進程。一個階段需要其所有任務都完成才能被標記為完成。Spark具有強大的容錯能力,會重試任務,但最終可能會使某些任務失敗。如果一個或多個任務失敗,該階段的處理就會失敗。
讓玄貓仔細看看洗牌 (shuffles) 以及它們對Spark應用程式的意義。
洗牌
在Spark中,某些操作被認為是昂貴的。導致執行器之間數據移動的操作稱為洗牌 (shuffle),這是一種在執行器之間操作的寬轉換 (wide transformation)。有時讀者會需要並希望使用這些寬轉換。例如,玄貓的count函數在開發過程中很有用,當讀者想要驗證記錄計數時,但count函數應避免在生產過程中使用,因為它會增加額外的計算時間,從而減慢讀者的管線。
導致洗牌的一個有效用例是根據列中的值重新分區數據。預設情況下,Spark使用200個分區,因此它將從攝取的數據中創建200個獨立的分區。讓玄貓舉一個例子,讀者有一個名為Groups的列,其中包含值A、B、C、X、Y和Z。Spark將讀取數據檔案並將這些記錄放入200個不同的分區中,而不會考慮其內容。
從效能評估視角切入,深入剖析Spark應用程式的建構與執行機制後,我們清晰地看見,程式碼的簡潔性與底層運作的複雜性之間存在著顯著的權衡。SparkSession的建立僅是旅程的起點,真正的挑戰在於能否駕馭其後由「工作」、「階段」與「任務」構成的分散式運算邏輯。
本章節透過一個簡單的count函式,揭示了從單純的API呼叫到觸發「洗牌 (Shuffle)」這類昂貴「寬轉換」的隱藏成本,這正是初階與資深數據工程師的關鍵分野。許多開發者滿足於功能實現,卻忽略了其背後DAG的生成邏輯與數據移動的巨大代價,這將導致數據管線在規模化時遭遇難以預期的效能瓶頸,限制了專案的最終成就。
展望未來2-3年,隨著數據規模持續指數級增長,數據工程的價值將不僅體現在功能交付,更取決於資源利用效率。對「洗牌」機制的掌控能力,將成為區分數據工匠與數據架構師的分水嶺,它預示著從「被動接受框架預設」到「主動設計高效數據流」的思維躍遷。
玄貓認為,將Spark的執行細節內化為開發直覺,是從「能用」邁向「精通」的必經之路。唯有如此,才能在看似簡單的API呼叫中預見其對整體系統效能的深遠影響,進而打造出兼具彈性與韌性的高階數據解決方案。