PySpark 分散式運算框架的特性,有效率地載入和預處理 Tesla 股票資料,包含日期、開盤價、最高價、最低價、成交量和收盤價等欄位。利用 SparkSession 建立 Spark 應用程式,並透過 spark.read.csv 讀取 AWS S3 上的 CSV 資料。資料預處理階段,使用 VectorAssembler 將開盤價、最高價、最低價和成交量等特徵組裝成特徵向量,再使用 StandardScaler 對特徵向量進行標準化,確保各特徵量綱一致,避免影響模型訓練效果。接著,將預處理後的 Spark DataFrame 轉換為 PyTorch TensorDataset,並使用 DataLoader 建立訓練和測試資料集的迭代器,以便於模型訓練和評估。模型採用多層感知器(MLP)架構,由線性層和 ReLU 啟用函式組成。訓練過程中,使用 Adam 最佳化器最小化均方誤差(MSE)損失函式,並記錄每個 epoch 的訓練損失。最後,在測試資料集上評估模型效能,計算測試損失和 R 平方得分,用於衡量模型的預測能力。
使用PySpark進行資料載入與預處理
在這一部分,我們將探討如何使用PySpark進行Tesla股票價格資料的載入與預處理。首先,我們需要建立一個Logger物件來記錄程式的執行過程。
建立Logger物件
logger = logging.getLogger(__name__)
這行程式碼建立了一個名為logger的Logger物件,專門用於當前模組的記錄工作,有助於追蹤程式進度、除錯問題和監控活動。
資料載入函式(load_data)
接下來,我們定義了load_data函式,用於從AWS S3上的CSV檔案載入Tesla股票價格資料。該資料包含了2019年2月26日至2024年2月23日期間Tesla股票的日期、開盤價、最高價、最低價、成交量和收盤價。
程式碼實作
def load_data(file_path: str) -> DataFrame:
"""
使用SparkSession從CSV檔案載入股票價格資料。
"""
try:
spark = (SparkSession.builder
.appName("StockPricePrediction")
.getOrCreate())
df = spark.read.csv(
file_path,
header=True,
inferSchema=True
)
return df
except Exception as e:
raise RuntimeError(
f"從{file_path}載入資料時發生錯誤:{e}"
)
內容解密:
- SparkSession的使用:
SparkSession是存取Spark功能和處理資料的主要入口點。在load_data函式中,SparkSession用於建立Spark應用程式上下文,從而實作大規模分散式資料的高效平行處理。 - 資料載入流程:函式內部使用
SparkSession.builder方法建立SparkSession,接著使用spark.read.csv方法讀取CSV檔案至DataFrame。其中,header=True表示CSV檔案的第一行包含欄位名稱,而inferSchema=True則會根據資料推斷DataFrame的結構描述。 - 例外處理:程式碼使用try-except區塊來處理資料載入過程中可能發生的例外。如果try區塊內的程式碼執行過程中出現例外,則會被except區塊捕捉,並丟出一個帶有詳細錯誤訊息的
RuntimeError。
資料預處理函式(preprocess_data)
接下來,我們定義了preprocess_data函式,用於對載入的Tesla股票價格資料進行預處理。
程式碼實作
def preprocess_data(df: DataFrame) -> DataFrame:
"""
透過VectorAssembler組裝特徵向量,並使用StandardScaler進行特徵縮放。
"""
assembler = VectorAssembler(
inputCols=['Open', 'High', 'Low', 'Volume'],
outputCol='features'
)
df = assembler.transform(df)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
df = df.select('scaled_features', 'Close')
return df
內容解密:
- 特徵向量組裝:首先,使用
VectorAssembler將多個特徵欄位(如開盤價、最高價、最低價和成交量)組裝成一個特徵向量,以便於後續的模型訓練。 - 特徵縮放:接著,使用
StandardScaler對組裝好的特徵向量進行縮放,將特徵值標準化至相同的尺度,以避免某些特徵值因量綱差異而對模型訓練產生不當影響。 - 實作細節:在函式內部,首先初始化
VectorAssembler並呼叫其transform方法將特徵欄位組裝成特徵向量。然後,初始化StandardScaler並呼叫其fit方法計算特徵縮放所需的統計量,接著呼叫transform方法對特徵向量進行縮放。最後,選取縮放後的特徵向量和收盤價欄位傳回。
步驟五:建立資料載入器(DataLoader)的函式(create_data_loader)
在這一步驟中,我們定義了 create_data_loader 函式,該函式在建立訓練和測試資料集的 DataLoader 物件方面發揮著關鍵作用:
create_data_loader 函式實作
def create_data_loader(features, labels, batch_size=32, num_workers=4) -> DataLoader:
"""
將預處理後的資料轉換為 PyTorch 張量,並為訓練和測試集建立 DataLoader 物件。
引數:
features (Tensor):輸入特徵。
labels (Tensor):對應的標籤。
batch_size (int):每個批次載入的樣本數量。預設為 32。
num_workers (int):用於資料載入的子程式數量。預設為 4。
傳回:
DataLoader:PyTorch DataLoader 物件,用於資料集。
"""
dataset = TensorDataset(features, labels)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
內容解密:
create_data_loader函式的作用:此函式負責將預處理後的資料轉換為 PyTorch 張量,並建立 DataLoader 物件。這些物件對於在訓練和測試階段有效地將資料饋送到機器學習模型至關重要。TensorDataset的使用:函式接收預處理後的特徵和標籤作為輸入,並將它們轉換為 PyTorch 的TensorDataset,這是一種包裝張量的資料集。DataLoader物件的建立:透過將TensorDataset與指定的批次大小(batch_size)和洗牌引數(shuffle=True)傳遞給DataLoader建構函式,建立訓練和測試資料集的 DataLoader 物件。num_workers引數的作用:DataLoader中的num_workers引數允許平行資料載入,特別是在處理大型資料集或複雜的預處理步驟時,可以顯著加快資料準備階段。
DataLoader 的重要性
- 批次處理(Batching):DataLoader 自動將資料分批處理,這使得計算更高效,特別是在處理無法一次性載入記憶體的大型資料集時。
- 資料洗牌(Shuffling):在模型訓練過程中,洗牌資料至關重要,以防止模型學習到資料集特有的順序或模式。DataLoader 提供了洗牌資料的選項,確保模型在訓練過程中看到的每個批次都是資料集中的隨機樣本。
步驟六:模型訓練函式(train_model)
在這一步驟中,我們建立了 train_model 函式,該函式負責使用訓練資料訓練神經網路模型:
train_model 函式實作
def train_model(model, train_loader, criterion, optimizer, num_epochs):
"""
使用 DataLoader 和定義的損失函式及最佳化器,在訓練資料上訓練模型。
在指定的 epoch 數量內迭代資料,計算損失,並更新模型引數。
"""
for epoch in range(num_epochs):
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
loss.backward()
optimizer.step()
logger.info(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")
內容解密:
train_model函式的作用:此函式負責在訓練資料上訓練神經網路模型。訓練神經網路涉及根據預測輸出與實際目標之間的差異迭代更新模型引數(權重和偏差)。- 迭代訓練過程:
- 函式在指定的 epoch 數量內迭代訓練資料集。每個 epoch 代表完整遍歷整個訓練資料集一次。
- 在每個 epoch 內,函式迭代處理小批次(mini-batches)的資料,這一過程由 DataLoader 物件促進。
- 引數更新與反向傳播:
- 反向傳播(Backpropagation)是訓練神經網路的關鍵演算法。它涉及計算損失函式相對於模型引數的梯度,表示每個引數對誤差的貢獻程度。
- 在計算梯度後,最佳化器(此例中為 Adam 最佳化器)根據梯度調整模型引數,以最小化損失。
PyTorch迴歸模型訓練與評估實作
在深度學習應用中,PyTorch框架提供了一套完整的工具來建立、訓練和評估神經網路模型。本文將探討如何使用PyTorch建立一個迴歸模型來預測特斯拉(Tesla)股票價格,並詳細解析模型訓練和評估的實作過程。
模型訓練函式(train_model)解析
模型訓練是深度學習的核心步驟,train_model函式負責執行模型的訓練流程:
程式碼實作
def train_model(model, train_loader, criterion, optimizer, num_epochs):
for epoch in range(num_epochs):
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
loss.backward()
optimizer.step()
內容解密:
- 訓練迴圈結構:外層迴圈控制訓練的輪數(epochs),內層迴圈則遍歷每個小批次(mini-batch)的訓練資料。
- 梯度重置:使用
optimizer.zero_grad()將模型引數的梯度歸零,避免累積梯度影響訓練。 - 前向傳播:呼叫模型的
forward方法(在PyTorch中自動執行)進行預測,計算輸出結果。 - 損失計算:使用指定的損失函式(criterion)計算預測值與實際標籤之間的誤差。
- 反向傳播:呼叫
loss.backward()計算損失函式相對於模型引數的梯度。 - 引數更新:使用
optimizer.step()根據計算出的梯度更新模型引數,完成一次最佳化步驟。
模型評估函式(evaluate_model)詳解
模型評估是驗證訓練成效的重要步驟,evaluate_model函式負責在測試資料集上評估模型的效能:
程式碼實作
def evaluate_model(model, test_loader, criterion):
with torch.no_grad():
model.eval()
predictions = []
targets = []
test_loss = 0.0
for inputs, labels in test_loader:
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
test_loss += loss.item() * inputs.size(0)
predictions.extend(outputs.squeeze().tolist())
targets.extend(labels.tolist())
test_loss /= len(test_loader.dataset)
predictions = torch.tensor(predictions)
targets = torch.tensor(targets)
ss_res = torch.sum((targets - predictions) ** 2)
ss_tot = torch.sum((targets - torch.mean(targets)) ** 2)
r_squared = 1 - ss_res / ss_tot
logger.info(f"測試損失:{test_loss:.4f}")
logger.info(f"R平方得分:{r_squared:.4f}")
return test_loss, r_squared.item()
內容解密:
- 評估模式切換:使用
model.eval()將模型切換到評估模式,停用不必要的計算如批次正規化中的統計更新。 - 損失計算:遍歷測試資料集,計算每個小批次的損失並累積總損失。
- R平方得分計算:使用公式 $R^2 = 1 - \frac{\sum (y_i - \hat{y_i})^2}{\sum (y_i - \bar{y})^2}$ 評估模型的擬合優度。
- 效能指標記錄:使用logger記錄測試損失和R平方得分,提供模型效能的量化評估。
主函式(main)流程控制
主函式是程式的進入點,負責協調整個訓練和評估流程:
程式碼實作
def main(data_file_path: str, num_epochs: int = 100, batch_size: int = 32, learning_rate: float = 0.001):
try:
# 資料載入與預處理
df = load_data(data_file_path)
df = preprocess_data(df)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# 資料轉換與載入器建立
train_features, train_labels = spark_df_to_tensor(train_df)
test_features, test_labels = spark_df_to_tensor(test_df)
train_loader = create_data_loader(train_features, train_labels, batch_size=batch_size)
test_loader = create_data_loader(test_features, test_labels, batch_size=batch_size)
# 模型建立與訓練
input_size = train_features.shape[1]
model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, 1)
)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_model(model, train_loader, criterion, optimizer, num_epochs)
evaluate_model(model, test_loader, criterion)
except Exception as e:
logger.error(f"執行錯誤:{str(e)}")
內容解密:
- 資料處理流程:包含資料載入、預處理、分割和轉換等步驟,為模型訓練準備適當的資料格式。
- 模型組態:建立一個包含多層線性層和ReLU啟用函式的神經網路模型,使用MSE損失函式和Adam最佳化器。
- 訓練與評估:呼叫
train_model進行模型訓練,然後使用evaluate_model評估模型在測試資料集上的表現。
本實作展示瞭如何使用PyTorch建立和訓練一個迴歸模型來預測股票價格,並透過嚴謹的評估流程驗證模型的效能。這種端對端的實作方法不僅有助於深入理解深度學習模型的開發流程,也為實際應用提供了寶貴的參考。