返回文章列表

PySpark機器學習模型評估指標詳解

本文探討使用 PySpark 評估機器學習模型的技巧,涵蓋混淆矩陣、ROC 曲線、AUC、KS 值和分位數分析等關鍵指標,並搭配程式碼範例和視覺化圖表,提供全面的模型評估。

機器學習 大資料

利用 PySpark 評估機器學習模型效能,需要理解混淆矩陣、ROC、AUC、KS 值等指標。首先,使用 MulticlassMetrics 計算混淆矩陣,並透過 seaborn 繪製熱力圖,能直觀呈現模型預測的準確性。接著,計算精確率、召回率、F1 分數等指標,並根據業務需求選擇合適的指標。ROC 曲線和 AUC 值則能展現模型區分正負樣本的能力,AUC 越接近 1,模型效能越好。最後,KS 值和分位數分析則能進一步評估模型的區分能力和穩定性,並透過視覺化工具呈現分析結果,幫助我們更全面地理解模型效能。

模型評估:混淆矩陣與分類別指標詳解

在機器學習模型評估中,混淆矩陣(Confusion Matrix)是理解模型效能的重要工具。本文將透過 PySpark 實作範例,詳細解析混淆矩陣的計算過程以及相關的分類別指標。

混淆矩陣的計算與視覺化

首先,我們使用 PySpark 的 MulticlassMetrics 來計算混淆矩陣。以下為實作程式碼:

from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.sql.types import DoubleType
import seaborn as sns
import matplotlib.pyplot as plt

# 選取預測結果與真實標籤
predictionAndTarget = pred_result.select("prediction", "target")
predictionAndTarget = predictionAndTarget.withColumn("target", predictionAndTarget["target"].cast(DoubleType()))
predictionAndTarget = predictionAndTarget.withColumn("prediction", predictionAndTarget["prediction"].cast(DoubleType()))

# 轉換為 RDD 以使用 MulticlassMetrics
metrics = MulticlassMetrics(predictionAndTarget.rdd.map(tuple))

# 取得混淆矩陣
cm = metrics.confusionMatrix().toArray()

# 繪製混淆矩陣圖表
def make_confusion_matrix_chart(cf_matrix):
    list_values = ['0', '1']
    plt.subplot(111)
    sns.heatmap(cf_matrix, annot=True, yticklabels=list_values, xticklabels=list_values, fmt='g')
    plt.ylabel("實際值")
    plt.xlabel("預測值")
    plt.ylim([0,2])
    plt.title('OR 邏輯閘預測結果')
    plt.tight_layout()
    return None

make_confusion_matrix_chart(cm)  # 繪製混淆矩陣圖表

內容解密:

  1. 資料準備:從預測結果中選取「預測值」與「真實標籤」兩欄位,並將資料型態轉換為 DoubleType() 以符合 MulticlassMetrics 的需求。
  2. 混淆矩陣計算:使用 MulticlassMetrics 計算混淆矩陣,並將結果轉換為 NumPy 陣列以便視覺化。
  3. 視覺化:利用 seabornheatmap 功能繪製混淆矩陣熱力圖,直觀展示模型的預測表現。

混淆矩陣解讀

抽象的混淆矩陣結構如下:

| | 預測負類別 | 預測正類別 | |



|


–|


–| | 實際正類別 | False Negative (FN) | True Positive (TP) | | 實際負類別 | True Negative (TN) | False Positive (FP) |

對應到我們的範例結果:

  • True Positive (TP) = 2
  • True Negative (TN) = 1
  • False Positive (FP) = 0
  • False Negative (FN) = 1

分類別指標計算

接下來,我們計算多項重要的分類別指標:

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 計算各項指標
acc = metrics.accuracy  # 精確度
misclassification_rate = 1 - acc  # 誤分類別率
precision = metrics.precision(1.0)  # 精準度
recall = metrics.recall(1.0)  # 召回率
f1 = metrics.fMeasure(1.0)  # F1 分數

# ROC/AUC 評估
evaluator = BinaryClassificationEvaluator(labelCol='target', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
roc = evaluator.evaluate(pred_result)

print(acc, misclassification_rate, precision, recall, f1, roc)

內容解密:

  1. 精確度(Accuracy):衡量模型正確預測的比例,計算公式為 (TP + TN) / (TP + TN + FP + FN)
  2. 誤分類別率(Misclassification Rate):表示模型預測錯誤的比例,等於 1 - Accuracy
  3. 精準度(Precision):衡量正類別預測的準確性,計算公式為 TP / (TP + FP)
  4. 召回率(Recall):衡量模型捕捉正類別樣本的能力,計算公式為 TP / (TP + FN)
  5. F1 分數:綜合精準度與召回率的調和平均,計算公式為 2 * (Precision * Recall) / (Precision + Recall)
  6. ROC/AUC:衡量模型區分正負類別的能力,AUC 值越接近 1 表示模型效能越佳。

各指標適用場景分析

| 指標名稱 | 數值範圍 | 何時使用 | 應使用案例項 | |



|


-|


-|


-| | 精確度(Accuracy) | 0 至 1 | 各類別同等重要且資料分佈平衡時 | 整體效能評估 | | 誤分類別率(Misclassification Rate) | 0 至 1 | 需要了解整體錯誤率時 | 風險控制評估 | | 精準度(Precision) | 0 至 1 | 假陽性成本較高時 | 醫學檢測、詐騙偵測 | | 召回率(Recall) | 0 至 1 | 假陰性成本較高或資料不平衡時 | 疾病篩檢、金融風險監控 | | F1 分數 | 0 至 1 | 需要平衡精準度與召回率時 | 資訊檢索任務 | | ROC/AUC | 0.5 至 1 | 評估模型整體區分能力時 | 風險評估報告、業務簡報 |

ROC/AUC 圖表解析

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title PySpark機器學習模型評估指標詳解

package "機器學習流程" {
    package "資料處理" {
        component [資料收集] as collect
        component [資料清洗] as clean
        component [特徵工程] as feature
    }

    package "模型訓練" {
        component [模型選擇] as select
        component [超參數調優] as tune
        component [交叉驗證] as cv
    }

    package "評估部署" {
        component [模型評估] as eval
        component [模型部署] as deploy
        component [監控維護] as monitor
    }
}

collect --> clean : 原始資料
clean --> feature : 乾淨資料
feature --> select : 特徵向量
select --> tune : 基礎模型
tune --> cv : 最佳參數
cv --> eval : 訓練模型
eval --> deploy : 驗證模型
deploy --> monitor : 生產模型

note right of feature
  特徵工程包含:
  - 特徵選擇
  - 特徵轉換
  - 降維處理
end note

note right of eval
  評估指標:
  - 準確率/召回率
  - F1 Score
  - AUC-ROC
end note

@enduml

圖表翻譯: 此圖示展示了 ROC/AUC 的計算流程:

  1. 設定不同的閾值(0 至 1 之間的多個數值)
  2. 對每個閾值計算對應的敏感度(Recall)和特異度
  3. 將這些點連線成 ROC 曲線
  4. 計算曲線下的面積即為 AUC 值

模型評估:ROC曲線、精確率-召回率曲線與KS統計量

ROC曲線分析

ROC(Receiver Operating Characteristic)曲線是用於評估二元分類別模型效能的重要工具。它透過繪製真正例率(True Positive Rate, TPR)與假正例率(False Positive Rate, FPR)的關係圖來展示模型的區分能力。

繪製ROC曲線

在PySpark中,可以使用BinaryClassificationMetrics類別來計算ROC曲線的點。以下是一個範例程式碼:

from pyspark.mllib.evaluation import BinaryClassificationMetrics

# 定義CurveMetrics類別繼承BinaryClassificationMetrics
class CurveMetrics(BinaryClassificationMetrics):
    def __init__(self, *args):
        super(CurveMetrics, self).__init__(*args)

    def _to_list(self, rdd):
        points = []
        results_collect = rdd.collect()
        for row in results_collect:
            points += [(float(row._1()), float(row._2()))]
        return points

    def get_curve(self, method):
        rdd = getattr(self._java_model, method)().toJavaRDD()
        return self._to_list(rdd)

# 計算預測結果的機率和真實標籤
preds = pred_result.select('target', 'probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['target'])))

# 取得ROC曲線的點
points = CurveMetrics(preds).get_curve('roc')

# 繪製ROC曲線
import matplotlib.pyplot as plt
plt.figure()
x_val = [x[0] for x in points]
y_val = [x[1] for x in points]
plt.title('ROC曲線')
plt.xlabel('假正例率 (1-特異度)')
plt.ylabel('真正例率 (靈敏度)')
plt.plot(x_val, y_val, label='AUC = %0.2f' % roc)
plt.plot([0, 1], [0, 1], color='red', linestyle='--')
plt.legend(loc='lower right')

ROC曲線的解讀

  • 紅線代表隨機模型的ROC曲線,其面積為0.5。
  • 藍色曲線代表模型的ROC曲線,其面積(AUC)越接近1,表示模型效能越好。
  • 當模型效能不佳時,ROC曲線會接近紅線,AUC接近0.5。

精確率-召回率曲線

精確率-召回率(Precision-Recall, PR)曲線是另一種評估二元分類別模型效能的工具,特別是在類別不平衡的情況下。

繪製PR曲線

# 評估模型的PR曲線
evaluator = BinaryClassificationEvaluator(labelCol='target', rawPredictionCol='rawPrediction', metricName='areaUnderPR')
pr = evaluator.evaluate(pred_result)

# 計算PR曲線的點
preds = pred_result.select('target', 'probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['target'])))
points = CurveMetrics(preds).get_curve('pr')

# 繪製PR曲線
plt.figure()
x_val = [x[0] for x in points]
y_val = [x[1] for x in points]
plt.title('精確率-召回率曲線')
plt.xlabel('召回率')
plt.ylabel('精確率')
plt.plot(x_val, y_val, label='平均精確率 = %0.2f' % pr)
plt.plot([0, 1], [0.5, 0.5], color='red', linestyle='--')
plt.legend(loc='lower right')

PR曲線的解讀

  • 紅線代表根據目標變數分佈的基準線,在本例中為0.5。
  • 藍色曲線下的面積代表模型的平均精確率,越接近1表示模型效能越好。
  • 當模型效能不佳時,PR曲線下的面積會接近0.5。

KS統計量與Deciles分析

KS(Kolmogorov-Smirnov)統計量用於衡量模型區分正負樣本的能力。Deciles分析則是將樣本按預測機率排序後分成十等份,以評估模型在不同分位數上的表現。

計算KS統計量和Deciles

# 載入資料集,清理並擬合模型
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import countDistinct
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import functions as F
import numpy as np

filename = "bank-full.csv"
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
df = df.withColumn('label', F.when(F.col("y") == 'yes', 1).otherwise(0))
df = df.drop('y')

# 後續步驟涉及具體的模型擬合和評估,這裡省略具體程式碼實作。

KS統計量和Deciles的解讀

  • KS統計量越大,表示模型區分正負樣本的能力越強。
  • Deciles分析可以幫助理解模型在不同機率閾值下的表現,從而指導實際應用中的決策。

邏輯迴歸模型評估:KS 值與分位數分析

本章節重點介紹如何使用 PySpark 進行邏輯迴歸模型的評估,涵蓋 KS 值計算、分位數分析以及相關視覺化技術。

向量組合與模型訓練

首先,我們定義一個函式 assemble_vectors,用於將多個特徵欄位組合成一個向量欄位 ‘features’。

def assemble_vectors(df, features_list, target_variable_name):
    stages = []
    assembler = VectorAssembler(inputCols=features_list, outputCol='features')
    stages = [assembler]
    selectedCols = [target_variable_name, 'features']
    pipeline = Pipeline(stages=stages)
    assembleModel = pipeline.fit(df)
    df = assembleModel.transform(df).select(selectedCols)
    return df

# 選擇相關特徵並組合成向量
df = df.select(['education', 'age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'label'])
features_list = ['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']
assembled_df = assemble_vectors(df, features_list, 'label')
clf = LogisticRegression(featuresCol='features', labelCol='label')
train, test = assembled_df.randomSplit([0.7, 0.3], seed=12345)
clf_model = clf.fit(train)

程式碼解密:

  1. VectorAssembler:用於將多個欄位組合成一個向量欄位。
  2. Pipeline:用於按順序處理多個資料處理步驟。
  3. LogisticRegression:用於建立邏輯迴歸模型。

KS 值計算與分位數分析

接著,我們定義一個函式 create_deciles,用於計算 KS 值和進行分位數分析。

def create_deciles(df, clf, score, prediction, target, buckets):
    pred = clf.transform(df)
    pred = pred.select(F.col(score), F.col(prediction), F.col(target)).rdd.map(lambda row: (float(row[score][1]), float(row[prediction]), float(row[target])))
    predDF = pred.toDF(schema=[score, prediction, target])
    
    window = Window.orderBy(F.desc(score))
    predDF = predDF.withColumn("row_number", F.row_number().over(window))
    predDF.cache()
    predDF = predDF.withColumn("row_number", predDF['row_number'].cast("double"))
    
    window2 = Window.orderBy("row_number")
    final_predDF = predDF.withColumn("deciles", F.ntile(buckets).over(window2))
    final_predDF = final_predDF.withColumn("deciles", final_predDF['deciles'].cast("int"))
    
    final_predDF = final_predDF.withColumn("non_target", 1 - final_predDF[target])
    final_predDF.cache()
    
    temp_deciles = final_predDF.groupby('deciles').agg(F.sum(target).alias(target)).toPandas()
    non_target_cnt = final_predDF.groupby('deciles').agg(F.sum('non_target').alias('non_target')).toPandas()
    temp_deciles = temp_deciles.merge(non_target_cnt, on='deciles', how='inner')
    
    temp_deciles['total'] = temp_deciles[target] + temp_deciles['non_target']
    temp_deciles['target_%'] = (temp_deciles[target] / temp_deciles['total']) * 100
    temp_deciles['cum_target'] = temp_deciles[target].cumsum()
    temp_deciles['cum_non_target'] = temp_deciles['non_target'].cumsum()
    temp_deciles['target_dist'] = (temp_deciles['cum_target'] / temp_deciles[target].sum()) * 100
    temp_deciles['non_target_dist'] = (temp_deciles['cum_non_target'] / temp_deciles['non_target'].sum()) * 100
    temp_deciles['spread'] = temp_deciles['target_dist'] - temp_deciles['non_target_dist']
    
    decile_table = temp_deciles.round(2)
    decile_table = decile_table[['deciles', 'total', 'label', 'non_target', 'target_%', 'cum_target', 'cum_non_target', 'target_dist', 'non_target_dist', 'spread']]
    print("KS Value - ", round(temp_deciles['spread'].max(), 2))
    
    return final_predDF, decile_table

# 對訓練集和測試集進行分位數分析
pred_train, train_deciles = create_deciles(train, clf_model, 'probability', 'prediction', 'label', 10)
pred_test, test_deciles = create_deciles(test, clf_model, 'probability', 'prediction', 'label', 10)

程式碼解密:

  1. create_deciles 函式:用於計算模型的預測結果並進行分位數分析。
  2. Window.orderBy:用於對資料進行排序。
  3. F.ntile:用於將資料分成指定的分位數。
  4. KS 值計算:透過計算目標變數和非目標變數的累積分佈差異來獲得。

視覺化分位數分析結果

最後,我們使用 pandas 的樣式功能來視覺化分位數分析結果。

def decile_labels(agg1, target, color='skyblue'):
    agg1 = agg1.round(2)
    agg1 = agg1.style.apply(highlight_max, color='yellow', subset=['spread']).set_precision(2)
    agg1.bar(subset=[target], color='{}'.format(color), vmin=0)
    agg1.bar(subset=['total'], color='{}'.format(color), vmin=0)
    agg1.bar(subset=['target_%'], color='{}'.format(color), vmin=0)
    return agg1

# 視覺化訓練集和測試集的分位數分析結果
plot_decile_train = decile_labels(train_deciles, 'label', color='skyblue')
plot_pandas_style(plot_decile_train)

plot_decile_test = decile_labels(test_deciles, 'label', color='skyblue')
plot_pandas_style(plot_decile_test)

程式碼解密:

  1. decile_labels 函式:用於視覺化分位數分析結果。
  2. highlight_max 函式:用於突出顯示最大值。
  3. plot_pandas_style 函式:用於將 pandas 的樣式輸出為 HTML。