深度學習模型的效能極度仰賴超引數的設定。Keras Tuner 提供了便捷的超引數搜尋方法,有效提升模型效能。本文以糖尿病預測為例,示範如何利用 Keras Tuner 的 RandomSearch 搜尋最佳超引陣列合。首先,定義包含模型架構和超引數搜尋空間的 HyperModel 類別,並使用 hp.Int 和 hp.Float 設定待調引數的範圍與型別。接著,將 Keras Tuner 整合至訓練流程,使用驗證集的準確率作為目標函式,並透過 tuner.search 方法進行搜尋。最後,使用 tuner.get_best_models 取得最佳模型。除了超引數調優,本文也涵蓋 PySpark 資料預處理、TensorFlow 模型建立與評估等環節,提供完整的模型開發流程參考。
使用Keras Tuner進行超引數調優以提升模型效能
在深度學習模型的開發過程中,超引數的選擇對於模型的效能有著至關重要的影響。Keras Tuner是一個強大的工具,能夠幫助我們系統地探索超引數空間,從而找到最佳的模型組態。
步驟1:定義超引數模型
首先,我們需要定義一個超引數模型類別DiabetesHyperModel,該類別繼承自kt.HyperModel。在這個類別中,我們定義了模型的架構以及需要調優的超引數。
class DiabetesHyperModel(kt.HyperModel):
def __init__(self, input_shape):
self.input_shape = input_shape
def build(self, hp):
model = Sequential()
model.add(Dense(units=hp.Int('units1', 32, 128, step=32), input_dim=self.input_shape))
model.add(ReLU())
model.add(Dense(units=hp.Int('units2', 16, 64, step=16)))
model.add(ReLU())
model.add(Dense(1, activation='sigmoid'))
optimizer = SGD(learning_rate=hp.Float('lr', 1e-4, 1e-2, sampling='log'))
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
return model
內容解密:
DiabetesHyperModel類別定義了模型的架構和超引數搜尋空間。build方法中,我們使用hp.Int和hp.Float來定義需要調優的超引數,例如神經網路層的單元數和學習率。- 模型的編譯使用了二元交叉熵損失函式和SGD最佳化器,並監控準確率指標。
步驟2:整合Keras Tuner到模型訓練流程
接下來,我們將Keras Tuner整合到DiabetesModelTrainer類別中,使用RandomSearch調優器來搜尋最佳的超引陣列合。
class DiabetesModelTrainer:
@staticmethod
def train_tensorflow_model(X_train, y_train, X_test, y_test, epochs=50):
tuner = RandomSearch(
DiabetesHyperModel(X_train.shape[1]),
objective='val_accuracy',
max_trials=5,
executions_per_trial=3,
directory='tuner_dir',
project_name='diabetes_tuning'
)
tuner.search(X_train, y_train, epochs=epochs, validation_data=(X_test, y_test), verbose=1)
best_model = tuner.get_best_models(num_models=1)[0]
logger.info("Best model training completed.")
return best_model
內容解密:
RandomSearch調優器用於搜尋最佳的超引陣列合,最大試驗次數為5,每次試驗執行3次。tuner.search方法執行超引數搜尋,使用訓練資料和驗證資料。- 最佳模型透過
tuner.get_best_models方法取得。
超引數調優的效果
透過比較調優後的模型和固定超引數模型的效能,我們可以看到調優後的模型在多個指標上都有所提升。
| 指標 | 調優後模型 | 固定超引數模型 |
|---|---|---|
| 準確率 | 0.7863 | 0.7692 |
| 精確度 | 0.7500 | 0.7059 |
| 回召率 | 0.5854 | 0.5854 |
| F1分數 | 0.6575 | 0.6400 |
結合完整的程式碼片段
最後,我們結合了完整的程式碼片段,包括資料預處理、模型訓練和評估。
# 資料預處理
class DiabetesProcessor:
@staticmethod
def preprocess_data(spark, data_file_path, train_parquet_path, test_parquet_path):
# ...
# 模型訓練
class DiabetesModelTrainer:
@staticmethod
def train_tensorflow_model(X_train, y_train, epochs=100, lr=0.01):
# ...
# 模型評估
class DiabetesModelEvaluator:
@staticmethod
def evaluate_model(model, X_test, y_test):
# ...
內容解密:
- 資料預處理步驟包括讀取資料、過濾缺失值、特徵工程和資料分割。
- 模型訓練使用了Keras Tuner進行超引數調優。
- 模型評估計算了準確率、精確度、回召率和F1分數等指標。
透過這些步驟,我們成功地使用Keras Tuner對深度學習模型進行了超引數調優,從而提升了模型的效能。
使用TensorFlow進行糖尿病分類別的深度學習模型最佳化
本章節將探討如何利用TensorFlow和Keras進行糖尿病分類別的深度學習模型開發,並透過超引數調優來最佳化模型效能。
資料預處理
首先,我們需要對資料進行預處理。以下程式碼展示瞭如何使用PySpark進行資料清理和特徵縮放:
class DiabetesProcessor:
@staticmethod
def preprocess_data(spark, data_file_path, train_parquet_path, test_parquet_path):
try:
diabetes_df = spark.read.csv(data_file_path, header=True, inferSchema=True)
diabetes_df = diabetes_df.filter((col("Glucose") != 0) & (col("BloodPressure") != 0) & (col("BMI") != 0))
feature_cols = ["Pregnancies", "Glucose", "BloodPressure", "BMI", "DiabetesPedigreeFunction", "Age"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
diabetes_df = assembler.transform(diabetes_df)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(diabetes_df)
diabetes_df = scaler_model.transform(diabetes_df)
train_df, test_df = diabetes_df.randomSplit([0.8, 0.2], seed=42)
train_df.write.parquet(train_parquet_path, mode="overwrite")
test_df.write.parquet(test_parquet_path, mode="overwrite")
logger.info("資料預處理完成。")
except Exception as e:
logger.error(f"資料預處理過程中發生錯誤:{str(e)}")
程式碼解析:
- 資料讀取與清理:使用Spark讀取CSV檔案並過濾掉包含零值的列,確保資料品質。
- 特徵組合:使用
VectorAssembler將多個特徵欄位組合成一個向量欄位。 - 特徵縮放:使用
StandardScaler對特徵進行標準化縮放,使模型訓練更穩定。 - 資料分割:將資料集按8:2比例分割為訓練集和測試集。
模型訓練與超引數調優
接下來,我們使用Keras和keras_tuner進行模型訓練和超引數調優。
class DiabetesHyperModel(HyperModel):
def build(self, hp):
model = Sequential()
model.add(Dense(units=hp.Int('units_1', min_value=32, max_value=128, step=32), input_dim=X_train.shape[1]))
model.add(ReLU())
model.add(Dense(units=hp.Int('units_2', min_value=16, max_value=64, step=16)))
model.add(ReLU())
model.add(Dense(units=hp.Int('units_3', min_value=8, max_value=32, step=8)))
model.add(ReLU())
model.add(Dense(1, activation='sigmoid'))
optimizer = SGD(learning_rate=hp.Float('lr', 1e-4, 1e-2, sampling='log'))
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
return model
程式碼解析:
- 超引數定義:使用
keras_tuner的HyperModel類別定義模型的超引數空間,包括神經元數量和學習率。 - 模型結構:建立一個包含多層Dense層的Sequential模型,並使用ReLU作為啟用函式,最後一層使用sigmoid進行二元分類別。
- 最佳化器組態:使用隨機梯度下降(SGD)作為最佳化器,並對學習率進行調優。
模型評估
模型訓練完成後,我們需要對其進行評估。
class DiabetesModelEvaluator:
@staticmethod
def evaluate_model(model, X_test, y_test):
try:
y_pred = (model.predict(X_test) > 0.5).astype(int)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
logger.info(f"準確率:{accuracy:.4f}")
logger.info(f"精確率:{precision:.4f}")
logger.info(f"召回率:{recall:.4f}")
logger.info(f"F1分數:{f1:.4f}")
except Exception as e:
logger.error(f"模型評估過程中發生錯誤:{str(e)}")
程式碼解析:
- 預測結果:使用訓練好的模型對測試集進行預測,並將預測結果二值化。
- 評估指標:計算準確率、精確率、召回率和F1分數,以全面評估模型效能。
使用Apache Airflow建立可擴充套件的深度學習管線
在前面的章節中,我們已經在AWS EC2例項上手動執行了可擴充套件的深度學習管線。雖然這種非自動化的方法在業界很常見,但它可能並不是最有效率的。一方面,它缺乏對工作流程狀態和進度的可視性,需要單獨實作監控和日誌記錄。另一方面,它也可能導致在修改或擴充套件工作流程時出現挑戰。事實上,如果沒有專門的工作流程管理框架,擴充套件工作流程以管理大量資料或計算資源可能會非常困難。此外,如果沒有標準化的方法,很難確保實驗的可重現性,這嚴重依賴於版本控制和檔案。
採用Apache Airflow來執行工作流程可以有效地解決這些問題。它根據預定的時間表或觸發器自動執行工作流程,最大限度地減少了手動干預的需要。此外,它還提供了全面的監控和日誌記錄功能,讓使用者可以輕鬆地跟蹤工作流程的進度和排查出現的問題。在Airflow中執行DAG(有向無環圖)時,您可以確保任務按照其依賴關係的正確順序執行。Airflow的設計目的是隨著工作流程的大小和複雜性而擴充套件,能夠有效地管理大量資料和計算資源。此外,它透過將工作流程封裝為程式碼來促進可重現性,便於版本控制,並使團隊成員之間能夠分享,以重現實驗和跟蹤時間變化。
在本章中,我們將展示如何使用Apache Airflow建立可擴充套件的資料處理管線。我們將建立整個AWS工作流程,包括使用PySpark進行資料預處理、使用PyTorch和TensorFlow開發深度學習模型,並使用Airflow執行程式碼。更具體地說,我們將使用第4章和第7章中的Tesla股票和糖尿病資料集,以及這些章節中的獨立迴歸和分類別指令碼。這將使我們能夠展示如何正常執行Python指令碼,並將其與作為Apache Airflow DAG的更有效執行進行比較。
使用Apache Airflow進行Tesla股票價格預測的管線
在本文中,我們將展示如何使用Apache Airflow建立可擴充套件的資料處理工作流程。我們將建立一個AWS管線,包括使用PySpark進行資料預處理、使用PyTorch開發深度學習模型,並使用Airflow執行程式碼。
為了實作這一目標,我們將使用與第4章相同的Tesla資料集。程式碼也將與該章節保持一致,主要區別在於本章節採用了更模組化的方法。
Spark組態與資料預處理
spark = SparkSession.builder \
.appName("Tesla Stock Price Prediction") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "2") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
內容解密:
.appName("Tesla Stock Price Prediction"):設定Spark應用程式的名稱,用於在Spark UI中識別該應用。.config("spark.executor.memory", "4g"):為每個執行器組態4GB的記憶體,以處理資料和執行任務。.config("spark.executor.cores", "2"):為每個執行器分配2個核心,以提高平行處理能力。.config("spark.driver.memory", "2g"):為驅動程式組態2GB的記憶體,用於協調任務和管理後設資料。.getOrCreate():如果已經存在具有相同組態的SparkSession,則傳回現有的例項;否則,建立一個新的例項。
data_file_path = "/home/ubuntu/airflow/dags/tesla_stock_data.csv"
train_parquet_path = "/home/ubuntu/airflow/dags/tesla_stock_train.parquet"
test_parquet_path = "/home/ubuntu/airflow/dags/tesla_stock_test.parquet"
TeslaDataProcessor.preprocess_data(spark, data_file_path, train_parquet_path, test_parquet_path)
內容解密:
data_file_path:指定Tesla股票資料CSV檔案的路徑。train_parquet_path和test_parquet_path:指定預處理後的訓練和測試資料將被儲存為Parquet檔案的路徑。TeslaDataProcessor.preprocess_data():呼叫自定義的資料預處理函式,使用Spark對資料進行處理,並將結果儲存到指定的Parquet檔案中。
訓練與評估模型
train_tesla_df = spark.read.parquet(train_parquet_path)
X_train = np.array(train_tesla_df.select("scaled_features").rdd.flatMap(lambda x: x).collect())
y_train = np.array(train_tesla_df.select("label").rdd.flatMap(lambda x: x).collect())
test_tesla_df = spark.read.parquet(test_parquet_path)
X_test = np.array(test_tesla_df.select("scaled_features").rdd.flatMap(lambda x: x).collect())
y_test = np.array(test_tesla_df.select("label").rdd.flatMap(lambda x: x).collect())
best_model = TeslaModelTrainer.train_pytorch_model(X_train, y_train, X_test, y_test)
TeslaModelEvaluator.evaluate_model(best_model, X_test, y_test)
內容解密:
- 從Parquet檔案中讀取訓練和測試資料,並將其轉換為NumPy陣列格式,以便於後續的模型訓練和評估。
TeslaModelTrainer.train_pytorch_model():呼叫自定義的PyTorch模型訓練函式,使用訓練資料訓練模型,並傳回最佳模型。TeslaModelEvaluator.evaluate_model():呼叫自定義的模型評估函式,使用測試資料評估最佳模型的效能。
本章展示瞭如何使用Apache Airflow建立可擴充套件的深度學習管線,從而自動化和簡化工作流程,提高效率並減少手動干預的需求。透過結合PySpark進行資料預處理和PyTorch/TensorFlow進行模型訓練,我們能夠建立一個完整的AWS工作流程,實作了資料處理、模型開發和執行的自動化。