返回文章列表

Python資料清理轉換與載入策略實作

本文探討 Python 資料清理、轉換與載入的策略與實作,涵蓋處理缺失值、合併資料集、資料聚合、不同資料儲存系統的載入方式以及全量與增量載入策略,並提供最佳實踐與程式碼範例。

資料工程 Python

在資料科學專案中,資料清理、轉換和載入是不可或缺的環節。本文將示範如何使用 Python 工具和函式庫,有效地處理和轉換資料,並將其載入到不同的儲存系統。首先,我們會探討使用 Pandas 處理缺失值、合併和聚合資料的技巧。接著,我們將介紹如何使用 SQLAlchemy 與關係型資料函式庫互動,以及如何將資料載入到資料倉儲和 NoSQL 儲存。最後,我們將比較全量資料載入和增量資料載入的策略,並提供最佳實務建議,以確保資料處理流程的效率和可靠性。

資料清理與轉換:策略與實作

在資料科學和分析領域,資料清理與轉換是至關重要的步驟。本章節將探討在Python中如何進行資料清理與轉換,包括處理缺失值、合併資料集以及資料聚合等。

處理缺失值

首先,瞭解資料集中缺失值的分佈情況是非常重要的。可以使用Pandas的isnull().sum()方法來計算每個欄位中缺失值的數量。

import pandas as pd

# 假設df_crashes是我們的DataFrame
null_counts = df_crashes.isnull().sum()
print(null_counts)

內容解密:

  • isnull()用於識別DataFrame中的缺失值,傳回一個布林值的DataFrame。
  • sum()對布林值的DataFrame進行求和,True被視為1,False被視為0,因此得到每個欄位缺失值的總數。

刪除全為空值的欄位

刪除全為空值的欄位是簡化資料集的一種有效方法。

df_crashes.dropna(axis='columns', how='all', inplace=True)

內容解密:

  • dropna()用於刪除缺失值。
  • axis='columns'指定刪除的軸為欄位。
  • how='all'表示只有當整個欄位都為空值時才刪除。
  • inplace=True表示直接在原DataFrame上進行修改。

刪除缺失值過多的列

對於某些應用場景,刪除缺失值過多的行也是必要的。

df_crashes = df_crashes.dropna(axis='index', thresh=2, inplace=False)

內容解密:

  • axis='index'指定刪除的軸為行。
  • thresh=2表示至少需要2個非空值,否則將被刪除。
  • inplace=False表示傳回一個新的DataFrame,原DataFrame不變。

處理特定欄位的缺失值

對於某些欄位,可以根據業務邏輯填充缺失值。

df_crashes = df_crashes.fillna(value={'report_type': 'ON SCENE'})

內容解密:

  • fillna()用於填充缺失值。
  • value={'report_type': 'ON SCENE'}指定了對report_type欄位的缺失值填充為’ON SCENE’。

合併資料集

在實際應用中,往往需要合併多個資料集。Pandas提供了類別似SQL的合併功能。

df_vehicles = pd.read_csv("data/traffic_crash_vehicle.csv")
df = df_crashes.merge(df_vehicles, how='left', on='crash_record_id', suffixes=('_left', '_right'))

內容解密:

  • merge()用於合併兩個DataFrame。
  • how='left'表示進行左連線,即保留左邊DataFrame的所有行。
  • on='crash_record_id'指定了合併的鍵。
  • suffixes=('_left', '_right')用於區分兩個DataFrame中相同名稱的欄位。

資料聚合

合併後的資料集可以進行聚合操作以提取有價值的資訊。

df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index()

內容解密:

  • groupby('vehicle_type')根據vehicle_type進行分組。
  • agg({'crash_record_id': 'count'})對每個分組計算crash_record_id的數量。
  • reset_index()重置索引,使vehicle_type成為普通欄位。

資料對映

在資料轉換過程中,常常需要進行資料對映,以滿足目標系統的要求。

vehicle_mapping = {'vehicle_type': 'vehicletypes'}
df = df.rename(columns=vehicle_mapping)

內容解密:

  • rename()用於重新命名欄位。
  • columns=vehicle_mapping指定了欄位的對映關係。

建立轉換活動

最後,將上述步驟封裝成可重用的函式,以便於在不同的資料集上進行相同的轉換操作。

本章節介紹了在Python中進行資料清理與轉換的基本策略和實作方法,包括處理缺失值、合併資料集、資料聚合以及資料對映等。透過這些技術,可以有效地提高資料的品質和可用性,為後續的分析和建模工作奠定堅實的基礎。

資料清理與轉換的策略

在資料處理流程中,資料清理與轉換是至關重要的步驟。這些過程確保了輸出資料的品質和可靠性,使得資料能夠被有效地用於後續的分析和機器學習任務。

建立動態的資料處理流程

在設計資料處理流程時,應該將不同的處理步驟分解成獨立的函式,以便於重用和維護。這種方法不僅提高了程式碼的可讀性和可維護性,也使得流程更加靈活和可擴充套件。

步驟分解

  1. 讀取資料:建立 read_datasources(source_name) 函式來讀取資料來源。

    def read_datasources(source_name):
        # 實作讀取資料的邏輯
        pass
    

    內容解密:

    • read_datasources 函式的主要目的是從指定的資料來源讀取資料。
    • 引數 source_name 指定了資料來源的名稱或路徑。
    • 函式內部應該實作讀取資料的邏輯,例如使用 Pandas 讀取 CSV 檔案。
  2. 丟棄含有空值的列:建立 drop_rows_with_null_values(df) 函式來丟棄 DataFrame 中的空值列。

    def drop_rows_with_null_values(df):
        # 實作丟棄空值列的邏輯
        return df.dropna()
    

    內容解密:

    • drop_rows_with_null_values 函式的主要目的是丟棄 DataFrame 中的含有空值的列。
    • 使用 Pandas 的 dropna() 方法來實作這一功能。
    • 函式傳回處理後的 DataFrame。
  3. 填補缺失值:建立 fill_missing_values(df) 函式來填補 DataFrame 中的缺失值。

    def fill_missing_values(df):
        # 實作填補缺失值的邏輯
        return df.fillna(0)  # 示例中用0填補缺失值
    

    內容解密:

    • fill_missing_values 函式的主要目的是填補 DataFrame 中的缺失值。
    • 使用 Pandas 的 fillna() 方法來實作這一功能。
    • 示例中使用0來填補缺失值,實際應用中應根據具體情況選擇合適的值。
  4. 合併 DataFrame:建立 merge_dataframes(df1, df2) 函式來合併兩個 DataFrame。

    def merge_dataframes(df1, df2):
        # 實作合併 DataFrame 的邏輯
        return pd.merge(df1, df2, on='key')
    

    內容解密:

    • merge_dataframes 函式的主要目的是合併兩個 DataFrame。
    • 使用 Pandas 的 merge() 方法來實作這一功能。
    • on='key' 指定了合併的鍵。
  5. 重新命名欄位:建立 rename_columns(df) 函式來重新命名 DataFrame 中的欄位。

    def rename_columns(df):
        # 實作重新命名欄位的邏輯
        return df.rename(columns={'old_name': 'new_name'})
    

    內容解密:

    • rename_columns 函式的主要目的是重新命名 DataFrame 中的欄位。
    • 使用 Pandas 的 rename() 方法來實作這一功能。
    • 示例中將欄位 ‘old_name’ 重新命名為 ’new_name’。

組合資料處理流程

將上述步驟組合成一個完整的資料處理流程,可以使用以下程式碼:

def get_transformed_data(crash_file, vehicle_file):
    # 讀取資料
    df_crash, df_vehicle = read_data_pipeline(crash_file, vehicle_file)
    
    # 丟棄空值列
    df_crash, df_vehicle = drop_rows_with_null_values_pipeline(df_crash, df_vehicle)
    
    # 填補缺失值
    df_crash, df_vehicle = fill_missing_values_pipeline(df_crash, df_vehicle)
    
    # 合併 DataFrame
    df_agg = merge_dataframes_pipeline(df_crash, df_vehicle)
    
    # 重新命名欄位
    df_output = format_dataframes_pipeline(df_agg)
    
    return df_output

內容解密:

  • get_transformed_data 函式整合了資料處理的各個步驟,包括讀取資料、丟棄空值列、填補缺失值、合併 DataFrame 和重新命名欄位。
  • 每個步驟都使用了前面定義的獨立函式,使得流程清晰且易於維護。

資料載入的最佳實踐與技術

在ETL(提取、轉換、載入)流程中,資料載入是最後也是最關鍵的一步。本章節將探討資料載入的技術細節,包括不同型別的資料儲存系統、最佳實踐以及如何最佳化資料載入活動。

資料載入簡介

資料載入是ETL流程的最後一步,其目的是將經過處理和轉換的資料無縫地轉移到目標儲存系統中。選擇合適的目標系統對於資料的可存取性、儲存、查詢能力和整體系統效能至關重要。

選擇合適的載入目標

根據專案需求的不同,資料可能會被載入到關係型資料函式庫、雲端資料倉儲、NoSQL儲存或其他儲存函式庫中。瞭解目標系統的需求和能力是設計高效載入策略的基礎。

資料儲存系統型別

Python提供了一套多樣化的函式庫和工具,可以與各種資料儲存系統介面,包括:

關係型資料函式庫

關係型資料函式庫是最常見的資料儲存和管理系統。Python支援多種流行的RDBMS,如MySQL、PostgreSQL、SQLite和Oracle。使用SQLAlchemy和特定資料函式庫驅動程式(如psycopg2和pymysql)可以高效地將資料載入到結構化的表格中。

資料倉儲

資料倉儲主要用於儲存大量資料以進行長期儲存和分析查詢。Python有多個函式庫,如Pandas,可以高效地將資料載入到Amazon Redshift、Google BigQuery和Snowflake等資料倉儲中。

NoSQL儲存

NoSQL資料函式庫如MongoDB、Cassandra和Couchbase提供了靈活且無模式的資料儲存。Python工具如pymongo和cassandra-driver使得與這些資料函式庫的整合變得無縫。NoSQL儲存非常適合需要高擴充套件性、快速資料攝取和非結構化資料型別的場景。

資料載入的最佳實踐

雖然沒有一種通用方法適用於所有資料管道的載入活動,但遵循最佳實踐可以提高效率、準確性和可靠性。設計資料載入活動時,應考慮以下三個原則:

  • 效能最佳化:使用批次載入、平行處理和最佳化SQL查詢等技術,可以顯著提高大型資料集的載入效能。
  • 可擴充套件性:採用可擴充套件的策略,以確保資料載入解決方案在資料量增加時仍保持高效和回應。
  • 錯誤處理:設計完善的錯誤處理機制,以應對資料載入過程中可能出現的問題。

實戰演練:準備本地環境進行資料載入活動

為了有效地利用本章節提供的資源和程式碼範例,請確保您的系統滿足以下技術要求:

  • 軟體需求:
    • 整合開發環境(IDE):推薦使用PyCharm作為Python開發的首選IDE。
    • Jupyter Notebooks:安裝Jupyter Notebooks以便進行互動式開發。
    • Python版本:安裝Python 3.6或更高版本。
    • Pipenv:安裝Pipenv以管理依賴項。

建立本地PostgreSQL資料函式庫

import psycopg2

# 連線到PostgreSQL資料函式庫
def create_database():
    try:
        # 建立連線
        conn = psycopg2.connect(
            database="postgres",
            user="your_username",
            password="your_password",
            host="localhost"
        )
        
        # 建立cursor物件
        cur = conn.cursor()
        
        # 建立新資料函式庫
        cur.execute("CREATE DATABASE mydatabase;")
        
        # 提交變更
        conn.commit()
        
        # 關閉cursor和連線
        cur.close()
        conn.close()
        
        print("資料函式庫建立成功!")
    
    except psycopg2.Error as e:
        print(f"建立資料函式庫時發生錯誤:{e}")

create_database()

#### 內容解密:

此段程式碼展示瞭如何使用Python的psycopg2函式庫連線到PostgreSQL資料函式庫並建立一個新的資料函式庫。首先,我們建立一個到PostgreSQL預設資料函式庫postgres的連線。然後,使用CREATE DATABASE SQL陳述式建立一個名為mydatabase的新資料函式庫。最後,提交變更並關閉連線。

資料載入最佳化:控制資料匯入方法

在資料處理流程中,資料載入是至關重要的一環。適當的資料載入策略不僅能提升系統的效能,還能確保資料的完整性和一致性。本章節將探討如何透過不同的資料載入策略來最佳化資料處理活動。

建立示範資料

為了更好地理解全量資料載入和增量資料載入的概念,我們將使用Python的sqlite3資料函式庫進行示範。首先,在PyCharm環境中的chapter_06/目錄下,開啟Loading_Transformed_Data.ipynb檔案,並在PyCharm終端機中執行以下命令來啟動Jupyter Notebook:

(venv) (base) usr@usr-MBP chapter_06 % jupyter notebook

確認Jupyter Notebook中包含以下程式碼:

# 匯入模組
import sqlite3

# 示範資料
laundry_mat_data = [
    {"product": "Detergent", "dollar_price_per_unit": 4.5, "quantity": 100, "total_cost": 200},
    {"product": "Dryer Sheets Box", "dollar_price_per_unit": 3.5, "quantity": 100, "total_cost": 350},
    {"product": "Washing Machine", "dollar_price_per_unit": 400, "quantity": 25, "total_cost": 10000},
    {"product": "Dryer", "dollar_price_per_unit": 400, "quantity": 25, "total_cost": 10000},
]

# 建立連線例項和遊標
conn = sqlite3.connect("laundry_mat.db")
cursor = conn.cursor()

程式碼解析:

  1. 匯入必要的模組:使用import sqlite3匯入sqlite3模組,以便與SQLite資料函式庫進行互動。
  2. 定義示範資料laundry_mat_data是一個包含多個字典的列表,每個字典代表一筆產品資料,包括產品名稱、單價、數量和總成本。
  3. 建立資料函式庫連線:使用sqlite3.connect()方法建立與SQLite資料函式庫的連線,並建立一個遊標物件cursor以執行SQL陳述式。

全量資料載入

理論上,資料處理流程的設計旨在一次性載入輸出資料。這種方法的好處在於能夠順暢地將資料從一個位置轉移到另一個位置,並在下一次迭代之前提供審查和改進的機會。

全量資料載入步驟:

  1. 清空現有資料

cursor.execute(“DELETE FROM laundry_mat”)

   這一步驟確保在載入新資料之前清空目標表格中的現有資料。

2. **插入新資料**:
   ```python
for record in laundry_mat_data:
    cursor.execute("INSERT INTO laundry_mat (product, dollar_price_per_unit, quantity, total_cost) VALUES (?, ?, ?, ?)",
                   (record["product"], record["dollar_price_per_unit"], record["quantity"], record["total_cost"]))

透過迴圈遍歷laundry_mat_data中的每筆記錄,並使用INSERT INTO陳述式將新資料插入到laundry_mat表格中。

程式碼解析:

  • 清空現有資料:使用DELETE FROM陳述式刪除laundry_mat表格中的所有現有資料,以確保載入新資料前表格為空。
  • 插入新資料:使用INSERT INTO陳述式將laundry_mat_data中的每筆記錄插入到laundry_mat表格中。引數化查詢(使用?作為佔位符)有助於防止SQL注入攻擊。

然而,在許多情況下,全量資料載入可能並不是最佳選擇,例如客戶需要持續存取完整資料集,或是處理每日更新的龐大且複雜的資料集(如金融資料)。在這些情況下,採用增量資料載入的方法更為合適。

增量資料載入

增量資料載入是一種將大型資料載入活動分解為更小、更易管理的區塊的方法,從而降低計算資源的負擔。

增量資料載入步驟:

for record in laundry_mat_data:
    cursor.execute("INSERT OR IGNORE INTO laundry_mat (product, dollar_price_per_unit, quantity, total_cost) VALUES (?, ?, ?, ?)",
                   (record["product"], record["dollar_price_per_unit"], record["quantity"], record["total_cost"]))

程式碼解析:

  • 增量載入新資料:使用INSERT OR IGNORE INTO陳述式插入新資料,如果遇到重複的記錄則忽略。這種方法避免了重複資料的插入,同時也減少了因重複插入而導致的錯誤。

實務考量

在設計資料載入流程時,需要考慮多種因素,包括資料的準確性、格式的一致性以及系統的擴充套件性。選擇適當的資料載入策略(全量或增量)取決於具體的業務需求和系統架構。

風險與挑戰:

  1. 錯誤處理:增量資料載入需要健全的錯誤處理機制,以管理不完整的資料載入和重複資料匯入的問題。
  2. 資料一致性:確保載入的資料與目標位置中現有的資料保持一致性和連貫性。
  3. 系統資源利用:最大化利用CPU資源和其他系統資源,以提高資料載入的效率。

最佳實踐:

  • 利用關係型資料函式倉管理系統(RDBMS)內建的批次載入工具,可以有效簡化資料載入流程。
  • 根據資料規則建立自定義索引,以提高查詢效率。
  • 結合上述策略,可以實作更平滑、更高效的資料載入過程,提升整體資料倉儲管理的效能。

綜上所述,選擇適當的資料載入策略對於確保資料處理流程的高效運作至關重要。無論是全量資料載入還是增量資料載入,都需要根據具體的業務需求和系統限制進行仔細設計和最佳化,以實作最佳的資料處理效果。