Tesla 股價預測專案採用模組化設計,將資料處理、模型訓練和評估等步驟分解成獨立的模組,提升了程式碼的可維護性和可重用性。資料處理模組使用 PySpark 載入和預處理股價資料,包含特徵向量組裝和標準化縮放。模型訓練模組使用 PyTorch 構建深度學習模型,並使用 DataLoader 載入資料進行訓練。模型評估模組計算測試損失和 R-squared 指標,評估模型的預測效能。此架構方便整合與維護,並能根據需求靈活調整模型或資料處理方法。
使用模組化設計最佳化 Tesla 股價預測流程
在開發複雜的資料處理和機器學習任務時,採用模組化的設計方法可以顯著提升開發效率和系統的靈活性。以 Tesla 股價預測為例,我們可以將整個流程分解為多個獨立的模組,每個模組負責特定的任務,例如資料處理、模型訓練和模型評估。這種設計方式不僅便於整合和自定義,還能在不同專案之間重複使用,從而提高整體開發效率。
Tesla 股價預測的模組化設計
在 Tesla 股價預測專案中,我們設計了五個主要的模組:
data_processing.py:負責使用 PySpark 載入和預處理 Tesla 股價資料。model_training.py:使用 PyTorch 訓練深度學習迴歸模型。model_evaluation.py:評估 PyTorch 深度學習模型的效能。utils.py:提供輔助函式,例如將 Spark DataFrame 轉換為 PyTorch 張量。main.py:協調整個工作流程的執行。
資料預處理模組詳解
首先,我們來看看 data_processing.py 模組的實作細節:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import DataFrame
def load_data(file_path: str) -> DataFrame:
"""
使用 SparkSession 從 CSV 檔案載入股價資料。
"""
spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()
df = spark.read.csv(file_path, header=True, inferSchema=True)
return df
def preprocess_data(df: DataFrame) -> DataFrame:
"""
對資料進行預處理,包括特徵向量組裝和標準化縮放。
"""
assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')
df = assembler.transform(df)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
df = df.select('scaled_features', 'Close')
return df
內容解密:
load_data函式:利用SparkSession從指定的 CSV 檔案路徑讀取股價資料,傳回一個 DataFrame。設定header=True表示第一行包含欄位名稱,而inferSchema=True則讓 Spark 自動推斷欄位的資料型別。preprocess_data函式:對輸入的 DataFrame 進行預處理。首先,使用VectorAssembler將指定的特徵欄位(‘Open’, ‘High’, ‘Low’, ‘Volume’)組裝成一個名為 ‘features’ 的向量欄位。接著,使用StandardScaler對特徵向量進行標準化縮放,以確保特徵具有相似的範圍,從而提升機器學習模型的效能和收斂速度。最後,選取 ‘scaled_features’ 和 ‘Close’ 欄位,傳回處理後的 DataFrame。
為何採用模組化設計?
- 提高可重用性:每個模組都可以獨立開發和測試,便於在不同專案中重複使用。
- 簡化整合與維護:模組化的設計使得系統更易於理解和維護,當需要修改或更新某個功能時,只需關注相關的模組即可。
- 增強靈活性:可以根據需求輕鬆替換或升級特定的模組,例如更換不同的機器學習模型或調整資料預處理步驟。
PyTorch 模型訓練與評估模組詳解
本章節探討使用 PyTorch 進行 Tesla 股價預測的深度學習模型訓練與評估過程,主要涵蓋 model_training.py 和 model_evaluation.py 兩個核心模組。
模組 1:模型訓練(model_training.py)
資料載入器建立
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
def create_data_loader(features, labels, batch_size=32) -> DataLoader:
"""
將預處理後的資料轉換為 PyTorch 張量並建立 DataLoader 物件。
"""
dataset = TensorDataset(features, labels)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
內容解密:
- 匯入必要的 PyTorch 模組:包括
torch、torch.nn和torch.optim等,用於構建神經網路和最佳化演算法。 create_data_loader函式:將預處理後的特徵和標籤資料封裝成TensorDataset,並進一步建立DataLoader物件以實作高效的批次資料載入。DataLoader的重要性:在分散式環境(如 PySpark)中,DataLoader能有效管理大規模資料集的載入與分批,最佳化記憶體使用和計算效率,並支援資料洗牌和平行載入。
模型訓練流程
def train_model(model, train_loader, criterion, optimizer, num_epochs):
"""
使用 DataLoader 和指定的損失函式及最佳化器訓練模型。
"""
for epoch in range(num_epochs):
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
loss.backward()
optimizer.step()
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")
內容解密:
train_model函式:根據指定的 epoch 次數迭代訓練模型,每次迭代中計算損失並更新模型引數。- 梯度清零:使用
optimizer.zero_grad()清除前一次迭代的梯度,避免梯度累積。 - 前向傳播:將輸入資料送入模型進行預測,計算預測值與真實標籤之間的損失。
- 反向傳播與引數更新:透過
loss.backward()計算梯度,並使用optimizer.step()更新模型引數。
模組 2:模型評估(model_evaluation.py)
評估函式實作
import torch
def evaluate_model(model, test_loader, criterion):
"""
在測試資料集上評估訓練好的模型,計算測試損失和 R-squared 評分。
"""
with torch.no_grad():
model.eval()
predictions = []
targets = []
test_loss = 0.0
for inputs, labels in test_loader:
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
test_loss += loss.item() * inputs.size(0)
predictions.extend(outputs.squeeze().tolist())
targets.extend(labels.tolist())
test_loss /= len(test_loader.dataset)
predictions = torch.tensor(predictions)
targets = torch.tensor(targets)
ss_res = torch.sum((targets - predictions) ** 2)
ss_tot = torch.sum((targets - torch.mean(targets)) ** 2)
r_squared = 1 - ss_res / ss_tot
print(f"Test Loss: {test_loss:.4f}")
print(f"R-squared Score: {r_squared:.4f}")
return test_loss, r_squared.item()
內容解密:
evaluate_model函式:在測試資料集上評估模型的表現,計算測試損失和 R-squared 評分指標。- 無梯度上下文:使用
torch.no_grad()確保在評估過程中不計算梯度,提高計算效率。 - 模型評估模式:透過
model.eval()將模型切換至評估模式,避免 BatchNorm 和 Dropout 等層的訓練行為影響評估結果。 - R-squared 計算:根據預測值和真實值計算 R-squared,評估模型的擬合優度。
深度學習模型訓練與評估流程解析
本章節將探討如何使用Apache Airflow建構可擴充套件的深度學習管線,涵蓋資料預處理、模型訓練、模型評估等關鍵步驟,並著重於實作細節與技術解析。
模型評估模組(model_evaluation.py)詳解
模型評估是深度學習管線中的關鍵環節,用於檢驗訓練好的模型在測試資料集上的表現。以下將詳細解析評估流程:
評估函式實作
def evaluate_model(model, test_loader, criterion):
model.eval()
with torch.no_grad():
test_loss = 0
predictions = []
targets = []
for inputs, labels in test_loader:
outputs = model(inputs)
loss = criterion(outputs, labels)
test_loss += loss.item() * inputs.size(0)
predictions.extend(outputs.squeeze().tolist())
targets.extend(labels.tolist())
test_loss /= len(test_loader.dataset)
ss_res = sum((p - t) ** 2 for p, t in zip(predictions, targets))
ss_tot = sum((t - sum(targets) / len(targets)) ** 2 for t in targets)
r_squared = 1 - (ss_res / ss_tot)
print(f'Test Loss: {test_loss:.4f}, R-squared: {r_squared:.4f}')
return test_loss, r_squared
內容解密:
- 模型評估模式:首先將模型切換至評估模式(
model.eval()),並停用梯度計算(torch.no_grad()),因為評估過程中不需要計算梯度。 - 損失與預測收集:初始化
test_loss、predictions和targets來收集測試損失、模型預測值和真實標籤。 - 批次處理:遍歷測試資料載入器(
test_loader),對每個批次進行預測並計算損失,將結果累積至test_loss,同時收集預測值和真實標籤。 - 平均損失計算:完成所有批次處理後,計算整個測試資料集上的平均損失。
- R-squared指標計算:根據收集的預測值和真實標籤,計算R-squared指標以評估模型的擬合優度。
- 結果輸出:列印測試損失和R-squared分數,並將其作為元組傳回。
工具模組(utils.py)功能解析
工具模組提供了一系列輔助函式,用於支援深度學習管線的運作。以下重點介紹spark_df_to_tensor函式:
def spark_df_to_tensor(df: DataFrame) -> Tuple[torch.Tensor, torch.Tensor]:
features = torch.tensor(np.array(df.rdd.map(lambda x: x.scaled_features.toArray()).collect()), dtype=torch.float32)
labels = torch.tensor(np.array(df.rdd.map(lambda x: x.Close).collect()), dtype=torch.float32)
return features, labels
內容解密:
- Spark DataFrame轉換:該函式將Spark DataFrame轉換為PyTorch張量,支援將特徵和標籤分別轉換為張量格式。
- RDD操作:利用Spark的RDD(彈性分散式資料集)API對DataFrame進行操作,將所需的欄位(如
scaled_features和Close)提取並轉換為NumPy陣列,最終轉換為PyTorch張量。 - 資料型別指定:明確指定張量的資料型別為
torch.float32,確保與後續的深度學習模型相容。
主程式(main.py)流程控制
主程式負責協調整個深度學習管線的執行流程,包括資料載入、預處理、模型訓練和評估等步驟。
def main(data_file_path: str, num_epochs: int = 100, batch_size: int = 32, learning_rate: float = 0.001):
try:
df = load_data(data_file_path)
df = preprocess_data(df)
# ... 省略部分程式碼 ...
model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, output_size)
)
# ... 省略部分程式碼 ...
train_model(model, train_loader, criterion, optimizer, num_epochs)
evaluate_model(model, test_loader, criterion)
torch.save(model.state_dict(), 'trained_model.pth')
except FileNotFoundError:
logger.error(f"Data file not found at {data_file_path}")
內容解密:
- 資料載入與預處理:首先載入資料並進行預處理,為後續的模型訓練做好準備。
- 資料分割:將資料集分割為訓練集和測試集,用於模型的訓練和評估。
- 模型定義:定義一個包含多層線性層和ReLU啟用函式的神經網路模型,用於股票價格預測任務。
- 模型訓練與評估:呼叫
train_model函式進行模型訓練,然後使用evaluate_model函式在測試集上評估模型效能。 - 模型儲存:將訓練好的模型引數儲存到檔案中,以便於後續的使用或佈署。