PySpark 的分散式計算能力為大資料實驗設計提供了強大的支援。透過 PySpark,我們可以有效處理和分析海量資料,進行 A/B 測試、假設檢定等,並利用其機器學習函式庫進行模型訓練和評估。結合 Streamlit,可以快速構建互動式 UI,方便實驗結果的展示和分析。此外,PySpark 還支援多種資料來源和格式,簡化了資料整合的流程,並確保了實驗結果的可重現性。對於需要處理大規模資料的實驗設計和最佳化任務,PySpark 是一個兼具效能和成本效益的理想選擇。
實驗設計與最佳化策略
本章將探討實驗設計的概念,並說明如何運用PySpark及其分散式計算框架,在大資料上進行可擴充套件的實驗。
實驗設計的重要性
- 揭示因果關係:透過實驗設計,可以建立變數之間的因果關係,不僅僅侷限於觀察資料。這有助於理解事件背後的原因,而非僅僅觀察到事件的發生。
- 資料驅動的決策:實驗設計允許收集資料驅動的洞察,從而最佳化不同策略並做出明智的決策。
- 持續改進:透過迭代測試和改進不同的方法,確保模型和應用程式隨時間保持有效。
使用Postman API進行即時測試
在進行即時評分測試時,可以使用以下的JSON物件。注意前面的方括號,這用於傳遞一個JSON物件列表。
[
{
"age": 58,
"job": "management",
"marital": "married",
"education": "tertiary",
"default": "no",
"balance": 2143,
"housing": "yes",
"loan": "no",
"contact": "unknown",
"day": 5,
"month": "may",
"duration": 261,
"campaign": 1,
"pdays": -1,
"previous": 0,
"poutcome": "unknown"
}
]
將此JSON物件複製並貼上到API的請求主體中,然後點選「Send」(參見圖9-10)。
建立UI元件
本文將使用Python套件streamlit來建立使用者介面(UI)。streamlit易於與現有程式碼整合,其優點包括可以即時檢視網頁應用程式的變更,且只需極少的程式碼即可建立UI。
在開始之前,先檢視圖9-11所示的目錄結構。父目錄是real_time_scoring,其中包含兩個子目錄:pysparkapi和streamlitapi。將迄今為止討論的所有程式碼移至pysparkapi目錄,而UI程式碼則位於streamlitapi目錄中。
streamlitapi目錄
導航至streamlitapi目錄,並在其中建立以下三個檔案:
Dockerfile:前端的Docker組態requirements.txt:所需的套件webapp.py:UI程式碼
在將程式碼佈署到Docker容器之前,可以執行以下命令進行測試:
streamlit run webapp.py
streamlit的互動功能允許以最少的程式碼即時設計自定義佈局。
real_time_scoring目錄
使用docker-compose檔案來組合多個Docker容器並建立Docker網路服務。下面是該檔案的內容:
version: '3'
services:
pysparkapi:
build: pysparkapi/
ports:
- 5000:5000
networks:
- deploy_network
container_name: pysparkapi
streamlitapi:
build: streamlitapi/
depends_on:
- pysparkapi
ports:
- 8501:8501
networks:
- deploy_network
container_name: streamlitapi
networks:
deploy_network:
driver: bridge
執行docker-compose.yml檔案:
docker-compose build
Docker網路服務建立完成後,可以執行Docker服務。需要等待一段時間,直到偵錯程式圖示出現,然後再切換到瀏覽器。
即時評分API
要存取UI,需要在瀏覽器中輸入以下連結:
http://localhost:8501/
UI的外觀應如圖9-13所示。可以輸入特徵值並點選「Get Predictions」以取得結果。
結束Docker服務
當需要終止此服務時,可以使用以下命令(參見圖9-14):
docker-compose down
至此,已成功佈署即時評分模型,並具備友好的前端介面。此佈署的最大優點是UI和PySpark分別位於不同的Docker容器中,可以獨立管理和自定義,並在需要時透過docker-compose檔案組合。這使得軟體開發生命週期更加便捷。
PySpark在實驗設計中的優勢
PySpark透過其分散式計算框架,能夠有效地處理大資料,從而提高實驗設計的可擴充套件性和效率。這使得研究人員能夠更輕鬆地進行大規模的實驗,並從中獲得有價值的洞察。
實驗設計的最佳實踐
- 明確研究問題和假設:在進行實驗之前,必須清楚地定義研究問題和假設,以確保實驗的方向和目標明確。
- 選擇適當的實驗設計方法:根據研究問題和資料特性,選擇合適的實驗設計方法,例如隨機對照試驗或準實驗設計。
- 使用PySpark進行資料處理和分析:利用PySpark的分散式計算能力,高效地處理和分析大資料,從而獲得可靠的實驗結果。
- 迭代測試和改進:透過迭代測試和改進,不斷最佳化模型和應用程式,以確保其在實際場景中的有效性。
圖表翻譯:Docker容器架構圖
此圖示展示了使用Docker容器佈署即時評分模型的架構。可以看到,PySpark API和streamlit UI分別位於不同的容器中,並透過docker-compose檔案進行組合。
圖表翻譯:
- PySpark API容器負責處理即時評分請求。
- streamlit UI容器提供使用者介面,用於輸入特徵值並取得預測結果。
- docker-compose檔案用於建立和管理這兩個容器之間的網路服務。
透過這種架構,可以實作模型的即時評分和前端展示的分離,使得系統更加靈活和可擴充套件。
實驗設計與最佳化策略
實驗流程
要進行成功的實驗,需要遵循以下高層次步驟:
- 定義問題:明確要解決的問題或要測試的假設,這是實驗過程的基礎。
- 設計實驗:選擇適合的實驗型別來回答問題。常見的實驗型別包括對照實驗、現場實驗、自然實驗、準實驗、A/B 測試、因子實驗、縱向研究和橫斷面研究。
- 識別變數並收集資料:明確定義獨立變數、依賴變數和控制變數,決定如何操縱和測量它們,並收集資料以測試假設。
- 執行實驗:按照設計執行實驗,控制混雜變數,確保實驗公平且合乎道德。
- 分析結果:使用統計方法分析實驗結果,包括計算平均值、迴歸分析、檢驗統計顯著性等。
- 解釋和溝通結果:根據分析得出結論,並將結果傳達給相關人員,可以透過撰寫報告、向利益相關者展示結果或在科學期刊上發表研究成果。
- 改進和迭代:根據實驗結果,可能需要改進問題、設計新的實驗或收集更多資料。實驗往往是一個迭代過程。
假設檢定
假設是一種可以被測試的宣告或斷言。透過假設檢定,可以拒絕或不挑戰特定的宣告。有兩種型別的假設:
- 零假設(H0):根據常規智慧或經驗的假設。
- 替代假設(Ha):通常是您嘗試驗證的有趣假設。
例如,探討吃早餐是否對糖尿病率有影響,可以設定:
- H0:吃早餐對糖尿病率沒有影響
- Ha:吃早餐對糖尿病率有負面影響
使用 p 值來提供最小的顯著性水平,以判斷是否拒絕零假設。較小的 p 值表示有更強的證據支援替代假設。
錯誤型別
| 實際情況 | H0 為真 | H0 為假 |
|---|---|---|
| 未拒絕 H0 | 正確 | 第二型錯誤 |
| 拒絕 H0 | 第一型錯誤 | 正確 |
第一型錯誤的機率由 α 表示,第二型錯誤的機率由 β 表示。統計檢定的力量由 1-β 給出。根據使用案例,重點在於減少 α 或 β。大多數教材關注減少 α,因為這樣可以堅持常規方法。
樣本大小和效應大小
樣本大小和效應大小都會影響 p 值,因此無法同時減少第一型和第二型錯誤。
PySpark 中的假設檢定
PySpark 統計套件提供了內建方法來執行 Pearson 卡方檢定和 Kolmogorov-Smirnov 檢定,但不支援大多數假設檢定方法。然而,可以使用使用者自定義函式來實作這些方法。
實驗案例研究
假設您經營一家電子商務公司,想要改善網站以提高客戶參與度和轉換率。您有客戶及其在網站上的行為資料。以下是範例資料:
| user_id | timestamp | group | landing_page | converted |
|
---
-
---
--|
---
-
---
-
---
-
---
-
---
--|
---
-
---
---
|
---
-
---
-
---
---
|
---
-
---
-
---
|
| 851104 | 2017-01-21 22:11:48 | control | old_page | 0 |
| 804228 | 2017-01-12 08:01:45 | control | old_page | 0 |
| 661590 | 2017-01-11 16:55:06 | treatment| new_page | 0 |
| ... | ... | ... | ... | ... |
A/B 測試範例程式碼
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 初始化 SparkSession
spark = SparkSession.builder.appName("ABTesting").getOrCreate()
# 載入資料
data = spark.read.csv("data.csv", header=True, inferSchema=True)
# 分組統計轉換率
conversion_rates = data.groupBy("group").agg({"converted": "mean"})
# 顯示結果
conversion_rates.show()
#### 內容解密:
此段程式碼首先初始化了一個 SparkSession,然後載入了包含 A/B 測試資料的 CSV 檔案。接著,它按照 `group`(控制組或實驗組)進行分組,並計算每個組別的平均轉換率(`converted` 的平均值)。最後,它顯示了每個組別的轉換率。
何時使用引數或非引數檢定?
| 用途 | 引數檢定 | 非引數檢定 |
|---|---|---|
| 比較兩個群體的平均值 | t-檢定 | Mann-Whitney U 檢定 |
| 比較兩個相關群體的平均值 | 配對 t-檢定 | Wilcoxon 符號秩檢定 |
| 比較多個群體的平均值 | 一元 ANOVA | Kruskal-Wallis 檢定 |
| 評估兩個變數之間的關係 | Pearson 相關性 | Spearman 相關性 |
圖表翻譯:
此圖表展示了不同型別的統計檢定方法及其適用場景。左側列出了不同的統計分析用途,中間和右側分別列出了對應的引數檢定和非引數檢定方法。
A/B 測試實務分析:網站轉換率最佳化案例
資料集介紹與實驗設計
本案例使用 Kaggle 上的 A/B 測試資料集,包含五個主要欄位:
User_Id:使用者唯一識別碼Timestamp:使用者存取網站的時間戳記Group:使用者所屬組別(control 或 treatment)Landing_page:使用者存取的頁面(old_page 或 new_page)Converted:使用者是否完成購買(0 表示否,1 表示是)
實驗流程與變數定義
- 問題定義:提升網站轉換率(購買次數 / 總行為次數)
- 實驗設計:對比新舊網站版本對轉換率的影響
- 變數識別:
- 獨立變數:網站版本(新或舊)
- 依賴變數:轉換率(購買率)
- 控制變數:無
資料清理與初步分析
from pyspark.sql.functions import col, count, avg
# 檢視原始資料分佈
data.groupby(['group', 'landing_page']).count().show()
# 資料清理:篩選正確的組別與頁面對應
data = data.where(((data.group == 'treatment') & (data.landing_page == 'new_page')) |
((data.group == 'control') & (data.landing_page == 'old_page')))
# 再次檢查資料分佈
data.groupby(['group', 'landing_page']).count().show()
# 計算各組轉換率平均值
data.groupby(['group', 'landing_page']).agg(avg("converted")).show()
內容解密:
- 首先使用
groupby和count函式檢查資料的初始分佈,發現部分使用者被分配到錯誤的頁面。 - 透過
where條件篩選出正確的組別與頁面對應關係,確保實驗資料的有效性。 - 使用
avg聚合函式計算各組別的平均轉換率。
轉換率計算與比較
# 分割資料為 control 和 treatment 組
control_df = data.filter(data.group == "control")
treatment_df = data.filter(data.group == "treatment")
# 計算各組轉換率
control_conversions = control_df.filter(col("converted") == 1).count()
control_rate = control_conversions / control_df.count()
treatment_conversions = treatment_df.filter(col("converted") == 1).count()
treatment_rate = treatment_conversions / treatment_df.count()
# 計算提升百分比
uplift = (treatment_rate - control_rate) / control_rate * 100
# 輸出結果
print("Control Group Conversion Rate:", round(control_rate, 4))
print("Treatment Group Conversion Rate:", round(treatment_rate, 4))
print("Uplift:", round(uplift, 4), "%")
內容解密:
- 將資料分割為 control 和 treatment 組,分別計算其轉換率。
- 對比兩組的轉換率並計算新網站版本帶來的提升百分比。
- 結果顯示新網站版本的轉換率(0.1188)略低於舊版本(0.1204),提升百分比為 -1.3117%。
t 檢定分析
# 提取兩組的轉換結果資料
control_data_flag = control_df.select('converted').rdd.flatMap(lambda x: x).collect()
treatment_data_flag = treatment_df.select('converted').rdd.flatMap(lambda x: x).collect()
import scipy.stats as stats
# 執行 t 檢定
t_statistic, p_value = stats.ttest_ind(treatment_data_flag, control_data_flag)
# 輸出結果
print("t-statistic:", t_statistic)
print("p-value:", p_value)
內容解密:
- 將兩組的轉換結果提取為列表形式以進行 t 檢定。
- 使用
scipy.stats.ttest_ind函式執行獨立樣本 t 檢定。 - t 檢定的結果用於判斷新舊網站版本的轉換率是否有顯著差異。
PySpark 在實驗設計與最佳化策略中的應用
實驗設計與 T 檢定實作
在進行網站佈局改版實驗時,我們希望評估新版佈局是否能顯著提高購買率。為此,我們採用了 T 檢定來比較新舊佈局之間的轉換率差異。
# 計算控制組和實驗組的轉換率
control_conversion_rate = 0.1204
treatment_conversion_rate = 0.1188
# 計算 T 統計量和 p 值
t_statistic = -1.31
p_value = 0.19
print("T 統計量:", t_statistic)
print("p 值:", p_value)
內容解密:
- T 檢定用於比較兩組之間的平均值差異。
- T 統計量為 -1.31,表示實驗組的轉換率低於控制組。
- p 值為 0.19,大於常見的顯著性水平 0.05,意味著我們無法拒絕虛無假設。
- 結論:新版網站佈局並未顯著提高購買率。
使用 PySpark 進行 T 檢定
PySpark 提供了可擴充套件的機器學習能力,我們可以利用線性迴歸模型來進行 T 檢定。
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
# 將輸入特徵轉換為向量
vectorAssembler = VectorAssembler(inputCols=["group_flag"], outputCol="v_group_flag")
v_data = vectorAssembler.transform(data)
# 定義線性迴歸模型
lr = LinearRegression(featuresCol='v_group_flag', labelCol='converted')
# 訓練模型
lr_model = lr.fit(v_data)
# 輸出係數和截距
print("係數:", lr_model.coefficients)
print("截距:", lr_model.intercept)
# 取得 T 統計量和 p 值
print("T 統計量:", lr_model.summary.tValues)
print("p 值:", lr_model.summary.pValues)
內容解密:
- 線性迴歸模型的係數代表了實驗組對轉換率的影響。
- 截距等於控制組的轉換率。
- T 統計量和 p 值與直接計算的結果一致,驗證了實驗的有效性。
為何選擇 PySpark 進行實驗設計?
- 可擴充套件性與效率:PySpark 能夠處理海量資料,並原生支援平行運算,提高了資料處理和模型訓練的效率。
- 機器學習與資料處理整合:PySpark 提供了 MLlib 和 SQL 套件,簡化了資料清洗和模型訓練的流程。
- 多樣化的資料來源支援:PySpark 能夠處理傳統資料格式(CSV、JSON、Parquet)和串流資料來源(Kafka),方便資料整合。
- 結果重現性:PySpark 確保了實驗結果的可重現性,方便與其他資料科學家協作。
- 成本效益:作為開源解決方案,PySpark 是大規模資料實驗的成本效益選擇。
使用 PySpark 進行隨機資料生成
在測試場景和演算法時,PySpark 提供了便捷的隨機資料生成功能。
from pyspark.mllib.random import RandomRDDs
# 生成包含 100 萬個獨立同分布標準常態分佈值的隨機 RDD
u = RandomRDDs.normalRDD(spark.sparkContext, 1000000, 10)
# 對隨機 RDD 進行轉換,生成服從 N(1, 4) 的隨機數
v = u.map(lambda x: 1.0 + 2.0 * x)
內容解密:
RandomRDDs.normalRDD用於生成標準常態分佈的隨機 RDD。- 使用
map方法對隨機 RDD 進行轉換,生成具有指定均值和變異數的隨機數。