隨著機器學習模型日益複雜,龐大的資料量和計算需求使得分散式訓練成為必要。Ray Train 提供了一個簡潔易用的框架,方便開發者快速搭建分散式訓練環境。本文以計程車小費預測模型為例,示範如何使用 Ray Train 進行分散式訓練,並涵蓋資料處理、模型定義、訓練流程以及批次推斷等環節。利用 Dask on Ray 進行資料 ETL,確保資料處理效率,同時結合 PyTorch 建立深度學習模型,並透過 Ray Train 的 API 進行分散式訓練,有效提升模型訓練速度。最後,示範如何使用訓練好的模型進行批次推斷,完成整個機器學習流程。
分散式訓練實務:以 Ray Train 為例
在機器學習領域中,模型的訓練往往需要龐大的資料和計算資源。為了提高訓練效率,分散式訓練成為了一種常見的解決方案。本文將以 Ray Train 為例,介紹如何使用分散式訓練來加速模型的訓練過程。
資料載入、預處理與特徵工程
在訓練模型之前,我們需要對資料進行載入、預處理和特徵工程。這一步驟對於模型的效能至關重要。我們將使用 Dask on Ray 來完成這項工作。
import ray
from ray.util.dask import enable_dask_on_ray
import dask.dataframe as dd
LABEL_COLUMN = "is_big_tip"
FEATURE_COLUMNS = ["passenger_count", "trip_distance", "fare_amount", "trip_duration", "hour", "day_of_week"]
enable_dask_on_ray()
def load_dataset(path: str, *, include_label=True):
columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "tip_amount", "passenger_count", "trip_distance", "fare_amount"]
df = dd.read_parquet(path, columns=columns)
df = df.dropna()
df = df[(df["passenger_count"] <= 4) & (df["trip_distance"] < 100) & (df["fare_amount"] < 1000)]
df["tpep_pickup_datetime"] = dd.to_datetime(df["tpep_pickup_datetime"])
df["tpep_dropoff_datetime"] = dd.to_datetime(df["tpep_dropoff_datetime"])
df["trip_duration"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.seconds
df = df[df["trip_duration"] < 4 * 60 * 60] # 4 hours.
df["hour"] = df["tpep_pickup_datetime"].dt.hour
df["day_of_week"] = df["tpep_pickup_datetime"].dt.weekday
if include_label:
df[LABEL_COLUMN] = df["tip_amount"] > 0.2 * df["fare_amount"]
df = df.drop(columns=["tpep_pickup_datetime", "tpep_dropoff_datetime", "tip_amount"])
return ray.data.from_dask(df).repartition(100)
內容解密:
- 使用 Dask on Ray 載入 Parquet 檔案中的資料,並進行初步的資料清理。
- 對資料進行預處理,包括轉換日期時間格式、計算行程時長等。
- 根據需要計算標籤欄位,並刪除未使用的欄位。
- 將處理後的 Dask DataFrame 轉換為 Ray Dataset,以便後續的訓練和推斷。
定義深度學習模型
接下來,我們需要定義一個深度學習模型。在本例中,我們使用 PyTorch 定義了一個名為 FarePredictor 的神經網路。
import torch
import torch.nn as nn
import torch.nn.functional as F
class FarePredictor(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(6, 256)
self.fc2 = nn.Linear(256, 16)
self.fc3 = nn.Linear(16, 1)
self.bn1 = nn.BatchNorm1d(256)
self.bn2 = nn.BatchNorm1d(16)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.bn1(x)
x = F.relu(self.fc2(x))
x = self.bn2(x)
x = torch.sigmoid(self.fc3(x))
return x
內容解密:
- 定義了一個具有三層全連線層的神經網路 FarePredictor。
- 使用 ReLU 作為啟用函式,並在前兩層後使用批次歸一化。
- 最後一層使用 Sigmoid 啟用函式輸出一個介於 0 和 1 之間的值。
使用 Ray Train 進行分散式訓練
有了資料和模型之後,我們可以使用 Ray Train 進行分散式訓練。
from ray import train
# 省略了部分程式碼,請參考原來的內容
內容解密:
- 使用 Ray Train 定義了一個可擴充套件的訓練過程。
- 在多台機器上平行訓練模型,每台機器都有模型的副本和資料的子集。
- 使用 PyTorch DataParallel 在底層實作資料平行訓練。
使用Ray Train進行分散式訓練的實務操作
在分散式訓練的過程中,每個工作節點(worker)在每個訓練週期(epoch)中都需要對本地端的資料分片(shard)進行訓練。這涉及將資料輸入本地的模型副本,進行反向傳播(backpropagation)以更新模型權重。完成每個訓練週期後,工作節點會利用Ray Train的工具來報告訓練結果並儲存當前的模型權重,以供後續使用。
定義每個工作節點的訓練迴圈
from ray.air import session
from ray.air.config import ScalingConfig
import ray.train as train
from ray.train.torch import TorchCheckpoint, TorchTrainer
def train_loop_per_worker(config: dict):
batch_size = config.get("batch_size", 32)
lr = config.get("lr", 1e-2)
num_epochs = config.get("num_epochs", 3)
dataset_shard = session.get_dataset_shard("train")
model = FarePredictor()
dist_model = train.torch.prepare_model(model)
loss_function = nn.SmoothL1Loss()
optimizer = torch.optim.Adam(dist_model.parameters(), lr=lr)
for epoch in range(num_epochs):
loss = 0
num_batches = 0
for batch in dataset_shard.iter_torch_batches(batch_size=batch_size, dtypes=torch.float):
labels = torch.unsqueeze(batch[LABEL_COLUMN], dim=1)
inputs = torch.cat([torch.unsqueeze(batch[f], dim=1) for f in FEATURE_COLUMNS], dim=1)
output = dist_model(inputs)
batch_loss = loss_function(output, labels)
optimizer.zero_grad()
batch_loss.backward()
optimizer.step()
num_batches += 1
loss += batch_loss.item()
session.report({"epoch": epoch, "loss": loss}, checkpoint=TorchCheckpoint.from_model(dist_model))
內容解密:
train_loop_per_worker函式:定義了每個工作節點在每個訓練週期內執行的訓練邏輯。config字典:允許在執行時傳入引數,如批次大小(batch_size)、學習率(lr)和訓練週期數(num_epochs)。dataset_shard:透過session.get_dataset_shard("train")取得當前工作節點的資料分片。dist_model:使用train.torch.prepare_model準備模型以進行分散式訓練。- 訓練迴圈:迭代資料批次,計算損失,反向傳播更新模型引數,並在每個訓練週期結束後報告損失和儲存模型檢查點。
載入訓練和驗證資料
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={"lr": 1e-2, "num_epochs": 3, "batch_size": 64},
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": load_dataset("nyc_tlc_data/yellow_tripdata_2020-01.parquet")},
)
result = trainer.fit()
trained_model = result.checkpoint
內容解密:
TorchTrainer:用於建立一個分散式訓練任務,需要指定train_loop_per_worker函式。train_loop_config:傳入訓練迴圈所需的組態引數。ScalingConfig:定義了訓練任務的擴充套件組態,如工作節點數量。datasets:指定訓練資料集,這裡使用load_dataset函式載入資料。.fit()方法:啟動訓練過程,並傳回訓練結果。
分散式批次推斷
完成模型訓練後,可以使用訓練好的模型進行分散式批次推斷。
from ray.train.torch import TorchPredictor
from ray.train.batch_predictor import BatchPredictor
batch_predictor = BatchPredictor(trained_model, TorchPredictor)
ds = load_dataset("nyc_tlc_data/yellow_tripdata_2021-01.parquet", include_label=False)
batch_predictor.predict_pipelined(ds, blocks_per_window=10)
內容解密:
BatchPredictor:用於建立一個批次預測器,需要傳入訓練好的模型檢查點和預測器類別(如TorchPredictor)。load_dataset:載入需要進行預測的資料集。.predict_pipelined()方法:對資料集進行分散式批次預測,可以透過調整blocks_per_window引數來控制每次處理的資料量。
Ray Train 的核心:Trainer 抽象概念
Ray Train 提供了一個強大的抽象概念——Trainer,用於實作分散式訓練。本文將探討 Trainer 的細節,並透過 PyTorch 的例子來展示如何使用 Ray Train 進行分散式訓練。
什麼是 Trainer?
Trainer 是 Ray Train 中的一個框架特定的類別,用於以分散式方式執行模型訓練。所有的 Ray Trainer 類別分享一個共同的介面,主要包括兩個方面:
.fit()方法:用於將給定的 Trainer 與資料集、組態和所需的擴充套件屬性進行擬合。.checkpoint屬性:傳回該 Trainer 的 Ray Checkpoint 物件。
使用 PyTorch 的例子
讓我們透過一個 PyTorch 的例子來瞭解如何使用 Ray Train 進行分散式訓練。首先,我們定義一個簡單的神經網路模型和訓練迴圈:
import torch
import torch.nn as nn
import torch.nn.functional as F
from ray.data import from_torch
# 定義神經網路模型
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(10, 15)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(15, 5)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
# 定義訓練迴圈
def train_one_epoch(model, loss_fn, optimizer, input_data, label_data):
output = model(input_data)
loss = loss_fn(output, label_data)
optimizer.zero_grad()
loss.backward()
optimizer.step()
def training_loop():
model = NeuralNetwork()
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
input_data = torch.randn(20, 10)
label_data = torch.randn(20, 5)
for epoch in range(3):
train_one_epoch(model, loss_fn, optimizer, input_data, label_data)
內容解密:
此程式碼定義了一個簡單的神經網路模型 NeuralNetwork,包含兩個全連線層和一個 ReLU 啟用函式。train_one_epoch 函式負責進行單次迭代的訓練,計算損失、反向傳播和最佳化器更新。training_loop 函式初始化模型、損失函式和最佳化器,並執行多次迭代的訓練。
將 PyTorch 模型遷移到 Ray Train
要使用 Ray Train 進行分散式訓練,我們只需要對程式碼進行一個小小的修改,即呼叫 prepare_model 函式來準備模型:
from ray.train.torch import prepare_model
def distributed_training_loop():
model = NeuralNetwork()
model = prepare_model(model) # 準備模型以進行分散式訓練
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
input_data = torch.randn(20, 10)
label_data = torch.randn(20, 5)
train_dataset = from_torch(input_data)
for epoch in range(3):
train_one_epoch(model, loss_fn, optimizer, input_data, label_data)
內容解密:
此程式碼與之前的 training_loop 相似,但呼叫了 prepare_model 函式來準備模型以進行分散式訓練。這是使用 Ray Train 進行分散式訓練所需的唯一修改。
建立 TorchTrainer
接下來,我們需要建立一個 TorchTrainer 物件,該物件需要三個必要的引數:
train_loop_per_worker:每個工作執行緒執行的訓練函式。datasets:包含 Ray Datasets 的字典,用於提供訓練資料。scaling_config:指定訓練的擴充套件組態,例如工作執行緒數量和 GPU 使用情況。
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_loop_per_worker=distributed_training_loop,
scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
datasets={"train": train_dataset}
)
result = trainer.fit()
內容解密:
此程式碼建立了一個 TorchTrainer 物件,並指定了訓練函式、scaling_config 和資料集。然後,呼叫 fit 方法來執行分散式訓練。