模型訓練服務是機器學習流程自動化的核心。本文介紹的模型訓練服務採用記憶體儲存管理訓練任務,並使用 Docker 容器化技術執行訓練程式。服務提供 gRPC API 供使用者提交訓練請求和查詢任務狀態。Docker 作業追蹤器監控任務佇列,並在 Docker 引擎有可用容量時啟動容器。服務透過環境變數將訓練引數傳遞給容器,並持續監控容器狀態,更新任務狀態。文章也說明瞭使用 MinIO 作為資料集和模型儲存的方案,並以 Python 程式碼示範了意圖分類別模型的訓練流程,包含資料下載、模型訓練、模型儲存和上傳等步驟。
模型訓練服務的設計與實作
模型訓練服務是機器學習工作流程中的關鍵元件,負責管理和執行模型的訓練任務。本章節將探討模型訓練服務的設計原理、核心元件以及其實作細節。
模型訓練工作流程
在介紹模型訓練服務的具體實作之前,我們首先需要了解其整體工作流程。資料科學家(如Alex)負責提交訓練請求,而開發者(如Tang)則維護訓練服務的系統。整個訓練過程是自助式的,Alex可以自行管理模型的訓練,而Tang則專注於確保系統的可用性和效率。
核心元件:記憶體儲存與Docker作業追蹤器
模型訓練服務的兩個核心元件分別是記憶體儲存(Memory Store)和Docker作業追蹤器(Docker Job Tracker)。記憶體儲存使用四種不同的資料結構(對映)來組織請求(作業):作業佇列、啟動列表、執行列表和已完成列表。每個對映代表不同執行狀態下的作業。
記憶體儲存的資料結構
public class MemoryStore {
// 等待執行的作業佇列
public SortedMap<Integer, TrainingJobMetadata> jobQueue = new TreeMap<>();
// 正在啟動的作業列表
public Map<Integer, ExecutedTrainingJob> launchingList = new HashMap<>();
// 正在執行的作業列表
public Map<Integer, ExecutedTrainingJob> runningList = new HashMap<>();
// 已完成的作業列表
public Map<Integer, ExecutedTrainingJob> finalizedJobs = new HashMap<>();
// ...
}
Docker作業追蹤器的功能
Docker作業追蹤器負責在Docker引擎中實際執行作業。它會週期性地監控記憶體儲存中的作業佇列。當Docker引擎有可用容量時,追蹤器會從佇列中取出作業並啟動相應的Docker容器,同時持續監控容器的執行狀態。
模型訓練服務的API
模型訓練服務提供了兩個主要的gRPC API:Train和GetTrainingStatus。Train API用於提交訓練請求,而GetTrainingStatus API則用於取得訓練任務的執行狀態。
API定義
service TrainingService {
rpc Train(TrainRequest) returns (TrainResponse);
rpc GetTrainingStatus(GetTrainingStatusRequest) returns (GetTrainingStatusResponse);
}
message TrainingJobMetadata {
string algorithm = 1;
string dataset_id = 2;
string name = 3;
string train_data_version_hash = 4;
map<string, string> parameters = 5;
}
message GetTrainingStatusResponse {
TrainingStatus status = 1;
int32 job_id = 2;
string message = 3;
TrainingJobMetadata metadata = 4;
int32 positionInQueue = 5;
}
提交訓練請求的實作
當使用者呼叫Train API時,訓練請求會被新增到記憶體儲存的作業佇列中,並傳回一個作業ID供呼叫者參考。具體實作如下:
public void train(TrainRequest request, StreamObserver<TrainResponse> responseObserver) {
int jobId = store.offer(request);
responseObserver.onNext(TrainResponse.newBuilder().setJobId(jobId).build());
responseObserver.onCompleted();
}
內容解密:
train方法接收一個TrainRequest物件和一個StreamObserver<TrainResponse>物件,用於處理訓練請求並傳回回應。store.offer(request)將訓練請求新增到作業佇列中,並傳回分配給該請求的作業ID。- 使用
responseObserver.onNext將包含作業ID的TrainResponse傳送給客戶端。 responseObserver.onCompleted()表示RPC呼叫已完成。
模型訓練服務的實作細節
在前一章節中,我們探討了模型訓練服務的基本架構與設計理念。本章節將探討模型訓練服務的具體實作方式,涵蓋從接收訓練請求到執行訓練任務的整個流程。
3.3 模型訓練服務範例
接收訓練請求
模型訓練服務的核心功能之一是接收並處理訓練請求。以下程式碼展示瞭如何實作這一功能:
public int offer(TrainRequest request) {
int jobId = jobIdSeed.incrementAndGet();
jobQueue.put(jobId, request.getMetadata());
return jobId;
}
啟動訓練任務
當訓練請求被接收後,Docker 任務追蹤器(Docker Job Tracker)會監控任務佇列,並在系統資源足夠時啟動訓練任務。圖 3.6 展示了這一流程。
public boolean hasCapacity() {
return store.launchingList.size() + store.runningList.size() == 0;
}
public String launch(int jobId, TrainingJobMetadata metadata, VersionedSnapshot versionedSnapshot) {
Map<String, String> envs = new HashMap<>();
envs.put("TRAINING_DATA_PATH", versionedSnapshot.getRoot());
envs.putAll(metadata.getParametersMap());
List<String> envStrings = envs.entrySet().stream()
.map(kvp -> String.format("%s=%s", kvp.getKey(), kvp.getValue()))
.collect(Collectors.toList());
String containerId = dockerClient.createContainerCmd(metadata.getAlgorithm())
.withCmd("server", "/data")
.withEnv(envStrings)
.withHostConfig(HostConfig.newHostConfig().withNetworkMode(config.network))
.exec().getId();
dockerClient.startContainerCmd(containerId).exec();
jobIdTracker.put(jobId, containerId);
return containerId;
}
內容解密:
hasCapacity()方法檢查系統是否有足夠的容量來啟動新的訓練任務。launch()方法負責啟動訓練容器,並將訓練引數以環境變數的形式傳遞給容器。- Docker 客戶端用於建立和啟動容器,容器的組態包括演算法名稱、命令、環境變數和網路模式等。
追蹤訓練進度
Docker 任務追蹤器會持續監控每個訓練任務的執行狀態,並根據容器的狀態變化更新任務狀態。
public void updateContainerStatus() {
Set<Integer> launchingJobs = store.launchingList.keySet();
Set<Integer> runningJobs = store.runningList.keySet();
for (Integer jobId : launchingJobs) {
String containerId = jobIdTracker.get(jobId);
InspectContainerResponse.ContainerState state = dockerClient.inspectContainerCmd(containerId).exec().getState();
String containerStatus = state.getStatus();
// 根據容器狀態更新任務狀態
}
for (Integer jobId : runningJobs) {
// 檢查執行中的任務是否已完成
}
}
內容解密:
updateContainerStatus()方法負責更新容器的狀態。- 它遍歷所有正在啟動和執行的任務,查詢容器的狀態,並根據狀態變化更新任務列表。
- 當容器開始執行時,任務會從啟動列表轉移到執行列表;當容器完成時,任務會被轉移到已完成列表。
更新和取得任務狀態
使用者可以透過查詢 GetTrainingStatus API 取得訓練任務的狀態。
public void getTrainingStatus(GetTrainingStatusRequest request) {
int jobId = request.getJobId();
if (store.finalizedJobs.containsKey(jobId)) {
job = store.finalizedJobs.get(jobId);
status = job.isSuccess() ? TrainingStatus.succeed : TrainingStatus.failure;
} else if (store.launchingList.containsKey(jobId)) {
job = store.launchingList.get(jobId);
status = TrainingStatus.launch;
} else if (store.runningList.containsKey(jobId)) {
job = store.runningList.get(jobId);
status = TrainingStatus.running;
} else {
// 處理其他情況
}
}
內容解密:
getTrainingStatus()方法根據任務 ID 查詢任務狀態。- 它檢查任務是否存在於不同的列表中(已完成、啟動中、執行中),並傳回相應的狀態。
- 狀態的判斷根據任務在記憶體儲存中的位置。
綜上所述,模型訓練服務透過 Docker 容器化技術實作了訓練任務的管理和執行。透過對訓練請求的接收、訓練任務的啟動和追蹤,以及任務狀態的更新和查詢,服務提供了完整的模型訓練功能。這些功能的實作細節對於理解模型訓練服務的工作原理至關重要。
3.3 樣本模型訓練服務詳解
在探討模型訓練服務的實作細節後,我們現在來檢視一個具體的範例,展示如何設計和實作一個功能完備的模型訓練服務。
3.3.6 意圖分類別模型訓練程式碼解析
在前面的章節中,我們詳細討論了訓練服務的程式碼實作。現在,讓我們來看看最後一個關鍵部分:模型訓練程式碼。請不要被深度學習演算法所嚇倒;本程式碼範例的主要目的是展示訓練服務如何與模型訓練程式碼互動的具體例項。圖 3.9 繪製了樣本意圖分類別訓練程式碼的工作流程。
我們的樣本訓練程式碼訓練了一個三層神經網路來執行意圖分類別。首先,它從環境變數中取得所有輸入引數,這些引數是由我們的訓練服務(Docker 作業追蹤器)傳遞的。輸入引數包括超引數(迭代次數、學習率等)、資料集下載設定(MinIO 伺服器位址、資料集 ID、版本雜湊)和模型上傳設定。接下來,訓練程式碼下載並解析資料集,然後開始迭代學習過程。最後,程式碼將生成的模型和訓練指標上傳到後設資料儲存函式庫。以下程式碼清單重點介紹了前面提到的主要步驟(train-service/text-classification/train.py 和 train-service/text-classification/Dockerfile)。
程式碼清單 3.7 意圖分類別模型訓練程式碼和 Docker 檔案
# 1. 從環境變數中讀取所有輸入引數
EPOCHS = int_or_default(os.getenv('EPOCHS'), 20)
TRAINING_DATA_PATH = os.getenv('TRAINING_DATA_PATH')
# 2. 從資料集管理系統下載訓練資料
client.fget_object(TRAINING_DATA_BUCKET, TRAINING_DATA_PATH + "/examples.csv", "examples.csv")
client.fget_object(TRAINING_DATA_BUCKET, TRAINING_DATA_PATH + "/labels.csv", "labels.csv")
# 3. 準備資料集
train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(split_test_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
# 4. 開始模型訓練
for epoch in range(1, EPOCHS + 1):
epoch_start_time = time.time()
train(train_dataloader)
print('Checking the results of test dataset.')
accu_test = evaluate(test_dataloader)
print('test accuracy {:8.3f}'.format(accu_test))
# 5. 儲存模型並上傳到後設資料儲存函式庫
client.fput_object(config.MODEL_BUCKET, config.MODEL_OBJECT_NAME, model_local_path)
artifact = orca3_utils.create_artifact(config.MODEL_BUCKET, config.MODEL_OBJECT_NAME)
內容解密:
- 環境變數讀取:從環境變數中取得訓練所需的引數,如迭代次數(
EPOCHS)和訓練資料路徑(TRAINING_DATA_PATH)。 - 資料下載:使用 MinIO 客戶端從指定的儲存桶下載訓練資料(
examples.csv和labels.csv)。 - 資料準備:將下載的資料集分割為訓練、驗證和測試三個部分,並使用
DataLoader對每個部分進行批次處理。 - 模型訓練:在指定的迭代次數內進行模型訓練,每次迭代後評估模型在測試資料集上的準確率。
- 模型儲存與上傳:將訓練好的模型儲存到本地,並上傳到指定的儲存桶,同時建立對應的 artifact 物件。
3.3.7 訓練作業管理
在第 3.1.2 節中,我們提到了一個好的訓練服務應該解決計算隔離問題並提供按需的計算資源(原則 4)。這種隔離有兩層含義:訓練程式執行的隔離和資源消耗的隔離。由於我們將訓練程式 Docker 化,因此程式執行的隔離由 Docker 引擎保證。但是,我們仍然需要自行處理資源消耗的隔離。
資源競爭問題
假設來自不同團隊的三個使用者(A、B 和 C)向我們的訓練服務提交訓練請求。如果使用者 A 提交了 100 個訓練請求,而使用者 B 和 C 各提交了一個請求,那麼在使用者 A 的所有訓練請求完成之前,使用者 B 和 C 的請求將會在等待作業佇列中等待。這就是當我們將訓練叢集視為所有人分享的場地時發生的情況:一個重度使用的案例將主導作業排程和資源消耗。
資源隔離解決方案
為瞭解決這個資源競爭問題,我們需要在訓練叢集內為不同的團隊和使用者設定邊界。我們可以在訓練叢集內建立機器池來實作資源消耗的隔離。每個團隊或使用者可以被分配到一個專用的機器池,每個池都有自己的 GPU 和機器,池的大小取決於專案需求和訓練使用情況。此外,每個機器池可以有一個專用的作業佇列,這樣重度使用的使用者就不會影響其他使用者。圖 3.10 說明瞭這種方法的運作方式。
Kubernetes 的理想實作
實作圖 3.10 所示方法的理想方法是使用 Kubernetes。Kubernetes 允許您建立多個虛擬叢集,並由相同的物理叢集支援,這被稱為名稱空間。Kubernetes 名稱空間是一種輕量級的機器池,它消耗非常少的系統資源。
如果您正在使用 Kubernetes 管理您的服務環境和計算叢集,那麼設定這種隔離是相當容易的。首先,您建立一個具有資源配額的名稱空間,例如 CPU 數量、記憶體大小和 GPU 數量;然後,在訓練服務中定義使用者及其名稱空間對映。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title 模型訓練服務設計與實作
package "Kubernetes Cluster" {
package "Control Plane" {
component [API Server] as api
component [Controller Manager] as cm
component [Scheduler] as sched
database [etcd] as etcd
}
package "Worker Nodes" {
component [Kubelet] as kubelet
component [Kube-proxy] as proxy
package "Pods" {
component [Container 1] as c1
component [Container 2] as c2
}
}
}
api --> etcd : 儲存狀態
api --> cm : 控制迴圈
api --> sched : 調度決策
api --> kubelet : 指令下達
kubelet --> c1
kubelet --> c2
proxy --> c1 : 網路代理
proxy --> c2
note right of api
核心 API 入口
所有操作經由此處
end note
@enduml
圖示說明:
此圖示展示瞭如何在訓練叢集內建立機器池,以為不同的使用者設定資源消耗邊界。每個使用者被分配到一個專用的機器池,擁有自己的作業佇列,從而避免了資源競爭問題。
透過這種方式,我們可以有效地管理和隔離不同使用者之間的資源消耗,確保每個使用者的訓練請求都能得到公平的處理,從而提高整個訓練叢集的利用率和效率。