Kubernetes 叢集有效簡化了多伺服器管理和資源分配,本文介紹如何利用 Kubernetes 任務追蹤器構建分散式訓練服務。該服務支援 Docker 和 Kubernetes 後端,透過 Kubernetes Pod 執行訓練任務,並在記憶體中追蹤任務狀態。使用者可透過 gRPC 提交訓練請求,並指定 PARALLEL_INSTANCES 引數控制工作節點數量。訓練進度可透過 GetTrainingStatus API 或 kubectl 命令查詢。服務會建立 Kubernetes Service 物件,確保主節點和工作節點之間的網路連線。對於 PyTorch 分散式訓練,服務會設定必要的環境變數,例如 WORLD_SIZE、RANK、MASTER_ADDR 和 MASTER_PORT,並使用 Kubernetes DNS 作為永久位址,解決 Pod IP 位址變動的問題。程式碼實作部分展示瞭如何啟動訓練 Pod、設定環境變數以及處理工作狀態更新。最後,文章還簡述了大型模型訓練策略,例如梯度累積和記憶體交換等技術。
分散式訓練的服務設計與實作
在管理多台伺服器的叢集時,Kubernetes 提供了強大的協助,可以大幅簡化工作資源的分配和工作節點之間的通訊。因此,我們引入了一個新的元件——Kubernetes 任務追蹤器,用於管理 Kubernetes 中的訓練任務。圖 4.3 展示了更新後的服務設計圖和使用者工作流程。
圖 4.3:服務設計更新前後對比
圖 4.3(a) 重複了第 3.3 節中討論的訓練服務系統圖,該圖使用 Docker 任務追蹤器在 Docker 引擎中執行訓練任務。圖 4.3(b) 則展示了更新後的訓練服務,新增了對分散式訓練的支援,包括 Kubernetes 和 Docker 引擎後端。Kubernetes 任務追蹤器負責在 Kubernetes 叢集中執行訓練任務,透過啟動 Kubernetes Pod 來執行訓練任務,並監控和更新任務執行狀態在記憶體儲存中。
與服務互動
首先,我們使用 Kubernetes 後端執行訓練服務,相關命令如下(scripts/ts-001-start-server-kube.sh):
$ docker build -t orca3/services:latest -f services.dockerfile .
$ docker run --name training-service -v \
$HOME/.kube/config:/.kube/config --env \
APP_CONFIG=config-kube.properties \
--network orca3 --rm -d -p
"${TS_PORT}":51001
orca3/services:latest training-service.jar
提交分散式訓練請求時,只需在請求負載中新增 PARALLEL_INSTANCES=3 引數,即可告訴訓練服務建立一個包含三個工作節點的分散式訓練群組來訓練模型(scripts/ts-004-start-parallel-run.sh 1):
# 提交分散式訓練請求
$ grpcurl -plaintext -d "{ "metadata":
{ "algorithm":"intent-classification",
"dataset_id":"1",
"Name":"test1",
"train_data_version_hash":"hashBA==",
"Parameters":{
"LR":"4","EPOCHS":"15",
"PARALLEL_INSTANCES":"3",
"BATCH_SIZE":"64","FC_SIZE":"128"}}
}"
${TS_SERVER}:${TS_PORT}
training.TrainingService/Train
內容解密:
- 使用
grpcurl命令提交 gRPC 請求,呼叫Train方法。 - 請求負載中包含了訓練所需的引數,如學習率(LR)、迭代次數(EPOCHS)、平行例項數(PARALLEL_INSTANCES)等。
PARALLEL_INSTANCES=3表示建立三個工作節點進行分散式訓練。
檢查訓練進度
可使用 GetTrainingStatus API 查詢訓練執行進度:
grpcurl -plaintext -d "{"job_id": "$1"}"
${TS_SERVER}:"${TS_PORT}"
training.TrainingService/GetTrainingStatus
此外,也可透過 Kubernetes 命令 kubectl get all 檢視訓練進度,會看到三個工作節點 Pod 被建立,其中一個是主節點,其他兩個是普通工作節點。同時,會建立一個 Kubernetes 服務物件 intent-classification-1-master-service,用於主節點和工作節點之間的網路連線。
# 檢視 Kubernetes 資源狀態
$ kubectl get all -n orca3
NAME READY STATUS
pod/intent-classification-1-1-worker 0/1 Completed
pod/intent-classification-1-2-worker 0/1 Completed
pod/intent-classification-1-master 0/1 Completed
NAME TYPE .. ..
service/intent-classification-1-master-service ClusterIP
內容解密:
- 使用
kubectl get all命令檢視 Kubernetes 中的資源狀態。 - 三個 Pod 對應於分散式訓練的三個工作節點,一個 Service 物件用於支援主節點和工作節點之間的通訊。
發起訓練任務
當接收到訓練請求時,請求會被加入到任務佇列中。同時,Kubernetes 任務追蹤器會監控任務佇列,當發現有等待中的任務且系統有可用資源時,便會開始處理這些任務。
對於 PyTorch 分散式訓練任務,追蹤器首先會建立所需數量的 Kubernetes Pod,每個 Pod 執行一個訓練程式。追蹤器也會向每個 Pod 傳遞單獨的引數,然後將任務從任務佇列移到啟動列表中(圖 4.4)。
圖 4.4:Kubernetes 任務追蹤器工作流程
此流程中,Kubernetes 任務追蹤器可以處理單裝置訓練和分散式訓練。對於單裝置訓練,它建立一個 Kubernetes Pod;對於分散式訓練,則建立多個 Pod。
內容解密:
- Kubernetes 任務追蹤器根據請求建立 Pod,並傳遞環境變數中的使用者定義引數。
- 為支援 PyTorch 分散式訓練,服務還會建立一個 Kubernetes Service 物件,用於與主 Pod 通訊。
分散式訓練在 Kubernetes 中的實作範例
在 PyTorch 分散式訓練演算法章節(4.2.3)中,我們瞭解到每個 PyTorch 訓練程式都需要主程式(pod)的 IP 位址來初始化分散式訓練群組。例如,每個 PyTorch 程式碼需要在訓練邏輯開始之前具備以下程式碼片段:
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'xxx xxx xxx xxx'
os.environ['MASTER_PORT'] = '12356'
dist.init_process_group("gloo", rank=rank, world_size=world_size)
然而,在 Kubernetes 中,pod 是一種暫時性的資源,因此我們無法依賴 pod 的 IP 位址來定位 pod。取而代之的是,我們使用 Kubernetes 網域名稱服務(DNS)作為永久位址來定位 pod。即使 pod 被銷毀並在不同的節點上重新建立,且其 IP 位址發生變化,我們仍然可以使用相同的 DNS 來存取它。因此,為了啟用訓練群組的初始化,我們首先為主 pod 建立一個 Kubernetes 服務,然後將 DNS 傳遞給所有工作 pod 作為主 pod 位址。
設定分散式訓練環境變數
在 Kubernetes 中執行分散式訓練時,需要為每個 pod 設定四個環境變數:WORLD_SIZE、RANK、MASTER_ADDR 和 MASTER_PORT。
WORLD_SIZE表示訓練群組的 pod 總數,包括主 pod 和工作 pod。RANK是每個訓練程式的唯一 ID,主程式的 rank 必須為 0。MASTER_ADDR和MASTER_PORT定義了主程式的主機位址和連線埠號碼,以便每個工作 pod 可以使用它們來連線主 pod。
程式碼實作
以下是實作分散式訓練在 Kubernetes 中的程式碼範例:
protected List<String> launchTrainingPods(
int jobId, int worldSize, TrainingJobMetadata metadata, ...) {
...
// 判斷是否為分散式訓練
if (worldSize > 1) {
// 建立 Kubernetes 服務並指向主 pod
api.createNamespacedService(
config.kubeNamespace, serviceBody, null, null, null);
serviceTracker.add(masterServiceName);
logger.info(String.format("Launched master service %s", masterServiceName));
...
}
// 建立訓練 pod 定義
for (int rank = 0; rank < worldSize; rank++) {
envs.put("WORLD_SIZE", Integer.toString(worldSize));
// RANK 0 是主程式
envs.put("RANK", Integer.toString(rank));
envs.put("MASTER_ADDR", masterPodDnsName);
envs.put("MASTER_PORT", Integer.toString(masterPort));
V1PodSpec podSpec = new V1PodSpec()
.restartPolicy("Never")
.addContainersItem(new V1Container()
.image(algorithmToImage(metadata.getAlgorithm()))
.env(envVarsToList(envs)) ...);
String workerPodName = rank == 0 ? masterPodName :
String.format("job-%d-%d-%s-worker-%d", jobId, now, metadata.getName(), rank);
V1Pod workerPodBody = new V1Pod();
workerPodBody.apiVersion("v1");
...
// 建立實際的訓練 pod
api.createNamespacedPod(config.kubeNamespace, workerPodBody, null, null, null);
...
}
return podNames;
}
內容解密:
- 分散式訓練判斷:程式碼首先檢查
worldSize是否大於 1,以判斷是否為分散式訓練。如果是,則建立 Kubernetes 服務並指向主 pod。 - 環境變數設定:為每個 pod 設定
WORLD_SIZE、RANK、MASTER_ADDR和MASTER_PORT環境變數,以便進行分散式訓練。 - Pod 定義與建立:定義訓練 pod 的設定,包括容器映像檔、環境變數等,並建立實際的訓練 pod。
注意事項
- 上述範例程式碼是針對 PyTorch 分散式訓練函式庫進行客製化的,但同樣的概念也可以套用到其他框架,如 TensorFlow 2。
- 在 TensorFlow 中,需要收集所有工作 pod 的 IP 或 DNS,並將它們廣播回每個工作 pod,以啟動分散式訓練群組。
RANK值不一定與 pod 一一對應,因為一個 pod 可以執行多個訓練程式,如果它具備多個 GPU。
分散式訓練中的工作狀態更新與查詢
在建立訓練Pod後,Kubernetes工作追蹤器會持續查詢Pod的執行狀態,並在狀態變更時將工作轉移到其他工作列表中。例如,如果Pod成功建立並開始執行,追蹤器會將工作從啟動列表轉移到執行列表。如果Pod執行完成,追蹤器會將工作從執行列表轉移到已完成的工作列表。圖4.5展示了這一過程。
工作狀態查詢與更新流程
當使用者提交工作狀態查詢時,訓練服務會在記憶體儲存中的所有四個工作佇列中搜尋工作ID並傳回工作物件。有趣的是,儘管有多個訓練Pod,但我們只需要檢查主Pod的狀態即可追蹤分散式訓練的進度。這是因為對於同步資料平行訓練,所有工作節點都必須在每個訓練週期中相互同步,因此主Pod可以代表其他工作節點Pod。
程式碼實作
查詢和更新工作執行狀態的程式碼與我們在3.3.5節中看到的Docker工作追蹤器非常相似。唯一的差別是我們查詢的是Kubernetes叢集而不是Docker引擎來取得訓練狀態。
關鍵程式碼解析
def updateContainerStatus(self):
# 從Kubernetes叢集中取得Pod執行狀態
# 更新工作狀態並轉移到相應的工作列表中
pass
內容解密:
updateContainerStatus方法負責更新容器狀態。- 該方法需要從Kubernetes叢集中取得Pod的執行狀態。
- 根據Pod的執行狀態更新工作狀態,並將工作轉移到相應的工作列表中。
支援資料平行分散式訓練的範例服務
為了支援分散式訓練,我們對意圖分類別訓練程式碼(在前一章3.3.6節中介紹)進行了兩項修改。
第一項修改:初始化訓練群組
我們使用WORLD_SIZE環境變數來檢查訓練程式碼是否應該以分散式方式執行。如果WORLD_SIZE等於1,則使用與3.3.6節中相同的單一裝置訓練程式碼。
程式碼實作
def should_distribute():
return dist.is_available() and config.WORLD_SIZE > 1
def is_distributed():
return dist.is_available() and dist.is_initialized()
if should_distribute():
dist.init_process_group("gloo", rank=config.RANK, world_size=config.WORLD_SIZE)
if is_distributed():
model = DDP(model)
內容解密:
should_distribute函式檢查是否應該進行分散式訓練。is_distributed函式檢查是否已經初始化分散式訓練。- 如果需要分散式訓練,則初始化分散式處理群組。
- 如果已經初始化分散式訓練,則使用
DistributedDataParallel(DDP)包裝模型以啟用資料平行訓練。
第二項修改:只從主Pod上傳最終模型
在第二項修改中,我們只允許主Pod(rank = 0)上傳最終模型,以防止每個工作節點上傳相同的模型多次。
程式碼實作
if config.RANK == 0:
accu_test = evaluate(test_dataloader)
# 上傳模型到元資料儲存
artifact = orca3_utils.create_artifact(config.MODEL_BUCKET, config.MODEL_OBJECT_NAME)
內容解密:
- 檢查當前Pod的rank是否為0(即主Pod)。
- 如果是主Pod,則評估測試資料集並上傳最終模型。
大型模型的訓練策略
隨著神經網路規模的快速增長,我們需要討論常見的大型模型訓練策略。本文將介紹與資料平行策略不同的訓練大型模型的方法。
傳統方法:記憶體節省
當資料科學團隊希望訓練一個能夠載入到訓練叢集中最大GPU上的模型時,可以使用記憶體節省技術,例如梯度累積和記憶體交換。
關鍵概念
- 梯度累積:透過累積多個小批次的梯度來減少記憶體使用量。
- 記憶體交換:在GPU記憶體不足時,將部分資料交換到主機記憶體中。
工程視角
作為平台開發人員,瞭解這些技術有助於設計訓練服務和訓練程式碼之間的通訊協定,並提供故障排除或微調訓練效能的洞察。