利用 Apache Airflow 協調 TensorFlow 深度學習模型,實作自動化的糖尿病預測流程。文章首先詳述資料預處理、模型訓練和評估三個核心模組,並以 Python 程式碼示範如何載入資料、建構神經網路、訓練模型以及計算評估指標。接著,將這些模組整合至 Airflow DAG,利用 PythonOperator 定義各個任務,並設定其依賴關係,實作工作流程的自動化排程和監控,提升機器學習管線的效率和可維護性。最後,討論如何將模型評估指標整合到 Airflow 中,以便追蹤模型效能並觸發後續動作。
大規模深度學習管線實作:以糖尿病預測為例
本章節將探討如何使用Apache Airflow建構可擴充套件的深度學習管線,以TensorFlow為基礎進行糖尿病預測。整個流程涵蓋資料預處理、模型訓練和評估等關鍵步驟,並透過日誌記錄來監控每個階段的完成情況。
資料預處理模組(preprocessing.py)
資料預處理是整個深度學習流程的基礎,主要負責載入原始資料、進行必要的資料清理和轉換,最終將處理好的資料儲存為Parquet格式以供後續使用。
程式碼解析
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import logging
def preprocess_data(input_path, train_output_path, test_output_path):
try:
# 載入資料集
data = pd.read_csv(input_path)
# 資料預處理步驟
# ...
# 分割訓練集和測試集
X_train, X_test, y_train, y_test = train_test_split(data.drop('target', axis=1), data['target'], test_size=0.2, random_state=42)
# 儲存預處理後的資料為Parquet格式
train_df = pd.concat([X_train, y_train], axis=1)
test_df = pd.concat([X_test, y_test], axis=1)
train_df.to_parquet(train_output_path, index=False)
test_df.to_parquet(test_output_path, index=False)
logging.info("資料預處理完成。")
except Exception as e:
logging.error(f"資料預處理過程中發生錯誤:{e}")
內容解密:
- 資料載入:使用
pd.read_csv函式載入原始CSV資料集。 - 資料預處理:進行必要的資料清理和特徵工程,例如處理缺失值、標準化或歸一化等。
- 訓練測試集分割:利用
train_test_split函式將資料集分割為訓練集和測試集。 - 結果儲存:將預處理後的訓練集和測試集分別儲存為Parquet檔案,以提高後續處理的效率。
模型訓練模組(model_training.py)
模型訓練階段利用TensorFlow框架建立神經網路模型,並使用預處理後的訓練資料進行訓練。
程式碼解析
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import logging
def train_tensorflow_model(train_data_path, model_output_path):
try:
# 載入訓練資料
train_df = pd.read_parquet(train_data_path)
# 定義TensorFlow模型
model = Sequential([
Dense(64, activation='relu', input_shape=(train_df.shape[1]-1,)),
Dense(32, activation='relu'),
Dense(1, activation='sigmoid')
])
# 編譯模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# 訓練模型
model.fit(train_df.drop('target', axis=1), train_df['target'], epochs=100, batch_size=32, verbose=2)
# 儲存訓練好的模型
model.save(model_output_path)
logging.info("模型訓練完成。")
except Exception as e:
logging.error(f"模型訓練過程中發生錯誤:{e}")
內容解密:
- 模型定義:使用TensorFlow的Keras API定義一個簡單的神經網路模型,包含多個全連線層。
- 模型編譯:設定模型的最佳化器、損失函式和評估指標。
- 模型訓練:使用預處理後的訓練資料對模型進行訓練,設定訓練的輪數(epochs)和批次大小(batch size)。
- 模型儲存:將訓練好的模型儲存為HDF5格式檔案。
模型評估模組(model_evaluation.py)
在模型評估階段,利用測試資料對訓練好的模型進行評估,計算多項評估指標以衡量模型的效能。
程式碼解析
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import tensorflow as tf
import logging
def evaluate_model(test_data_path, model_path):
try:
# 載入測試資料和模型
test_df = pd.read_parquet(test_data_path)
model = tf.keras.models.load_model(model_path)
# 使用模型進行預測
y_pred = (model.predict(test_df.drop('target', axis=1)) > 0.5).astype("int32")
# 計算評估指標
accuracy = accuracy_score(test_df['target'], y_pred)
precision = precision_score(test_df['target'], y_pred)
recall = recall_score(test_df['target'], y_pred)
f1 = f1_score(test_df['target'], y_pred)
# 日誌記錄評估結果
logging.info(f"準確率:{accuracy:.4f}, 精確度:{precision:.4f}, 召回率:{recall:.4f}, F1分數:{f1:.4f}")
except Exception as e:
logging.error(f"模型評估過程中發生錯誤:{e}")
內容解密:
- 載入測試資料和模型:分別載入預處理後的測試資料和訓練好的TensorFlow模型。
- 模型預測:使用載入的模型對測試資料進行預測,並將預測結果二值化。
- 評估指標計算:利用Scikit-learn函式庫計算準確率、精確度、召回率和F1分數等評估指標。
- 結果記錄:將計算出的評估指標透過日誌記錄下來,以便後續分析和參考。
主程式(main.py)
主程式負責協調整個工作流程,包括呼叫資料預處理、模型訓練和評估等模組。
程式碼解析
import logging
from preprocessing import preprocess_data
from model_training import train_tensorflow_model
from model_evaluation import evaluate_model
def main():
# 資料預處理
preprocess_data("diabetes.csv", "diabetes_train.parquet", "diabetes_test.parquet")
logging.info("資料預處理完成。")
# 模型訓練
train_tensorflow_model("diabetes_train.parquet", "diabetes_model.h5")
logging.info("模型訓練完成。")
# 模型評估
evaluate_model("diabetes_test.parquet", "diabetes_model.h5")
logging.info("模型評估完成。")
if __name__ == "__main__":
main()
內容解密:
- 工作流程協調:主程式按順序呼叫各個模組,先進行資料預處理,再進行模型訓練,最後進行模型評估。
- 日誌記錄:在每個階段完成後,透過日誌記錄相關資訊,以便監控整個流程的執行情況。
結果分析
執行主程式後,將得到模型的訓練過程損失值變化以及最終的評估指標結果。
訓練過程損失值變化
| Epoch | Loss | |
|
-| | 10 | 0.5857 | | 20 | 0.5705 | | … | … | | 100 | 0.5273 |
隨著訓練輪數的增加,模型的損失值逐漸下降,表明模型的預測能力在不斷提升。
模型評估結果
| Metric | Value | |
-|
-| | Accuracy | 0.7692 | | Precision | 0.7059 | | Recall | 0.5854 | | F1 Score | 0.6400 |
從評估結果來看,模型的準確率達到約76.92%,但仍有提升空間,特別是在召回率方面。
將糖尿病預測模型整合至Apache Airflow DAG
在前面的章節中,我們已經瞭解如何透過main()函式手動執行糖尿病預測的程式碼,並協調各個任務(資料預處理、模型訓練和模型評估)的執行。在本文中,我們將把這個main()函式轉換成Apache Airflow DAG。當將程式碼整合到DAG中時,不需要單獨的main()函式來協調任務,而是使用諸如PythonOperator或BashOperator之類別的運算元在Airflow DAG中明確定義任務。每個運算元對應一個特定的任務,並且使用DAG定義中的>>運算元指定任務之間的依賴關係。
建立糖尿病資料處理與訓練DAG
讓我們示範如何將與前一小節相同的程式碼執行為名為diabetes_processing_and_training的DAG。這個DAG取代了main()函式,而三個模組(preprocessing.py、model_training.py和model_evaluation.py)保持不變。
步驟1:匯入必要的模組
下面的匯入陳述式引入了在Apache Airflow DAG中定義和執行資料預處理、模型訓練和模型評估任務所需的類別和函式:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from preprocessing import preprocess_data
from model_training import train_tensorflow_model
from model_evaluation import evaluate_model
內容解密:
from datetime import datetime, timedelta:匯入處理日期和時間所需的類別。from airflow import DAG:匯入DAG類別,代表具有依賴關係的任務集合。from airflow.operators.python_operator import PythonOperator:匯入用於在Airflow DAG中執行Python函式的運算元。from preprocessing import preprocess_data、from model_training import train_tensorflow_model和from model_evaluation import evaluate_model:分別匯入用於資料預處理、模型訓練和模型評估的函式。
步驟2:定義預設引數
在此步驟中,我們定義了default_args字典,它指定了各種預設引數,例如所有者、開始日期、電子郵件設定、重試次數和重試延遲:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 2, 25),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
內容解密:
'owner': 'airflow':指定DAG的所有者。'depends_on_past': False:表示任務的執行不依賴於前一次執行的成功與否。'start_date': datetime(2024, 2, 25):設定DAG的開始日期。'email_on_failure': False和'email_on_retry': False:分別控制任務失敗或重試時是否傳送電子郵件通知。'retries': 1:定義任務失敗時的重試次數。'retry_delay': timedelta(minutes=5):設定重試之間的延遲時間。
DAG實作與任務協調
透過上述步驟,我們完成了將糖尿病預測模型的訓練流程轉換為Apache Airflow DAG的基本組態。這不僅提高了工作流程的可管理性和可擴充套件性,也使得整個過程更加自動化和規範化。接下來,我們可以進一步探討如何利用Airflow的強大功能來最佳化我們的機器學習工作流程。
將模型評估指標與Airflow整合
前面章節介紹了模型評估的指標,包括精確率(Precision)、召回率(Recall)和F1分數。這些指標對於理解模型的表現至關重要。在將模型訓練流程整合到Airflow DAG中後,我們也應該考慮如何有效地評估和監控模型的表現。
模型評估指標回顧
- 精確率(Precision):衡量模型正確識別為正例的樣本佔所有被預測為正例的樣本的比例。
- 召回率(Recall):衡量模型正確識別的正例佔所有實際正例的比例。
- F1分數:精確率和召回率的調和平均值,提供了一個平衡這兩個指標的單一評估標準。
結合Airflow進行模型評估
在Airflow DAG中,我們可以設計一個專門的任務來進行模型評估,並計算上述指標。這不僅能夠幫助我們及時瞭解模型的表現,也便於在模型效能下降時觸發相應的動作,如重新訓練模型等。
示例程式碼
# 在model_evaluation.py中定義evaluate_model函式
def evaluate_model(model, test_data):
# 進行模型評估,並計算精確率、召回率和F1分數
precision = calculate_precision(model, test_data)
recall = calculate_recall(model, test_data)
f1_score = calculate_f1_score(precision, recall)
# 可以將評估結果儲存或報告
report_evaluation_results(precision, recall, f1_score)
# 在DAG中加入模型評估任務
evaluate_task = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
dag=dag,
)
內容解密:
evaluate_model函式負責對訓練好的模型進行評估,計算相關指標。- 在Airflow DAG中,使用
PythonOperator建立一個名為evaluate_model的任務,該任務呼叫evaluate_model函式。
透過這種方式,我們能夠在工作流程中自動進行模型評估,並根據評估結果採取相應措施,從而實作更高效、更智慧的工作流程管理。