返回文章列表

Tesla 股價預測模組化深度學習管線

本文介紹如何利用模組化設計方法建構 Tesla 股價預測的深度學習管線,使用 PySpark 進行資料預處理,PyTorch 構建和訓練深度學習模型,並利用 Apache Airflow 協調整個工作流程。文中詳細闡述了資料處理、模型訓練和評估等模組的實作細節,以及如何使用 R-squared

機器學習 資料工程

Tesla 股價預測專案採用模組化設計,將資料處理、模型訓練和評估等步驟分解成獨立的模組,提升了程式碼的可維護性和可重用性。資料處理模組使用 PySpark 載入和預處理股價資料,包含特徵向量組裝和標準化縮放。模型訓練模組使用 PyTorch 構建深度學習模型,並使用 DataLoader 載入資料進行訓練。模型評估模組計算測試損失和 R-squared 指標,評估模型的預測效能。此架構方便整合與維護,並能根據需求靈活調整模型或資料處理方法。

使用模組化設計最佳化 Tesla 股價預測流程

在開發複雜的資料處理和機器學習任務時,採用模組化的設計方法可以顯著提升開發效率和系統的靈活性。以 Tesla 股價預測為例,我們可以將整個流程分解為多個獨立的模組,每個模組負責特定的任務,例如資料處理、模型訓練和模型評估。這種設計方式不僅便於整合和自定義,還能在不同專案之間重複使用,從而提高整體開發效率。

Tesla 股價預測的模組化設計

在 Tesla 股價預測專案中,我們設計了五個主要的模組:

  1. data_processing.py:負責使用 PySpark 載入和預處理 Tesla 股價資料。
  2. model_training.py:使用 PyTorch 訓練深度學習迴歸模型。
  3. model_evaluation.py:評估 PyTorch 深度學習模型的效能。
  4. utils.py:提供輔助函式,例如將 Spark DataFrame 轉換為 PyTorch 張量。
  5. main.py:協調整個工作流程的執行。

資料預處理模組詳解

首先,我們來看看 data_processing.py 模組的實作細節:

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import DataFrame

def load_data(file_path: str) -> DataFrame:
    """
    使用 SparkSession 從 CSV 檔案載入股價資料。
    """
    spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df

def preprocess_data(df: DataFrame) -> DataFrame:
    """
    對資料進行預處理,包括特徵向量組裝和標準化縮放。
    """
    assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')
    df = assembler.transform(df)
    
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    scaler_model = scaler.fit(df)
    df = scaler_model.transform(df)
    
    df = df.select('scaled_features', 'Close')
    return df

內容解密:

  1. load_data 函式:利用 SparkSession 從指定的 CSV 檔案路徑讀取股價資料,傳回一個 DataFrame。設定 header=True 表示第一行包含欄位名稱,而 inferSchema=True 則讓 Spark 自動推斷欄位的資料型別。
  2. preprocess_data 函式:對輸入的 DataFrame 進行預處理。首先,使用 VectorAssembler 將指定的特徵欄位(‘Open’, ‘High’, ‘Low’, ‘Volume’)組裝成一個名為 ‘features’ 的向量欄位。接著,使用 StandardScaler 對特徵向量進行標準化縮放,以確保特徵具有相似的範圍,從而提升機器學習模型的效能和收斂速度。最後,選取 ‘scaled_features’ 和 ‘Close’ 欄位,傳回處理後的 DataFrame。

為何採用模組化設計?

  1. 提高可重用性:每個模組都可以獨立開發和測試,便於在不同專案中重複使用。
  2. 簡化整合與維護:模組化的設計使得系統更易於理解和維護,當需要修改或更新某個功能時,只需關注相關的模組即可。
  3. 增強靈活性:可以根據需求輕鬆替換或升級特定的模組,例如更換不同的機器學習模型或調整資料預處理步驟。

PyTorch 模型訓練與評估模組詳解

本章節探討使用 PyTorch 進行 Tesla 股價預測的深度學習模型訓練與評估過程,主要涵蓋 model_training.pymodel_evaluation.py 兩個核心模組。

模組 1:模型訓練(model_training.py)

資料載入器建立

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

def create_data_loader(features, labels, batch_size=32) -> DataLoader:
    """
    將預處理後的資料轉換為 PyTorch 張量並建立 DataLoader 物件。
    """
    dataset = TensorDataset(features, labels)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

內容解密:

  1. 匯入必要的 PyTorch 模組:包括 torchtorch.nntorch.optim 等,用於構建神經網路和最佳化演算法。
  2. create_data_loader 函式:將預處理後的特徵和標籤資料封裝成 TensorDataset,並進一步建立 DataLoader 物件以實作高效的批次資料載入。
  3. DataLoader 的重要性:在分散式環境(如 PySpark)中,DataLoader 能有效管理大規模資料集的載入與分批,最佳化記憶體使用和計算效率,並支援資料洗牌和平行載入。

模型訓練流程

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    """
    使用 DataLoader 和指定的損失函式及最佳化器訓練模型。
    """
    for epoch in range(num_epochs):
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.unsqueeze(1))
            loss.backward()
            optimizer.step()
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

內容解密:

  1. train_model 函式:根據指定的 epoch 次數迭代訓練模型,每次迭代中計算損失並更新模型引數。
  2. 梯度清零:使用 optimizer.zero_grad() 清除前一次迭代的梯度,避免梯度累積。
  3. 前向傳播:將輸入資料送入模型進行預測,計算預測值與真實標籤之間的損失。
  4. 反向傳播與引數更新:透過 loss.backward() 計算梯度,並使用 optimizer.step() 更新模型引數。

模組 2:模型評估(model_evaluation.py)

評估函式實作

import torch

def evaluate_model(model, test_loader, criterion):
    """
    在測試資料集上評估訓練好的模型,計算測試損失和 R-squared 評分。
    """
    with torch.no_grad():
        model.eval()
        predictions = []
        targets = []
        test_loss = 0.0
        for inputs, labels in test_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels.unsqueeze(1))
            test_loss += loss.item() * inputs.size(0)
            predictions.extend(outputs.squeeze().tolist())
            targets.extend(labels.tolist())
        test_loss /= len(test_loader.dataset)
        predictions = torch.tensor(predictions)
        targets = torch.tensor(targets)
        ss_res = torch.sum((targets - predictions) ** 2)
        ss_tot = torch.sum((targets - torch.mean(targets)) ** 2)
        r_squared = 1 - ss_res / ss_tot
    print(f"Test Loss: {test_loss:.4f}")
    print(f"R-squared Score: {r_squared:.4f}")
    return test_loss, r_squared.item()

內容解密:

  1. evaluate_model 函式:在測試資料集上評估模型的表現,計算測試損失和 R-squared 評分指標。
  2. 無梯度上下文:使用 torch.no_grad() 確保在評估過程中不計算梯度,提高計算效率。
  3. 模型評估模式:透過 model.eval() 將模型切換至評估模式,避免 BatchNorm 和 Dropout 等層的訓練行為影響評估結果。
  4. R-squared 計算:根據預測值和真實值計算 R-squared,評估模型的擬合優度。

深度學習模型訓練與評估流程解析

本章節將探討如何使用Apache Airflow建構可擴充套件的深度學習管線,涵蓋資料預處理、模型訓練、模型評估等關鍵步驟,並著重於實作細節與技術解析。

模型評估模組(model_evaluation.py)詳解

模型評估是深度學習管線中的關鍵環節,用於檢驗訓練好的模型在測試資料集上的表現。以下將詳細解析評估流程:

評估函式實作

def evaluate_model(model, test_loader, criterion):
    model.eval()
    with torch.no_grad():
        test_loss = 0
        predictions = []
        targets = []
        
        for inputs, labels in test_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * inputs.size(0)
            
            predictions.extend(outputs.squeeze().tolist())
            targets.extend(labels.tolist())
        
        test_loss /= len(test_loader.dataset)
        ss_res = sum((p - t) ** 2 for p, t in zip(predictions, targets))
        ss_tot = sum((t - sum(targets) / len(targets)) ** 2 for t in targets)
        r_squared = 1 - (ss_res / ss_tot)
        
        print(f'Test Loss: {test_loss:.4f}, R-squared: {r_squared:.4f}')
        return test_loss, r_squared

內容解密:

  1. 模型評估模式:首先將模型切換至評估模式(model.eval()),並停用梯度計算(torch.no_grad()),因為評估過程中不需要計算梯度。
  2. 損失與預測收集:初始化test_losspredictionstargets來收集測試損失、模型預測值和真實標籤。
  3. 批次處理:遍歷測試資料載入器(test_loader),對每個批次進行預測並計算損失,將結果累積至test_loss,同時收集預測值和真實標籤。
  4. 平均損失計算:完成所有批次處理後,計算整個測試資料集上的平均損失。
  5. R-squared指標計算:根據收集的預測值和真實標籤,計算R-squared指標以評估模型的擬合優度。
  6. 結果輸出:列印測試損失和R-squared分數,並將其作為元組傳回。

工具模組(utils.py)功能解析

工具模組提供了一系列輔助函式,用於支援深度學習管線的運作。以下重點介紹spark_df_to_tensor函式:

def spark_df_to_tensor(df: DataFrame) -> Tuple[torch.Tensor, torch.Tensor]:
    features = torch.tensor(np.array(df.rdd.map(lambda x: x.scaled_features.toArray()).collect()), dtype=torch.float32)
    labels = torch.tensor(np.array(df.rdd.map(lambda x: x.Close).collect()), dtype=torch.float32)
    return features, labels

內容解密:

  1. Spark DataFrame轉換:該函式將Spark DataFrame轉換為PyTorch張量,支援將特徵和標籤分別轉換為張量格式。
  2. RDD操作:利用Spark的RDD(彈性分散式資料集)API對DataFrame進行操作,將所需的欄位(如scaled_featuresClose)提取並轉換為NumPy陣列,最終轉換為PyTorch張量。
  3. 資料型別指定:明確指定張量的資料型別為torch.float32,確保與後續的深度學習模型相容。

主程式(main.py)流程控制

主程式負責協調整個深度學習管線的執行流程,包括資料載入、預處理、模型訓練和評估等步驟。

def main(data_file_path: str, num_epochs: int = 100, batch_size: int = 32, learning_rate: float = 0.001):
    try:
        df = load_data(data_file_path)
        df = preprocess_data(df)
        # ... 省略部分程式碼 ...
        model = nn.Sequential(
            nn.Linear(input_size, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, output_size)
        )
        # ... 省略部分程式碼 ...
        train_model(model, train_loader, criterion, optimizer, num_epochs)
        evaluate_model(model, test_loader, criterion)
        torch.save(model.state_dict(), 'trained_model.pth')
    except FileNotFoundError:
        logger.error(f"Data file not found at {data_file_path}")

內容解密:

  1. 資料載入與預處理:首先載入資料並進行預處理,為後續的模型訓練做好準備。
  2. 資料分割:將資料集分割為訓練集和測試集,用於模型的訓練和評估。
  3. 模型定義:定義一個包含多層線性層和ReLU啟用函式的神經網路模型,用於股票價格預測任務。
  4. 模型訓練與評估:呼叫train_model函式進行模型訓練,然後使用evaluate_model函式在測試集上評估模型效能。
  5. 模型儲存:將訓練好的模型引數儲存到檔案中,以便於後續的使用或佈署。