邁向生產級機器學習的自動化旅程
當企業的機器學習專案從實驗階段邁向生產環境時,自動化流程成為成功的關鍵因素。傳統的手動模型訓練與部署方式不僅耗時耗力,更難以應對持續變動的資料環境與快速演進的業務需求。在台灣許多企業的數位轉型過程中,建立穩定可靠的機器學習自動化管道已成為資料團隊的核心任務。Databricks 作為統一分析平台,整合了 Apache Spark 的分散式運算能力與完整的機器學習生態系統,為組織提供了建構端對端自動化流程的理想環境。
現代企業面臨的資料量呈現爆炸性成長,模型必須定期重新訓練以維持預測準確度。以電商產業為例,消費者行為模式隨著季節、促銷活動與市場趨勢不斷變化,推薦系統若長期使用靜態模型,預測品質會逐漸下降。透過 Databricks 的排程功能結合 AWS Lambda 的事件驅動機制,資料團隊可以建立完全自動化的訓練管道。當新的交易資料匯入資料湖時,系統自動觸發訓練流程,並將訓練完成的模型無縫部署至生產環境,確保推薦結果始終基於最新的消費者行為模式。
MLflow 是 Databricks 平台內建的開源機器學習生命週期管理工具,提供了實驗追蹤、模型封裝、版本控制與部署等完整功能。透過 MLflow Model Registry,團隊能夠集中管理所有模型版本,追蹤每個版本的效能指標與訓練參數,並在開發、測試與生產環境間順暢地推進模型。結合 Databricks 的模型服務功能,已註冊的模型可以一鍵部署為高可用的 REST API 端點,讓下游應用程式透過標準 HTTP 協定即時取得預測結果。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100
package "Databricks 自動化 ML 架構" {
[資料來源] --> [資料湖儲存]
[資料湖儲存] --> [Databricks Workspace]
package "自動化觸發機制" {
[排程任務] --> [訓練 Notebook]
[AWS Lambda] --> [訓練 Notebook]
[S3 事件] --> [AWS Lambda]
}
[訓練 Notebook] --> [MLflow 追蹤]
[MLflow 追蹤] --> [Model Registry]
[Model Registry] --> [模型服務]
[模型服務] --> [REST API]
[REST API] --> [下游應用]
}
note right of [排程任務]
定時觸發訓練流程
支援 Cron 表達式
end note
note right of [Model Registry]
版本控制與階段管理
Staging / Production
end note
@enduml
上方架構圖呈現了 Databricks 自動化機器學習流程的完整生態系統。資料從各種來源匯入 S3 資料湖並轉換為 Delta Lake 格式後,透過排程任務或 S3 事件觸發訓練 Notebook 執行。訓練過程經過特徵工程與模型訓練階段,產生的指標與模型成品由 MLflow Tracking 記錄並註冊至 Model Registry。經過審核的模型透過 Model Serving 部署為 REST API,供網頁、行動與批次處理等不同類型的應用程式呼叫。這種端對端的自動化架構確保從資料到預測服務的每個環節都能高效運作。
排程任務的精細化配置策略
Databricks Jobs 提供了強大的任務排程服務,支援以固定時間間隔或特定時間點執行 Notebook、Python 腳本或 JAR 檔案。對於需要定期重新訓練模型的應用場景,排程任務提供了簡單且可靠的解決方案。透過合理的任務配置與叢集選擇,資料團隊不僅能確保模型始終基於最新資料進行訓練,更能有效控制雲端運算資源的使用成本。
建立排程任務的流程從 Databricks Workspace 的 Jobs 介面開始。每個任務需要指定執行的 Notebook 路徑、運算叢集配置與執行排程設定。叢集配置是影響成本與效能的關鍵決策點,Databricks 提供了 Job Cluster 與 All-Purpose Cluster 兩種選項。Job Cluster 專為任務執行而設計,在任務完成後自動終止釋放資源,每 DBU 費用僅需 0.07 美元。相較之下,All-Purpose Cluster 設計為互動式開發使用,持續運行期間每 DBU 費用高達 0.40 美元。對於批次訓練任務而言,採用 Job Cluster 能夠在維持相同運算效能的前提下,大幅降低營運成本。
執行排程的設定需要考慮業務需求與資料更新頻率。以零售業的需求預測模型為例,若銷售資料每日凌晨完成匯入,可將訓練任務排程在凌晨兩點執行,確保使用最新的完整日資料。Databricks Jobs 支援標準的 Cron 表達式,提供了極高的排程彈性。週一至週五的每日凌晨兩點執行可設定為 “0 2 * * 1-5”,而每月第一天執行則設定為 “0 2 1 * *"。這種精細的時間控制讓團隊能夠根據實際需求安排訓練時程。
# Databricks Notebook: 自動化新聞分類模型訓練流程
# 設計目的: 由排程任務定期執行,實現模型持續更新
# 執行環境: Databricks Runtime ML 版本,包含常用機器學習套件
# 匯入資料處理與機器學習相關套件
import pandas as pd # 用於結構化資料處理與分析
import numpy as np # 提供數值運算與陣列操作功能
from sklearn.model_selection import train_test_split # 資料集分割工具
from sklearn.ensemble import RandomForestClassifier # 隨機森林分類器
from sklearn.feature_extraction.text import TfidfVectorizer # 文字向量化工具
from sklearn.metrics import ( # 多項評估指標
accuracy_score,
f1_score,
precision_score,
recall_score,
classification_report
)
# MLflow 相關套件,用於實驗追蹤與模型管理
import mlflow
import mlflow.sklearn
from datetime import datetime
import json
# 設定 MLflow 實驗名稱,用於組織相關的訓練執行記錄
# 使用共享路徑確保團隊成員都能存取實驗結果
EXPERIMENT_NAME = "/Shared/automated_training/news_classifier"
mlflow.set_experiment(EXPERIMENT_NAME)
def load_and_validate_data(data_path):
"""
載入訓練資料並執行基本驗證
從 DBFS 路徑讀取 Parquet 格式資料,Parquet 提供高效的欄式儲存
與壓縮機制,適合大規模資料處理。函式會執行基本的資料品質檢查,
確保資料符合訓練需求。
參數:
data_path (str): 訓練資料的 DBFS 完整路徑
回傳:
pd.DataFrame: 經過驗證的訓練資料
拋出:
ValueError: 當資料不符合品質標準時
"""
# 使用 Spark 讀取 Parquet 檔案,利用分散式處理能力
# toPandas() 將 Spark DataFrame 轉換為 Pandas DataFrame
# 注意: 大型資料集應考慮直接使用 Spark ML 進行訓練
df = spark.read.parquet(data_path).toPandas()
# 記錄資料載入資訊供後續追蹤
print(f"成功載入 {len(df)} 筆訓練資料")
print(f"資料欄位: {df.columns.tolist()}")
print(f"記憶體使用: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
# 執行資料品質檢查
required_columns = ['description', 'category']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"資料缺少必要欄位: {missing_columns}")
# 檢查空值比例,若超過 10% 則發出警告
null_ratio = df['description'].isnull().sum() / len(df)
if null_ratio > 0.1:
print(f"警告: 描述欄位空值比例達 {null_ratio:.2%}")
# 移除空值記錄,確保訓練資料完整性
df = df.dropna(subset=['description', 'category'])
print(f"移除空值後剩餘 {len(df)} 筆資料")
return df
def engineer_text_features(df, text_column, label_column, max_features=10000):
"""
將文字資料轉換為數值特徵矩陣
使用 TF-IDF (詞頻-逆文件頻率) 向量化技術將文字轉換為數值特徵。
TF-IDF 能夠識別對分類具有區別性的詞彙,同時降低常見詞的權重。
參數:
df (pd.DataFrame): 包含文字與標籤的資料框
text_column (str): 文字內容的欄位名稱
label_column (str): 分類標籤的欄位名稱
max_features (int): 保留的最大特徵數量,控制特徵維度
回傳:
tuple: (特徵矩陣, 標籤陣列, 向量化器物件)
"""
# 建立 TF-IDF 向量化器並設定參數
# max_features 限制特徵數量以控制模型複雜度與記憶體使用
# ngram_range=(1,2) 同時考慮單詞與雙詞組合,捕捉詞序資訊
# stop_words='english' 移除常見的英文停用詞
# min_df=2 忽略只在一份文件中出現的詞,減少雜訊
vectorizer = TfidfVectorizer(
max_features=max_features,
ngram_range=(1, 2),
stop_words='english',
min_df=2,
max_df=0.95, # 忽略在 95% 文件中都出現的詞
sublinear_tf=True # 使用對數尺度的詞頻,減少高頻詞的支配性
)
# 執行向量化轉換,fit_transform 同時學習詞彙表與轉換資料
# 回傳的 X 是稀疏矩陣,節省記憶體空間
X = vectorizer.fit_transform(df[text_column])
y = df[label_column].values
# 輸出特徵工程結果資訊
print(f"特徵矩陣維度: {X.shape}")
print(f"稀疏度: {(1 - X.nnz / (X.shape[0] * X.shape[1])):.2%}")
print(f"詞彙表大小: {len(vectorizer.vocabulary_)}")
# 輸出類別分布,檢查是否存在類別不平衡問題
label_counts = pd.Series(y).value_counts()
print(f"類別分布:\n{label_counts}")
return X, y, vectorizer
def train_random_forest_model(X_train, y_train, model_params):
"""
訓練隨機森林分類模型
Random Forest 是集成學習演算法,透過組合多棵決策樹的預測結果
來提升準確度與穩定性。對於文字分類任務具有良好的效能表現。
參數:
X_train (sparse matrix): 訓練特徵矩陣
y_train (array): 訓練標籤陣列
model_params (dict): 模型超參數配置字典
回傳:
RandomForestClassifier: 訓練完成的模型物件
"""
# 建立 Random Forest 分類器並設定超參數
# n_estimators: 決策樹的數量,越多通常越準確但訓練時間更長
# max_depth: 樹的最大深度,限制可防止過擬合
# min_samples_split: 分割節點所需的最小樣本數
# min_samples_leaf: 葉節點所需的最小樣本數
# class_weight='balanced': 自動調整類別權重以處理不平衡資料
# n_jobs=-1: 使用所有可用 CPU 核心進行平行訓練
model = RandomForestClassifier(
n_estimators=model_params.get('n_estimators', 200),
max_depth=model_params.get('max_depth', 25),
min_samples_split=model_params.get('min_samples_split', 5),
min_samples_leaf=model_params.get('min_samples_leaf', 2),
class_weight='balanced',
n_jobs=-1,
random_state=42, # 固定隨機種子確保結果可重現
verbose=1 # 輸出訓練進度資訊
)
# 執行模型訓練
print("開始訓練隨機森林模型...")
start_time = datetime.now()
model.fit(X_train, y_train)
training_time = (datetime.now() - start_time).total_seconds()
print(f"模型訓練完成,耗時 {training_time:.2f} 秒")
# 輸出特徵重要性資訊
feature_importance = model.feature_importances_
print(f"平均特徵重要性: {feature_importance.mean():.6f}")
return model
def evaluate_model_performance(model, X_test, y_test, label_names=None):
"""
全面評估模型預測效能
計算多項分類效能指標,包含準確率、精確率、召回率與 F1 分數。
同時產生詳細的分類報告,提供每個類別的個別效能分析。
參數:
model: 訓練完成的分類模型
X_test (sparse matrix): 測試特徵矩陣
y_test (array): 測試標籤陣列
label_names (list): 類別名稱列表,用於報告輸出
回傳:
dict: 包含各項評估指標的字典
"""
# 產生預測結果
print("執行模型預測...")
y_pred = model.predict(X_test)
# 計算整體效能指標
# accuracy: 正確預測的比例
# precision: 預測為正類中實際為正類的比例
# recall: 實際為正類中被正確預測的比例
# f1: precision 與 recall 的調和平均數
# weighted: 根據各類別樣本數加權平均
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'precision_weighted': precision_score(y_test, y_pred, average='weighted', zero_division=0),
'recall_weighted': recall_score(y_test, y_pred, average='weighted', zero_division=0),
'f1_weighted': f1_score(y_test, y_pred, average='weighted', zero_division=0)
}
# 輸出整體評估結果
print("\n模型整體效能指標:")
for metric_name, metric_value in metrics.items():
print(f" {metric_name}: {metric_value:.4f}")
# 產生詳細的分類報告,顯示每個類別的個別效能
print("\n詳細分類報告:")
report = classification_report(
y_test,
y_pred,
target_names=label_names,
zero_division=0
)
print(report)
return metrics
def register_model_to_mlflow(model, vectorizer, metrics, model_params):
"""
將訓練完成的模型與相關成品註冊至 MLflow
MLflow 提供完整的模型管理功能,包含版本控制、階段管理與部署。
此函式將模型、向量化器與評估指標一併記錄,確保完整的可追溯性。
參數:
model: 訓練完成的分類模型
vectorizer: 文字向量化器物件
metrics (dict): 評估指標字典
model_params (dict): 模型超參數字典
回傳:
str: MLflow 執行 ID
"""
# 建立 MLflow 執行並記錄所有相關資訊
# run_name 包含時間戳記以區分不同執行
run_name = f"scheduled_training_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
with mlflow.start_run(run_name=run_name) as run:
# 記錄模型超參數,用於後續實驗比較
mlflow.log_params(model_params)
# 記錄評估指標,追蹤模型效能變化
for metric_name, metric_value in metrics.items():
mlflow.log_metric(metric_name, metric_value)
# 記錄額外的訓練資訊
mlflow.log_param("training_timestamp", datetime.now().isoformat())
mlflow.log_metric("n_features", vectorizer.max_features)
# 儲存向量化器以供推論時使用
# 推論時必須使用相同的向量化器確保特徵一致性
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name="news_classifier",
signature=mlflow.models.infer_signature(
vectorizer.transform(["sample text"]).toarray(),
model.predict(vectorizer.transform(["sample text"]))
)
)
# 額外儲存向量化器物件
import joblib
import tempfile
import os
with tempfile.TemporaryDirectory() as tmp_dir:
vectorizer_path = os.path.join(tmp_dir, "vectorizer.pkl")
joblib.dump(vectorizer, vectorizer_path)
mlflow.log_artifact(vectorizer_path, "preprocessing")
print(f"模型已註冊至 MLflow,執行 ID: {run.info.run_id}")
return run.info.run_id
def main():
"""
主執行函式 - 協調完整的訓練流程
整合資料載入、特徵工程、模型訓練、效能評估與模型註冊等步驟,
實現端對端的自動化訓練管道。所有步驟都包含錯誤處理與日誌記錄。
"""
try:
# 定義訓練配置參數
# DBFS 路徑格式: dbfs:/mnt/<mount_name>/<path>
DATA_PATH = "dbfs:/mnt/data/news_articles/training_data.parquet"
TEXT_COLUMN = "description"
LABEL_COLUMN = "category"
# 模型超參數配置
# 這些參數可以透過外部配置檔案或 Databricks Widget 動態調整
MODEL_PARAMS = {
'n_estimators': 200,
'max_depth': 25,
'min_samples_split': 5,
'min_samples_leaf': 2
}
print("="* 60)
print("開始執行自動化訓練流程")
print("="* 60)
# 步驟 1: 載入並驗證訓練資料
print("\n[步驟 1/5] 載入訓練資料...")
df = load_and_validate_data(DATA_PATH)
# 步驟 2: 特徵工程
print("\n[步驟 2/5] 執行特徵工程...")
X, y, vectorizer = engineer_text_features(
df, TEXT_COLUMN, LABEL_COLUMN, max_features=10000
)
# 步驟 3: 分割訓練與測試資料集
# test_size=0.2: 保留 20% 資料作為測試集
# stratify=y: 保持類別分布一致性
# random_state: 固定隨機種子確保可重現性
print("\n[步驟 3/5] 分割資料集...")
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=0.2,
random_state=42,
stratify=y
)
print(f"訓練集大小: {X_train.shape[0]} 筆")
print(f"測試集大小: {X_test.shape[0]} 筆")
# 步驟 4: 訓練模型
print("\n[步驟 4/5] 訓練模型...")
model = train_random_forest_model(X_train, y_train, MODEL_PARAMS)
# 步驟 5: 評估模型效能
print("\n[步驟 5/5] 評估模型效能...")
label_names = sorted(df[LABEL_COLUMN].unique())
metrics = evaluate_model_performance(
model, X_test, y_test, label_names
)
# 註冊模型至 MLflow
print("\n註冊模型至 MLflow...")
run_id = register_model_to_mlflow(
model, vectorizer, metrics, MODEL_PARAMS
)
print("\n" + "="* 60)
print("訓練流程成功完成")
print(f"MLflow 執行 ID: {run_id}")
print("="* 60)
except Exception as e:
# 捕捉所有異常並記錄詳細錯誤資訊
print(f"\n訓練流程發生錯誤: {str(e)}")
import traceback
print(traceback.format_exc())
# 在生產環境中應整合告警系統
# 例如發送電子郵件或 Slack 通知給維運團隊
raise
# Notebook 執行入口點
if __name__ == "__main__":
main()
上述程式碼展示了完整的自動化訓練 Notebook 架構。整個流程從資料載入與驗證開始,經過特徵工程、資料分割、模型訓練、效能評估,最後將模型與相關成品註冊至 MLflow Model Registry。每個函式都有明確的職責劃分與詳細的文件說明,便於團隊成員理解與維護。MLflow 的深度整合確保每次訓練執行都有完整的記錄,包含使用的超參數、產生的評估指標與模型成品,建立起完善的實驗追蹤機制。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100
start
:排程時間到達;
:啟動 Job Cluster;
note right
專屬運算資源
任務結束後自動終止
end note
:載入訓練資料;
if (資料品質檢查) then (通過)
:執行特徵工程;
:分割訓練測試集;
:訓練隨機森林模型;
note right
使用全部 CPU 核心
平行訓練加速
end note
:計算評估指標;
if (效能達到閾值?) then (是)
:註冊模型至 Registry;
:標記為 Staging 階段;
:發送成功通知;
else (否)
:記錄效能警告;
:不進行模型註冊;
:發送效能不足警報;
endif
else (失敗)
:記錄資料品質錯誤;
:發送資料異常警報;
endif
:儲存執行日誌;
:終止 Job Cluster;
:釋放運算資源;
stop
@enduml
上方活動圖詳細呈現了排程任務從觸發到完成的完整執行流程。系統在排程時間到達時啟動專屬的 Job Cluster,接著依序執行資料載入、品質檢查、特徵工程、模型訓練與效能評估等步驟。流程中包含兩個關鍵的決策檢查點,第一個檢查資料品質是否符合訓練要求,第二個檢查模型效能是否達到上線標準。只有通過所有檢查的模型才會被註冊至 Model Registry 並標記為 Staging 階段。任何異常狀況都會觸發相應的警報通知,讓維運團隊能夠及時介入處理。任務完成後 Job Cluster 自動終止,避免不必要的資源浪費。
事件驅動架構的整合實踐
排程任務適合處理週期性的訓練需求,但某些業務場景需要在特定事件發生時立即啟動處理流程。以金融科技產業為例,當偵測到異常交易模式時,系統應該立即觸發模型重新訓練以適應新的詐欺手法。AWS Lambda 與 Databricks Jobs API 的整合提供了實現事件驅動架構的理想途徑,讓系統能夠即時回應資料變化。
AWS Lambda 是無伺服器運算服務,能夠自動回應 AWS 生態系統中各種服務產生的事件。透過配置 S3 事件通知,當新的訓練資料檔案上傳至指定儲存桶時,Lambda 函式會自動被調用執行。Lambda 函式接著透過 Databricks REST API 觸發預先定義的訓練任務,實現從資料到達到模型訓練的全自動化流程。整個過程無需人工介入,大幅縮短了從資料收集到模型更新的時間差距。
/**
* AWS Lambda 函式: 觸發 Databricks 自動化訓練任務
*
* 功能說明:
* 此函式設計為回應 S3 物件建立事件,當新的訓練資料上傳至
* 指定儲存桶時自動觸發。函式會解析 S3 事件資訊並透過
* Databricks Jobs API 啟動預先配置的訓練任務。
*
* 觸發條件:
* - S3 事件類型: s3:ObjectCreated:*
* - 儲存桶路徑前綴: training_data/
* - 檔案副檔名: .parquet
*
* 環境變數需求:
* - DATABRICKS_HOST: Databricks workspace 主機名稱
* - DATABRICKS_TOKEN: Personal Access Token 或 Service Principal
* - DATABRICKS_JOB_ID: 要觸發的 Job ID
*
* @param {Object} event - S3 事件物件,包含上傳檔案的詳細資訊
* @param {Object} context - Lambda 執行環境的上下文資訊
* @param {Function} callback - 回呼函式,用於回傳執行結果
*/
// 引入 Node.js 內建的 https 模組,用於發送 HTTPS 請求
const https = require('https');
// Lambda 主處理函式
exports.handler = (event, context, callback) => {
// 從環境變數取得 Databricks 連線配置
// 使用環境變數而非硬編碼是重要的安全實踐
// 敏感資訊應該透過 AWS Secrets Manager 或 Parameter Store 管理
const DATABRICKS_HOST = process.env.DATABRICKS_HOST;
const DATABRICKS_TOKEN = process.env.DATABRICKS_TOKEN;
const JOB_ID = process.env.DATABRICKS_JOB_ID;
// 驗證必要的環境變數是否存在
if (!DATABRICKS_HOST || !DATABRICKS_TOKEN || !JOB_ID) {
const error = new Error('缺少必要的環境變數配置');
console.error('環境變數檢查失敗:', {
hasHost: !!DATABRICKS_HOST,
hasToken: !!DATABRICKS_TOKEN,
hasJobId: !!JOB_ID
});
return callback(error);
}
// 解析 S3 事件以取得上傳檔案資訊
// S3 事件的 Records 陣列可能包含多筆記錄
// 在大多數情況下只會有一筆,但程式碼應該能處理多筆記錄
if (!event.Records || event.Records.length === 0) {
return callback(new Error('S3 事件不包含任何記錄'));
}
const s3Event = event.Records[0].s3;
const bucketName = s3Event.bucket.name;
// 解碼物件鍵值,處理特殊字元與空格
// S3 會將空格編碼為 +,需要先替換再解碼
const objectKey = decodeURIComponent(
s3Event.object.key.replace(/\+/g, ' ')
);
// 取得檔案大小資訊,用於日誌記錄
const objectSize = s3Event.object.size;
// 記錄詳細的事件資訊
console.log('收到 S3 事件通知:', {
bucket: bucketName,
key: objectKey,
size: `${(objectSize / 1024 / 1024).toFixed(2)} MB`,
eventTime: event.Records[0].eventTime
});
// 建構 Databricks Jobs API 請求主體
// notebook_params 允許將 S3 事件資訊傳遞給 Notebook
// Notebook 可以根據這些參數動態調整行為
const requestBody = JSON.stringify({
job_id: parseInt(JOB_ID),
notebook_params: {
// 傳遞 S3 檔案位置資訊
input_bucket: bucketName,
input_key: objectKey,
input_size_mb: (objectSize / 1024 / 1024).toFixed(2),
// 記錄觸發來源與時間
triggered_by: 's3_event',
trigger_timestamp: new Date().toISOString(),
// 建構完整的 S3 URI
s3_uri: `s3://${bucketName}/${objectKey}`
}
});
// 設定 HTTPS 請求選項
// Databricks API 要求使用 HTTPS 協定並進行 Bearer Token 認證
const requestOptions = {
hostname: DATABRICKS_HOST,
port: 443,
path: '/api/2.0/jobs/run-now', // Databricks Jobs API 端點
method: 'POST',
headers: {
// 使用 Bearer Token 進行 API 認證
'Authorization': `Bearer ${DATABRICKS_TOKEN}`,
'Content-Type': 'application/json',
// 設定正確的內容長度標頭
'Content-Length': Buffer.byteLength(requestBody),
// 加入自訂標頭以識別請求來源
'User-Agent': 'AWS-Lambda-S3-Trigger/1.0'
}
};
// 建立並發送 HTTPS 請求
const request = https.request(requestOptions, (response) => {
let responseData = '';
// 收集回應資料片段
// HTTP 回應可能分多個資料塊傳輸
response.on('data', (chunk) => {
responseData += chunk;
});
// 處理回應完成事件
response.on('end', () => {
console.log('Databricks API 回應狀態:', response.statusCode);
console.log('完整回應內容:', responseData);
try {
// 解析 JSON 回應
const result = JSON.parse(responseData);
// 檢查 API 呼叫是否成功
if (response.statusCode === 200 && result.run_id) {
// 成功觸發訓練任務
console.log(`成功觸發 Databricks 訓練任務`, {
run_id: result.run_id,
job_id: JOB_ID,
triggered_file: objectKey
});
// 回傳成功結果
callback(null, {
statusCode: 200,
body: JSON.stringify({
message: '訓練任務已成功觸發',
run_id: result.run_id,
job_id: JOB_ID,
input_file: `s3://${bucketName}/${objectKey}`
})
});
} else {
// API 回傳錯誤狀態
const errorMessage = `Databricks API 錯誤 (狀態碼 ${response.statusCode})`;
console.error(errorMessage, responseData);
callback(new Error(`${errorMessage}: ${responseData}`));
}
} catch (parseError) {
// JSON 解析失敗
console.error('無法解析 API 回應:', parseError);
callback(parseError);
}
});
});
// 處理請求層級的錯誤
// 例如網路連線失敗、DNS 解析錯誤等
request.on('error', (error) => {
console.error('HTTPS 請求發生錯誤:', {
message: error.message,
code: error.code,
host: DATABRICKS_HOST
});
callback(error);
});
// 設定請求逾時時間為 30 秒
// 避免 Lambda 函式無限期等待
request.setTimeout(30000, () => {
request.destroy();
callback(new Error('Databricks API 請求逾時'));
});
// 發送請求主體並結束請求
request.write(requestBody);
request.end();
};
Lambda 函式的程式碼結構清晰且健壯,包含完整的錯誤處理與日誌記錄機制。函式首先驗證必要的環境變數是否存在,接著解析 S3 事件取得上傳檔案的詳細資訊,最後呼叫 Databricks Jobs API 觸發訓練任務。將 S3 事件資訊透過 notebook_params 傳遞給 Notebook,使訓練流程能夠動態載入新上傳的資料檔案。這種設計實現了資料與處理邏輯的解耦,提升了系統的彈性與可維護性。
配置 S3 事件通知需要在儲存桶的屬性設定中進行。選擇要監控的事件類型,常見的選項包含 ObjectCreated、ObjectRemoved 與 ObjectRestore 等。對於訓練資料上傳場景,應選擇 ObjectCreated:Put 或 ObjectCreated:* 以涵蓋所有建立操作。設定前綴過濾條件可以限制觸發範圍,例如只監控 training_data/ 路徑下的檔案。後綴過濾則用於限制檔案類型,例如只處理 .parquet 或 .csv 副檔名的檔案。這些過濾機制確保只有符合條件的檔案上傳才會觸發 Lambda 函式,避免不必要的運算資源消耗。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100
participant "資料工程師" as engineer
participant "S3 儲存桶" as s3
participant "S3 事件通知" as event
participant "AWS Lambda" as lambda
participant "Databricks API" as api
participant "Job Cluster" as cluster
participant "訓練 Notebook" as notebook
participant "MLflow Registry" as registry
engineer -> s3: 上傳訓練資料檔案
activate s3
s3 -> event: 觸發 ObjectCreated 事件
activate event
event -> lambda: 調用 Lambda 函式
activate lambda
lambda -> lambda: 驗證環境變數
lambda -> lambda: 解析 S3 事件資訊
lambda -> api: POST /api/2.0/jobs/run-now
activate api
api -> cluster: 配置並啟動 Job Cluster
activate cluster
api --> lambda: 回傳 run_id
deactivate api
lambda -> lambda: 記錄成功日誌
lambda --> event: 回傳執行成功
deactivate lambda
deactivate event
deactivate s3
cluster -> notebook: 執行訓練 Notebook
activate notebook
notebook -> notebook: 載入 S3 新資料
notebook -> notebook: 執行特徵工程
notebook -> notebook: 訓練與評估模型
notebook -> registry: 註冊模型版本
activate registry
registry --> notebook: 確認註冊成功
deactivate registry
notebook --> cluster: 任務執行完成
deactivate notebook
cluster -> cluster: 自動終止釋放資源
deactivate cluster
@enduml
上方時序圖清楚呈現了事件驅動架構中各元件的互動時序與資料流動。從資料工程師上傳檔案觸發 S3 事件開始,經過 Lambda 函式處理與 Databricks API 呼叫,到最後的模型訓練與註冊,整個流程完全自動化執行。Lambda 函式在收到 S3 事件後立即回應,避免長時間等待訓練完成。訓練任務在獨立的 Job Cluster 上執行,完成後自動終止釋放資源。這種非同步架構設計提供了極佳的擴展性,即使同時上傳多個檔案也能順暢處理。
MLflow 模型生命週期的完整管理
MLflow 提供了管理機器學習模型完整生命週期的強大工具組,涵蓋實驗追蹤、模型封裝、版本控制與階段管理等功能。在 Databricks 環境中,MLflow 已經深度整合至平台核心,開發者可以無縫使用各項功能而無需額外的配置工作。Model Registry 是 MLflow 的核心元件之一,提供了集中式的模型儲存庫,讓團隊能夠有效管理模型的不同版本與階段轉換流程。
每個註冊至 Model Registry 的模型都擁有獨立的版本歷史記錄。當新的訓練執行將模型註冊至相同名稱時,系統會自動分配新的版本號,而所有舊版本會完整保留並可隨時存取。這種設計讓資料科學團隊能夠輕鬆比較不同版本的效能表現,追蹤模型演進的軌跡,並在發現新版本出現問題時快速回滾至先前的穩定版本。Model Registry 還提供了階段管理功能,模型可以處於 None、Staging、Production 或 Archived 等不同階段。透過明確的階段轉換機制,團隊可以建立正式的模型審核與發布流程,確保只有經過充分驗證的模型才會部署至生產環境服務真實用戶流量。
"""
MLflow Model Registry 完整操作範例
展示模型版本管理、效能比較、階段轉換與回滾等核心功能
"""
import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
import pandas as pd
# 建立 MLflow 客戶端物件
# 客戶端提供對 MLflow Tracking Server 與 Model Registry 的程式化存取
# 在 Databricks 環境中會自動連接至內建的 MLflow 服務
client = MlflowClient()
def list_all_model_versions(model_name):
"""
列出指定模型的所有版本資訊
查詢 Model Registry 取得模型的完整版本歷史,包含每個版本的
建立時間、當前階段、來源執行與效能指標等詳細資訊。
參數:
model_name (str): 已註冊的模型名稱
回傳:
list: 模型版本物件列表,按版本號排序
"""
# 使用搜尋語法查詢特定模型的所有版本
# search_model_versions 支援靈活的查詢條件
versions = client.search_model_versions(f"name='{model_name}'")
# 按版本號排序,從舊到新
versions_sorted = sorted(versions, key=lambda v: int(v.version))
print(f"模型 '{model_name}' 的版本歷史 (共 {len(versions_sorted)} 個版本):\n")
for version in versions_sorted:
# 取得版本的詳細資訊
creation_time = datetime.fromtimestamp(
int(version.creation_timestamp) / 1000
).strftime('%Y-%m-%d %H:%M:%S')
print(f"版本 {version.version}:")
print(f" 當前階段: {version.current_stage}")
print(f" 建立時間: {creation_time}")
print(f" 來源執行: {version.run_id}")
print(f" 描述: {version.description or '無'}")
# 顯示版本標籤
if version.tags:
print(f" 標籤: {version.tags}")
print()
return versions_sorted
def compare_model_versions(model_name, version_1, version_2, metrics_to_compare=None):
"""
比較兩個模型版本的效能指標
從 MLflow Tracking 取得兩個版本對應執行的完整指標資料,
進行並列比較分析,協助團隊做出模型升級決策。
參數:
model_name (str): 模型名稱
version_1 (int): 第一個版本號
version_2 (int): 第二個版本號
metrics_to_compare (list): 要比較的指標名稱列表,None 表示全部
回傳:
dict: 詳細的比較結果字典
"""
# 取得兩個版本的詳細資訊
v1_info = client.get_model_version(model_name, str(version_1))
v2_info = client.get_model_version(model_name, str(version_2))
# 取得對應訓練執行的完整資料
run_1 = client.get_run(v1_info.run_id)
run_2 = client.get_run(v2_info.run_id)
# 提取指標資料
metrics_1 = run_1.data.metrics
metrics_2 = run_2.data.metrics
print(f"\n版本比較: v{version_1} vs v{version_2}")
print("=" * 80)
# 如果未指定要比較的指標,則比較所有共同指標
if metrics_to_compare is None:
metrics_to_compare = list(set(metrics_1.keys()) & set(metrics_2.keys()))
comparison_results = {}
for metric_name in metrics_to_compare:
if metric_name in metrics_1 and metric_name in metrics_2:
value_1 = metrics_1[metric_name]
value_2 = metrics_2[metric_name]
diff = value_2 - value_1
pct_change = (diff / value_1 * 100) if value_1 != 0 else float('inf')
comparison_results[metric_name] = {
f'v{version_1}': value_1,
f'v{version_2}': value_2,
'difference': diff,
'pct_change': pct_change
}
# 格式化輸出比較結果
print(f"\n{metric_name}:")
print(f" 版本 {version_1}: {value_1:.6f}")
print(f" 版本 {version_2}: {value_2:.6f}")
print(f" 差異: {diff:+.6f} ({pct_change:+.2f}%)")
# 標示效能變化方向
if diff > 0:
print(f" ↑ 版本 {version_2} 表現較佳")
elif diff < 0:
print(f" ↓ 版本 {version_1} 表現較佳")
else:
print(f" = 兩版本表現相同")
print("\n" + "=" * 80)
return comparison_results
def promote_to_staging(model_name, version, description=None):
"""
將模型版本升級至 Staging 階段
Staging 階段用於整合測試與效能驗證,模型在此階段會接受
更嚴格的評估與測試,確認符合上線標準後才會升級至 Production。
參數:
model_name (str): 模型名稱
version (int): 要升級的版本號
description (str): 升級原因描述
"""
# 執行階段轉換
# archive_existing_versions=False 表示保留現有 Staging 版本
client.transition_model_version_stage(
name=model_name,
version=str(version),
stage="Staging",
archive_existing_versions=False
)
print(f"模型 '{model_name}' 版本 {version} 已升級至 Staging 階段")
# 更新版本描述
if description:
client.update_model_version(
name=model_name,
version=str(version),
description=description
)
else:
# 使用預設描述並加入時間戳記
default_description = f"升級至 Staging 階段進行驗證測試 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
client.update_model_version(
name=model_name,
version=str(version),
description=default_description
)
# 加入階段轉換標籤
client.set_model_version_tag(
name=model_name,
version=str(version),
key="promoted_to_staging_at",
value=datetime.now().isoformat()
)
def promote_to_production(model_name, version, archive_existing=True):
"""
將模型版本升級至 Production 階段
Production 階段代表模型已通過所有測試並準備好服務線上流量。
通常會將現有的 Production 版本歸檔以維持單一生產版本的清晰性。
參數:
model_name (str): 模型名稱
version (int): 要升級的版本號
archive_existing (bool): 是否自動歸檔現有 Production 版本
"""
# 執行階段轉換至 Production
client.transition_model_version_stage(
name=model_name,
version=str(version),
stage="Production",
archive_existing_versions=archive_existing
)
print(f"模型 '{model_name}' 版本 {version} 已升級至 Production 階段")
# 記錄部署時間戳記
client.set_model_version_tag(
name=model_name,
version=str(version),
key="deployed_to_production_at",
value=datetime.now().isoformat()
)
# 記錄部署者資訊
import getpass
try:
deployer = getpass.getuser()
except:
deployer = "unknown"
client.set_model_version_tag(
name=model_name,
version=str(version),
key="deployed_by",
value=deployer
)
# 更新版本描述
production_description = f"已部署至生產環境 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
client.update_model_version(
name=model_name,
version=str(version),
description=production_description
)
def rollback_to_version(model_name, target_version):
"""
回滾生產模型至指定版本
當新部署的 Production 版本發現嚴重問題時,快速回滾至
先前已知穩定的版本以恢復服務品質。
參數:
model_name (str): 模型名稱
target_version (int): 要回滾至的目標版本號
"""
# 查詢當前 Production 版本
current_production = None
versions = client.search_model_versions(f"name='{model_name}'")
for version in versions:
if version.current_stage == "Production":
current_production = version.version
break
if current_production:
print(f"當前 Production 版本: {current_production}")
# 將當前 Production 版本降級至 Staging
client.transition_model_version_stage(
name=model_name,
version=current_production,
stage="Staging"
)
print(f"版本 {current_production} 已從 Production 降級至 Staging")
# 將目標版本升級至 Production
client.transition_model_version_stage(
name=model_name,
version=str(target_version),
stage="Production"
)
# 記錄回滾操作
client.set_model_version_tag(
name=model_name,
version=str(target_version),
key="rollback_from_version",
value=str(current_production) if current_production else "unknown"
)
client.set_model_version_tag(
name=model_name,
version=str(target_version),
key="rollback_timestamp",
value=datetime.now().isoformat()
)
print(f"\n已成功回滾至版本 {target_version}")
print(f"請確認服務恢復正常運作")
def get_production_model_info(model_name):
"""
取得當前生產環境模型的詳細資訊
查詢處於 Production 階段的模型版本並顯示關鍵資訊,
包含版本號、部署時間、效能指標等。
參數:
model_name (str): 模型名稱
回傳:
dict: 生產模型的詳細資訊
"""
versions = client.search_model_versions(f"name='{model_name}'")
production_version = None
for version in versions:
if version.current_stage == "Production":
production_version = version
break
if not production_version:
print(f"模型 '{model_name}' 目前沒有 Production 版本")
return None
# 取得對應執行的指標
run = client.get_run(production_version.run_id)
metrics = run.data.metrics
params = run.data.params
info = {
'version': production_version.version,
'run_id': production_version.run_id,
'creation_time': datetime.fromtimestamp(
int(production_version.creation_timestamp) / 1000
),
'metrics': metrics,
'params': params,
'tags': production_version.tags,
'description': production_version.description
}
print(f"\n當前生產環境模型資訊:")
print(f" 模型名稱: {model_name}")
print(f" 版本號: {info['version']}")
print(f" 建立時間: {info['creation_time']}")
print(f"\n效能指標:")
for metric_name, metric_value in sorted(metrics.items()):
print(f" {metric_name}: {metric_value:.6f}")
return info
# 使用範例
if __name__ == "__main__":
MODEL_NAME = "news_classifier"
# 列出所有版本
print("步驟 1: 查詢模型版本歷史")
versions = list_all_model_versions(MODEL_NAME)
# 比較最新兩個版本
if len(versions) >= 2:
print("\n步驟 2: 比較最新兩個版本")
latest_version = int(versions[-1].version)
previous_version = int(versions[-2].version)
comparison = compare_model_versions(
MODEL_NAME,
previous_version,
latest_version,
metrics_to_compare=['accuracy', 'f1_weighted']
)
# 升級最新版本至 Staging
if len(versions) > 0:
print("\n步驟 3: 升級最新版本至 Staging")
latest_version = int(versions[-1].version)
promote_to_staging(
MODEL_NAME,
latest_version,
description="新版本效能提升,進入測試階段"
)
# 查看當前生產環境模型
print("\n步驟 4: 查詢生產環境模型資訊")
production_info = get_production_model_info(MODEL_NAME)
這段完整的程式碼展示了 MLflow Model Registry 的核心操作功能。list_all_model_versions 函式查詢並顯示指定模型的完整版本歷史,compare_model_versions 則提供詳細的版本間效能比較分析。promote_to_staging 與 promote_to_production 函式實現了模型的階段升級流程,而 rollback_to_version 則提供了快速回滾的能力。這些操作共同構成了完整的模型發布與管理流程,確保只有經過充分驗證的高品質模型才會部署至生產環境服務真實用戶。
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100
state "None" as none
state "Staging" as staging
state "Production" as production
state "Archived" as archived
[*] --> none: 模型註冊
none --> staging: 升級至測試
staging --> production: 通過驗證
staging --> none: 驗證失敗
production --> staging: 效能下降
production --> archived: 新版本上線
archived --> staging: 重新評估
staging --> archived: 棄用
note right of staging
執行整合測試
驗證效能指標
確認相容性
end note
note right of production
服務線上流量
監控效能變化
記錄預測日誌
end note
@enduml
上方狀態圖清楚呈現了模型在 MLflow Model Registry 中的完整生命週期流轉。新註冊的模型起始於 None 階段,經過初步驗證後升級至 Staging 進行更嚴格的整合測試。只有通過完整測試流程且效能符合標準的模型才會升級至 Production 階段服務線上流量。當新版本成功上線後,舊的 Production 版本會移至 Archived 階段保留歷史記錄。若生產環境模型出現效能下降或異常問題,可以降級回 Staging 階段重新檢查,或直接回滾至先前的穩定版本。這種嚴謹的階段管理機制確保了模型發布的品質控制與服務穩定性。
REST API 部署與高可用服務架構
訓練完成並通過驗證的模型需要透過適當的介面提供給下游應用程式使用。Databricks Model Serving 功能讓資料團隊能夠將 MLflow 註冊的模型一鍵部署為企業級的 REST API 服務。部署後的模型端點會自動處理負載平衡、自動擴展、健康檢查與故障恢復等複雜的基礎設施管理工作,讓開發者能夠專注於模型本身的品質提升,無需花費心力管理底層的運算與網路資源。
啟用 Model Serving 的流程從 MLflow Model Registry 介面開始。選擇已註冊且處於 Production 階段的模型版本,點選「Use model for inference」選項並選擇「Real-time」部署類型。系統會自動配置專屬的端點 URL 並產生存取認證資訊,整個部署過程通常在數分鐘內完成。部署完成後,端點會持續監控請求流量與回應延遲,當負載增加時自動擴展運算資源,確保服務品質始終維持在高水準。應用程式可以透過標準的 HTTP POST 請求發送資料至端點並即時取得預測結果,實現低延遲的推論服務。
"""
Databricks Model Serving 客戶端實作
提供便利的介面與模型服務端點互動,支援單筆與批次預測
"""
import requests
import json
import os
import time
from typing import List, Dict, Any, Optional
import logging
# 設定日誌記錄
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ModelServingClient:
"""
Databricks Model Serving 客戶端類別
封裝與模型服務端點的互動邏輯,提供方便的介面進行預測請求。
支援單筆即時預測與批次預測,並包含完整的錯誤處理與重試機制。
屬性:
endpoint_url (str): 模型服務端點的完整 URL
token (str): Databricks 存取權杖
headers (dict): HTTP 請求標頭配置
timeout (int): 請求逾時時間(秒)
"""
def __init__(
self,
endpoint_url: str,
token: str,
timeout: int = 60,
max_retries: int = 3
):
"""
初始化模型服務客戶端
參數:
endpoint_url: 模型服務端點 URL
token: Databricks Personal Access Token 或 Service Principal
timeout: 請求逾時時間,預設 60 秒
max_retries: 失敗重試次數,預設 3 次
"""
self.endpoint_url = endpoint_url
self.token = token
self.timeout = timeout
self.max_retries = max_retries
# 設定 HTTP 請求標頭
# Authorization 使用 Bearer Token 認證
# Content-Type 指定為 JSON 格式
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
logger.info(f"模型服務客戶端已初始化: {endpoint_url}")
def predict(
self,
instances: List[Dict[str, Any]],
return_probabilities: bool = True
) -> Dict[str, Any]:
"""
發送單次預測請求
將輸入資料發送至模型服務端點並取得預測結果。
支援自動重試機制以處理暫時性網路錯誤。
參數:
instances: 輸入資料列表,每個元素為特徵字典
return_probabilities: 是否回傳類別機率分布
回傳:
dict: 包含預測結果的回應字典
拋出:
Exception: 當所有重試都失敗時
"""
# 建構請求主體
# dataframe_records 格式適合結構化的表格資料
request_body = {
"dataframe_records": instances
}
# 如果需要機率輸出,加入參數設定
if return_probabilities:
request_body["params"] = {
"predict_proba": True
}
# 執行請求並包含重試邏輯
for attempt in range(self.max_retries):
try:
logger.info(f"發送預測請求 (嘗試 {attempt + 1}/{self.max_retries})")
# 記錄請求開始時間
start_time = time.time()
# 發送 POST 請求
response = requests.post(
self.endpoint_url,
headers=self.headers,
json=request_body,
timeout=self.timeout
)
# 計算請求延遲
latency = (time.time() - start_time) * 1000 # 轉換為毫秒
# 檢查回應狀態碼
if response.status_code == 200:
result = response.json()
logger.info(f"預測成功 (延遲: {latency:.2f}ms)")
return result
else:
error_msg = f"預測請求失敗: HTTP {response.status_code}"
logger.error(f"{error_msg} - {response.text}")
# 如果是最後一次嘗試,拋出例外
if attempt == self.max_retries - 1:
raise Exception(f"{error_msg}: {response.text}")
# 等待後重試,使用指數退避策略
wait_time = 2 ** attempt
logger.info(f"等待 {wait_time} 秒後重試...")
time.sleep(wait_time)
except requests.exceptions.Timeout:
logger.error(f"請求逾時 (超過 {self.timeout} 秒)")
if attempt == self.max_retries - 1:
raise Exception(f"預測請求逾時,已重試 {self.max_retries} 次")
time.sleep(2 ** attempt)
except requests.exceptions.RequestException as e:
logger.error(f"網路請求錯誤: {str(e)}")
if attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt)
def batch_predict(
self,
instances: List[Dict[str, Any]],
batch_size: int = 100,
show_progress: bool = True
) -> List[Any]:
"""
批次預測功能
將大量資料分批發送以避免單次請求過大導致逾時或記憶體問題。
適合離線批次推論場景。
參數:
instances: 完整的輸入資料列表
batch_size: 每批次的資料筆數,預設 100
show_progress: 是否顯示處理進度
回傳:
list: 所有批次的預測結果彙整
"""
all_predictions = []
total_batches = (len(instances) + batch_size - 1) // batch_size
logger.info(f"開始批次預測: {len(instances)} 筆資料, {total_batches} 個批次")
# 分批處理
for i in range(0, len(instances), batch_size):
batch = instances[i:i + batch_size]
batch_num = i // batch_size + 1
try:
# 發送批次預測請求
result = self.predict(batch)
predictions = result.get("predictions", [])
all_predictions.extend(predictions)
# 顯示進度
if show_progress:
processed = min(i + batch_size, len(instances))
progress = (processed / len(instances)) * 100
logger.info(
f"批次 {batch_num}/{total_batches} 完成 "
f"({processed}/{len(instances)}, {progress:.1f}%)"
)
except Exception as e:
logger.error(f"批次 {batch_num} 處理失敗: {str(e)}")
# 記錄失敗的批次但繼續處理
all_predictions.extend([None] * len(batch))
logger.info(f"批次預測完成,共取得 {len(all_predictions)} 筆結果")
return all_predictions
def health_check(self) -> bool:
"""
檢查模型服務端點健康狀態
透過簡單的請求驗證端點是否正常運作。
可用於監控系統的健康檢查機制。
回傳:
bool: True 表示端點正常,False 表示異常
"""
try:
# 使用最小的測試資料
test_instance = [{"description": "test"}]
self.predict(test_instance)
logger.info("端點健康檢查: 正常")
return True
except Exception as e:
logger.error(f"端點健康檢查: 異常 - {str(e)}")
return False
def demo_news_classifier():
"""
新聞分類模型服務使用示範
展示如何使用 ModelServingClient 進行單筆與批次預測
"""
# 從環境變數取得配置
# 在生產環境中應使用更安全的憑證管理方式
endpoint_url = os.environ.get("MODEL_ENDPOINT_URL")
token = os.environ.get("DATABRICKS_TOKEN")
if not endpoint_url or not token:
raise ValueError("缺少必要的環境變數: MODEL_ENDPOINT_URL, DATABRICKS_TOKEN")
# 建立客戶端
client = ModelServingClient(
endpoint_url=endpoint_url,
token=token,
timeout=30,
max_retries=3
)
# 執行健康檢查
if not client.health_check():
logger.warning("模型服務端點健康檢查失敗,請確認服務狀態")
return
# 準備測試資料
test_articles = [
{
"description": "Scientists discover new exoplanet with potential for life in distant solar system"
},
{
"description": "Stock market reaches all-time high amid strong economic recovery and investor confidence"
},
{
"description": "National team wins championship in dramatic penalty shootout finale"
},
{
"description": "Revolutionary smartphone features advanced AI chip and holographic display technology"
}
]
# 單筆預測示範
logger.info("\n執行單筆預測示範...")
result = client.predict(test_articles[:1])
if result and "predictions" in result:
prediction = result["predictions"][0]
logger.info(f"預測結果: {prediction}")
# 批次預測示範
logger.info("\n執行批次預測示範...")
all_results = client.batch_predict(
test_articles,
batch_size=2,
show_progress=True
)
# 輸出結果摘要
logger.info("\n預測結果摘要:")
for i, (article, prediction) in enumerate(zip(test_articles, all_results), 1):
logger.info(f"\n文章 {i}:")
logger.info(f" 內容: {article['description'][:60]}...")
if prediction and isinstance(prediction, dict):
# 找出預測機率最高的類別
top_category = max(prediction.items(), key=lambda x: x[1])
logger.info(f" 預測類別: {top_category[0]} (信心度: {top_category[1]:.2%})")
if __name__ == "__main__":
demo_news_classifier()
這段完整的客戶端實作展示了與 Model Serving 端點互動的最佳實踐。ModelServingClient 類別封裝了 HTTP 請求的所有細節,提供了 predict 方法進行單筆即時預測,batch_predict 方法處理大量資料的批次推論,以及 health_check 方法進行端點健康檢查。客戶端包含了完整的錯誤處理與自動重試機制,使用指數退避策略避免在服務暫時不可用時過度重試造成額外負擔。demo_news_classifier 函式展示了實際的使用範例,從環境變數取得配置到發送預測請求的完整流程。
在 Databricks 平台上建構自動化機器學習流程需要整合多個元件與服務。排程任務提供了可靠的週期性訓練機制,而 AWS Lambda 整合則實現了事件驅動的即時回應能力。MLflow Model Registry 集中管理模型的版本控制與階段轉換,確保只有經過嚴格驗證的模型才會部署至生產環境。Model Serving 功能將訓練完成的模型快速轉化為高可用的 REST API 服務,為下游應用程式提供低延遲的預測能力。透過這些實踐,台灣的資料團隊能夠建構穩定可靠的 MLOps 平台,加速從資料到商業價值的轉化過程,在競爭激烈的市場中保持技術領先優勢。