返回文章列表

解析Argo與Databricks工作流程的資料管道編排策略

本文深入探討現代資料管道的兩大主流編排工具:Argo Workflows 與 Databricks Workflows。Argo Workflows 作為 Kubernetes 原生工作流程引擎,以其聲明式 YAML 配置與 CronWorkflow 排程功能,提供靈活且自動化的任務管理。另一方面,Databricks Workflows 作為完全託管的雲端服務,無縫整合 Databricks 生態系,簡化了從本地開發、打包 JAR 到雲端部署的 Spark 任務編排流程。文章比較了這兩種工具在架構、操作與應用場景上的核心差異,為資料工程師選擇合適的編排方案提供理論依據。

資料工程 平台架構

在資料密集型應用中,自動化且可靠的資料管道是確保數據流暢通的關鍵。工作流程編排工具在此扮演核心角色,負責調度、執行與監控複雜的數據處理任務。本文聚焦於兩種截然不同的編排哲學。首先是開源的 Argo Workflows,它深度整合 Kubernetes,透過聲明式組態檔定義有向無環圖(DAG)任務,實現了雲原生環境下的高度客製化與可擴展性。接著,我們將檢視 Databricks Workflows,這是一個整合在 Databricks 平台內的託管服務,它將編排、運算與儲存緊密結合,提供一站式解決方案,特別適用於以 Spark 為核心的數據分析與機器學習工作負載。透過對比這兩種工具的實現方式與架構設計,能更深入理解不同業務場景下的技術選型考量。

第十章:資料管道編排

使用Argo Workflows

使用Argo Workflows

211

namespace: spark-app
spec:
timeToLiveSeconds: 3600
type: Scala
mode: cluster
image: apache/spark:v3.3.1
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local://///opt/spark/examples/jars/spark-examples_2.12-3.3.1.jar"
sparkConf:
spark.kubernetes.authenticate.driver.serviceAccountName: spark
sparkVersion: 3.0.0
driver:
memory: 1G
executor:
instances: 1
cores: 1
memory: 1G
- name: print-termination-message
container:
image: busybox
imagePullPolicy: IfNotPresent
command: [echo]
args: ["Congratulations! Argo Workflow ran successfully!"]
- name: dag-seq
dag:
tasks:
- name: start-message #任務名稱
template: print-start-message #要使用的模板名稱
- name: launch-spark-job
depends: start-message #launch-spark-job 在 start-message 之後啟動
template: calculate-pi
- name: termination-message
depends: launch-spark-job
template: print-termination-message

資料管道編排

212

玄貓可以將上述內容保存到一個檔案中,然後使用argo submit啟動工作流程,如下所示:

argo submit my-first-workflow.yaml --serviceaccount=spark -n spark-app --watch

請注意,玄貓使用的是玄貓在上一節中創建的spark服務帳戶。如果沒有具有適當角色的服務帳戶Spark應用程式將無法運行。玄貓還使用了watch標誌,它將跟踪進度並將其列印在螢幕上。一旦工作流程完成,它應該看起來像圖10.4

圖10.4 – Argo工作流程狀態

Argo還提供了CronWorkflow,它提供了按預設排程運行工作流程的能力。例如,如果玄貓想將工作流程排程為每小時運行一次,玄貓可以創建一個cron工作流程,如下所示:

kind: CronWorkflow #k8s resource kind
metadata:
generateName: example-cron-workflow- #避免名稱衝突
namespace: spark-app #此工作流程將運行的k8s命名空間
spec:
schedule: "0 * * * *"
timeZone: Etc/UTC
workflowSpec: #workflow spec
entrypoint: dag-seq #工作流程將從dag-seq開始
templates:
- name: print-start-message
container:
image: busybox
imagePullPolicy: IfNotPresent
command: [echo]
args: ["Starting Argo Workflow!"]
#工作流程的其餘部分

使用Databricks Workflows

213

請注意,工作流程.spec部分在CronWorkflow中保持不變地進入.workflowSpec部分。

要創建CronWorkflow實例,請使用cron create而不是submit,如以下範例所示:

argo cron create my-first-cron-workflow.yaml --serviceaccount=spark -n spark-app

輸出將類似於:

Name: example-cron-workflow-n7wgx
Namespace: spark-app
Created: Fri Aug 18 22:31:18 +0530 (now)
Schedule: 0 * * * *
Suspended: false
Timezone: Etc/UTC
NextScheduledTime: Fri Aug 18 23:30:00 +0530 (58 minutes from now) (assumes workflow-controller is in UTC)

在本節中,玄貓研究了Argo Workflows,如何在本地安裝和配置它,它的各種組件,以及如何使用工作流程運行Spark應用程式。最後,玄貓研究了創建Cron工作流程,它允許玄貓按照指定的排程觸發工作流程Argo Workflows聲明性使其易於開發人員快速學習功能並實施更改,而無需編寫大量程式碼。

在下一節中,玄貓將研究Databricks Workflows,這不是一個開源工具,並且適用於在Databricks上運行的應用程式。鑑於Databricks在這個領域的巨大受歡迎程度,玄貓想快速介紹一下它的編排功能

使用Databricks Workflows

Databricks Workflows是一個完全託管的雲端編排服務,適用於所有Databricks客戶。它簡化了以下類型任務的管道編排創建:

此圖示:Argo Workflows 任務編排與排程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

actor "資料工程師" as DataEngineer

package "Argo Workflows 核心" as ArgoCore {
rectangle "Workflow 定義 (YAML)" as WorkflowDef
rectangle "CronWorkflow 定義 (YAML)" as CronWorkflowDef
rectangle "Argo CLI" as ArgoCLI
rectangle "Argo Server & Controller" as ArgoServer
}

package "任務執行與監控" as TaskExecution {
rectangle "啟動訊息 (Container)" as StartMessage
rectangle "Spark Pi 作業 (Resource)" as SparkPiJob
rectangle "終止訊息 (Container)" as EndMessage
rectangle "DAG 任務依賴" as DAGTasks
rectangle "Workflow 狀態監控" as WorkflowStatus
}

package "排程機制" as Scheduling {
rectangle "預設排程 (Cron)" as CronSchedule
}

DataEngineer --> WorkflowDef : 編寫 Workflow YAML
DataEngineer --> CronWorkflowDef : 編寫 CronWorkflow YAML
DataEngineer --> ArgoCLI : 使用 Argo CLI 提交/創建

ArgoCLI --> WorkflowDef : argo submit
ArgoCLI --> CronWorkflowDef : argo cron create

WorkflowDef --> ArgoServer : Workflow 提交至 Argo Server
CronWorkflowDef --> ArgoServer : CronWorkflow 提交至 Argo Server

ArgoServer --> StartMessage : 執行啟動訊息
StartMessage --> DAGTasks : 啟動訊息完成後
DAGTasks --> SparkPiJob : 根據 DAG 依賴執行 Spark Pi 作業
SparkPiJob --> EndMessage : Spark Pi 作業完成後
EndMessage --> WorkflowStatus : 終止訊息完成後更新 Workflow 狀態

CronSchedule --> ArgoServer : 定期觸發 CronWorkflow

note right of WorkflowDef
- 定義一系列有序任務
- 包含 Container, Resource 模板
- 使用 DAG 模板定義任務依賴
end note

note right of CronWorkflowDef
- 在 WorkflowDef 基礎上增加排程 (schedule, timeZone)
- 實現定期自動執行
end note

note right of ArgoCLI
- `argo submit` 執行一次性 Workflow
- `argo cron create` 創建排程 Workflow
end note

note right of WorkflowStatus
- 透過 `argo submit --watch` 實時監控
- 顯示 Workflow 執行進度與最終狀態
end note

note right of DAGTasks
- `start-message` -> `launch-spark-job` -> `termination-message`
- 確保任務按序執行
end note

@enduml

看圖說話:

此圖示清晰地展示了Argo Workflows任務編排排程方面的核心功能。整個流程圍繞著資料工程師如何定義、提交和監控工作流程。首先,資料工程師會編寫Workflow定義CronWorkflow定義,這些都是以YAML格式呈現的聲明式配置。這些定義包含了工作流程中的各個任務(例如啟動訊息、運行Spark作業、終止訊息),以及這些任務之間的依賴關係,通常透過DAG模板來實現。

Argo CLI資料工程師Argo Workflows互動的主要介面。透過argo submit命令,資料工程師可以提交一個一次性執行工作流程。而透過argo cron create命令,則可以創建一個排程工作流程(CronWorkflow),使其按照預設的排程機制(例如每小時執行一次)自動運行。

無論是一次性工作流程還是排程工作流程,它們都會被提交到Argo Server & Controller進行處理。Argo Server會根據工作流程定義中的DAG任務依賴,依次執行各個任務。例如,先執行啟動訊息(一個Container任務),然後根據依賴關係觸發Spark Pi作業(一個Resource任務),最後再執行終止訊息(另一個Container任務)。在整個執行過程中,資料工程師可以透過argo submit --watch等命令實時監控工作流程的狀態,了解每個任務的進度和最終結果。CronWorkflow則由排程機制定期觸發,實現了自動化的資料管道運行。這種聲明式自動化的特性使得Argo Workflows成為Kubernetes環境下強大的任務編排工具

第十章:資料管道編排

使用Databricks Workflows

  • Databricks筆記本
  • Python腳本/Wheel
  • JAR
  • Spark Submit
  • Databricks SQL – 儀表板、查詢、警報或檔案
  • Delta Live Table管道
  • dbt

玄貓將專注於使用spark submit任務來運行一個Scala JAR。玄貓必須做的第一件事是創建一個組裝胖JAR,它將包含玄貓專案中的所有依賴項。

資料管道編排

214

為此,玄貓將以下程式碼添加到玄貓的build.sbt檔案中:

assemblyJarName in assembly := "de-with-scala-assembly-1.0.jar"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}

第一行是指定要創建的.jar檔案的名稱。下一個區塊將提供一個合併策略來管理玄貓依賴項中的重複斷言。將上述程式碼添加到玄貓的專案後,玄貓可以透過添加sbt-assembly外掛來創建.jar檔案:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")

它看起來會像這樣:

圖10.5 – 包含assembly.sbt的資料夾結構

現在玄貓已經將sbt-assembly添加到玄貓的專案中,玄貓可以使用以下程式碼運行它:

sbt reload
sbt assembly

使用Databricks Workflows

215

上述命令將在玄貓主專案的target目錄中創建.jar檔案:

圖10.6 – .jar檔案的本地位置

DatabricksData Explorer中,點擊+Add按鈕並將.jar檔案上傳到Databricks File System (DBFS)中的一個位置。玄貓需要將.jar檔案添加到DBFS,以便玄貓可以從玄貓的工作流程任務中調用它,如下所示:

圖10.7 – Databricks Data Explorer DBFS上傳對話框

檔案上傳後,請務必記下檔案的DBFS路徑。這是玄貓範例中的路徑:dbfs:/FileStore/erictome/de_with_scala_assembly_1_0.jar

資料管道編排

216

現在,玄貓將點擊Databricks UI中的Workflows,然後選擇Create Job。將為玄貓創建一個作業和一個未命名任務。透過編輯它,玄貓選擇Spark Submit作為類型,選擇一個作業集群,並在參數中輸入類似以下內容:

["--class","com.packt.dewithscala.chapter6.SparkTransformations","dbfs:/FileStore/erictome/de_with_scala_assembly_1_0.jar"]

第一個參數指定玄貓將使用一個類別。下一個是要執行的類別。最後,玄貓會注意到玄貓使用的是玄貓之前上傳的JARDBFS位置。請參閱以下螢幕截圖,了解它在UI中的樣子:

圖10.8 – Spark Submit任務範例

請務必點擊Save Task,現在玄貓已經創建了一個包含一個任務的作業。此時,玄貓可以選擇Run now來測試運行玄貓的流程:

利用Azure Data Factory

217

圖10.9 – 成功的工作流程運行!

成功運行後,玄貓應該會看到上述結果。一旦玄貓確認它正常工作,玄貓可以執行以下任何操作:

  • 添加額外任務
  • 有條件地執行那些依賴於其他任務的任務
  • 創建排程
  • 透過電子郵件、傳呼服務、Slack等設置通知

Databricks Workflows提供了一種簡單、內建的方式來管理複雜資料工程管道編排,而無需維護和支付第三方編排費用。玄貓只是觸及了其功能的皮毛,但玄貓可以使用玄貓所學到的知識來入門。接下來,玄貓將討論使用Azure Data Factory進行對Microsoft Azure雲端的原生編排

利用Azure Data Factory

Azure Data Factory (ADF)是從本地版本Data Management Gateway演變而來的。認識到向雲端運算的轉變,Microsoft改造並推出了ADF,以滿足對雲端資料解決方案日益增長的需求。ADF建立在「無程式碼資料整合的理念之上。它強調視覺化工具,允許使用者構建、部署和管理資料轉換流程,而無需編寫大量程式碼。在本節中,玄貓將研究ADF提供的功能。

此圖示:Databricks Workflows 整合與任務管理

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

actor "資料工程師" as DataEngineer

package "本地開發環境" as LocalDev {
rectangle "Scala 專案" as ScalaProject
rectangle "build.sbt 配置" as BuildSBT
rectangle "sbt-assembly 外掛" as SbtAssembly
rectangle "生成 Fat JAR" as FatJAR
}

package "Databricks 環境" as DatabricksEnv {
cloud "Databricks UI" as DatabricksUI
database "DBFS (Databricks File System)" as DBFS
rectangle "Databricks Workflows" as DatabricksWorkflows
rectangle "作業集群 (Job Cluster)" as JobCluster
}

package "Databricks Workflows 任務" as WorkflowTasks {
rectangle "Spark Submit 任務" as SparkSubmitTask
rectangle "其他任務類型" as OtherTasks
rectangle "任務依賴與條件執行" as TaskDependencies
rectangle "排程與通知" as SchedulingNotifications
}

DataEngineer --> ScalaProject : 開發 Scala 專案
ScalaProject --> BuildSBT : 配置 build.sbt
BuildSBT --> SbtAssembly : 添加 sbt-assembly 外掛
SbtAssembly --> FatJAR : 生成包含所有依賴的 Fat JAR

FatJAR --> DBFS : 上傳 Fat JAR 到 DBFS

DataEngineer --> DatabricksUI : 登入 Databricks UI
DatabricksUI --> DatabricksWorkflows : 創建新的工作流程
DatabricksWorkflows --> SparkSubmitTask : 添加 Spark Submit 任務
SparkSubmitTask --> JobCluster : 選擇作業集群
SparkSubmitTask --> DBFS : 引用 DBFS 上的 Fat JAR
SparkSubmitTask --> DatabricksWorkflows : 配置任務參數 (class, jar path)

DatabricksWorkflows --> WorkflowTasks : 運行工作流程
WorkflowTasks --> SparkSubmitTask : 執行 Spark Submit 任務
SparkSubmitTask --> JobCluster : 在作業集群上運行 Spark 作業
JobCluster --> DatabricksWorkflows : 返回執行結果

DatabricksWorkflows --> WorkflowStatus : 監控工作流程狀態
WorkflowStatus --> DataEngineer : 顯示成功運行結果

DatabricksWorkflows --> OtherTasks : 添加更多任務
DatabricksWorkflows --> TaskDependencies : 配置任務依賴
DatabricksWorkflows --> SchedulingNotifications : 設置排程與通知

note right of FatJAR
- 包含所有專案依賴
- 確保 Spark 應用程式獨立運行
end note

note right of DBFS
- Databricks 的分散式檔案系統
- 儲存 JAR 檔案供 Spark 任務調用
end note

note right of SparkSubmitTask
- 指定 `--class` 和 JAR 檔案路徑
- 透過 Databricks Workflows 執行 Spark 作業
end note

note right of DatabricksWorkflows
- 完全託管的雲端編排服務
- 簡化資料管道的創建與管理
- 支持多種任務類型 (筆記本, Python, JAR, SQL, DLT, dbt)
end note

@enduml

看圖說話:

此圖示詳細展示了Databricks Workflows如何實現資料管道的整合與任務管理。整個流程始於資料工程師本地開發環境中準備Scala專案,透過配置build.sbt和添加sbt-assembly外掛來生成一個包含所有依賴的胖JAR檔案。這個胖JARSpark應用程式的執行載體,確保其能夠獨立運行。

隨後,這個生成的胖JAR會被上傳到Databricks環境中的DBFS(Databricks File System)DBFS作為Databricks的分散式檔案系統,為Spark任務提供了儲存和訪問JAR檔案的能力。

接下來,資料工程師會登入Databricks UI,進入Databricks Workflows介面,創建一個新的工作流程。在這個工作流程中,玄貓會添加一個Spark Submit任務。這個任務會配置為選擇一個作業集群來執行Spark作業,並引用DBFS上傳的胖JAR檔案。任務參數會明確指定要執行的類別和JAR檔案的路徑。

一旦工作流程被定義並運行,Databricks Workflows會負責在選定的作業集群上執行Spark Submit任務。資料工程師可以監控工作流程的狀態,確認其成功運行。Databricks Workflows不僅支持Spark Submit任務,還能整合多種其他任務類型,如Databricks筆記本、Python腳本、Databricks SQL查詢等。此外,它還提供了任務依賴條件執行排程通知等高級功能,使得資料工程師能夠靈活地構建和管理複雜的資料管道。作為一個完全託管的雲端編排服務Databricks Workflows極大地簡化了資料管道的創建與管理

第十章結論:資料管道編排的策略權衡與未來

從績效與成就的視角評估,本章探討的Argo Workflows與Databricks Workflows,分別代表了「技術深度」與「業務速度」兩種價值實現路徑。Argo以其聲明式架構賦予團隊極致的客製化彈性與控制權,適合追求技術自主的長期投資;Databricks則以全託管服務加速開發週期,優先滿足短期業務目標,但伴隨平台綁定。這兩者間的選擇,實則是對「長期靈活性」與「短期生產力」的策略權衡。我們預見,未來趨勢將是混合式編排,而非單一工具的勝利。因此,玄貓認為,對高階管理者而言,洞察不同哲學背後的策略意涵,並選擇最能對齊組織藍圖、釋放團隊綜合效能的路徑,遠比精通單一工具更為關鍵。