在資料驅動的時代,有效地擷取、轉換和清理資料是資料工程的核心任務。本文將探討如何運用 Python 和相關工具,從多元資料來源擷取資料,並進行必要的轉換和清理,最終構建穩固的 ETL 流程。首先,API 請求的 HTTP 狀態碼驗證是確保資料擷取可靠性的第一步。接著,文章將介紹如何處理不同型別的資料源,包含 CSV、Parquet 檔案、資料函式庫以及網頁資料。針對資料轉換環節,將詳細說明如何運用 Pandas 函式庫進行資料清洗、欄位標準化、缺失值處理等操作。最後,文章將整合這些步驟,演示如何建立一個完整的 ETL 流程,並強調日誌記錄和錯誤處理的重要性,以確保資料管線的穩定性和可維護性。同時,本文也將探討資料品質的重要性,包含準確性、一致性等導向,以確保資料的可靠性和可用性,進而支援商業決策。
資料擷取策略與實務
HTTP 狀態碼與 API 請求驗證
在進行 API 請求時,瞭解 HTTP 狀態碼至關重要。常見的狀態碼及其意義如下表所示:
| 狀態碼 | 一般含義 |
|---|---|
| 200 | 連線成功 |
| 400 | 請求錯誤/錯誤的請求/傳送的資料不正確 |
| 401 | 認證錯誤 |
| 403 | 存取被禁止 |
| 404 | 伺服器上找不到資源 |
只有狀態碼為 200 時,才表示連線成功。其他狀態碼均表示錯誤,應被視為可見且存在的錯誤,並阻止指令碼繼續執行,以避免安全威脅。以下條件陳述式用於驗證狀態碼是否為 200:
if apt_status == 200:
# 繼續執行
資料來源與擷取方法
資料可來源於多種通路,包括 CSV 檔案、Parquet 檔案、API、SQLite 資料函式庫和網頁資料等。
資料函式庫系統
資料函式庫系統主要分為關聯式資料函式庫(RDBMS)和非關聯式資料函式庫(non-RDBMS)。RDBMS,如 MySQL 和 Oracle,具有結構化的資料,以行和列的形式組織,這與 Pandas 中的 DataFrame 結構相似。非 RDBMS,如 MongoDB,則被視為非結構化資料函式庫,因為它們缺乏 RDBMS 中定義的資料表結構。這使得非 RDBMS 在儲存多樣化資料方面極具靈活性。
Python 自帶的 SQLite DB 是一個完整的關聯式資料函式庫,可以用於資料儲存和擷取。
網頁資料擷取
工程師經常需要從開放的網頁中「抓取」或收集資料,以建立 ETL 管道中的資料。我們將使用 Wikipedia 的網址來抓取各國的名義 GDP 經濟資料。
建立資料擷取管道
初始化環境與匯入模組
在 PyCharm 環境中,首先需要啟動 pipenv 環境,並建立一個名為 extraction 的目錄和一個名為 extraction_functional.py 的 Python 檔案。在指令碼中匯入必要的模組:
# 匯入模組
import json
import sqlite3
import certifi
import pandas as pd
import urllib3
資料擷取函式
分別建立函式來從 CSV 檔案、Parquet 檔案、API、SQLite 資料函式庫和網頁資料中匯入資料。最後,建立一個主函式來整合所有形式的資料匯入。
# 示例:從 CSV 檔案匯入資料
def import_csv_data(file_path):
try:
df = pd.read_csv(file_path)
logger.info(f'{file_path} : extracted {df.shape[0]} records from the file')
return df
except Exception as e:
logger.exception(f'{file_path} : - exception {e} encountered while extracting the file')
日誌記錄
日誌記錄是程式碼中捕捉錯誤或異常的重要部分。我們將引入日誌記錄機制,使程式碼可除錯。
組態日誌
建立一個通用的日誌組態,可以在所有 Python 模組中使用,而無需重複編寫樣板程式碼。日誌檔案可以用於分析故障或查詢程式碼中的錯誤。
# config.py
import logging.config
def log_config():
logging.config.dictConfig({
'version': 1,
'formatters': {
'default': {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
'formatter': 'default'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'logs/etl_pipeline.log',
'formatter': 'default'
}
},
'root': {
'level': 'INFO',
'handlers': ['console', 'file']
}
})
使用日誌記錄
在 extraction/__init__.py 檔案中匯入 log_config 並呼叫它:
from config import log_config
log_config()
在 extraction_functional_enhanced.py 指令碼中定義日誌例項,並在每個資料匯入函式中使用它來記錄相關資訊:
# 定義頂級模組日誌記錄器
logger = logging.getLogger(__name__)
# 在資料匯入函式中使用日誌記錄器
logger.info(f'{file_name} : extracted {df.shape[0]} records from the file')
logger.exception(f'{file_name} : - exception {e} encountered while extracting the file')
日誌記錄的重要性
日誌記錄幫助開發者追蹤錯誤、除錯程式碼,並最佳化程式效能。透過合理組態日誌級別和輸出目標,日誌記錄可以成為開發和維護階段的有力工具。
資料轉換與清洗:開發強大的ETL流程
在ETL(Extract, Transform, Load)或ELT(Extract, Load, Transform)資料處理流程中,資料轉換(Data Transformation)是至關重要的階段。這個階段決定了輸入資料能否有效地轉換成所需的輸出資料,從而影響整個資料處理流程的成效。本章將探討Python中各種資料轉換技術,幫助讀者掌握如何將資料轉換成所需的格式。
技術需求
在開始本章的內容之前,請確保您的系統滿足以下技術需求:
軟體需求:
- 整合開發環境(IDE):建議使用PyCharm作為主要的Python開發環境。
- Jupyter Notebooks:應安裝Jupyter Notebooks以便於進行互動式開發和測試。
- Python版本:需安裝Python 3.6或更高版本。
- Pipenv:建議安裝Pipenv來管理專案依賴。
GitHub儲存函式庫:
資料清洗與轉換
資料提取(Extraction)是為了選擇有意義的資料來支援所需輸出資料的產生。由於資料來源的多樣性,資料儲存定義的差異,以及其他不可控的變異,資料需要進行大量的修改。這種修改被稱為資料「清洗」(Cleansing)和「轉換」(Transformation),發生在ETL或ELT資料處理流程的「轉換」(Transform)階段。
資料清洗
資料清洗,也稱為資料清理或資料擦洗(Scrubbing),是用來描述對原始資料進行操作的過程。它是資料工程(Data Engineering)的一個基本導向,因為未經處理的資料往往非常雜亂,需要特定的修改來產生高品質的資料輸出。資料清洗過程涉及識別和修正或移除資料集中的錯誤、不準確性和不一致性,以提高其品質和可靠性。
import pandas as pd
# 建立一個範例DataFrame
data = {
'客戶ID': [1, 2, 2, 3, 4],
'姓名': ['張三', '李四', '李四', '王五', '趙六'],
'年齡': [25, 30, 30, 35, None]
}
df = pd.DataFrame(data)
# 顯示原始DataFrame
print("原始DataFrame:")
print(df)
#### 內容解密:
# 上述程式碼建立了一個包含客戶資訊的範例DataFrame,其中包含重複的客戶ID和缺失的年齡值。
# 移除重複的行
df.drop_duplicates(inplace=True)
# 顯示移除重複行後的DataFrame
print("\n移除重複行後的DataFrame:")
print(df)
#### 內容解密:
# 使用drop_duplicates()方法移除DataFrame中的重複行,inplace=True表示直接在原DataFrame上進行修改。
# 處理缺失值
df['年齡'].fillna(df['年齡'].mean(), inplace=True)
# 顯示處理缺失值後的DataFrame
print("\n處理缺失值後的DataFrame:")
print(df)
#### 內容解密:
# 使用fillna()方法將年齡列中的缺失值替換為該列的平均值,inplace=True表示直接在原DataFrame上進行修改。
資料轉換工作流程
- 資料檢查:首先檢查資料的品質和結構,以確定需要進行哪些轉換。
- 定義轉換規則:根據業務需求定義資料轉換的規則。
- 執行轉換:使用Python中的Pandas等函式庫執行資料轉換操作。
- 驗證結果:檢查轉換後的資料是否符合預期。
建立資料轉換活動
在Python中,可以使用Pandas函式庫來建立資料轉換活動。以下是一個簡單的例子,展示如何使用Pandas進行資料轉換:
import pandas as pd
# 建立一個範例DataFrame
data = {
'訂單ID': [1, 2, 3],
'訂單日期': ['2022-01-01', '2022-01-15', '2022-02-01'],
'金額': [100, 200, 300]
}
df = pd.DataFrame(data)
# 將訂單日期轉換為datetime型別
df['訂單日期'] = pd.to_datetime(df['訂單日期'])
# 提取月份和年份
df['月份'] = df['訂單日期'].dt.month
df['年份'] = df['訂單日期'].dt.year
# 顯示轉換後的DataFrame
print(df)
#### 內容解密:
# 上述程式碼首先將訂單日期列轉換為datetime型別,然後提取出月份和年份,並新增到DataFrame中。
結語
本章介紹了資料清洗和轉換的基本概念和技術,並提供了使用Python進行資料轉換的範例。透過這些技術,可以有效地將原始資料轉換成所需的格式,為後續的分析和應用提供高品質的資料。下一章將繼續探討ETL流程中的其他重要環節。
資料清理與轉換:ETL 流程的核心
在資料工程中,資料清理與轉換是確保資料品質和可靠性的關鍵步驟。Pandas 函式庫是進行資料清理的強大工具。以下範例展示如何使用 Pandas 清理包含缺失值的 DataFrame:
import pandas as pd
import numpy as np
# 建立包含缺失值的 DataFrame
df = pd.DataFrame({
'A': [1, 2, np.nan, 4],
'B': [5, np.nan, np.nan, 8],
'C': [9, 10, 11, 12]
})
# 使用平均值填補缺失值
df.fillna(df.mean(), inplace=True)
內容解密:
- 匯入必要的函式庫:首先,匯入
pandas和numpy函式庫,分別用於資料處理和數值運算。 - 建立 DataFrame:建立一個包含缺失值(
np.nan)的 DataFrame,用於模擬真實資料中的缺失情況。 fillna方法:使用fillna方法將缺失值替換為各列的平均值。這是一種常見的資料清理技術,能夠保持資料的整體趨勢。inplace=True:此引數表示直接在原 DataFrame 上進行修改,而不建立新的 DataFrame。
資料轉換
資料轉換是將資料從一種格式或結構轉換為另一種格式或結構的過程。這在不同系統間傳輸資料或準備資料進行分析時尤為重要。常見的資料轉換操作包括標準化、聚合和泛化。
例如,若要將某列的值標準化到 0 和 1 之間,可以使用以下程式碼:
# 假設 'A' 是需要標準化的列
df['A'] = (df['A'] - df['A'].min()) / (df['A'].max() - df['A'].min())
內容解密:
- 標準化公式:使用
(x - min) / (max - min)將列A的值標準化到 [0, 1] 範圍內。 - 最小值和最大值:
df['A'].min()和df['A'].max()分別取得列A的最小值和最大值,用於計算標準化值。 - 應用場景:標準化是機器學習和資料分析中常見的預處理步驟,能夠提高模型的穩定性和準確性。
ETL 流程中的資料清理與轉換
在 ETL(Extract, Transform, Load)流程中,建立動態的資料清理過程能夠有效避免程式碼冗餘,保持程式碼的 DRY(Don’t Repeat Yourself)原則。動態且可重複的資料清理和轉換活動有助於維護組織內資料的一致性。
資料準確性和一致性的重要性
- 資料準確性 指的是資料值與真實值的接近程度。資料不準確可能導致錯誤的商業決策和預測,進而影響對資料系統的信任。
- 資料一致性 確保不同系統和平台上的資料呈現統一的檢視,這對於營運效率和有效決策至關重要。
為了確保資料的準確性和一致性,資料工程師必須實施健全且可重複使用的資料驗證、清理和整合實踐。這不僅能夠消除不準確性和不一致性,還能提升資料管道的整體品質。
資料清理與轉換:開發可靠的資料管線
在建立資料管線的過程中,確保資料的準確性和一致性是至關重要的。這不僅關乎資料的品質,更直接影響到後續的決策制定和市場競爭力。資料品質問題可能源自多個方面,包括缺失值、錯誤資料、或不一致的資料結構。以下圖表展示了一些常見的資料品質問題:
此圖示顯示了常見的資料品質問題,包括缺失值、重複資料、格式不一致等。瞭解這些問題有助於我們更好地進行資料清理和轉換。
瞭解資料的下游應用
花時間和資源確保資料標註的準確性是資料管線建立中的關鍵步驟,但往往被工程師所忽視。通常,資料預處理的重擔主要落在資料分析師和資料科學家身上。然而,從源資料中擷取「真實值」應該被視為一種共同的責任。識別輸入資料源中的事實真相,有助於工程師瞭解需要新增哪些額外的標籤或進行哪些修改,以最佳化輸出資料的生成。
資料清理與轉換的方法
在啟動新的管線工作流程時,明確定義這些原則將有助於未來對資料轉換方法的改進。以下是一些常見的資料轉換方法:
此圖示列出了多種資料轉換方法,包括資料標準化、資料聚合、資料過濾等。這些方法有助於提高資料的品質和可用性。
在Python中進行資料清理與轉換的策略
Python豐富的資料處理函式庫,如Pandas和NumPy,使得檢測和修正資料中的不一致性、錯誤或缺失值變得更加便捷。在轉換過程中,資料被重新塑形、標準化或聚合,以滿足特定的需求。Python的靈活性使得複雜的轉換和操作(如合併資料集、分組資料或建立樞紐表)變得更加容易,這些操作對於進階分析或機器學習模型至關重要。
階段性任務:資料暫存的重要性
在轉換和清理之前,提取的資料會被送到一個稱為資料暫存區的臨時儲存區域。這樣做可以避免在處理過程中出現問題時需要重新提取資料。
在Python中進行資料清理與轉換的步驟
步驟1:資料探索與解讀
正如前一章節所提到的,輸出資料的有用性取決於輸入資料在新轉換中的準確表示。輸出資料所需的所有欄位都需要從輸入資料的屬性中對映,每一層資料處理都應設計為執行一組特定的任務,以產生已知的輸出資料需求。通常,需要多次轉換才能實作這一目標。可以將輸入資料和轉換函式視為食材:
此圖示將輸入資料和轉換函式比喻為食材,有助於理解資料轉換的過程。
讓我們以芝加哥市的道路交通事故資料為例。首先,我們需要將traffic_crashes.csv檔案匯入Jupyter Notebook中,以瞭解資料的基本情況。
import pandas as pd
df_crashes = pd.read_csv("data/traffic_crashes.csv")
df_crashes.head()
上述程式碼將輸出交通事故資料的前五行,如下所示:
此圖示顯示了芝加哥市交通事故資料的前五行,有助於瞭解資料的基本結構。
步驟2:檢查缺失或無效資料
在操作輸入資料之前,檢查資料是否損壞是一個重要的探索步驟。需要系統地識別源資料的完整性,這可以透過檢查缺失、無效或不一致的資料結構,或重複的資料條目來實作。透過識別可預測的「壞」資料,並在管線中建立補救措施,可以將其轉換為可用的初步值,以實作準確的資料對映。
我們可以使用Pandas內建的info()方法來檢查資料的品質:
df_crashes.info()
info()方法對於瞭解DataFrame的結構和評估資料的品質和完整性非常有用。它提供了欄位數量、資料型別、記憶體使用情況以及每個欄位的非空值數量等資訊,有助於識別缺失資料。
內容解密:
import pandas as pd:匯入Pandas函式庫,並將其簡稱為pd,以便於後續使用。df_crashes = pd.read_csv("data/traffic_crashes.csv"):使用Pandas的read_csv()函式讀取traffic_crashes.csv檔案,並將其儲存在df_crashes變數中。df_crashes.head():顯示df_crashesDataFrame的前五行,用於初步瞭解資料的基本結構。df_crashes.info():使用info()方法檢視DataFrame的詳細資訊,包括欄位數量、資料型別、記憶體使用情況以及每個欄位的非空值數量。
這些步驟和程式碼有助於我們更好地瞭解和清理資料,從而建立更可靠的資料管線。