在機器學習專案中,資料處理、模型訓練、評估和佈署等步驟通常需要重複執行。Kubeflow Pipelines 提供了一個框架,可以將這些步驟組織成一個自動化的管線,提高效率和可靠性。本文將逐步介紹如何使用 Kubeflow Pipelines 建置和自動化機器學習管線,並探討最佳實踐。首先,我們需要將程式碼容器化,以便在 Kubernetes 叢集中執行。接著,我們會定義管線的各個元件,例如資料預處理、模型訓練和評估,並使用 Kubeflow Pipelines 的 DSL 將這些元件連線起來,形成一個完整的管線。最後,我們將探討如何設定自動化觸發器,例如 Cloud Scheduler 或 Cloud Storage 事件,以便在滿足特定條件時自動執行管線。
機器學習管線(Machine Learning Pipelines)
機器學習管線的整體檢視
圖 10-1 展示了機器學習管線的高層檢視。為了建立一個能夠接收影像檔案並識別其中花卉的網路服務,如同本文所述,我們需要執行以下步驟:
- 將 JPEG 影像轉換為 TensorFlow Records,並將資料分割為訓練、驗證和測試資料集,以建立我們的資料集。
- 訓練一個機器學習模型來分類別花卉(我們已經進行了超引數調優以選擇最佳模型,但假設我們可以預先確定引數)。
- 佈署模型以供服務。
圖 10-1. 端對端的機器學習管線
正如您在本文中將看到的,為了在機器學習管線中完成這些步驟,我們必須:
- 設定一個叢集來執行管線。
- 將我們的程式碼容器化,因為管線執行的是容器。
- 編寫對應於管線每個步驟的管線元件。
- 連線管線元件,以便一次性執行整個管線。
- 自動化管線,使其能夠回應諸如新資料到達等事件而執行。
為何需要管線
在我們使用原始資料集訓練模型之後,如果我們獲得了幾百個額外的檔案來訓練,我們需要執行相同的操作來處理這些檔案,將它們新增到我們的資料集中,並重新訓練我們的模型。對於一個嚴重依賴新鮮資料的模型(例如用於產品識別而非花卉分類別的模型),我們可能需要每天執行這些步驟。
隨著新資料的到達,模型的效能可能會因為資料漂移(data drift)而開始下降——也就是說,新資料可能與訓練資料不同。也許新的影像解析度更高,或者來自我們訓練資料集中沒有的季節或地點。我們也可以預期,一個月後,我們將會有更多的想法想要嘗試。也許我們的同事將會設計出更好的增強濾波器,或者將會發布新版本的 MobileNet(我們正在進行遷移學習的架構)。改變模型程式碼的實驗將會相當常見,並且必須為此做好規劃。
理想情況下,我們希望有一個框架能夠幫助我們排程和實施機器學習管線,並允許不斷的實驗。Kubeflow Pipelines 提供了一個軟體框架,可以用其特定領域語言(DSL)表示我們選擇的任何機器學習管線。它執行在 Kubeflow 上,這是一個針對執行 TensorFlow 模型的 Kubernetes 框架(見圖 10-2)。Google Cloud 上的受控 Kubeflow Pipelines 執行器稱為 Vertex Pipelines。管線本身可以在 Kubernetes 叢集上執行步驟(用於本地佈署),或者在 Google Cloud 上呼叫 Vertex Training、Vertex Prediction 和 Cloud Dataflow。關於實驗和步驟的中繼資料可以儲存在叢集本身中,也可以儲存在 Cloud Storage 和 Cloud SQL 中。
圖 10-2. Kubeflow Pipelines API 執行在 TensorFlow 和 Kubernetes 上
大多數機器學習管線都遵循一套相當標準的步驟:資料驗證、資料轉換、模型訓練、模型評估、模型佈署和模型監控。如果您的管線遵循這些步驟,您可以利用 TensorFlow Extended(TFX)提供的高層抽象,以 Python API 的形式提供。這樣,您就不需要在 DSL 和容器化步驟的層級上工作。TFX 超出了本文的範圍。
Kubeflow Pipelines 叢集
要執行 Kubeflow 管線,我們需要一個叢集。我們可以在 Google Cloud 上透過導航到 AI Platform Pipelines 控制檯並建立一個新例項來設定一個。一旦啟動,我們將獲得一個連結來開啟 Pipelines 控制檯,以及一個提供主機 URL 的設定圖示(見圖 10-3)。
圖 10-3. AI Platform Pipelines 提供了一個受控的 Kubeflow Pipelines 執行環境
我們可以在 Jupyter notebook 中開發管線,然後將它們佈署到叢集。請跟隨 GitHub 上的 07e_mlpipeline.ipynb 中的完整程式碼。
程式碼容器化
一旦我們有了叢集,我們的管線的第一步就需要將 JPEG 檔案轉換為 TensorFlow Records。回想一下,我們在第 5 章編寫了一個名為 jpeg_to_tfrecord.py 的 Apache Beam 程式來處理這個任務。為了使這項工作可重複,我們需要將其製作成一個容器,其中捕捉了所有的依賴項。
我們在 Jupyter notebook 中開發了它,幸運的是,Vertex AI 上的 Notebooks 服務會為每個 Notebook 例項型別發布一個容器映像。
程式碼範例:
# 在這裡插入範例程式碼
內容解密:
這段程式碼展示瞭如何使用 Apache Beam 將 JPEG 影像轉換為 TensorFlow Records。關鍵在於利用 Beam 的平行處理能力高效地處理大量影像檔案。首先,我們定義了一個管道(pipeline),然後使用 ReadFromText 從文字檔案中讀取影像檔案路徑,接著透過一系列轉換(transformations)將這些路徑轉換為 TensorFlow Records,最後寫入到指定的輸出路徑。在這個過程中,我們利用了 Beam 的容錯機制和自動擴縮能力,確保了資料處理的高效性和可靠性。
下一步驟
在接下來的章節中,我們將繼續探討如何編寫和連線管線元件,以實作端對端的機器學習管線。敬請期待!
機器學習管線的建置與執行
在現代化的機器學習開發流程中,建置一個自動化、可擴充套件且具備高效能的管線至關重要。透過容器化技術與 Kubeflow Pipelines 的結合,我們能夠實作這一目標。
容器化的重要性
首先,我們需要為我們的機器學習程式建置一個適當的容器。這涉及以下步驟:
- 取得對應的容器映像:選擇適合的基礎映像,例如 TensorFlow 2 的 GPU 版本。
- 安裝額外的軟體依賴:根據需求安裝必要的 Python 套件,例如
apache-beam[gcp]和cloudml-hypertune。 - 複製必要的程式碼:將整個儲存函式庫複製到容器中,以便於後續的操作。
以下是一個範例 Dockerfile,用於執行上述步驟:
FROM gcr.io/deeplearning-platform-release/tf2-gpu
RUN python3 -m pip install --upgrade apache-beam[gcp] cloudml-hypertune
RUN mkdir -p /src/practical-ml-vision-book
COPY . /src/practical-ml-vision-book/
內容解密:
FROM gcr.io/deeplearning-platform-release/tf2-gpu:使用 TensorFlow 2 的 GPU 版本作為基礎映像。RUN python3 -m pip install --upgrade apache-beam[gcp] cloudml-hypertune:安裝必要的 Python 套件,包括 Apache Beam 和 CloudML Hypertune。RUN mkdir -p /src/practical-ml-vision-book:在容器中建立一個目錄,用於存放我們的程式碼。COPY . /src/practical-ml-vision-book/:將當前目錄下的所有檔案複製到容器中的指定目錄。
建置並推播 Docker 映像的命令如下:
full_image_name=gcr.io/${PROJECT_ID}/practical-ml-vision-book:latest
docker build -t "${full_image_name}" .
docker push "$full_image_name"
編寫 Kubeflow Pipelines 元件
每個元件都需要從 YAML 檔案中載入定義,然後用於建立實際的元件。以下是建立資料集元件的範例:
inputs:
- {name: runner, type: str, default: 'DirectRunner', description: '執行模式'}
- {name: project_id, type: str, description: '專案 ID'}
- {name: region, type: str, description: '執行區域'}
- {name: input_csv, type: GCSPath, description: '輸入 CSV 檔案路徑'}
- {name: output_dir, type: GCSPath, description: '輸出目錄路徑'}
- {name: labels_dict, type: GCSPath, description: '標籤字典檔案路徑'}
implementation:
container:
image: gcr.io/[PROJECT-ID]/practical-ml-vision-book:latest
command: [
"bash",
"/src/practical-ml-vision-book/.../create_dataset.sh"
]
args: [
{inputValue: output_dir},
{outputPath: tfrecords_topdir},
"--all_data", {inputValue: input_csv},
"--labels_file", {inputValue: labels_dict},
"--project_id", {inputValue: project_id},
"--output_dir", {inputValue: output_dir},
"--runner", {inputValue: runner},
"--region", {inputValue: region},
]
內容解密:
inputs:定義了元件所需的輸入引數,包括執行模式、專案 ID、執行區域等。implementation:定義了元件的實作細節,包括使用的 Docker 映像、執行的命令和引數。create_dataset.sh:一個 shell 指令碼,用於執行實際的資料集建立工作。
在 Python 程式碼中,我們可以載入並使用這個元件:
create_dataset_op = kfp.components.load_component_from_file('components/create_dataset.yaml')
create_dataset = create_dataset_op(
runner='DataflowRunner',
project_id=project_id,
region=region,
input_csv='gs://cloud-ml-data/img/flower_photos/all_data.csv',
output_dir='gs://{}/data/flower_tfrecords'.format(bucket),
labels_dict='gs://cloud-ml-data/img/flower_photos/dict.txt'
)
定義與執行管線
我們可以定義一個管線,包含多個元件,並指定它們之間的依賴關係。以下是一個範例管線定義:
@dsl.pipeline(
name='Flowers Transfer Learning Pipeline',
description='端對端的管線'
)
def flowerstxf_pipeline(
project_id=PROJECT,
bucket=BUCKET,
region=REGION
):
# 步驟 1:建立資料集
create_dataset = create_dataset_op(
runner='DataflowRunner',
project_id=project_id,
region=region,
input_csv='gs://cloud-ml-data/img/flower_photos/all_data.csv',
output_dir='gs://{}/data/flower_tfrecords'.format(bucket),
labels_dict='gs://cloud-ml-data/img/flower_photos/dict.txt'
)
內容解密:
@dsl.pipeline:定義了一個 Kubeflow Pipelines 管線。flowerstxf_pipeline:管線函式,包含多個元件和它們之間的依賴關係。create_dataset_op:建立資料集的元件。
編譯並提交管線的程式碼如下:
pipeline_func = flowerstxf_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
import kfp
client = kfp.Client(host=KFPHOST)
experiment = client.create_experiment('from_notebook')
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(
experiment.id,
run_name,
pipeline_filename,
{
'project_id': PROJECT,
'bucket': BUCKET,
'region': REGION
}
)
內容解密:
compiler.Compiler().compile:將管線函式編譯成一個 ZIP 檔案。kfp.Client:建立一個 Kubeflow Pipelines 的客戶端,用於提交管線。client.run_pipeline:提交管線並執行。
Kubeflow 管道與機器學習工作流程自動化
在機器學習(ML)領域,將工作流程自動化是提高效率和可靠性的關鍵。Kubeflow 管道提供了一個強大的工具,用於協調和管理 ML 工作流程。本章將探討如何使用 Kubeflow 管道自動化 ML 工作流程,並介紹相關的最佳實踐。
連線元件與建立管道
建立 ML 管道的第一步是定義管道中的各個元件。在前面的範例中,我們建立了一個名為 create_dataset 的元件,用於將影像資料轉換為 TensorFlow Records。接下來,我們需要建立一個名為 train_model 的元件,用於在 TensorFlow Records 上訓練 ML 模型。
create_dataset = create_dataset_op(...)
train_model = train_model_op(
input_topdir=create_dataset.outputs['tfrecords_topdir'],
region=region,
job_dir='gs://{}/trained_model'.format(bucket)
)
內容解密:
create_dataset_op和train_model_op的定義:這兩個操作(operation)分別代表了建立資料集和訓練模型的元件。它們透過 Kubeflow Pipelines 的 API 被定義和例項化。train_model_op的輸入依賴於create_dataset的輸出:這裡的input_topdir引數依賴於create_dataset輸出的tfrecords_topdir。這種依賴關係確保了train_model步驟在create_dataset完成後才開始執行。create_dataset步驟寫出輸出目錄名稱到檔案:在create_dataset.sh指令碼中,輸出目錄的名稱被寫入到一個檔案中,該檔案的名稱由 Kubeflow Pipelines 自動生成。
自動化執行與快取
為了自動化管道的執行,我們可以使用 Python API 提交新的執行任務。這可以被整合到 Cloud Function 或 Cloud Run 容器中,並根據 Cloud Scheduler 的觸發或儲存桶中新檔案的新增來呼叫。
Kubeflow Pipelines 支援快取機制,可以重用之前執行的結果。但是,由於它不檢查 Google Cloud Storage 目錄的內容,因此當輸入引數保持不變但目錄內容改變時,快取可能會失效。因此,建議明確設定快取的過時標準。
create_dataset.execution_options.caching_strategy.max_cache_staleness = "P7D"
內容解密:
max_cache_staleness的設定:這裡設定了快取的最大過時時間為 7 天(“P7D”)。這意味著如果相同的輸入在 7 天內再次出現,管道將重用之前的輸出。- 使用時間戳記命名輸入和輸出目錄:為了有效地使用快取,建議在輸入和輸出目錄的名稱中加入時間戳記,以避免因目錄內容變更而導致快取失效。
可解釋性與模型信任
當模型做出預測時,瞭解其決策過程是非常重要的。可解釋性方法可以幫助我們理解模型是如何工作的,並找出潛在的問題,如偏差或錯誤。
內容解密:
- 可解釋性的重要性:可解釋性對於建立對模型的信任、除錯模型以及發現偏差至關重要。
- 全域性和例項級別的可解釋性:全域性可解釋性提供了對整個模型的洞察,而例項級別的可解釋性則關注於單個預測的解釋。