在資料工程領域,許多任務都涉及多個相互依賴的處理步驟。手動依序執行這些腳本不僅效率低下,而且難以管理和監控。Apache Airflow 作為業界領先的工作流程管理平台,允許我們以程式化的方式定義、排程和監控這些複雜的資料管線。本文將以一個簡單的 PySpark 資料處理任務為例,從一個單體腳本開始,逐步引導您將其重構成一個由 Airflow 調度的、更穩健、更模組化的自動化工作流程。
第一部分:問題的起點 - 本地 PySpark 腳本
假設我們有一個簡單的任務:讀取一些關於墨西哥玉米卷 (Taco) 的資料,計算其平均價格。一個直觀的作法是編寫一個單一的 Python 腳本來完成所有事情。
# local_script.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
def main():
spark = SparkSession.builder.appName("TacoPrices").getOrCreate()
# 步驟一:建立 DataFrame
data = [("Chicken Taco", 2.50), ("Beef Taco", 3.00), ("Fish Taco", 3.50)]
schema = ["Taco", "Price"]
taco_df = spark.createDataFrame(data, schema)
taco_df.show()
# 步驟二:計算平均價格
avg_price = taco_df.select(avg("Price")).first()[0]
print(f"Average Price of Tacos: {avg_price:.2f}")
spark.stop()
if __name__ == "__main__":
main()
程式碼解析:
此程式碼展示了一個典型的本地 PySpark 資料處理腳本。程式使用 SparkSession 建立 Spark 應用程式,然後建立包含墨西哥玉米卷名稱和價格的 DataFrame。透過 createDataFrame 方法將 Python 列表轉換為 Spark DataFrame,並使用定義好的欄位結構 schema。接著使用 PySpark 的聚合函式 avg 計算價格欄位的平均值,透過 select 和 first 方法提取計算結果。最後正確關閉 SparkSession 以釋放資源。雖然功能完整,但所有邏輯都集中在一個函式中,缺乏模組化設計。
這個腳本雖然能完成工作,但存在明顯的缺點:
- 高耦合: 資料建立和計算邏輯緊密耦合在同一個函式中。
- 缺乏彈性: 如果計算步驟失敗,我們必須重新執行整個腳本,包括資料建立的部分。
- 難以監控與排程: 無法方便地設定定時執行或在失敗時自動重試。
第二部分:Airflow 的解決方案 - 任務拆分與 DAG
Airflow 的核心思想是將一個大的工作流程,拆解成一系列獨立的、可重試的任務 (Tasks),並定義它們之間的依賴關係,形成一個有向無環圖 (DAG)。
為了將我們的 PySpark 腳本遷移到 Airflow,我們首先將其拆分為兩個獨立的函式:
create_and_save_dataframe: 負責建立 DataFrame 並將其儲存為中間產物(例如 Parquet 檔案)。calculate_average_from_parquet: 負責讀取 Parquet 檔案,並執行計算。
這種解耦的設計,使得每個任務都可以獨立執行和重試。
第三部分:編寫 Airflow DAG
現在,我們可以編寫一個 DAG 檔案,使用 Airflow 的 PythonOperator 來分別執行這兩個函式。
# dags/taco_prices_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
# --- 任務一的 Python 函式
---
def create_and_save_dataframe(output_path: str):
"""建立 Taco DataFrame 並儲存為 Parquet 檔案。"""
spark = SparkSession.builder.appName("CreateTacoDF").getOrCreate()
data = [("Chicken Taco", 2.50), ("Beef Taco", 3.00), ("Fish Taco", 3.50)]
schema = ["Taco", "Price"]
taco_df = spark.createDataFrame(data, schema)
taco_df.write.parquet(output_path, mode="overwrite")
print(f"Taco DataFrame 已儲存至 {output_path}")
spark.stop()
# --- 任務二的 Python 函式
---
def calculate_average_from_parquet(input_path: str):
"""從 Parquet 檔案讀取資料並計算平均價格。"""
spark = SparkSession.builder.appName("CalculateAvgPrice").getOrCreate()
taco_df = spark.read.parquet(input_path)
avg_price = taco_df.select(avg("Price")).first()[0]
print(f"計算出的平均價格為: {avg_price:.2f}")
spark.stop()
# --- DAG 定義
---
with DAG(
dag_id='taco_price_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
tags=['pyspark', 'example'],
) as dag:
task_create_df = PythonOperator(
task_id='create_taco_dataframe',
python_callable=create_and_save_dataframe,
op_kwargs={'output_path': '/tmp/taco_prices.parquet'}
)
task_calculate_avg = PythonOperator(
task_id='calculate_average_price',
python_callable=calculate_average_from_parquet,
op_kwargs={'input_path': '/tmp/taco_prices.parquet'}
)
# 設定任務依賴關係
task_create_df >> task_calculate_avg
程式碼解析:
此 Airflow DAG 程式碼展示了如何將單一 PySpark 腳本重構為模組化的工作流程管線。程式定義了兩個獨立的 Python 函式:create_and_save_dataframe 負責資料建立和儲存,calculate_average_from_parquet 負責資料讀取和計算。DAG 使用 PythonOperator 將這些函式包裝成 Airflow 任務,透過 op_kwargs 參數傳遞檔案路徑等設定。最關鍵的是任務依賴關係設定 task_create_df >> task_calculate_avg,確保資料處理的正確順序。這種設計實現了任務解耦,使每個步驟都可以獨立執行、監控和重試,大幅提升了資料管線的穩健性和可維護性。
第四部分:執行與監控
- 將上述 DAG 檔案放置在您的 Airflow
dags資料夾中。 - 啟動 Airflow Web 伺服器和排程器:
airflow webserver & airflow scheduler & - 打開瀏覽器訪問 Airflow UI (通常是
http://localhost:8080)。 - 在 DAG 列表中找到
taco_price_pipeline,啟用它,並手動觸發一次執行。 - 您可以在「Graph View」中視覺化地看到任務的執行流程和狀態。
圖表一:資料處理管線活動圖:此活動圖展示了我們的資料處理流程,以及每個步驟如何對應到 Airflow DAG 中的一個任務。
@startuml
!theme _none_
skinparam dpi auto
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam minClassWidth 100
skinparam defaultFontSize 14
title Taco 價格分析資料管線
start
:<b>Task 1: 建立並儲存 DataFrame</b>\n(create_and_save_dataframe);
note right
<b>輸出</b>: /tmp/taco_prices.parquet
end note
:<b>Task 2: 讀取並計算平均值</b>\n(calculate_average_from_parquet);
note right
<b>輸入</b>: /tmp/taco_prices.parquet
end note
stop
@enduml
透過 Airflow,我們成功地將一個單體的資料處理腳本,轉化為一個模組化、可監控、可重跑的自動化工作流程,這正是現代資料工程的核心實踐。