返回文章列表

Ray Train 分散式機器學習訓練實務

本文探討如何運用 Ray Train 框架實作高效的分散式機器學習訓練。文章涵蓋資料載入、預處理、特徵工程、模型定義、分散式訓練流程以及批次推斷等關鍵步驟,並以實際案例示範如何使用 Dask on Ray 進行資料處理,以及如何使用 PyTorch 建立和訓練模型。此外,文章還闡述了 Ray Train 的核心概念

機器學習 分散式系統

隨著機器學習模型日益複雜,龐大的資料量和計算需求使得分散式訓練成為必要。Ray Train 提供了一個簡潔易用的框架,方便開發者快速搭建分散式訓練環境。本文以計程車小費預測模型為例,示範如何使用 Ray Train 進行分散式訓練,並涵蓋資料處理、模型定義、訓練流程以及批次推斷等環節。利用 Dask on Ray 進行資料 ETL,確保資料處理效率,同時結合 PyTorch 建立深度學習模型,並透過 Ray Train 的 API 進行分散式訓練,有效提升模型訓練速度。最後,示範如何使用訓練好的模型進行批次推斷,完成整個機器學習流程。

分散式訓練實務:以 Ray Train 為例

在機器學習領域中,模型的訓練往往需要龐大的資料和計算資源。為了提高訓練效率,分散式訓練成為了一種常見的解決方案。本文將以 Ray Train 為例,介紹如何使用分散式訓練來加速模型的訓練過程。

資料載入、預處理與特徵工程

在訓練模型之前,我們需要對資料進行載入、預處理和特徵工程。這一步驟對於模型的效能至關重要。我們將使用 Dask on Ray 來完成這項工作。

import ray
from ray.util.dask import enable_dask_on_ray
import dask.dataframe as dd

LABEL_COLUMN = "is_big_tip"
FEATURE_COLUMNS = ["passenger_count", "trip_distance", "fare_amount", "trip_duration", "hour", "day_of_week"]

enable_dask_on_ray()

def load_dataset(path: str, *, include_label=True):
    columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "tip_amount", "passenger_count", "trip_distance", "fare_amount"]
    df = dd.read_parquet(path, columns=columns)
    df = df.dropna()
    df = df[(df["passenger_count"] <= 4) & (df["trip_distance"] < 100) & (df["fare_amount"] < 1000)]
    df["tpep_pickup_datetime"] = dd.to_datetime(df["tpep_pickup_datetime"])
    df["tpep_dropoff_datetime"] = dd.to_datetime(df["tpep_dropoff_datetime"])
    df["trip_duration"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.seconds
    df = df[df["trip_duration"] < 4 * 60 * 60]  # 4 hours.
    df["hour"] = df["tpep_pickup_datetime"].dt.hour
    df["day_of_week"] = df["tpep_pickup_datetime"].dt.weekday
    if include_label:
        df[LABEL_COLUMN] = df["tip_amount"] > 0.2 * df["fare_amount"]
    df = df.drop(columns=["tpep_pickup_datetime", "tpep_dropoff_datetime", "tip_amount"])
    return ray.data.from_dask(df).repartition(100)

內容解密:

  1. 使用 Dask on Ray 載入 Parquet 檔案中的資料,並進行初步的資料清理。
  2. 對資料進行預處理,包括轉換日期時間格式、計算行程時長等。
  3. 根據需要計算標籤欄位,並刪除未使用的欄位。
  4. 將處理後的 Dask DataFrame 轉換為 Ray Dataset,以便後續的訓練和推斷。

定義深度學習模型

接下來,我們需要定義一個深度學習模型。在本例中,我們使用 PyTorch 定義了一個名為 FarePredictor 的神經網路。

import torch
import torch.nn as nn
import torch.nn.functional as F

class FarePredictor(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(6, 256)
        self.fc2 = nn.Linear(256, 16)
        self.fc3 = nn.Linear(16, 1)
        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(16)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.bn1(x)
        x = F.relu(self.fc2(x))
        x = self.bn2(x)
        x = torch.sigmoid(self.fc3(x))
        return x

內容解密:

  1. 定義了一個具有三層全連線層的神經網路 FarePredictor。
  2. 使用 ReLU 作為啟用函式,並在前兩層後使用批次歸一化。
  3. 最後一層使用 Sigmoid 啟用函式輸出一個介於 0 和 1 之間的值。

使用 Ray Train 進行分散式訓練

有了資料和模型之後,我們可以使用 Ray Train 進行分散式訓練。

from ray import train

# 省略了部分程式碼,請參考原來的內容

內容解密:

  1. 使用 Ray Train 定義了一個可擴充套件的訓練過程。
  2. 在多台機器上平行訓練模型,每台機器都有模型的副本和資料的子集。
  3. 使用 PyTorch DataParallel 在底層實作資料平行訓練。

使用Ray Train進行分散式訓練的實務操作

在分散式訓練的過程中,每個工作節點(worker)在每個訓練週期(epoch)中都需要對本地端的資料分片(shard)進行訓練。這涉及將資料輸入本地的模型副本,進行反向傳播(backpropagation)以更新模型權重。完成每個訓練週期後,工作節點會利用Ray Train的工具來報告訓練結果並儲存當前的模型權重,以供後續使用。

定義每個工作節點的訓練迴圈

from ray.air import session
from ray.air.config import ScalingConfig
import ray.train as train
from ray.train.torch import TorchCheckpoint, TorchTrainer

def train_loop_per_worker(config: dict):
    batch_size = config.get("batch_size", 32)
    lr = config.get("lr", 1e-2)
    num_epochs = config.get("num_epochs", 3)
    dataset_shard = session.get_dataset_shard("train")
    model = FarePredictor()
    dist_model = train.torch.prepare_model(model)
    loss_function = nn.SmoothL1Loss()
    optimizer = torch.optim.Adam(dist_model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        loss = 0
        num_batches = 0
        for batch in dataset_shard.iter_torch_batches(batch_size=batch_size, dtypes=torch.float):
            labels = torch.unsqueeze(batch[LABEL_COLUMN], dim=1)
            inputs = torch.cat([torch.unsqueeze(batch[f], dim=1) for f in FEATURE_COLUMNS], dim=1)
            output = dist_model(inputs)
            batch_loss = loss_function(output, labels)
            optimizer.zero_grad()
            batch_loss.backward()
            optimizer.step()
            num_batches += 1
            loss += batch_loss.item()
        session.report({"epoch": epoch, "loss": loss}, checkpoint=TorchCheckpoint.from_model(dist_model))

內容解密:

  1. train_loop_per_worker函式:定義了每個工作節點在每個訓練週期內執行的訓練邏輯。
  2. config字典:允許在執行時傳入引數,如批次大小(batch_size)、學習率(lr)和訓練週期數(num_epochs)。
  3. dataset_shard:透過session.get_dataset_shard("train")取得當前工作節點的資料分片。
  4. dist_model:使用train.torch.prepare_model準備模型以進行分散式訓練。
  5. 訓練迴圈:迭代資料批次,計算損失,反向傳播更新模型引數,並在每個訓練週期結束後報告損失和儲存模型檢查點。

載入訓練和驗證資料

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={"lr": 1e-2, "num_epochs": 3, "batch_size": 64},
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"train": load_dataset("nyc_tlc_data/yellow_tripdata_2020-01.parquet")},
)
result = trainer.fit()
trained_model = result.checkpoint

內容解密:

  1. TorchTrainer:用於建立一個分散式訓練任務,需要指定train_loop_per_worker函式。
  2. train_loop_config:傳入訓練迴圈所需的組態引數。
  3. ScalingConfig:定義了訓練任務的擴充套件組態,如工作節點數量。
  4. datasets:指定訓練資料集,這裡使用load_dataset函式載入資料。
  5. .fit()方法:啟動訓練過程,並傳回訓練結果。

分散式批次推斷

完成模型訓練後,可以使用訓練好的模型進行分散式批次推斷。

from ray.train.torch import TorchPredictor
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor(trained_model, TorchPredictor)
ds = load_dataset("nyc_tlc_data/yellow_tripdata_2021-01.parquet", include_label=False)
batch_predictor.predict_pipelined(ds, blocks_per_window=10)

內容解密:

  1. BatchPredictor:用於建立一個批次預測器,需要傳入訓練好的模型檢查點和預測器類別(如TorchPredictor)。
  2. load_dataset:載入需要進行預測的資料集。
  3. .predict_pipelined()方法:對資料集進行分散式批次預測,可以透過調整blocks_per_window引數來控制每次處理的資料量。

Ray Train 的核心:Trainer 抽象概念

Ray Train 提供了一個強大的抽象概念——Trainer,用於實作分散式訓練。本文將探討 Trainer 的細節,並透過 PyTorch 的例子來展示如何使用 Ray Train 進行分散式訓練。

什麼是 Trainer?

Trainer 是 Ray Train 中的一個框架特定的類別,用於以分散式方式執行模型訓練。所有的 Ray Trainer 類別分享一個共同的介面,主要包括兩個方面:

  • .fit() 方法:用於將給定的 Trainer 與資料集、組態和所需的擴充套件屬性進行擬合。
  • .checkpoint 屬性:傳回該 Trainer 的 Ray Checkpoint 物件。

使用 PyTorch 的例子

讓我們透過一個 PyTorch 的例子來瞭解如何使用 Ray Train 進行分散式訓練。首先,我們定義一個簡單的神經網路模型和訓練迴圈:

import torch
import torch.nn as nn
import torch.nn.functional as F
from ray.data import from_torch

# 定義神經網路模型
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(10, 15)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(15, 5)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# 定義訓練迴圈
def train_one_epoch(model, loss_fn, optimizer, input_data, label_data):
    output = model(input_data)
    loss = loss_fn(output, label_data)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

def training_loop():
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    input_data = torch.randn(20, 10)
    label_data = torch.randn(20, 5)
    for epoch in range(3):
        train_one_epoch(model, loss_fn, optimizer, input_data, label_data)

內容解密:

此程式碼定義了一個簡單的神經網路模型 NeuralNetwork,包含兩個全連線層和一個 ReLU 啟用函式。train_one_epoch 函式負責進行單次迭代的訓練,計算損失、反向傳播和最佳化器更新。training_loop 函式初始化模型、損失函式和最佳化器,並執行多次迭代的訓練。

將 PyTorch 模型遷移到 Ray Train

要使用 Ray Train 進行分散式訓練,我們只需要對程式碼進行一個小小的修改,即呼叫 prepare_model 函式來準備模型:

from ray.train.torch import prepare_model

def distributed_training_loop():
    model = NeuralNetwork()
    model = prepare_model(model)  # 準備模型以進行分散式訓練
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    input_data = torch.randn(20, 10)
    label_data = torch.randn(20, 5)
    train_dataset = from_torch(input_data)
    for epoch in range(3):
        train_one_epoch(model, loss_fn, optimizer, input_data, label_data)

內容解密:

此程式碼與之前的 training_loop 相似,但呼叫了 prepare_model 函式來準備模型以進行分散式訓練。這是使用 Ray Train 進行分散式訓練所需的唯一修改。

建立 TorchTrainer

接下來,我們需要建立一個 TorchTrainer 物件,該物件需要三個必要的引數:

  • train_loop_per_worker:每個工作執行緒執行的訓練函式。
  • datasets:包含 Ray Datasets 的字典,用於提供訓練資料。
  • scaling_config:指定訓練的擴充套件組態,例如工作執行緒數量和 GPU 使用情況。
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    train_loop_per_worker=distributed_training_loop,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
    datasets={"train": train_dataset}
)

result = trainer.fit()

內容解密:

此程式碼建立了一個 TorchTrainer 物件,並指定了訓練函式、scaling_config 和資料集。然後,呼叫 fit 方法來執行分散式訓練。