返回文章列表

Spark Python 資料處理與視覺化技術

本文介紹如何結合 Apache Spark 和 Python 進行資料處理與視覺化。涵蓋 Spark 的 Join、DropDuplicates 等資料處理技術,以及使用 Matplotlib 和 Seaborn 進行直方圖、柱狀圖等視覺化方法。文章提供程式碼範例,演示如何將 Spark DataFrame 轉換為

資料科學 Web 開發

Spark 提供高效的資料處理能力,而 Python 的資料視覺化函式庫則擅長資料呈現。本文整合兩者,先闡述 Spark 中的 Join 操作,包含 Left Anti Join、Left Semi Join 和 Broadcast Join 等不同型別,並說明 DropDuplicates 操作如何去除重複資料。接著,示範如何將 Spark DataFrame 轉換為 Pandas DataFrame,再利用 Matplotlib 和 Seaborn 繪製直方圖和柱狀圖,同時講解如何過濾資料以最佳化視覺化效果。最後,提供一個統計電影發行數量的案例和練習題,讓讀者能實際操作並加深理解。

資料處理與視覺化技術

在資料科學領域中,資料處理與視覺化是兩個非常重要的環節。Apache Spark 提供了強大的資料處理能力,而 Python 的資料視覺化函式庫(如 Matplotlib 和 Seaborn)則能夠有效地呈現資料。本文將介紹如何結合 Spark 與 Python 進行資料處理與視覺化。

資料處理技術

Spark 提供了多種資料處理技術,包括 Join、DropDuplicates 等。

Join 操作

Join 操作是一種常見的資料處理技術,用於合併兩個 DataFrame。Spark 支援多種 Join 操作,包括 Inner Join、Left Outer Join、Right Outer Join、Full Outer Join、Left Anti Join 和 Left Semi Join。

  • Left Anti Join:此操作會保留左側 DataFrame 中所有在右側 DataFrame 中不存在的列,並只保留左側 DataFrame 的欄位結構。

df.join(df_p1, df[‘id’] == df_p1[‘id’], ’left_anti’).printSchema() df.join(df_p1, df[‘id’] == df_p1[‘id’], ’left_anti’).count()


    #### 內容解密:
    - `df.join(df_p1, df['id'] == df_p1['id'], 'left_anti')` 這行程式碼執行了 Left Anti Join 操作,將 `df` 和 `df_p1` 兩個 DataFrame 根據 `id` 欄位進行比對。
    - `printSchema()` 用於列印 DataFrame 的欄位結構。
    - `count()` 用於計算 DataFrame 中的資料筆數。

*   **Left Semi Join**:此操作類別似於 Inner Join,但不會保留右側 DataFrame 的欄位。
    ```python
df.join(df_p1, df['id'] == df_p1['id'], 'left_semi').printSchema()
df.join(df_p1, df['id'] == df_p1['id'], 'left_semi').count()
#### 內容解密:
- `df.join(df_p1, df['id'] == df_p1['id'], 'left_semi')` 這行程式碼執行了 Left Semi Join 操作,將 `df` 和 `df_p1` 兩個 DataFrame 根據 `id` 欄位進行比對。
- 只保留 `df` 中存在於 `df_p1` 的資料列,但不包含 `df_p1` 的欄位。
  • Broadcast Join:當需要將一個大 DataFrame 與一個小 DataFrame 進行 Join 操作時,可以使用 Broadcast Join。Spark 會將小 DataFrame 複製到叢集中的所有節點上,以提高效率。

df.join(broadcast(df_p1), df[‘id’] == df_p1[‘id’], ’left_semi’).count()


    #### 內容解密:
    - `broadcast(df_p1)` 將小 DataFrame `df_p1` 廣播到叢集中的所有節點上。
    - 然後進行 Left Semi Join 操作,以提高效率。

#### DropDuplicates 操作

在處理真實世界的資料時,經常需要刪除重複的資料列。Spark 提供了 `dropDuplicates` 方法來實作這一功能。

```python
df.dropDuplicates(['title', 'release_year']).count()

內容解密:

  • df.dropDuplicates(['title', 'release_year']) 這行程式碼會刪除 dftitlerelease_year 欄位都相同的重複資料列。
  • count() 用於計算刪除重複資料列後的資料筆數。

資料視覺化

雖然 Spark 本身沒有內建的資料視覺化功能,但可以將處理好的資料轉換為 Pandas DataFrame,然後使用 Python 的資料視覺化函式庫(如 Matplotlib 和 Seaborn)進行視覺化。

直方圖視覺化

以下是一個將 Spark DataFrame 中的 popularity 欄位視覺化的例子:

# Step 1: Importing the required libraries
%matplotlib inline
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns

# Step 2: Processing the data in Spark
histogram_data = df.select('popularity').rdd.flatMap(lambda x: x).histogram(25)

# Step 3: Loading the Computed Histogram into a pandas DataFrame for plotting
hist_df = pd.DataFrame(list(zip(*histogram_data)), columns=['bin', 'frequency'])

# Step 4: Plotting the data
sns.set(rc={"figure.figsize": (12, 8)})
sns.barplot(hist_df['bin'], hist_df['frequency'])
plt.xticks(rotation=45)
plt.show()

內容解密:

  • 首先匯入必要的函式庫,包括 Matplotlib 和 Seaborn。
  • 使用 Spark 的 histogram 方法計算 popularity 欄位的直方圖資料。
  • 將計算好的直方圖資料轉換為 Pandas DataFrame。
  • 使用 Seaborn 的 barplot 方法繪製直方圖。

資料過濾與視覺化

在某些情況下,可能需要對資料進行過濾以獲得更好的視覺化效果。

# Filtering the data to get a better understanding of data
df_fil = df.filter('popularity < 22')

# Processing the data in Spark
histogram_data = df_fil.select('popularity').rdd.flatMap(lambda x: x).histogram(25)

# Loading the computed histogram into a pandas DataFrame for plotting
hist_df = pd.DataFrame(list(zip(*histogram_data)), columns=['bin', 'frequency'])

# Plotting the data
sns.set(rc={'figure.figsize': (11.7, 8.27)})
sns.barplot(hist_df['bin'], hist_df['frequency'])
plt.xticks(rotation=25)
plt.title('Distribution of Popularity - Data is filtered')
plt.show()

內容解密:

  • popularity 欄位進行過濾,只保留小於 22 的資料。
  • 然後按照前面的步驟計算直方圖並進行視覺化。

其他視覺化範例

以下是一個統計每年電影發行數量的例子:

# Step 1: Preparing the data using Spark functions and converting to pandas DataFrame
df_cat = df.filter("(release_year > 1959) and (release_year < 1971)").groupby('release_year').count().toPandas()

# Step 2: Sorting the values for display
df_cat = df_cat.sort_values(by=['release_year'], ascending=False)

# Step 3: Plotting the data
sns.set(rc={'figure.figsize': (11.7, 8.27)})
sns.barplot(df_cat['release_year'], df_cat['count'])
plt.xticks(rotation=25)
plt.title('Number of films released each year from 1960 to 1970 in our dataset')
plt.show()

內容解密:

  • 使用 Spark 對資料進行過濾和分組統計,然後轉換為 Pandas DataFrame。
  • 對結果進行排序。
  • 使用 Seaborn 繪製柱狀圖以展示每年電影的發行數量。

練習題

請使用提供的 TMDB 資料集(https://www.kaggle.com/kakarlaramcharan/tmdb-data-0920)完成以下練習:

  1. 找出2010年最受歡迎的第二部電影。
  2. 識別出重複的電影標題及其發行年份。
  3. 統計所有年份中最受歡迎的前幾部電影。
  4. 將2014年和2015年的電影預算進行視覺化比較。

機器學習導論

在深入機器學習的世界之前,瞭解分析學的演進至關重要。分析學是一種對資料或統計資料的計算分析,用於發現、解釋和傳達有意義的資料模式。人們經常聽到與分析學相關的多個術語和流行詞彙,但它們都用於收集洞察和做出明智的決策。

分析學大致可以分為以下幾類別(如圖 3-15 所示):

  • 描述性分析:這一領域專注於建立報告和圖表,以理解資料。
  • 預測性分析:這一領域與根據歷史資料估計未來事件相關聯,依賴歷史資料進行預測。
  • 規範性分析:這一領域專注於根據過去的資料決定應該做什麼,包括決策最佳化。

圖 3-15:分析學概覽

機器學習與分析學的關係

這些術語經常被以一種讓新手感到混淆的方式使用。機器學習是電腦科學的一個子領域,被視為人工智慧的一個子集。預測性分析指的是統計建模,常見於保險、銀行和市場行銷行業。機器學習是一個更廣泛的術語,涵蓋多個類別,包括:

  • 監督式學習:在這種學習中,演算法會被提供帶有目標標籤或事件的歷史資料集。這種研究領域透過準確度來驗證,也包括任何時間序列建模。
  • 非監督式學習:在這種學習中,演算法會被提供沒有目標標籤或事件的歷史資料集。這裡的目的是識別資料中的隱藏模式或分段。

資料型別

在討論監督式和非監督式學習技術之前,我們需要了解資料型別。資料大致可以分為質性資料量性資料

  • 質性資料是無法測量的資料,例如車輛顏色、婚姻狀況、教育程度等。這些變數也可以稱為類別變數。質性資料可以進一步分為:
    • 名目變數:當資料中有多個層級且沒有自然順序時,可以視為名目變數,例如車輛顏色或婚姻狀況。
    • 序數變數:當資料中有固有的順序時,可以視為序數變數,例如教育程度。教育程度有多個層級,如小學畢業、高中畢業、大學畢業和碩士畢業。這些層級具有相關的排名。

量性資料

量性資料是可以測量的,例如年齡、距離等。量性資料可以進一步分為離散型連續型

  • 離散型資料具有整數值,例如年齡(10, 20)。
  • 連續型資料具有小數或十進位制資料,例如距離(40.71 英里,59.82 英里)。

為什麼瞭解資料型別很重要?

瞭解資料型別有助於確定可以應用哪些型別的演算法。某些演算法是為特定的資料型別設計的。在監督式學習中,有兩類別不同的演算法:分類別迴歸

  • 分類別:在這種機器學習中,目標/事件資料是離散的。例如,預測可能流失的客戶(0/1)或可能違約的客戶(0/1)。
  • 迴歸:在這種機器學習中,目標/事件資料是連續的。例如,根據給定的引數預測房價($)或預測客戶在某個月的購買額。

程式碼範例:簡單的線性迴歸

# 匯入必要的函式庫
import numpy as np
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt

# 生成範例資料
X = np.array([1, 2, 3, 4, 5]).reshape(-1, 1)
y = np.array([2, 3, 5, 7, 11])

# 建立線性迴歸模型
model = LinearRegression()

# 訓練模型
model.fit(X, y)

# 進行預測
y_pred = model.predict(X)

# 繪製結果
plt.scatter(X, y, color='blue', label='實際值')
plt.plot(X, y_pred, color='red', label='預測值')
plt.legend()
plt.show()

內容解密:

這段程式碼展示了一個簡單的線性迴歸範例。首先,我們匯入了必要的函式庫,包括 NumPy、Scikit-learn 和 Matplotlib。然後,我們生成了一組簡單的範例資料,其中 X 是輸入特徵,y 是對應的輸出值。接著,我們建立了一個線性迴歸模型,並使用 fit 方法訓練模型。訓練完成後,我們使用模型進行預測,並將結果與實際值進行比較,最後透過 Matplotlib 繪製出實際值與預測值的對比圖。

圖 3-16:機器學習分類別

我們在後面的章節中將探討哪些演算法可以應用於這些方法。

分群

分群可以定義為將資料集中的同質群組分組。有許多標準方法可供資料專業人員遵循進行預測建模。其中一個稱為 CRISP-DM(跨行業資料探勘標準流程)(見圖 3-17)。它包括六個主要里程碑,如下所示:

  1. 業務理解:這是第一個重要的步驟,涉及理解問題陳述並將其轉化為資料問題。
  2. 資料理解:此步驟包括資料收集和描述性資料分析,以發現資料並熟悉有趣的特徵。
  3. 資料準備:這包括整合所有資料特徵並從資料集中建立額外的特徵/變數。
  4. 建模:這包括應用適當的演算法並迭代以建立更多變數(如有必要)。
  5. 評估:一旦建立了一個或多個模型,就會根據設定的驗證標準選擇一個模型。我們有一章專門幫助您瞭解所有模型評估和驗證技術。
  6. 佈署:這涉及將模型投入生產,並使模型評分可供業務各方利益相關者使用。

圖 3-17:CRISP-DM 方法論

另一個常見的標準框架是 SEMMA。SEMMA 代表 Sample(取樣)、Explore(探索)、Model(建模)、Modify(修改)和 Assess(評估)。這種方法由 SAS 在 CRISP-DM 之前引入。這種以建模為重點的方法不包括業務輸入作為其過程的一部分。如果您清楚地定義了您的業務目標,它在分散式組織中運作良好。

變數選擇的藝術:資料科學中的關鍵步驟

變數選擇是資料科學領域中的一項重要技術。本章旨在幫助讀者瞭解不同的變數選擇技術,並學習如何在PySpark中實作這些技術,以選出資料集中最具代表性的特徵。

探索性資料分析(EDA)在變數選擇中的角色

探索性資料分析(EDA)是用於分析和調查資料集,以識別其特徵和模式的方法。透過對資料的深入瞭解,可以做出更明智的決策,並解釋背後的理由。EDA也可以成為變數選擇過程中的第一步。

基數(Cardinality)

基數是指一個變數中獨特值的數量。對於類別變數來說,基數尤其重要。如果一個變數的基數為1,意味著該變數對於所有記錄都具有相同的值,因此可以省略該變數。

缺失值(Missing Values)

缺失值是指資料中缺失的資訊。瞭解缺失值的原因對於建立更好的模型至關重要。根據缺失值的型別,可以分為三類別:

  • 隨機缺失(MAR, Missing At Random):缺失值與觀測到的資料有關。例如,在房屋價格資料集中,當房屋型別為「獨立單元」時,HOA費用(Homeowners’ Association fees)可能缺失,因為獨立單元通常不需要支付HOA費用。
  • 完全隨機缺失(MCAR, Missing Completely At Random):缺失值與觀測到的資料無關。例如,房屋價格資料集中某個房屋的價格缺失,但無法從觀測到的資料中推斷出任何邏輯關係。
  • 非隨機缺失(MNAR, Missing Not At Random):缺失值與未觀測到的資料有關。

PySpark中的變數選擇技術

PySpark提供了多種內建和自定義的變數選擇技術。這些技術可以幫助選出最重要的特徵,去除冗餘或無關的變數,從而提高模型的準確性和效能。

使用PySpark實作變數選擇

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

# 載入銀行市場行銷資料集
bank_data = spark.read.csv("bank-full.csv", header=True, inferSchema=True)

# 將類別變數轉換為索引
indexer = StringIndexer(inputCols=["job", "marital", "education"], outputCols=["job_index", "marital_index", "education_index"])

# 將特徵組合成向量
assembler = VectorAssembler(inputCols=["age", "job_index", "marital_index", "education_index"], outputCol="features")

# 建立線性迴歸模型
lr = LinearRegression(featuresCol="features", labelCol="balance")

# 建立Pipeline
pipeline = Pipeline(stages=[indexer, assembler, lr])

# 訓練模型
model = pipeline.fit(bank_data)

#### 內容解密:
此段程式碼展示瞭如何使用PySpark進行變數選擇和建立線性迴歸模型首先使用StringIndexer將類別變數轉換為索引然後使用VectorAssembler將多個特徵組合成一個向量最後建立線性迴歸模型並使用Pipeline將整個流程串聯起來