返回文章列表

AWS Glue 與 Airflow 資料管線協調比較

本文比較 AWS Glue Workflows、Amazon MWAA 和 AWS Step Functions 在資料管線協調上的優缺點,並提供 Python 程式碼範例說明如何使用 Boto3 與 Glue Workflows 互動,以及如何使用 AWS Step Functions

資料工程 雲端運算

在資料密集型應用盛行的今日,高效的資料管線協調至關重要。AWS 提供了多種工具,如 AWS Glue Workflows、Amazon Managed Workflows for Apache Airflow (MWAA) 以及 AWS Step Functions,各有其優劣。Glue Workflows 與 AWS Glue 服務緊密整合,適合以 AWS Glue 為中心的資料處理流程。MWAA 提供託管的 Apache Airflow 服務,適合需要高度客製化和與多種服務整合的場景,但需要 Python 程式設計能力。Step Functions 則以其低程式碼、無伺服器的特性,提供易於使用的視覺化介面,適合快速構建和管理 AWS 原生服務的資料管線。選擇合適的工具取決於專案需求和團隊技術堆疊。

資料管線的協調:AWS Glue Workflows 與 Apache Airflow 的比較

在建構複雜的資料處理管線時,選擇適當的協調工具至關重要。AWS 提供了一系列服務來支援資料處理工作流程的協調,其中包括 AWS Glue Workflows 和 Amazon Managed Workflows for Apache Airflow(MWAA)。本篇文章將探討這些工具的功能、優缺點,以及如何根據特定的使用案例選擇最合適的解決方案。

使用 AWS Glue Workflows 協調資料管線

AWS Glue Workflows 是一種視覺化的工作流程協調工具,允許使用者建立和管理複雜的資料處理管線。雖然 Glue Workflows 主要設計用於與 AWS Glue 服務整合,但也可以透過 Glue Python Shell 任務與其他 AWS 服務(如 EMR 和 SQS)互動。

監控與錯誤處理

Glue Workflows 提供了一個圖形化的 UI,用於監控任務的進度。當管線中的某個步驟失敗時,使用者可以輕鬆地識別問題並在修復後從特定的步驟重新啟動工作流程。雖然 Glue Workflows 不直接支援在工作流程定義中設定重試機制,但可以為個別的 Glue 任務指定重試次數。

觸發 Glue Workflows

Glue Workflows 可以透過三種方式觸發:按需、手動或透過 Glue API/CLI;排程執行,例如每小時、每天或每週;以及事件驅動,例如當 S3 儲存桶接收到新的物件時。

事件驅動的觸發機制

Glue Workflows 支援事件驅動的觸發機制,允許使用者根據 EventBridge 事件啟動工作流程。這種方法使得管線能夠對外部事件做出反應,例如 S3 物件的上傳。透過設定觸發條件,使用者可以指定在接收到一定數量的事件後啟動工作流程,或在指定的時間延遲後啟動。

範例:批次處理事件

假設業務夥伴在一天中傳送多個小型 CSV 檔案到 S3 儲存桶,使用者可能希望批次處理這些檔案,而不是個別處理。使用者可以設定 Glue Workflow 在接收到 100 個事件後觸發,並指定 1 小時的時間延遲。如果在 1 小時內未接收到足夠的事件,工作流程仍將啟動並處理已接收的事件。

Apache Airflow:開源的協調解決方案

Apache Airflow 是一個開源的工作流程協調工具,提供了一系列功能,包括狀態排程、豐富的使用者介面、日誌記錄、監控和警示等。Airflow 的一個主要優點是其程式碼優先的方法,使得定義和管理複雜的工作流程變得更加容易。

Amazon Managed Workflows for Apache Airflow(MWAA)

AWS 提供了一個託管版本的 Airflow,稱為 Amazon MWAA。該服務簡化了 Airflow 的佈署和維護過程,因為底層基礎設施由 AWS 管理。這使得使用者能夠專注於定義和管理工作流程,而無需擔心基礎設施的管理。

比較與選擇

Glue Workflows 和 Apache Airflow(透過 MWAA)都提供了強大的工作流程協調功能,但它們在設計目標和適用場景上有所不同。Glue Workflows 更適合於主要使用 AWS Glue 服務的資料管線,而 Airflow(透過 MWAA)則提供了一個更全面的解決方案,可以協調多個 AWS 服務和本地工具。

程式碼範例:使用 Boto3 與 Glue Workflows 互動

import boto3

# 初始化 Glue 使用者端
glue_client = boto3.client('glue')

# 定義工作流程名稱
workflow_name = 'my_glue_workflow'

# 啟動工作流程
response = glue_client.start_workflow_run(
    Name=workflow_name
)

# 列印工作流程執行 ID
print(response['RunId'])

內容解密:

  1. 匯入 Boto3 函式庫:使用 import boto3 將 AWS SDK for Python (Boto3) 匯入指令碼,使其能夠與 AWS 服務進行互動。
  2. 初始化 Glue 使用者端:透過 boto3.client('glue') 初始化一個 Glue 使用者端,用於與 AWS Glue 服務進行互動。
  3. 定義工作流程名稱:指定要啟動的工作流程名稱,這裡使用 my_glue_workflow 作為範例。
  4. 啟動工作流程:呼叫 start_workflow_run 方法並傳入工作流程名稱,以非同步方式啟動指定的 Glue Workflow。
  5. 列印工作流程執行 ID:輸出啟動的工作流程執行的唯一 ID,用於追蹤和管理該次執行的狀態。

在AWS中協調資料管道的選項

在AWS中佈署受管的MWAA服務時,您可以從多個支援的Apache Airflow版本中進行選擇。在撰寫本文時,受管服務中支援Airflow v1.10.12和Airflow v2.0.2。

建立Apache Airflow管道的核心概念

Apache Airflow使用根據程式碼(Python)的方式來創作管道。這意味著要使用Airflow,您需要具備一定的Python程式設計技能。然而,將管道作為程式碼儲存在原始碼控制系統中是很自然的,而且也有助於為管道建立自動化測試。

以下是用於建立Airflow管道的一些核心概念:

有向無環圖(DAG)

我們在本章前面介紹了有向無環圖(DAG)的概念。在Airflow的上下文中,資料管道被建立為DAG(使用Python定義DAG),而DAG提供了管道中的任務以及任務之間的依賴關係。

在Airflow使用者介面中,您還可以檢視DAG的圖形表示——管道任務及其依賴關係,任務以節點表示,箭頭表示任務之間的依賴關係。

Airflow連線和鉤子

Airflow鉤子定義瞭如何連線到遠端來源和目標系統,例如資料函式庫或Zendesk等系統。該鉤子包含控制與遠端系統連線的程式碼,雖然Airflow包含幾個內建的鉤子,但它也允許您定義自定義鉤子。內建鉤子包括Amazon S3、HTTP系統、各種資料函式庫(如Oracle、MySQL和Postgres)以及Slack、Presto和Hive等系統的鉤子。

開源貢獻者還可以建立和分享鉤子,這包括AWS服務(如Athena、DynamoDB、Firehose和Glue)以及非AWS服務(如Google BigQuery、DataBricks、Jenkins等)的鉤子。

一個相關的概念是Airflow連線,它定義了用於連線到遠端系統的URL/主機名、使用者名稱和密碼。

鉤子和連線包含了連線到遠端系統並進行身份驗證的程式碼,將該程式碼與管道定義分開。

Airflow任務

Airflow任務定義了DAG執行的一個基本工作單元。每個任務都在DAG中定義了上游和下游依賴關係,這決定了任務執行的順序。

當DAG執行時,DAG中的任務會經歷各種狀態,從None到Scheduled、Queued、Running,最後到Success或Failed。

Airflow運算元

Airflow運算元提供了預定義的任務範本,為執行特定任務提供了預先構建的介面。Airflow包含幾個內建的核心運算元(如BashOperator和PythonOperator,分別執行bash命令或Python函式)。還有大量額外的運算元與Airflow Core分開發布(如JdbcOperator、S3FileTransformOperator、S3toRedshiftTransfer和DockerOperator)。

Airflow感測器

Airflow感測器提供了一種特殊的Airflow運算元,旨在等待特定操作的發生。感測器會定期檢查它正在等待的操作是否已完成,並且可以組態為在一定時間後超時。

使用Airflow感測器可以建立事件驅動的管道。例如,您可以使用S3KeySensor等待特定鍵出現在S3路徑中,一旦出現,就觸發特定的DAG執行。

使用MWAA的優缺點

Airflow的一個關鍵區別因素是來自開源社群的活躍開發支援,擁有超過1,500名貢獻者。由於這個活躍的社群,Airflow支援與許多不同服務的廣泛整合,包括來自AWS、Google和Microsoft Azure雲的服務。如果您的管道需要與多個提供商的多種服務整合,那麼Airflow中支援的整合數量是您使用Airflow最顯著的好處之一。

Airflow也是一個成熟的服務,具有內建的重試任務、故障警示和擴充套件以處理大型和複雜工作流程的功能。它具有完善的UI,用於監控和管理管道。Airflow在許多大型企業(如Airbnb)中得到廣泛使用和驗證。

AWS提供的受管版本的Airflow大大簡化了佈署Airflow環境的時間和精力。AWS還提供了內建的功能,用於擴充套件Airflow工作者,根據需求自動新增或刪除額外的工作者。

然而,您需要具備一定的Python技能才能使用Airflow,因此使用Airflow的學習曲線可能高於使用提供圖形使用者介面來建立管道的協調工具。Airflow還具有一定數量的固定基礎設施,用於交付服務,這伴隨著相關的固定成本。因此,無論您的Airflow環境是否正在積極執行管道,還是管道執行之間的幾個小時內處於閒置狀態,環境都存在持續的成本。

AWS Step Function:無伺服器協調解決方案

AWS Step Function是一個全面的無伺服器協調服務,使用低程式碼方法開發資料管道和無伺服器應用程式。Step Function提供了一個強大的視覺化設計工具,允許您使用簡單的拖曳方法建立管道。或者,如果您願意,可以直接使用JSON定義Amazon States Language(ASL)的管道。

AWS已經在許多不同的AWS服務和Step Function之間構建了最佳化的、易於使用的整合。例如,您可以輕鬆新增一個執行Lambda函式的步驟,並從下拉列表中選擇要執行的Lambda函式名稱。

Step Function還使指定如何處理具有自定義重試策略的狀態失敗變得容易,讓您指定捕捉塊以捕捉特定錯誤,並根據錯誤採取自定義操作。

對於AWS尚未構建最佳化整合的服務,您仍然可以使用Step Function中內建的AWS SDK整合執行該服務。例如,沒有直接的Step Function整合來執行Glue Crawlers,但您可以新增一個呼叫Glue StartCrawler API的狀態,並指定該API呼叫所需的引數。

Step Function還包括對錯誤處理的強大支援,並具有用於監控Step Function狀態機執行的視覺化介面。但是,Step Function目前不支援從特定步驟重新啟動狀態機的功能。

示例Step Function狀態機

使用Step Function,您可以建立一個狀態機,定義構成資料管道的各個任務。每個任務都被視為狀態機中的一個狀態,您還可以具有控制管道流程的狀態,例如執行管道分支的選擇狀態,或暫停管道一段時間的等待狀態。

當執行Step Function狀態機時,您可以傳入一個有效載荷,該有效載荷可以被每個狀態存取。每個狀態還可以向有效載荷新增額外的資料,例如指示任務是否成功或失敗的狀態程式碼。

以下圖表顯示了Step Function中的示例狀態機: 此圖示顯示了一個簡單的Step Function狀態機,具有多個狀態和轉換條件,用於控制資料管道流程。

內容解密:

此圖示展示了一個典型的Step Function狀態機結構,包括初始狀態、任務狀態、選擇狀態和最終狀態。每個狀態代表資料處理流程中的一個步驟,而轉換條件則根據前一步驟的結果決定下一步驟。這個結構使得資料處理流程能夠根據不同的條件進行動態調整,從而實作更靈活和強大的資料處理能力。

使用 AWS Step Function 協調資料管線

AWS Step Function 提供了一種原生且易於使用的 AWS 服務,用於定義和協調資料管線。該服務具備強大的視覺化設計工具,且為無伺服器服務,因此只需在使用時付費,無需管理基礎設施或做出相關決策。

Step Function 的狀態機制定義

在狀態機制定義中,我們可以看到以下狀態:

  1. 任務狀態:執行 Lambda 函式以驗證接收到的 manifest 檔案(確保 manifest 中列出的所有檔案都存在)。
  2. 任務狀態:執行 Glue Job 將接收到的 CSV 檔案轉換為 Parquet 格式。
  3. 任務狀態:執行 Glue crawler 更新資料目錄中的新 Parquet 資料集。
  4. 平行狀態:用於在狀態機制中建立平行的執行分支。在此例中,執行 Lambda 函式(總結 Parquet 檔案中的資料並將結果儲存在 DynamoDB 表中),並觸發 Glue Job 以豐富新的 Parquet 資料集。
  5. 選擇狀態:根據評估規則決定下一步。如果 Lambda 和 Glue Job 成功,則以成功狀態結束狀態機制;如果失敗,則執行 Lambda 函式傳送失敗通知,並以失敗狀態結束。

使用 AWS Step Function 的優缺點

AWS Step Function 提供了一種強大且易於使用的服務,用於協調資料管線。然而,它也有一些限制,例如無法從失敗點還原管線,而像 Apache Airflow 等工具則提供此功能。

比較 AWS Step Function 和 Amazon MWAA

特性AWS Step FunctionAmazon MWAA
易用性易於使用,具備視覺化設計工具需要更多設定和管理
與 AWS 服務的整合度高度整合高度整合
對非 AWS 第三方服務的支援有限較強

實作:使用 AWS Step Function 協調資料管線

在本文中,我們將使用 AWS Step Function 來協調一個簡單的資料管線。該管線將使用 Lambda 函式來處理資料。

建立新的 Lambda 函式

在建立 Step Function 之前,我們需要建立將被協調的 Lambda 函式。

  1. 登入 AWS Management Console,導航至 AWS Lambda 服務。
  2. 點選「建立函式」,選擇「從頭開始撰寫」,並輸入函式名稱 dataeng-check-file-ext
  3. 將執行環境設為 Python 3.9,並保留預設的架構和許可權設定。
  4. 在程式碼來源區塊中,替換現有的程式碼:
import urllib.parse
import json
import os

print('Loading function')

def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))
    # 從事件中取得物件並顯示其內容型別
    bucket = event['detail']['requestParameters']['bucketName']
    key = urllib.parse.unquote_plus(event['detail']['requestParameters']['key'], encoding='utf-8')
    filename, file_extension = os.path.splitext(key)
    print(f'File extension is: {file_extension}')
    payload = {
        "file_extension": file_extension,
        "bucket": bucket,
        "key": key
    }
    return payload

程式碼解析:

  • 使用 urllib.parsejson 模組處理事件中的 S3 物件資訊。
  • 從事件中提取 bucket 名稱和金鑰,並解析出檔案副檔名。
  • 將包含檔案副檔名、bucket 名稱和金鑰的 payload 傳回給狀態機制。
  1. 點選「佈署」按鈕以儲存和佈署 Lambda 函式。

建立第二個 Lambda 函式

建立第二個 Lambda 函式來處理接收到的檔案。在本練習中,該函式的程式碼將隨機產生失敗。

使用 AWS Step Function 協調資料管道的實作練習

建立用於隨機產生失敗的 Lambda 函式

為了測試 Step Function 的錯誤處理能力,我們將建立一個 Lambda 函式,該函式會隨機產生錯誤。這個函式將使用亂數產生器來決定是否產生錯誤。

  1. 按照前一節的步驟 1 到 5,建立一個新的 Lambda 函式,並將其命名為 dataeng-random-failure-generator
  2. 在程式碼區塊中,替換現有的程式碼如下:

from random import randint

def lambda_handler(event, context): print(‘Processing’) # 在此處放置處理檔案的 ETL 程式碼 value = randint(0, 2) # 將 10 除以亂數,若亂數為 0,則會產生除以零的錯誤 newval = 10 / value print(f’New Value is: {newval}’) return newval

3. 點選程式碼區塊上方的「Deploy」按鈕,佈署 Lambda 函式。

#### 程式碼解析:

*   首先,從 `random` 模組匯入 `randint` 函式,用於產生亂數。
*   定義 `lambda_handler` 函式,這是 Lambda 函式的入口點。
*   在函式中,產生一個介於 0 和 2 之間的亂數,並將其儲存在 `value` 變數中。
*   將 10 除以 `value`,若 `value` 為 0,則會產生除以零的錯誤。
*   將計算結果儲存在 `newval` 變數中,並列印出來。
*   最後,傳回 `newval` 的值。

### 建立 SNS 主題並訂閱電子郵件地址

為了在 Step Function 執行失敗時接收通知,我們需要建立一個 SNS 主題並訂閱電子郵件地址。

1. 前往 Amazon SNS 控制檯。
2. 確保您位於正確的區域。
3. 在左側選單中,點選「Topics」,然後點選「Create topic」。
4. 選擇「Standard」主題型別,並將主題命名為 `dataeng-failure-notification`。
5. 建立主題後,點選「Create subscription」。
6. 選擇「Email」作為協定,並輸入您的電子郵件地址。
7. 前往您的電子郵件信箱,查詢來自 `[email protected]` 的郵件,並點選確認訂閱連結。

### 建立 Step Function 狀態機

現在,我們可以建立 Step Function 狀態機,以協調 Lambda 函式和 SNS 主題。

1. 前往 Amazon Step Function 控制檯。
2. 點選「Create state machine」。
3. 選擇「Design your Workflow visually」並設定狀態機型別為「Standard」。
4. 在視覺化編輯器中,將「AWS Lambda Invoke」區塊拖曳到「Start」和「End」區塊之間。
5. 設定狀態名稱為「Check File Extension」,並選擇正確的 Lambda 函式。
6. 在「Output」標籤中,設定輸出路徑為 `$.Payload`。

### 設定狀態機流程

1. 在左側選單中,點選「Flow」標籤,然後將「Choice」狀態拖曳到 Lambda Invoke 函式和 End 狀態之間。
2. 設定 Choice 狀態的規則,以根據 Lambda 函式的輸出決定下一步驟。
3. 將「AWS Lambda Invoke」狀態拖曳到 Rule #1 方塊中,並設定狀態名稱為「Process CSV」。
4. 將「Pass」狀態拖曳到 Default rule 方塊中,並設定狀態名稱為「Pass – Invalid File Ext」。
5. 在 Pass 狀態的輸出中,輸入錯誤訊息 `{ "Error": "InvalidFileFormat" }`。

#### 狀態機組態解析:

*   使用 Choice 狀態根據 Lambda 函式的輸出決定下一步驟。
*   若檔案副檔名為 `.csv`,則呼叫 Process CSV Lambda 函式。
*   若檔案副檔名不是 `.csv`,則執行 Pass – Invalid File Ext 狀態,並傳遞錯誤訊息。

### 釋出 SNS 通知

1. 將「Amazon SNS Publish」狀態拖曳到 Pass - Invalid File Ext 狀態下方。
2. 設定 SNS Publish 狀態的主題為之前建立的 `dataeng-failure-notification` 主題。

#### SNS 釋出解析:

*   當 Pass – Invalid File Ext 狀態執行時,會觸發 SNS Publish 狀態。
*   SNS Publish 狀態會將通知釋出到指定的 SNS 主題,從而觸發電子郵件通知。