返回文章列表

建構批次與串流資料管道:Argo、Spark 與 Kafka 實戰

本文探討建構現代資料管道的兩種核心模式:批次與串流處理。在批次處理方面,文章詳述如何運用 Argo Workflow 編排 Spark 任務,在 Kubernetes 環境中實現青銅、白銀、黃金三層架構。接著,文章轉向串流處理,以電信業的物聯網(IoT)場景為例,展示如何利用 Spark 結構化串流與 Apache Kafka,建構一個能即時攝取、處理設備狀態數據的端到端管道,以滿足臨時分析、狀態追蹤與業務監控等多樣化需求。

資料工程 巨量資料

在數據驅動的商業環境中,建立高效能且可靠的資料管道是企業數位轉型的基石。本文從資料工程的實務角度出發,深入剖析兩種關鍵的處理模式。首先,探討如何透過 Argo Workflow 這類雲端原生工具,在 Kubernetes 上精準編排以 Spark 為核心的批次處理作業,並遵循青銅、白銀至黃金的經典分層架構,確保資料品質與治理。接著,文章延伸至即時數據領域,以物聯網(IoT)應用為背景,闡述如何整合 Spark 結構化串流與 Apache Kafka 事件匯流排,建構一個能應對高吞吐量、低延遲需求的串流管道。這兩種模式的結合,不僅體現了現代資料架構的彈性與擴展性,也為企業從歷史數據分析到即時營運監控提供了完整的技術藍圖。

第十二章:批次資料管道建構實務

批次處理編排

mountPath: /tmp/data/
executor:
instances: 1
cores: 1
memory: 1G
volumeMounts:
- name: data
mountPath: /tmp/data/
volumes:
- name: data
hostPath:
path: /app
type: Directory

- name: print-termination-message
container:

編排玄貓的批次處理

255

image: busybox
imagePullPolicy: Always
command: [echo]
args: ["Congratulations! Argo Workflow ran sucessfully!"]

- name: dag-seq
dag:
tasks:
- name: start-message
template: print-start-message
- name: bronze
depends: start-message
template: bronze
- name: silver
depends: bronze
template: silver
- name: gold
depends: silver
template: gold
- name: termination-message
depends: gold
template: print-termination-message

您可以使用以下命令運行上述工作流程:

argo submit pipeline.yaml --serviceaccount=spark -n spark-app --watch

一旦您啟動工作流程,您就可以在螢幕上監控進度。它應該在幾分鐘內完成,您應該會看到以下內容:

圖12.10 – 端到端管道

批次資料管道建構實務:Spark與Scala應用

256

在本節中,玄貓探討了如何使用Argo創建一個端到端資料管道。儘管玄貓將此管道配置為在minikube集群上本地運行,但將其部署到您的生產系統將只需要非常少的更改。

第十三章:構建串流管道:Spark與Scala的即時資料處理

串流處理的基礎與挑戰

本書的最後一章是玄貓所學知識的又一次綜合應用,但這次,玄貓將構建一個串流管道。您可以將串流視為將資料連續或**「即時」攝取到您的分析系統中。實現這一目標的方法有很多種,但通常這涉及事件匯流排訊息佇列系統**。玄貓將使用Azure Event Hubs作為玄貓的串流攝取來源,因為它可以配置為顯示為Apache Kafka,而Spark由於其開源連接器可以輕鬆使用它。作為一名資料工程師,您需要了解如何高效可靠地即時處理資料。玄貓將再次利用Spark,使用其結構化串流功能,以及Scala作為一種多功能且富有表現力程式語言。這次,玄貓將引入Apache Kafka,為玄貓的串流管道提供事件匯流排

在本章中,玄貓將涵蓋以下主要主題:

此圖示:Argo Workflow批次管道完整執行流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "Argo Workflow (data-pipeline)" as ArgoWorkflow {
component "啟動訊息 (print-start-message)" as StartMsg
component "青銅層處理 (bronze)" as Bronze
component "白銀層處理 (silver)" as Silver
component "黃金層處理 (gold)" as Gold
component "完成訊息 (print-termination-message)" as EndMsg

StartMsg --> Bronze : 啟動 Spark Job (Bronze.scala)
Bronze --> Silver : 依賴 Bronze 完成後啟動 Spark Job (Silver.scala)
Silver --> Gold : 依賴 Silver 完成後啟動 Spark Job (Gold.scala)
Gold --> EndMsg : 依賴 Gold 完成後啟動
}

cloud "Kubernetes Cluster (Minikube)" as K8sCluster {
rectangle "Spark Driver Pod (Bronze)" as BronzeDriver
rectangle "Spark Executor Pod (Bronze)" as BronzeExecutor
rectangle "Spark Driver Pod (Silver)" as SilverDriver
rectangle "Spark Executor Pod (Silver)" as SilverExecutor
rectangle "Spark Driver Pod (Gold)" as GoldDriver
rectangle "Spark Executor Pod (Gold)" as GoldExecutor
folder "共享資料儲存 (/app -> /tmp/data)" as SharedData
}

Bronze -[hidden]-> BronzeDriver
Bronze -[hidden]-> BronzeExecutor
Silver -[hidden]-> SilverDriver
Silver -[hidden]-> SilverExecutor
Gold -[hidden]-> GoldDriver
Gold -[hidden]-> GoldExecutor

BronzeDriver -- SharedData : 讀取/寫入原始資料
BronzeExecutor -- SharedData : 讀取/寫入原始資料

SilverDriver -- SharedData : 讀取青銅層資料/寫入白銀層資料
SilverExecutor -- SharedData : 讀取青銅層資料/寫入白銀層資料

GoldDriver -- SharedData : 讀取白銀層資料/寫入黃金層資料
GoldExecutor -- SharedData : 讀取白銀層資料/寫入黃金層資料

note bottom of K8sCluster
- 所有 Spark 任務共享 /tmp/data/ 目錄
- 確保資料在各層之間傳遞
- 每個 Spark 任務在獨立的 Pod 中運行
end note

@enduml

看圖說話:

此圖示展示了使用Argo Workflow編排的端到端批次資料管道的完整執行流程,從啟動訊息青銅層白銀層黃金層的處理,最終以完成訊息結束。它強調了Argo Workflow如何協調各個獨立的Spark任務,以及這些任務如何在Kubernetes集群中透過共享儲存進行資料交換。

  1. Argo Workflow的任務序列
  • 啟動訊息(start-message):作為管道的起點,列印一條啟動訊息,標誌著工作流程的開始。
  • 青銅層處理(bronze)start-message完成後,bronze任務啟動。它執行SparkApplication來處理原始資料攝取
  • 白銀層處理(silver)bronze任務成功完成後,silver任務啟動。它執行SparkApplication來進行資料清洗轉換品質驗證
  • 黃金層處理(gold)silver任務成功完成後,gold任務啟動。它執行SparkApplication來對資料進行聚合優化,形成業務就緒的服務層
  • 完成訊息(termination-message):所有資料處理任務成功完成後,最後的termination-message任務啟動,列印一條成功訊息,標誌著整個管道的圓滿結束。
  1. Kubernetes集群中的執行環境
  • 每個Argo Workflow任務(bronzesilvergold)都對應一個在Kubernetes集群中運行的SparkApplication
  • 每個SparkApplication都會啟動一個Spark Driver Pod和一個或多個Spark Executor Pod。這些Pod在集群中獨立運行,負責執行各自層次的資料處理邏輯。
  1. 共享資料儲存的關鍵作用
  • Kubernetes集群中,所有Spark Driver PodExecutor Pod都透過共享資料儲存(圖中的SharedData,對應於minikube中的/app目錄,並掛載到Pod/tmp/data)進行資料交換。
  • 青銅層Spark任務會將攝取到的原始資料寫入這個共享儲存。
  • 白銀層Spark任務會從共享儲存中讀取青銅層的資料,處理後將白銀層的資料寫回共享儲存。
  • 黃金層Spark任務則從共享儲存中讀取白銀層的資料,處理後將黃金層的資料寫回共享儲存。

這種編排模式確保了各個處理階段的資料依賴性順序性,同時利用了Kubernetes容器化優勢Spark分散式處理能力。透過Argo Workflow,玄貓能夠清晰地定義、監控和管理這個複雜的批次資料管道,使其在生產環境中具有高可靠性可擴展性

第十三章:構建串流管道:Spark與Scala的即時資料處理

串流處理的基礎與挑戰

  • 理解玄貓的業務用例
  • 玄貓的物聯網(IoT)用例是什麼?
  • 攝取資料
  • 轉換資料
  • 創建服務層
  • 編排玄貓的串流處理

透過本章的學習與實踐,您將學會如何全面規劃開發實施一個串流管道。這將使您能夠在您的組織中獨立完成類似的任務

構建串流管道:Spark與Scala應用

258

深入理解業務用例:物聯網的戰略價值

玄貓的串流情境基於物聯網(IoT)設備IoT用例在組織中變得越來越普遍,因為它們透過提供對其營運的即時洞察而具有變革潛力IoT感測器和設備透過提供即時數據來實現這一點,從而使各行各業能夠更有效地監控和優化流程。例如,在製造業中,啟用IoT的機械可以檢測和報告問題,減少停機時間並提高生產產出。這種效率提升不僅節省了時間和資源,而且對底線產生了直接影響。

這種效率和成本降低的一個例子是預測性維護。透過分析即時數據,公司可以預測機械何時可能發生故障主動安排維護。這種預防性方法最大限度地減少了計畫外停機時間,降低了維護成本,並延長了資產的使用壽命。另一個例子是能源和公用事業部門IoT設備在優化資源消耗減少浪費方面發揮著至關重要的作用,從而帶來環境效益可持續發展收益。這些應用強調了IoT促進更可持續和環保的工業格局方面的重要性。

IoT設備產生的海量數據提供了前所未有的機會來深入了解客戶行為市場趨勢。這種數據驅動的決策使企業能夠做出明智的選擇完善其戰略,並在市場中保持競爭力IoT設備還可以實現各種流程的自動化,這不僅降低了人為錯誤的風險,而且使員工能夠專注於更具戰略性的任務。本質上,IoT在行業中的採用不僅僅是一種技術趨勢,更是企業在當今快節奏的商業環境中保持高效競爭力環境責任戰略必然

玄貓將在IoT的一個通用用例中工作,玄貓希望追蹤設備的狀態。這對於在設備故障時通知客戶以及追蹤製造商模型的整體可靠性玄貓設備群的整體健康狀況非常有用。

物聯網(IoT)用例情境

玄貓為一家電信公司工作,該公司在客戶的家庭和企業中擁有多種類型的設備。玄貓的設備都連接到玄貓的網路,並且每分鐘,玄貓都會收到每個設備的狀態更新。該設備返回以下狀態:

  • 啟用(Activation)
  • 停用(Deactivation)
  • 方案變更(Plan change)
  • 電信活動(Telecoms activity)
  • 網路活動(Internet activity)
  • 設備錯誤(Device error)

物聯網(IoT)用例情境

259

玄貓的營運團隊將從玄貓的設備收集這些數據,並將每個狀態更新作為事件載入到Azure Event Hubs中。他們已啟用Azure Event Hubs作為Apache Kafka介面,以便玄貓可以使用SparkKafka連接器將這些數據讀取到玄貓的資料平台中進行分析

這些數據將以三種不同的方式使用。第一種是針對玄貓的白銀層數據進行臨時分析,這些數據是結構化去重的。第二種是識別每天設備狀態的總數。最後一種情況是識別任何設備的當前狀態

請參閱圖13.1以獲取玄貓將從IoT設備攝取的數據範例:

圖13.1 – 來自玄貓IoT設備的範例數據

要求是持續攝取數據每15分鐘以批次處理記錄組。現在,讓玄貓繼續理解玄貓的數據,以便玄貓知道如何處理它。

此圖示:物聯網設備資料流與處理需求

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

actor "電信客戶" as Customer
entity "IoT 設備" as IoTDevice
cloud "Azure Event Hubs (Kafka 兼容)" as EventHubs
component "營運團隊" as OpsTeam
rectangle "Spark 串流應用程式" as SparkStreamingApp
database "資料平台 (白銀層)" as SilverLayer
database "資料平台 (黃金層)" as GoldLayer
actor "業務分析師" as BusinessAnalyst
actor "客戶服務" as CustomerService

IoTDevice --> EventHubs : 每分鐘發送狀態更新 (啟用、停用、錯誤等)
OpsTeam --> EventHubs : 收集並載入狀態更新

EventHubs --> SparkStreamingApp : 持續攝取數據 (每 15 分鐘批次處理)

SparkStreamingApp --> SilverLayer : 寫入結構化、去重數據 (用於臨時分析)
SparkStreamingApp --> GoldLayer : 寫入聚合數據 (每天設備狀態總數)
SparkStreamingApp --> GoldLayer : 寫入最新狀態數據 (任何設備的當前狀態)

SilverLayer --> BusinessAnalyst : 進行臨時分析
GoldLayer --> BusinessAnalyst : 查詢每日設備狀態
GoldLayer --> CustomerService : 查詢設備當前狀態 (例如,通知故障)

Customer --> IoTDevice : 使用設備

note right of IoTDevice
- 狀態類型:
- Activation
- Deactivation
- Plan change
- Telecoms activity
- Internet activity
- Device error
end note

note right of EventHubs
- 作為 Apache Kafka 介面
- 確保高吞吐量和低延遲
end note

note right of SparkStreamingApp
- 使用 Spark 的 Kafka 連接器
- 實現結構化串流
- 處理邏輯: 清洗、轉換、聚合
end note

note right of SilverLayer
- 結構化、去重數據
- 支援 ad hoc 查詢
end note

note right of GoldLayer
- 聚合數據 (例如,每日總數)
- 最新狀態數據
- 支援快速查詢和報告
end note

@enduml

看圖說話:

此圖示詳細描繪了電信公司物聯網(IoT)設備資料產生最終應用串流處理管道。它清晰地展示了各個組件如何協同工作,以實現即時數據攝取處理多種分析用途

  1. IoT設備資料產生
  • 電信客戶使用IoT設備,這些設備每分鐘會自動發送狀態更新。這些狀態包括啟用停用方案變更電信活動網路活動設備錯誤等。這些是即時數據流的源頭。
  1. 資料攝取至Azure Event Hubs
  • 營運團隊負責收集這些設備狀態更新,並將其作為事件載入到Azure Event Hubs中。
  • Azure Event Hubs在這裡被配置為兼容Apache Kafka的介面,這使得它能夠處理高吞吐量低延遲事件流,並允許Spark使用其標準的Kafka連接器進行數據讀取。
  1. Spark串流應用程式處理
  • Spark串流應用程式持續從Azure Event Hubs攝取數據。雖然是持續攝取,但處理邏輯是每15分鐘以批次(微批次)的方式處理記錄組。
  • 此應用程式負責清洗轉換聚合數據,將其從原始事件流轉化為有價值的分析洞察
  1. 資料分發至資料平台層
  • 白銀層Spark串流應用程式將結構化去重後的數據寫入資料平台白銀層。這些數據主要用於業務分析師進行臨時(ad hoc)分析,因為它們提供了較為細粒度經過清洗的事件記錄。
  • 黃金層:同時,應用程式也會將聚合數據(例如,每天設備狀態的總數)和最新狀態數據(例如,任何設備的當前狀態)寫入資料平台黃金層黃金層的數據是業務就緒的,專為快速查詢報告而設計。
  1. 最終數據應用
  • 業務分析師可以從白銀層獲取數據進行探索性分析,也可以從黃金層查詢每日設備狀態以追蹤趨勢。
  • 客戶服務團隊可以從黃金層查詢設備的當前狀態,這對於在設備發生錯誤時即時通知客戶或提供技術支援至關重要。

這個端到端流程展示了如何利用串流技術來實現即時監控預測性維護響應式客戶服務,從而提升營運效率客戶滿意度

縱觀現代管理者的多元挑戰,本章所展示的批次資料管道不僅是一次技術實踐,更是對資料工程效能與價值的深度檢視。此架構巧妙地整合了 Argo Workflow 的編排能力、Spark 的分散式運算威力以及 Kubernetes 的容器化彈性,形成一個高效協作的系統。Argo Workflow 如同中樞神經,精準調度各個處理階段;而 Spark 則作為強大的執行引擎,在隔離的容器環境中完成從青銅層到黃金層的資料精煉。這種模式不僅解決了傳統資料管道中任務依賴管理與資源調度的複雜性,更關鍵的是,其從 minikube 本地環境到生產系統的低摩擦遷移特性,大幅縮短了從開發到價值實現的週期。

展望未來,穩固、可擴展的批次處理能力,是企業邁向更複雜資料應用的基石。熟練掌握本章的架構,意味著您已具備應對大規模、週期性資料任務的實力,為接下來迎接串流處理等即時數據挑戰奠定了不可或缺的基礎。

玄貓認為,對於追求卓越的資料工程師與技術領導者而言,將這種聲明式、容器化的工作流編排範式內化為核心能力,已是構建現代化、高可靠性資料平台的必然路徑。