返回文章列表

Amazon MWAA 佈署深度學習模型工作流程

本文介紹如何使用 Amazon MWAA(Amazon Managed Workflows for Apache Airflow)服務,自動化佈署和監控深度學習模型的工作流程。文章涵蓋了從 VPC 網路設定、安全群組組態到 DAG 開發、佈署和監控的完整流程,並以 Tesla 股票價格預測模型為例,示範如何使用

機器學習 雲端運算

在雲端環境中訓練和佈署深度學習模型需要複雜的工作流程管理。利用 Amazon MWAA,開發者可以簡化這個流程,並提升效率。本文將詳細說明如何設定 VPC、安全群組,並建立 MWAA 環境,接著示範如何開發一個包含資料預處理、模型訓練和評估的 DAG,最後說明如何封裝上傳 DAG 至 S3 並在 MWAA 環境中佈署和監控。

透過整合 PySpark 和 PyTorch,我們可以有效地進行資料預處理和模型訓練。PySpark 提供了分散式運算能力,可處理大規模資料集,而 PyTorch 則簡化了深度學習模型的構建和訓練過程。此架構讓開發者能更專注於模型開發,而不用煩惱底層基礎設施的管理。

在AWS上建立Amazon Managed Workflows for Apache Airflow (MWAA) 環境

在佈署和管理深度學習模型時,工作流程的自動化至關重要。Amazon Managed Workflows for Apache Airflow (MWAA) 提供了一個完全託管的服務,用於執行 Apache Airflow 工作流程。本章節將指導如何在 AWS 上建立 MWAA 環境,並開發一個簡單的 DAG。

步驟1:建立VPC和必要網路元件

首先,需要在 AWS 上建立一個 VPC 並組態相關的網路元件。

  1. 建立VPC:導航至 AWS VPC 儀錶板,點選「建立VPC」。
  2. 建立子網:在 VPC 中建立至少兩個私有子網,分別位於不同的可用區域。同時,建立公有子網以供 NAT閘道使用。
  3. 建立和附加網際網路閘道:建立一個網際網路閘道並將其附加到您的 VPC。
  4. 建立NAT閘道:在公有子網中建立一個 NAT閘道,並分配一個彈性 IP 地址。
  5. 更新路由表:更新私有子網的路由表,將其網際網路繫結的流量路由透過 NAT閘道。

VPC網路架構圖示

內容解密:

此圖示展示了 VPC 的基本網路架構。私有子網透過 NAT閘道存取網際網路,而公有子網則直接連線到網際網路閘道。正確組態路由表對於確保流量正確路由至關重要。

步驟2:組態安全群組

  1. 導航至安全群組:在 VPC 儀錶板中,導航至「安全群組」。
  2. 建立安全群組:為您的 MWAA 環境建立必要的安全群組,並組態輸入和輸出規則。

步驟3:建立MWAA環境

  1. 選擇環境類別:根據需求選擇適當的例項類別,這決定了分配給環境的計算資源。
  2. 其他組態:指定最大和最小工作節點數量、排程器數量、自定義 Airflow 組態、自定義外掛和 Python 依賴項。

開發DAG

在建立 MWAA 環境後,下一步是開發 DAG。下面是一個簡化的 DAG 範例(simple_mwaa_dag.py):

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# 定義預設引數
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 定義DAG
dag = DAG(
    'simple_mwaa_dag',
    default_args=default_args,
    description='A simple DAG for MWAA',
    schedule_interval=timedelta(days=1),
)

# 定義Python函式
def print_current_date():
    print(f"Current date and time: {datetime.now()}")

def print_message():
    print("Hello, this is a message from your DAG!")

# 定義任務
t1 = PythonOperator(
    task_id='print_date',
    python_callable=print_current_date,
    dag=dag,
)

t2 = PythonOperator(
    task_id='print_message',
    python_callable=print_message,
    dag=dag,
)

# 設定任務依賴關係
t1 >> t2

內容解密:

此 DAG 定義了兩個任務:print_dateprint_messageprint_date 任務使用 PythonOperator 執行 print_current_date 函式,列印目前日期和時間。print_message 任務同樣使用 PythonOperator 執行 print_message 函式,列印一條訊息。任務之間的依賴關係確保了它們按照定義的順序執行。

在 Amazon MWAA 上佈署與監控深度學習模型

在前一章節中,我們開發了一個簡單的 DAG 來示範如何在 Amazon Managed Workflows for Apache Airflow(MWAA)上執行任務。現在,我們將探討如何佈署和監控一個更複雜的深度學習模型。

步驟 3:上傳至 S3

在開發完 DAG 之後,下一步是將其佈署到 Amazon MWAA。首先,需要將包含 DAG 的檔案上傳到 Amazon S3 儲存桶中,該儲存桶必須能夠被 Amazon MWAA 環境存取。以下命令將 simple_mwaa_dag.py DAG 檔案從 Amazon EC2 上的 /home/ubuntu/airflow/dags/ 目錄上傳到 instance1bucket S3 儲存桶中的 dags 資料夾:

aws s3 cp /home/ubuntu/airflow/dags/simple_mwaa_dag.py s3://instance1bucket/dags/

內容解密:

此命令使用 AWS CLI 將本地檔案上傳到 S3 儲存桶。其中,aws s3 cp 是用於複製檔案到 S3 的命令,/home/ubuntu/airflow/dags/simple_mwaa_dag.py 是本地檔案的路徑,s3://instance1bucket/dags/ 是 S3 儲存桶中的目標路徑。

DAG 檔案及其相依性的封裝

通常,DAG 檔案及其相依性會被封裝成一個 zip 檔案,然後再載入到 S3 儲存桶中。在這個簡單的例子中,DAG 檔案(simple_mwaa_dag.py)沒有外部相依性,因此不需要將其封裝成 zip 檔案進行佈署。然而,隨著 DAG 的複雜度增加,並且引入了外部相依性,例如自定義的 Python 模組或第三方函式庫,將 DAG 封裝成 zip 檔案就變得非常重要。

複雜 DAG 的範例:Tesla 股價預測

一個複雜 DAG 的範例是我們在第 8 章中開發的 tesla_stock_prediction DAG。在本章中,我們建立了一個 Airflow 管道,該管道包括使用 PySpark 對 Tesla 股價資料進行預處理,以及使用 PyTorch 建立、訓練和評估深度學習模型來預測 Tesla 股價。該 DAG 包含以下五個模組:

  • data_processing.py:對 Tesla 股價資料進行預處理
  • model_training.py:使用 PyTorch 函式庫建立和訓練神經網路迴歸模型,以預測 Tesla 股價
  • model_evaluation.py:評估訓練好的 PyTorch 模型在測試資料集上的表現
  • utils.py:工具模組,將 Spark DataFrame 列轉換為 PyTorch 張量
  • tesla_stock_prediction.py:協調整個工作流程的 DAG

以下是這些模組中包含的程式碼:

data_processing.py

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import DataFrame

def load_data(file_path: str) -> DataFrame:
    spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df

def preprocess_data(df: DataFrame) -> DataFrame:
    assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')
    df = assembler.transform(df)
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    scaler_model = scaler.fit(df)
    df = scaler_model.transform(df)
    df = df.select('scaled_features', 'Close')
    return df

內容解密:

此程式碼定義了兩個函式:load_datapreprocess_dataload_data 使用 SparkSession 從 CSV 檔案載入股票價格資料。preprocess_data 對資料進行預處理,使用 VectorAssembler 組裝特徵向量,並使用 StandardScaler 對特徵進行縮放。

model_training.py

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

def create_data_loader(features, labels, batch_size=32) -> DataLoader:
    dataset = TensorDataset(features, labels)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.unsqueeze(1))
            loss.backward()
            optimizer.step()
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

內容解密:

此程式碼定義了兩個函式:create_data_loadertrain_modelcreate_data_loader 將預處理好的資料轉換為 PyTorch 張量,並建立 DataLoader 物件。train_model 對模型進行訓練,使用指定的損失函式和最佳化器,並迭代多個 epoch。

model_evaluation.py

import torch

def evaluate_model(model, test_loader, criterion):
    with torch.no_grad():
        model.eval()
        predictions = []
        targets = []
        test_loss = 0.0
        for inputs, labels in test_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels.unsqueeze(1))
            test_loss += loss.item() * inputs.size(0)
            predictions.extend(outputs.squeeze().tolist())
            targets.extend(labels.tolist())
        test_loss /= len(test_loader.dataset)
        predictions = torch.tensor(predictions)
        targets = torch.tensor(targets)
        ss_res = torch.sum((targets - predictions) ** 2)
        ss_tot = torch.sum((targets - torch.mean(targets)) ** 2)
        r_squared = 1 - ss_res / ss_tot
        print(f"Test Loss: {test_loss:.4f}")
        print(f"R-squared Score: {r_squared:.4f}")
        return test_loss, r_squared.item()

內容解密:

此程式碼定義了一個函式 evaluate_model,用於評估訓練好的模型在測試資料上的表現。它計算測試損失和 R-squared 評分。

utils.py

import numpy as np
import torch
from typing import Tuple
from pyspark.sql import DataFrame

def spark_df_to_tensor(df: DataFrame) -> Tuple[torch.Tensor, torch.Tensor]:
    features = torch.tensor(np.array(df.rdd.map(lambda x: x.scaled_features.toArray()).collect()), dtype=torch.float32)
    labels = torch.tensor(np.array(df.rdd.map(lambda x: x.Close).collect()), dtype=torch.float32)
    return features, labels

內容解密:

此程式碼定義了一個函式 spark_df_to_tensor,用於將 Spark DataFrame 列轉換為 PyTorch 張量。

tesla_stock_prediction.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import logging
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
from pyspark.sql import DataFrame
from typing import Tuple
from utils import spark_df_to_tensor
from data_processing import load_data, preprocess_data
from model_training import create_data_loader, train_model
from model_evaluation import evaluate_model

data_file_path = "/home/ubuntu/airflow/dags/TSLA_stock.csv"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 5, 1),
}

內容解密:

此程式碼定義了一個 DAG,用於協調整個工作流程。它載入資料、預處理資料、訓練模型、評估模型等任務。

利用 Apache Airflow 自動化特斯拉股票價格預測工作流程

簡介

本篇文章將介紹如何利用 Apache Airflow 自動化特斯拉股票價格預測的工作流程。我們將使用 Python 編寫 DAG(Directed Acyclic Graph)來定義工作流程,並使用各種運算元(Operator)來執行不同的任務。

DAG 定義

首先,我們需要定義 DAG 的基本屬性,例如 DAG 的名稱、描述、排程間隔等。在這個例子中,我們將建立一個名為 tesla_stock_prediction 的 DAG,用於預測特斯拉股票價格。

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'tesla_stock_prediction',
    default_args=default_args,
    description='DAG for Tesla stock price prediction',
    schedule_interval=None,
)

內容解密:

  • default_args:定義了 DAG 的預設引數,例如擁有者、是否依賴過去的任務、失敗或重試時的電子郵件通知等。
  • dag:建立了一個名為 tesla_stock_prediction 的 DAG 物件,並指定了預設引數、描述和排程間隔。

任務定義

接下來,我們需要定義三個任務:資料預處理、模型訓練和模型評估。每個任務都將使用 PythonOperator 來執行。

def preprocess_data_task():
    try:
        df = load_data(data_file_path)
        df = preprocess_data(df)
    except Exception as e:
        logger.error(f"Error in preprocessing data: {str(e)}")
        raise

preprocess_data_task = PythonOperator(
    task_id='preprocess_data_task',
    python_callable=preprocess_data_task,
    dag=dag,
)

內容解密:

  • preprocess_data_task:定義了一個名為 preprocess_data_task 的任務,用於預處理資料。
  • PythonOperator:建立了一個 PythonOperator 物件,用於執行 preprocess_data_task 函式。

任務依賴關係

最後,我們需要定義任務之間的依賴關係。在這個例子中,我們希望資料預處理任務完成後,才開始模型訓練任務;模型訓練任務完成後,才開始模型評估任務。

preprocess_data_task >> train_model_task >> evaluate_model_task

內容解密:

  • >>:定義了任務之間的依賴關係,例如 preprocess_data_task 完成後,才開始 train_model_task

將 DAG 檔案封裝並上傳到 Amazon S3

為了佈署 DAG,我們需要將相關的 Python 檔案封裝成一個 ZIP 檔案,並上傳到 Amazon S3。

import os
import zipfile
import boto3

# 定義包含 DAG 檔案的目錄
dags_directory = '/home/ubuntu/airflow/dags'

# 定義要包含在 ZIP 檔案中的檔名稱
files_to_zip = [
    'data_processing.py',
    'model_training.py',
    'model_evaluation.py',
    'utils.py',
    'tesla_stock_prediction.py',
    'requirements.txt'
]

# 定義 ZIP 檔案的名稱
zip_file_name = 'tesla_stock_prediction.zip'

# 建立 ZIP 檔案
with zipfile.ZipFile(zip_file_name, 'w') as zip_file:
    for file_name in files_to_zip:
        file_path = os.path.join(dags_directory, file_name)
        zip_file.write(file_path, arcname=file_name)

# 將 ZIP 檔案上傳到 S3 儲存桶
bucket_name = 'instance1bucket'
s3_client = boto3.client('s3')
s3_client.upload_file(zip_file_name, bucket_name, f'dags/{zip_file_name}')

# 清理:刪除 ZIP 檔案
os.remove(zip_file_name)
print("Files zipped and uploaded successfully.")

內容解密:

  • zipfile.ZipFile:建立了一個 ZIP 檔案物件,用於封裝相關的 Python 檔案。
  • boto3.client('s3'):建立了一個 S3 客戶端物件,用於上傳 ZIP 檔案到 S3 儲存桶。
  • s3_client.upload_file:將 ZIP 檔案上傳到 S3 儲存桶。

組態與佈署 Amazon MWAA 環境的完整

在將 DAG 檔案上傳至指定的 S3 儲存桶後,下一步驟是組態 Amazon MWAA 環境,以包含上傳的 DAG 並指定其他設定,如環境變數、連線和外掛。本步驟對於確保 MWAA 環境能夠根據需求有效地管理和執行 DAG 至關重要。

組態 Amazon MWAA 環境

  1. 存取 Amazon MWAA 控制檯:首先,進入 Amazon MWAA 控制檯,並導航至目標 MWAA 環境的環境設定頁面。
  2. 指定 DAG 檔案的 S3 路徑:在環境組態中,找到 DAG 的相關部分,並指定上傳的 DAG 檔案的 S3 路徑。這樣,MWAA 就能夠知道 DAG 檔案在 S3 儲存桶中的位置,並檢索該檔案以供執行。

設定環境變數和外掛

  • 環境變數:設定環境變數以提供 DAG 在執行期間所需的組態引數或機密資訊。這些變數可以包括資料函式庫連線字串、API 金鑰或其他敏感資訊。
  • 外掛:確保必要的外掛已安裝並組態在 MWAA 環境中,以支援 DAG 中的特定功能。這包括安裝任何自定義或第三方外掛,並在 MWAA 環境中指定其組態。

程式碼範例:設定環境變數

import os

# 設定環境變數
os.environ['DATABASE_URL'] = 'postgresql://user:password@host:port/dbname'
os.environ['API_KEY'] = 'your_api_key_here'

# 在 DAG 中使用環境變數
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'example_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

def print_env_variables(**kwargs):
    print(f"Database URL: {os.environ.get('DATABASE_URL')}")
    print(f"API Key: {os.environ.get('API_KEY')}")

print_env_task = PythonOperator(
    task_id='print_env_variables',
    python_callable=print_env_variables,
    dag=dag,
)

內容解密:

此程式碼範例展示瞭如何在 Airflow DAG 中設定和存取環境變數。首先,我們匯入必要的模組並設定環境變數 DATABASE_URLAPI_KEY。接著,我們定義了一個 DAG 及其預設引數。在 print_env_variables 函式中,我們列印了這些環境變數的值。最後,我們建立了一個 PythonOperator 任務來執行此函式,並將其加入到 DAG 中。

觸發 DAG 執行

成功佈署和組態 DAG 後,您可以手動觸發其執行或根據預定義的排程在 MWAA 環境中自動執行。

  • 手動執行:您可以從 MWAA 控制檯手動觸發 DAG 的執行。
  • 排程執行:您可以透過在 MWAA 環境組態中指定 schedule_interval 引數來排程 DAG 的自動執行。

監控執行

一旦觸發,Amazon MWAA 將根據定義的依賴關係和排程間隔開始執行 DAG 任務。您可以透過 MWAA 控制檯監控 DAG 執行的進度、檢視任務日誌並排除可能出現的問題。

DAG 執行流程

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Amazon MWAA 佈署深度學習模型工作流程

package "機器學習流程" {
    package "資料處理" {
        component [資料收集] as collect
        component [資料清洗] as clean
        component [特徵工程] as feature
    }

    package "模型訓練" {
        component [模型選擇] as select
        component [超參數調優] as tune
        component [交叉驗證] as cv
    }

    package "評估部署" {
        component [模型評估] as eval
        component [模型部署] as deploy
        component [監控維護] as monitor
    }
}

collect --> clean : 原始資料
clean --> feature : 乾淨資料
feature --> select : 特徵向量
select --> tune : 基礎模型
tune --> cv : 最佳參數
cv --> eval : 訓練模型
eval --> deploy : 驗證模型
deploy --> monitor : 生產模型

note right of feature
  特徵工程包含:
  - 特徵選擇
  - 特徵轉換
  - 降維處理
end note

note right of eval
  評估指標:
  - 準確率/召回率
  - F1 Score
  - AUC-ROC
end note

@enduml

此圖示展示了 DAG 的執行流程,從觸發 DAG 到完成所有任務。

在雲端佈署與監控深度學習模型

使用 Amazon MWAA 簡化工作流程管理

Amazon Managed Workflows for Apache Airflow(MWAA)提供了一個受管理的 Apache Airflow 環境,讓您可以輕鬆建立、管理和擴充套件工作流程。在 MWAA 的環境詳細資訊頁面,您可以找到開啟 Airflow UI 的選項。點選「開啟 Airflow UI」按鈕,即可進入 Apache Airflow 網頁介面,進一步監控和管理 DAG(Directed Acyclic Graph)。

監控 DAGs

在 Airflow UI 中,您可以透過不同的檢視來監控 DAG 的執行狀態和任務狀態:

  • DAGs 檢視:列出所有 DAG,並顯示其狀態、最後執行時間和排程資訊。
  • Graph 檢視:點選特定的 DAG 可檢視其結構和各任務的狀態。
  • Tree 檢視:提供 DAG 執行和任務狀態的階層式檢視,方便追蹤歷史執行記錄。
  • Gantt 圖表:用於視覺化顯示每個任務的執行時間,幫助分析任務執行的時間分配。
  • Code 檢視:允許您檢視 DAG 的程式碼,確保邏輯實作正確無誤。
  • 日誌:存取每個任務例項的日誌,可以檢視輸出和執行期間發生的錯誤,利於除錯和監控模型效能。

利用 Amazon CloudWatch 強化監控能力

Amazon CloudWatch 提供強大的監控功能,可以追蹤各種指標和日誌,建立儀錶板,並設定警示,以便在效能問題、失敗或其他關鍵事件發生時通知您。結合 MWAA 和 CloudWatch,可以確保深度學習模型持續提供價值,同時保持高效能、可靠性和合規性。

指標收集

  • MWAA 指標:MWAA 自動將指標釋出到 CloudWatch,包括環境健康狀態、排程器、Worker 和網頁伺服器的狀態,以及任務和 DAG 級別的指標。
  • 自定義指標:您可以使用 boto3 函式庫從 DAG 或任務中推播自定義指標到 CloudWatch,例如模型推斷時間、準確率或資源使用率。

日誌監控

  • 任務日誌:Airflow 任務日誌可以傳送到 CloudWatch 日誌,便於檢視和搜尋個別任務的日誌,有助於除錯和監控模型效能。
  • MWAA 環境日誌:MWAA 也可以將與環境相關的日誌傳送到 CloudWatch 日誌,包括排程器、網頁伺服器和其他元件的日誌。

警示和通知

  • CloudWatch 警示:您可以根據 CloudWatch 指標設定警示,以便在特定事件發生時收到通知,例如任務失敗率過高、延遲增加或其他效能問題。
  • Amazon SNS:使用 Amazon Simple Notification Service(SNS)在警示觸發時接收通知(電子郵件、簡訊等)。

儀錶板

  • CloudWatch 儀錶板:建立自定義儀錶板以視覺化呈現來自 MWAA 環境和模型的指標和日誌,包括顯示任務成功/失敗率、執行時間、資源使用率等相關指標的圖表。