返回文章列表

使用 Spark 與 Argo Workflow 建構批次資料管道黃金層

本文闡述批次資料管道的建構實務,聚焦於服務層(黃金層)的設計與實現。內容涵蓋如何運用 Spark 將白銀層的精煉資料進行分組與聚合,轉換為可直接支援商業分析的業務指標。此外,文章詳細介紹如何利用 Argo Workflow 在 Kubernetes 環境中編排包含青銅、白銀至黃金層的完整資料處理流程,並說明如何透過持久化儲存機制,確保各獨立容器化步驟間的資料共享與沿襲,最終建成一個可靠、自動化的資料湖倉一體架構。

資料工程 資料架構

在現代資料工程中,湖倉一體架構透過分層設計確保資料品質。本文接續白銀層的資料驗證,深入探討黃金層的建構方法。黃金層作為資料管道的最終服務層,其核心任務是將技術性事件資料,透過商業邏輯進行聚合與塑模,轉化為高價值、業務就緒的聚合指標。此過程是實現資料驅動決策的關鍵樞紐,為後續商業智慧分析提供穩定數據基礎。文章亦將展示如何透過工作流程編排工具,將這些分散的處理步驟整合成一個自動化、可維運的批次資料管道。

第十二章:批次資料管道建構實務

資料品質驗證

Array("Download Content", "Event Registration", "Survey Response"),
_ >= 0.5
)
.isComplete("country_name")
.hasCompleteness("product_group", _ >= .9)
)
.run()

然後玄貓如下調用runIfSuccess

DeequChecks(verificationResult, spark)
.runIfSuccess { writeDelta(true, silverConversionEvents) }

上述程式碼將在控制台列印以下內容,這證實了所有約束確實都已滿足:

圖12.9 – 作為DataFrame的檢查結果

現在玄貓的資料是可信的,並且玄貓已經將資料品質規則應用於玄貓的白銀層處理,讓玄貓來介紹準備和寫入服務層黃金層

構建服務層(黃金層)

玄貓管道的最後一步將是寫入玄貓的服務層的過程,在湖倉一體資料架構中稱為黃金層黃金層代表精煉高品質資料的最終目的地,這些資料被儲存並可用於各種分析報告商業智慧目的。在此層中,資料經過一系列關鍵轉換,以確保其可靠性可訪問性,並充分準備好支持組織內的資料驅動決策

玄貓將專注於建模玄貓的資料以適應本章前面討論的實體關係圖(ERD)。請參閱圖12.7以獲取完整模型。玄貓黃金層程式碼的主要任務是獲取玄貓白銀層轉換事件資料並將其轉換為轉換事件表。讓玄貓看看將執行此轉換的程式碼:

val silverConversionEvents: DataFrame =
spark.read.format("delta").load(silverCE)

批次資料管道建構實務:Spark與Scala應用

250

玄貓要做的第一件事是將玄貓的白銀層轉換事件資料載入到名為silverConversionEventsDataFrame中。接下來,玄貓將使用它將此程式碼轉換為玄貓的黃金層服務表

val goldConversionEvents: DataFrame =
silverConversionEvents
.groupBy(
col("country_id"),
col("campaign_id"),
col("conversion_event_id"),
col("date")
)
.agg(
count(col("conversion_event")).alias("event_count")
)

在上述程式碼中,silverConversionEvents DataFrame轉換聚合。它根據特定的列進行分組country_idcampaign_idconversion_event_iddategroupBy函數計算每個組中轉換事件的發生次數,結果被別名為event_count。這意味著生成的DataFramegoldConversionEvents,將包含指定列的每個唯一組合的事件計數,如下所示:

goldConversionEvents.write
.format("delta")
.mode("overwrite")
.save(s"${target}conversion_events")

最後,程式碼使用Delta格式將goldConversionEvents DataFrame寫入黃金層。它將寫入模式指定為overwrite,表示conversion_events Delta Lake表(在黃金層中)中的任何現有資料都應該被新資料替換。

save操作指定了黃金層的目標位置,即${target}conversion_events${target}表示儲存黃金層資料的路徑或目錄。

由於玄貓使用的是Delta,玄貓的資料湖可以用作湖倉一體。如果需要,資料可以複製到組織選擇的資料倉儲中。

接下來,玄貓將展示如何使用Argo來編排此管道

編排玄貓的批次處理

251

批次處理編排

現在玄貓已經編寫了玄貓的轉換,是時候構建玄貓的管道了。對於這個範例,玄貓將使用玄貓的minikube實例,該實例是作為**第10章「安裝Argo工作流程」**的一部分設置的。

玄貓的工作流程將首先列印一條起始訊息,然後是青銅層白銀層黃金層轉換,最後是一條管道完成訊息。這裡需要注意的一點是,所有這些步驟都將作為單獨的容器運行。這意味著由青銅層寫入的資料,玄貓需要使用持久化儲存

此圖示:黃金層資料聚合與寫入流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

database "白銀層 (Delta Lake)" as SilverLayer
rectangle "Spark 應用程式 (黃金層)" as GoldApp
database "黃金層 (Delta Lake)" as GoldLayer
actor "業務分析師/BI工具" as BusinessAnalyst

SilverLayer --> GoldApp : 讀取 silverConversionEvents DataFrame
note right of GoldApp
// val silverConversionEvents: DataFrame =
// spark.read.format("delta").load(silverCE)
end note

GoldApp --> GoldApp : 執行資料分組與聚合
note right of GoldApp
// val goldConversionEvents: DataFrame =
// silverConversionEvents
// .groupBy(
// col("country_id"),
// col("campaign_id"),
// col("conversion_event_id"),
// col("date")
// )
// .agg(
// count(col("conversion_event")).alias("event_count")
// )
end note

GoldApp --> GoldLayer : 寫入 goldConversionEvents DataFrame (覆蓋模式)
note right of GoldLayer
- 儲存聚合後的轉換事件計數
- 結構化、業務就緒
- 支援快速分析查詢
- 可作為數據倉儲的來源
end note

GoldLayer --> BusinessAnalyst : 提供數據用於分析、報告和BI
@enduml

看圖說話:

此圖示清晰地展示了批次資料管道黃金層資料處理流程,其核心任務是將白銀層精煉資料進一步聚合優化,以構建一個業務就緒服務層

整個流程如下:

  1. 從白銀層讀取資料
  • Spark應用程式首先從白銀層讀取silverConversionEvents DataFrame。這些資料已經過清洗轉換並與維度資料連接,並通過了資料品質檢查
  1. 資料分組與聚合
  • 這是黃金層的核心轉換步驟。Spark應用程式會對silverConversionEvents DataFrame進行groupBy操作,根據country_id(國家ID)、campaign_id(活動ID)、conversion_event_id(轉換事件ID)和date(日期)等關鍵維度進行分組。
  • 分組後,會使用agg函數對每個組內的conversion_event進行count聚合,計算出每個特定國家特定活動特定轉換事件類型特定日期發生的總次數,並將結果命名為event_count。這個步驟將細粒度的事件資料轉換為聚合的、可直接用於分析的指標。
  1. 寫入黃金層
  • 聚合後的DataFrame,即goldConversionEvents,隨後會被寫入黃金層Delta Lake表。
  • 寫入模式通常設定為overwrite,這表示每次運行時都會用最新的聚合結果替換舊資料,確保黃金層資料的即時性準確性
  • 黃金層的資料以Delta Lake格式儲存,繼承了其ACID特性,並可作為資料湖倉一體的一部分。
  1. 提供給業務分析師/BI工具
  • 黃金層的資料是高度結構化聚合業務就緒的。它可以直接提供給業務分析師商業智慧(BI)工具報告系統使用。
  • 透過黃金層業務用戶可以快速執行複雜的分析查詢、生成儀表板,並基於這些可靠的數據做出關鍵的業務決策,而無需關心底層的資料清洗轉換細節

總之,黃金層是整個資料管道終端服務層,它將精煉後的資料轉化為可直接消費的業務指標,極大地提升了資料的價值可用性

第十二章:批次資料管道建構實務

批次處理編排

持久化儲存。有幾種方法可以實現,但對於玄貓的範例,玄貓將使用hostPath,這是一種Kubernetes支持的持久卷類型。但玄貓的資料不是在minikube容器中,而是在主機機器上。因此,玄貓需要使所需的資料集可用,以便Spark在運行時可以找到它們。

要找到minikube容器,請運行docker ps -f name=minikube。使用此ID,您可以將主機機器上的任何檔案或目錄複製到minikube容器中。對於玄貓的範例,玄貓將chapter12/data中的檔案複製到容器內的/app目錄。為此,請在終端機中進入chapter12/data目錄並運行以下命令:

docker cp ./ <containerid>:/app/

檔案複製後,您可以將/app/掛載到驅動程式執行器Pod,如下面的工作流程所示:

kind: Workflow #k8s resource kind
metadata:
generateName: data-pipeline-
namespace: spark-app
spec:
entrypoint: dag-seq
templates:
- name: print-start-message
container:
image: busybox
imagePullPolicy: Always
command: [echo]
args: ["Starting Argo Workflow!"]

- name: bronze
resource:
action: create
successCondition: status.applicationState.state = COMPLETED
failureCondition: status.applicationState.state = FAILED
manifest: |

批次資料管道建構實務:Spark與Scala應用

252

kind: SparkApplication
metadata:
generateName: bronze-
namespace: spark-app
spec:
arguments:
- "true"
- ""
- ""
- "2023-08-01"
- "2023-08-02"
timeToLiveSeconds: 3600
type: Scala
mode: cluster
image: rupambhattacharjee/de-with-scala:latest
imagePullPolicy: Always
mainClass: com.packt.dewithscala.chapter12.Bronze
mainApplicationFile: "local://///app/de-with-scala-assembly-1.0.jar"
sparkConf:
spark.kubernetes.authenticate.driver.serviceAccountName: spark
sparkVersion: 3.1.1
driver:
memory: 1G
volumeMounts:
- name: data
mountPath: /tmp/data/
executor:
instances: 1
cores: 1
memory: 1G
volumeMounts:
- name: data
mountPath: /tmp/data/
volumes:
- name: data
hostPath:
path: /app
type: Directory

- name: silver
resource:
action: create
successCondition: status.applicationState.state = COMPLETED

編排玄貓的批次處理

253

failureCondition: status.applicationState.state = FAILED
manifest: |
kind: SparkApplication
metadata:
generateName: "silver-"
namespace: spark-app
spec:
arguments:
- "/tmp/data/"
timeToLiveSeconds: 3600
type: Scala
mode: cluster
image: rupambhattacharjee/de-with-scala:latest
imagePullPolicy: Always
mainClass: com.packt.dewithscala.chapter12.Silver
mainApplicationFile: "local://///app/de-with-scala-assembly-1.0.jar"
sparkConf:
spark.kubernetes.authenticate.driver.serviceAccountName: spark
sparkVersion: 3.1.1
driver:
memory: 1G
volumeMounts:
- name: data
mountPath: /tmp/data/
executor:
instances: 1
cores: 1
memory: 1G
volumeMounts:
- name: data
mountPath: /tmp/data/
volumes:
- name: data
hostPath:
path: /app
type: Directory

- name: gold
resource:
action: create
successCondition: status.applicationState.state = COMPLETED
failureCondition: status.applicationState.state = FAILED

批次資料管道建構實務:Spark與Scala應用

254

manifest: |
kind: SparkApplication
metadata:
generateName: "gold-"
namespace: spark-app
spec:
arguments:
- "/tmp/data/"
timeToLiveSeconds: 3600
type: Scala
mode: cluster
image: rupambhattacharjee/de-with-scala:latest
imagePullPolicy: Always
mainClass: com.packt.dewithscala.chapter12.Gold
mainApplicationFile: "local://///app/de-with-scala-assembly-1.0.jar"
sparkConf:
spark.kubernetes.authenticate.driver.serviceAccountName: spark
sparkVersion: 3.1.1
volumes:
- name: task-pv-storage
persistentVolumeClaim:
claimName: pvc01
driver:
memory: 1G
volumeMounts:
- name: data

此圖示:Argo Workflow批次資料管道編排

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "Argo Workflow (data-pipeline)" as ArgoWorkflow {
component "Step 1: 啟動訊息" as StartMessage
component "Step 2: 青銅層處理" as BronzeProcess
component "Step 3: 白銀層處理" as SilverProcess
component "Step 4: 黃金層處理" as GoldProcess
component "Step 5: 管道完成訊息" as EndMessage

StartMessage --> BronzeProcess
BronzeProcess --> SilverProcess
SilverProcess --> GoldProcess
GoldProcess --> EndMessage
}

cloud "Minikube 環境" as MinikubeEnv {
folder "主機 /app 目錄" as HostAppDir
folder "容器 /tmp/data/ 目錄" as ContainerDataDir
component "Spark Driver Pod" as DriverPod
component "Spark Executor Pod" as ExecutorPod
database "持久卷 (Persistent Volume)" as PV
database "持久卷聲明 (Persistent Volume Claim)" as PVC
}

HostAppDir --> ContainerDataDir : docker cp (資料複製)
ContainerDataDir --> DriverPod : hostPath 掛載 /tmp/data/
ContainerDataDir --> ExecutorPod : hostPath 掛載 /tmp/data/

BronzeProcess --> DriverPod : 啟動 SparkApplication (Bronze)
BronzeProcess --> ExecutorPod : 啟動 SparkApplication (Bronze)

SilverProcess --> DriverPod : 啟動 SparkApplication (Silver)
SilverProcess --> ExecutorPod : 啟動 SparkApplication (Silver)

GoldProcess --> DriverPod : 啟動 SparkApplication (Gold)
GoldProcess --> ExecutorPod : 啟動 SparkApplication (Gold)

PV <--> PVC : 儲存共享數據 (Gold層可能使用)

note bottom of ArgoWorkflow
- 每個步驟作為獨立容器運行
- 數據在各層間通過持久化儲存共享
- 確保數據沿襲和可追溯性
end note

@enduml

看圖說話:

此圖示展示了如何使用Argo Workflow來編排一個批次資料管道,該管道包含了青銅層白銀層黃金層的處理步驟。每個步驟都作為一個獨立的Kubernetes容器運行,並透過持久化儲存來共享資料,以確保資料的沿襲可追溯性

  1. Argo Workflow的整體流程
  • 整個管道由Argo Workflow定義,其入口點是dag-seq,表示這是一個有向無環圖(DAG)的順序執行。
  • 啟動訊息:管道首先會列印一條啟動訊息,標誌著工作流程的開始。
  • 青銅層處理:接著執行青銅層的Spark應用程式,負責原始資料的攝取初步儲存
  • 白銀層處理:青銅層完成後,白銀層的Spark應用程式啟動,進行資料清洗轉換品質驗證
  • 黃金層處理:白銀層處理完畢後,黃金層的Spark應用程式執行,將資料聚合優化業務就緒的格式
  • 管道完成訊息:最後,管道會列印一條完成訊息,標誌著整個批次處理的結束。
  1. Minikube環境與資料共享
  • 由於每個Spark應用程式(青銅、白銀、黃金層)都作為獨立的容器運行,它們之間需要一種機制來共享資料。
  • 資料複製:首先,透過docker cp命令將主機機器上的chapter12/data目錄中的資料複製到minikube容器內的/app目錄。
  • hostPath掛載:然後,KuberneteshostPath類型的PersistentVolume被用來將minikube容器內的/app目錄掛載到Spark Driver PodExecutor Pod/tmp/data/路徑。這使得所有Spark任務都能夠訪問相同的共享資料
  • 持久卷(PV)和持久卷聲明(PVC):在黃金層的配置中,可以看到PersistentVolumeClaimpvc01)的使用,這表示黃金層的輸出可能需要更持久可靠的儲存,而不是簡單的hostPath,這通常用於生產環境中的數據湖數據倉儲
  1. Spark應用程式的部署
  • 每個處理步驟(青銅、白銀、黃金)都配置為一個SparkApplication資源,這是一種Kubernetes自定義資源,用於在Kubernetes集群上運行Spark作業。
  • 每個SparkApplication都指定了主類com.packt.dewithscala.chapter12.BronzeSilverGold)、應用程式檔案路徑Spark配置以及驅動程式執行器的資源限制。
  • volumeMounts配置確保了Spark應用程式在運行時能夠訪問掛載的/tmp/data/目錄,從而讀取和寫入資料。

透過這種編排方式Argo Workflow提供了一個強大彈性可擴展的平台,用於管理和執行複雜的批次資料管道,確保了各個處理階段的順序性資料共享錯誤處理能力。

第十二章結論

檢視此批次資料管道從品質驗證到服務層建構的完整實踐後,其核心成就,在於將 Deequ 品質檢核、Spark 聚合處理、Delta Lake 可靠儲存及 Argo 容器化編排等獨立技術,整合成一個連貫且自動化的價值鏈。本章不僅展示了黃金層如何將精煉資料轉化為商業洞察,更透過 hostPathPVC 等具體配置,解決了容器化工作流程中最棘手的資料持久化與共享難題,清晰地展現了從架構藍圖到落地執行的完整路徑。

展望未來,這種以 Kubernetes 為基礎、以宣告式 YAML 定義的資料管道編排方式,將成為建構可擴展、高韌性資料平台的主流典範。掌握此整合技術堆疊,已不再是單純的工程技能,而是驅動企業實現資料資產化、提升決策品質的核心能力。

玄貓認為,本章所演示的從青銅到黃金的模組化、可驗證的實踐範例,正是當代資料工程師在追求高效能與高可靠性雙重目標時,所能依循的最佳實踐路徑。