返回文章列表

資料科學完整工作流程與技術方法論

深入探討資料科學完整工作流程與技術方法論,涵蓋 ETL 資料管線建置、資料清理與轉換、特徵工程實作、探索性資料分析、統計方法應用以及資料視覺化技術。透過完整的 Python 程式碼實作,包含 DataPipeline 與 FeatureEngineer 類別,協助讀者建立系統化的資料科學專案開發能力。

資料科學 資料工程 機器學習

資料科學作為當代最具影響力的技術領域之一,其核心價值在於從龐大且複雜的資料中萃取有意義的洞察,進而驅動商業決策與創新。然而,資料科學並非單純的技術堆疊,而是一套完整且系統化的方法論,從資料的擷取、轉換、載入,到清理、分析、視覺化,每一個環節都環環相扣,缺一不可。本文將深入探討資料科學的完整工作流程,涵蓋 ETL 資料管線建置、資料清理與轉換、特徵工程、探索性資料分析、統計方法應用以及資料視覺化等核心技術,並透過完整的 Python 程式碼實作,協助讀者建立系統化的資料科學專案開發能力。

資料科學工作流程概覽

資料科學專案的成功仰賴於一套結構化且可重複執行的工作流程。這個流程從業務問題的定義開始,經過資料的收集與處理,最終產出可行動的洞察或預測模型。理解這個完整的流程架構,是每一位資料科學家必備的基礎能力。

資料科學工作流程可以分為數個主要階段,每個階段都有其特定的目標與技術要求。首先是問題定義階段,在這個階段中,資料科學家需要與業務單位緊密合作,將模糊的業務需求轉化為明確的資料科學問題。這個步驟看似簡單,實則至關重要,因為問題定義的品質直接決定了後續分析的方向與價值。

接下來是資料收集階段,資料可能來自多種不同的來源,包括關聯式資料庫、NoSQL 資料庫、API 介面、檔案系統,甚至是即時串流資料。資料科學家需要設計適當的資料擷取策略,確保能夠取得完整且具代表性的資料集。在這個階段,ETL 流程扮演著關鍵角色,負責將分散各處的資料整合成統一的格式。

資料預處理階段是整個工作流程中最耗時的部分,據統計,資料科學家約有百分之六十至八十的時間都花費在這個階段。這個階段包含資料清理、缺失值處理、異常值偵測、資料轉換以及特徵工程等工作。高品質的資料預處理能夠顯著提升後續分析與建模的效果。

探索性資料分析階段讓資料科學家能夠深入了解資料的特性、分布與潛在的模式。透過描述性統計與視覺化技術,資料科學家可以發現資料中的趨勢、異常與關聯性,這些發現將指引後續的特徵選擇與模型設計。

以下的流程圖展示了資料科學完整工作流程的各個階段與其相互關係:

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "問題定義" as PD
rectangle "資料收集" as DC
rectangle "ETL 處理" as ETL
rectangle "資料清理" as CL
rectangle "特徵工程" as FE
rectangle "探索性資料分析" as EDA
rectangle "統計分析" as SA
rectangle "資料視覺化" as DV
rectangle "模型建置" as MB
rectangle "部署與監控" as DM

PD --> DC
DC --> ETL
ETL --> CL
CL --> FE
FE --> EDA
EDA --> SA
SA --> DV
DV --> MB
MB --> DM

@enduml

ETL 資料管線建置

ETL 是 Extract、Transform、Load 三個單字的縮寫,代表資料擷取、轉換與載入的完整流程。在現代資料科學專案中,建立穩健且可擴展的 ETL 管線是成功的基礎。一個設計良好的 ETL 管線能夠自動化資料處理流程,確保資料品質,並且支援大規模資料的處理需求。

資料擷取階段需要考慮多種資料來源的連接方式,包括資料庫連線、API 呼叫、檔案讀取以及串流資料接收。每種資料來源都有其特定的技術要求與最佳實踐,資料科學家需要根據實際情況選擇適當的擷取策略。

資料轉換階段是 ETL 流程的核心,在這個階段中,原始資料會經過一系列的處理步驟,包括格式轉換、資料型別調整、欄位合併與分割、數值計算以及邏輯判斷等。轉換的目標是將異質性的原始資料轉化為統一且標準化的格式,以便後續的分析與建模。

資料載入階段負責將處理完成的資料存入目標系統,這可能是資料倉儲、資料湖泊、或是分析用的資料市集。載入策略需要考慮資料量、更新頻率以及系統效能等因素,常見的載入模式包括全量載入、增量載入以及合併載入等。

以下是一個完整的 DataPipeline 類別實作,展示如何建立一個功能完備的 ETL 資料管線:

# 匯入必要的函式庫
# pandas 用於資料處理與分析
# numpy 用於數值運算
# logging 用於記錄管線執行狀態
# typing 用於型別提示,增加程式碼可讀性
import pandas as pd
import numpy as np
import logging
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime

# 設定日誌記錄器
# 日誌記錄對於監控 ETL 管線的執行狀態非常重要
# 可以幫助追蹤問題並進行效能分析
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

class DataPipeline:
    """
    資料管線類別

    這個類別封裝了完整的 ETL 流程,提供資料擷取、轉換、載入的功能。
    設計採用鏈式呼叫模式,讓使用者可以流暢地組合多個處理步驟。

    屬性:
        data: 儲存當前處理中的資料框架
        logger: 日誌記錄器實例
        transformations: 記錄已執行的轉換操作清單
        metadata: 儲存資料相關的元資料資訊
    """

    def __init__(self, name: str = "default_pipeline"):
        """
        初始化資料管線

        參數:
            name: 管線名稱,用於日誌記錄與識別
        """
        # 初始化資料儲存變數為 None
        # 資料將在擷取階段被載入
        self.data: Optional[pd.DataFrame] = None

        # 建立具名的日誌記錄器
        # 方便在多個管線同時執行時區分來源
        self.logger = logging.getLogger(name)

        # 記錄所有執行過的轉換操作
        # 用於追蹤資料處理歷程與除錯
        self.transformations: List[str] = []

        # 儲存資料的元資料資訊
        # 包含資料來源、處理時間、資料統計等
        self.metadata: Dict[str, Any] = {
            'pipeline_name': name,
            'created_at': datetime.now().isoformat(),
            'row_count': 0,
            'column_count': 0
        }

    def extract_from_csv(self, file_path: str, **kwargs) -> 'DataPipeline':
        """
        從 CSV 檔案擷取資料

        這個方法負責從 CSV 檔案中讀取資料,並將其載入到管線中。
        支援 pandas read_csv 的所有參數,提供最大的彈性。

        參數:
            file_path: CSV 檔案的路徑
            **kwargs: 傳遞給 pandas read_csv 的額外參數

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 記錄擷取操作的開始
        self.logger.info(f"開始從 CSV 檔案擷取資料: {file_path}")

        # 使用 pandas 讀取 CSV 檔案
        # **kwargs 允許傳入如 encoding、sep 等參數
        self.data = pd.read_csv(file_path, **kwargs)

        # 更新元資料
        self._update_metadata('csv_extract', file_path)

        # 記錄擷取成功的訊息
        self.logger.info(f"成功擷取 {len(self.data)} 筆資料,共 {len(self.data.columns)} 個欄位")

        # 回傳 self 以支援鏈式呼叫
        return self

    def extract_from_database(self, query: str, connection) -> 'DataPipeline':
        """
        從資料庫擷取資料

        執行 SQL 查詢並將結果載入到管線中。
        支援任何相容於 pandas read_sql 的資料庫連線。

        參數:
            query: SQL 查詢語句
            connection: 資料庫連線物件

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 記錄資料庫擷取操作
        self.logger.info("開始從資料庫擷取資料")

        # 使用 pandas 執行 SQL 查詢
        self.data = pd.read_sql(query, connection)

        # 更新元資料
        self._update_metadata('database_extract', query[:100])

        # 記錄成功訊息
        self.logger.info(f"成功從資料庫擷取 {len(self.data)} 筆資料")

        return self

    def transform(self, func: Callable[[pd.DataFrame], pd.DataFrame],
                  description: str = "custom_transform") -> 'DataPipeline':
        """
        執行自定義轉換函數

        這個方法提供最大的彈性,允許使用者定義任意的轉換邏輯。
        轉換函數接收 DataFrame 並回傳處理後的 DataFrame。

        參數:
            func: 轉換函數,接收 DataFrame 回傳 DataFrame
            description: 轉換操作的描述,用於追蹤

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 確保資料已被載入
        self._ensure_data_loaded()

        # 記錄轉換操作
        self.logger.info(f"執行轉換操作: {description}")

        # 執行轉換函數
        self.data = func(self.data)

        # 記錄轉換歷程
        self.transformations.append(description)

        # 更新元資料
        self._update_metadata('transform', description)

        return self

    def filter_rows(self, condition: str) -> 'DataPipeline':
        """
        根據條件篩選資料列

        使用 pandas query 語法篩選符合條件的資料。

        參數:
            condition: 篩選條件字串,使用 pandas query 語法

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 確保資料已被載入
        self._ensure_data_loaded()

        # 記錄篩選前的資料筆數
        original_count = len(self.data)

        # 執行篩選操作
        self.data = self.data.query(condition)

        # 計算被篩選掉的資料筆數
        filtered_count = original_count - len(self.data)

        # 記錄篩選結果
        self.logger.info(f"篩選條件: {condition},移除 {filtered_count} 筆資料")

        # 記錄轉換歷程
        self.transformations.append(f"filter: {condition}")

        return self

    def select_columns(self, columns: List[str]) -> 'DataPipeline':
        """
        選擇特定欄位

        從資料框架中選擇指定的欄位,移除其他欄位。

        參數:
            columns: 要保留的欄位名稱清單

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 確保資料已被載入
        self._ensure_data_loaded()

        # 選擇指定欄位
        self.data = self.data[columns]

        # 記錄操作
        self.logger.info(f"選擇 {len(columns)} 個欄位")
        self.transformations.append(f"select_columns: {columns}")

        return self

    def rename_columns(self, mapping: Dict[str, str]) -> 'DataPipeline':
        """
        重新命名欄位

        根據提供的對應表重新命名欄位。

        參數:
            mapping: 舊欄位名稱到新欄位名稱的對應字典

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 確保資料已被載入
        self._ensure_data_loaded()

        # 執行重新命名
        self.data = self.data.rename(columns=mapping)

        # 記錄操作
        self.logger.info(f"重新命名 {len(mapping)} 個欄位")
        self.transformations.append(f"rename_columns: {mapping}")

        return self

    def add_column(self, name: str, values) -> 'DataPipeline':
        """
        新增欄位

        在資料框架中新增一個欄位。

        參數:
            name: 新欄位名稱
            values: 欄位值,可以是常數、陣列或 Series

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 確保資料已被載入
        self._ensure_data_loaded()

        # 新增欄位
        self.data[name] = values

        # 記錄操作
        self.logger.info(f"新增欄位: {name}")
        self.transformations.append(f"add_column: {name}")

        return self

    def load_to_csv(self, file_path: str, **kwargs) -> 'DataPipeline':
        """
        將資料載入到 CSV 檔案

        將處理完成的資料輸出為 CSV 檔案。

        參數:
            file_path: 輸出檔案路徑
            **kwargs: 傳遞給 pandas to_csv 的額外參數

        回傳:
            self: 回傳管線實例,支援鏈式呼叫
        """
        # 確保資料已被載入
        self._ensure_data_loaded()

        # 記錄載入操作
        self.logger.info(f"開始將資料載入到 CSV 檔案: {file_path}")

        # 輸出為 CSV 檔案
        # 預設不輸出索引欄位
        if 'index' not in kwargs:
            kwargs['index'] = False

        self.data.to_csv(file_path, **kwargs)

        # 記錄成功訊息
        self.logger.info(f"成功將 {len(self.data)} 筆資料載入到 {file_path}")

        return self

    def get_data(self) -> pd.DataFrame:
        """
        取得當前處理中的資料

        回傳:
            當前的資料框架
        """
        self._ensure_data_loaded()
        return self.data.copy()

    def get_metadata(self) -> Dict[str, Any]:
        """
        取得管線元資料

        回傳:
            元資料字典
        """
        return self.metadata.copy()

    def get_transformations(self) -> List[str]:
        """
        取得轉換操作歷程

        回傳:
            轉換操作清單
        """
        return self.transformations.copy()

    def _ensure_data_loaded(self) -> None:
        """
        確保資料已被載入

        內部方法,用於檢查資料是否已載入。
        如果資料尚未載入,則拋出例外。
        """
        if self.data is None:
            raise ValueError("資料尚未載入,請先執行擷取操作")

    def _update_metadata(self, operation: str, details: str) -> None:
        """
        更新元資料

        內部方法,用於在每次操作後更新元資料。

        參數:
            operation: 操作類型
            details: 操作詳細資訊
        """
        if self.data is not None:
            self.metadata['row_count'] = len(self.data)
            self.metadata['column_count'] = len(self.data.columns)
            self.metadata['last_operation'] = operation
            self.metadata['last_operation_details'] = details
            self.metadata['updated_at'] = datetime.now().isoformat()

資料清理與品質控管

資料清理是資料科學流程中最為關鍵的環節之一,其品質直接影響後續分析與模型的可靠性。原始資料通常包含各種品質問題,包括缺失值、重複資料、異常值、格式不一致以及資料型別錯誤等。系統性的資料清理能夠顯著提升資料品質,確保分析結果的正確性。

缺失值處理是資料清理的首要任務。缺失值的產生原因多種多樣,可能是資料收集過程中的遺漏、系統錯誤、或是使用者未填寫等。處理缺失值的策略需要根據缺失的類型與比例來決定,常見的處理方法包括刪除含有缺失值的資料列、使用統計量填補如平均值或中位數、使用機器學習方法預測缺失值,以及使用特定值標記缺失等。

異常值偵測與處理同樣重要。異常值可能代表資料收集錯誤,也可能是真實但罕見的觀測值。判斷異常值是否應該被移除需要結合領域知識與統計方法。常用的異常值偵測方法包括 Z-score 方法、IQR 方法以及基於機器學習的方法如 Isolation Forest 等。

以下是資料清理流程的視覺化呈現:

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "原始資料" as RD
rectangle "缺失值分析" as MA
rectangle "缺失值處理" as MH
rectangle "重複資料偵測" as DD
rectangle "重複資料移除" as DR
rectangle "異常值偵測" as OD
rectangle "異常值處理" as OH
rectangle "資料型別轉換" as TC
rectangle "格式標準化" as FS
rectangle "清理後資料" as CD

RD --> MA
MA --> MH
MH --> DD
DD --> DR
DR --> OD
OD --> OH
OH --> TC
TC --> FS
FS --> CD

@enduml

以下是一個完整的資料清理類別實作:

# 匯入必要的函式庫
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Union
from scipy import stats

class DataCleaner:
    """
    資料清理類別

    這個類別提供完整的資料清理功能,包含缺失值處理、
    異常值偵測、重複資料移除以及資料型別轉換等。

    屬性:
        data: 待清理的資料框架
        cleaning_report: 清理操作報告
    """

    def __init__(self, data: pd.DataFrame):
        """
        初始化資料清理器

        參數:
            data: 待清理的資料框架
        """
        # 複製資料以避免修改原始資料
        self.data = data.copy()

        # 初始化清理報告
        # 用於記錄每個清理步驟的詳細資訊
        self.cleaning_report: List[Dict] = []

    def analyze_missing_values(self) -> pd.DataFrame:
        """
        分析缺失值

        計算每個欄位的缺失值數量與比例,
        協助決定適當的處理策略。

        回傳:
            缺失值分析報告 DataFrame
        """
        # 計算每個欄位的缺失值數量
        missing_count = self.data.isnull().sum()

        # 計算缺失值比例
        missing_percentage = (missing_count / len(self.data)) * 100

        # 取得每個欄位的資料型別
        dtypes = self.data.dtypes

        # 建立分析報告
        report = pd.DataFrame({
            'missing_count': missing_count,
            'missing_percentage': missing_percentage.round(2),
            'dtype': dtypes
        })

        # 按缺失比例排序
        report = report.sort_values('missing_percentage', ascending=False)

        return report

    def handle_missing_values(self, strategy: str = 'drop',
                              columns: Optional[List[str]] = None,
                              fill_value: Optional[Union[int, float, str]] = None,
                              threshold: float = 0.5) -> 'DataCleaner':
        """
        處理缺失值

        根據指定的策略處理資料中的缺失值。

        參數:
            strategy: 處理策略
                - 'drop': 刪除含有缺失值的資料列
                - 'drop_columns': 刪除缺失比例超過閾值的欄位
                - 'mean': 使用平均值填補(僅限數值欄位)
                - 'median': 使用中位數填補(僅限數值欄位)
                - 'mode': 使用眾數填補
                - 'constant': 使用常數值填補
                - 'ffill': 向前填補
                - 'bfill': 向後填補
            columns: 要處理的欄位清單,None 表示所有欄位
            fill_value: 當策略為 'constant' 時使用的填補值
            threshold: 當策略為 'drop_columns' 時的缺失比例閾值

        回傳:
            self: 回傳清理器實例,支援鏈式呼叫
        """
        # 記錄處理前的資料筆數
        original_rows = len(self.data)
        original_cols = len(self.data.columns)

        # 決定要處理的欄位
        if columns is None:
            columns = self.data.columns.tolist()

        # 根據策略處理缺失值
        if strategy == 'drop':
            # 刪除含有缺失值的資料列
            self.data = self.data.dropna(subset=columns)

        elif strategy == 'drop_columns':
            # 計算每個欄位的缺失比例
            missing_ratio = self.data[columns].isnull().sum() / len(self.data)
            # 找出缺失比例超過閾值的欄位
            cols_to_drop = missing_ratio[missing_ratio > threshold].index.tolist()
            # 刪除這些欄位
            self.data = self.data.drop(columns=cols_to_drop)

        elif strategy == 'mean':
            # 使用平均值填補數值欄位
            for col in columns:
                if self.data[col].dtype in ['int64', 'float64']:
                    self.data[col] = self.data[col].fillna(self.data[col].mean())

        elif strategy == 'median':
            # 使用中位數填補數值欄位
            for col in columns:
                if self.data[col].dtype in ['int64', 'float64']:
                    self.data[col] = self.data[col].fillna(self.data[col].median())

        elif strategy == 'mode':
            # 使用眾數填補
            for col in columns:
                mode_value = self.data[col].mode()
                if len(mode_value) > 0:
                    self.data[col] = self.data[col].fillna(mode_value[0])

        elif strategy == 'constant':
            # 使用常數值填補
            if fill_value is None:
                raise ValueError("使用 constant 策略時必須提供 fill_value")
            self.data[columns] = self.data[columns].fillna(fill_value)

        elif strategy == 'ffill':
            # 向前填補
            self.data[columns] = self.data[columns].ffill()

        elif strategy == 'bfill':
            # 向後填補
            self.data[columns] = self.data[columns].bfill()

        # 記錄清理操作
        self.cleaning_report.append({
            'operation': 'handle_missing_values',
            'strategy': strategy,
            'columns': columns,
            'rows_removed': original_rows - len(self.data),
            'columns_removed': original_cols - len(self.data.columns)
        })

        return self

    def detect_outliers(self, columns: List[str],
                        method: str = 'iqr',
                        threshold: float = 1.5) -> pd.DataFrame:
        """
        偵測異常值

        使用指定的方法偵測數值欄位中的異常值。

        參數:
            columns: 要偵測的欄位清單
            method: 偵測方法
                - 'iqr': 使用四分位距方法
                - 'zscore': 使用 Z-score 方法
            threshold: 判定閾值
                - IQR 方法: 預設 1.5
                - Z-score 方法: 預設 3

        回傳:
            異常值偵測報告 DataFrame
        """
        # 儲存偵測結果
        outlier_report = []

        for col in columns:
            # 確保欄位是數值型別
            if self.data[col].dtype not in ['int64', 'float64']:
                continue

            if method == 'iqr':
                # 計算四分位數
                Q1 = self.data[col].quantile(0.25)
                Q3 = self.data[col].quantile(0.75)
                IQR = Q3 - Q1

                # 計算上下界
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR

                # 找出異常值
                outliers = self.data[
                    (self.data[col] < lower_bound) |
                    (self.data[col] > upper_bound)
                ]

            elif method == 'zscore':
                # 計算 Z-score
                z_scores = np.abs(stats.zscore(self.data[col].dropna()))

                # 找出異常值
                outlier_indices = np.where(z_scores > threshold)[0]
                outliers = self.data.iloc[outlier_indices]

            # 記錄偵測結果
            outlier_report.append({
                'column': col,
                'method': method,
                'outlier_count': len(outliers),
                'outlier_percentage': (len(outliers) / len(self.data)) * 100
            })

        return pd.DataFrame(outlier_report)

    def remove_outliers(self, columns: List[str],
                        method: str = 'iqr',
                        threshold: float = 1.5) -> 'DataCleaner':
        """
        移除異常值

        移除指定欄位中的異常值。

        參數:
            columns: 要處理的欄位清單
            method: 偵測方法
            threshold: 判定閾值

        回傳:
            self: 回傳清理器實例,支援鏈式呼叫
        """
        # 記錄處理前的資料筆數
        original_count = len(self.data)

        for col in columns:
            # 確保欄位是數值型別
            if self.data[col].dtype not in ['int64', 'float64']:
                continue

            if method == 'iqr':
                # 計算四分位數
                Q1 = self.data[col].quantile(0.25)
                Q3 = self.data[col].quantile(0.75)
                IQR = Q3 - Q1

                # 計算上下界
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR

                # 保留正常值
                self.data = self.data[
                    (self.data[col] >= lower_bound) &
                    (self.data[col] <= upper_bound)
                ]

            elif method == 'zscore':
                # 計算 Z-score
                z_scores = np.abs(stats.zscore(self.data[col].dropna()))
                # 保留正常值
                self.data = self.data[z_scores <= threshold]

        # 記錄清理操作
        removed_count = original_count - len(self.data)
        self.cleaning_report.append({
            'operation': 'remove_outliers',
            'method': method,
            'columns': columns,
            'rows_removed': removed_count
        })

        return self

    def remove_duplicates(self, subset: Optional[List[str]] = None,
                          keep: str = 'first') -> 'DataCleaner':
        """
        移除重複資料

        參數:
            subset: 用於判斷重複的欄位清單,None 表示所有欄位
            keep: 保留策略
                - 'first': 保留第一筆
                - 'last': 保留最後一筆
                - False: 全部移除

        回傳:
            self: 回傳清理器實例,支援鏈式呼叫
        """
        # 記錄處理前的資料筆數
        original_count = len(self.data)

        # 移除重複資料
        self.data = self.data.drop_duplicates(subset=subset, keep=keep)

        # 記錄清理操作
        removed_count = original_count - len(self.data)
        self.cleaning_report.append({
            'operation': 'remove_duplicates',
            'subset': subset,
            'keep': keep,
            'rows_removed': removed_count
        })

        return self

    def convert_dtypes(self, type_mapping: Dict[str, str]) -> 'DataCleaner':
        """
        轉換資料型別

        參數:
            type_mapping: 欄位名稱到目標型別的對應字典

        回傳:
            self: 回傳清理器實例,支援鏈式呼叫
        """
        for col, dtype in type_mapping.items():
            if col in self.data.columns:
                try:
                    if dtype == 'datetime':
                        self.data[col] = pd.to_datetime(self.data[col])
                    elif dtype == 'category':
                        self.data[col] = self.data[col].astype('category')
                    else:
                        self.data[col] = self.data[col].astype(dtype)
                except Exception as e:
                    print(f"轉換欄位 {col}{dtype} 時發生錯誤: {e}")

        # 記錄清理操作
        self.cleaning_report.append({
            'operation': 'convert_dtypes',
            'type_mapping': type_mapping
        })

        return self

    def get_cleaned_data(self) -> pd.DataFrame:
        """
        取得清理後的資料

        回傳:
            清理後的資料框架
        """
        return self.data.copy()

    def get_cleaning_report(self) -> List[Dict]:
        """
        取得清理報告

        回傳:
            清理操作報告清單
        """
        return self.cleaning_report.copy()

特徵工程實作

特徵工程是資料科學中最具創造性的環節,其目標是從原始資料中創造出對預測目標更有價值的特徵。優秀的特徵工程能夠顯著提升模型效能,甚至比選擇更複雜的演算法更為有效。特徵工程需要結合領域知識、統計技巧以及創造力,是區分資深資料科學家與初學者的關鍵能力之一。

特徵工程的技術範疇相當廣泛,包含數值特徵處理、類別特徵編碼、時間特徵萃取、文字特徵處理以及特徵組合等。數值特徵處理包括標準化、正規化以及分箱等技術。類別特徵編碼則包括 Label Encoding、One-Hot Encoding 以及 Target Encoding 等方法。時間特徵萃取可以從日期時間資料中提取年、月、日、星期、小時等資訊,甚至可以計算時間差或判斷是否為假日等。

以下是一個完整的 FeatureEngineer 類別實作:

# 匯入必要的函式庫
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder

class FeatureEngineer:
    """
    特徵工程類別

    這個類別提供完整的特徵工程功能,包含數值特徵處理、
    類別特徵編碼、時間特徵萃取以及特徵組合等。

    屬性:
        data: 待處理的資料框架
        scalers: 儲存數值縮放器的字典
        encoders: 儲存編碼器的字典
        feature_names: 記錄新建立的特徵名稱
    """

    def __init__(self, data: pd.DataFrame):
        """
        初始化特徵工程器

        參數:
            data: 待處理的資料框架
        """
        # 複製資料以避免修改原始資料
        self.data = data.copy()

        # 儲存縮放器,用於後續對新資料進行相同的轉換
        self.scalers: Dict[str, object] = {}

        # 儲存編碼器
        self.encoders: Dict[str, object] = {}

        # 記錄新建立的特徵名稱
        self.feature_names: List[str] = []

    def scale_numeric(self, columns: List[str],
                      method: str = 'standard') -> 'FeatureEngineer':
        """
        縮放數值特徵

        對數值特徵進行標準化或正規化處理,
        使不同尺度的特徵能夠公平地被模型學習。

        參數:
            columns: 要縮放的欄位清單
            method: 縮放方法
                - 'standard': 標準化(Z-score)
                - 'minmax': 最小最大正規化

        回傳:
            self: 回傳工程器實例,支援鏈式呼叫
        """
        for col in columns:
            # 確保欄位存在且為數值型別
            if col not in self.data.columns:
                continue
            if self.data[col].dtype not in ['int64', 'float64']:
                continue

            # 根據方法選擇縮放器
            if method == 'standard':
                # 標準化:將資料轉換為平均值為 0、標準差為 1 的分布
                scaler = StandardScaler()
            elif method == 'minmax':
                # 最小最大正規化:將資料縮放到 0-1 之間
                scaler = MinMaxScaler()
            else:
                raise ValueError(f"不支援的縮放方法: {method}")

            # 執行縮放
            # reshape(-1, 1) 將一維陣列轉換為二維陣列
            scaled_values = scaler.fit_transform(
                self.data[col].values.reshape(-1, 1)
            )

            # 更新欄位值
            self.data[col] = scaled_values.flatten()

            # 儲存縮放器以供後續使用
            self.scalers[col] = scaler

        return self

    def encode_categorical(self, columns: List[str],
                           method: str = 'onehot',
                           drop_original: bool = True) -> 'FeatureEngineer':
        """
        編碼類別特徵

        將類別特徵轉換為數值形式,以便機器學習模型能夠處理。

        參數:
            columns: 要編碼的欄位清單
            method: 編碼方法
                - 'onehot': One-Hot Encoding
                - 'label': Label Encoding
                - 'frequency': 頻率編碼
            drop_original: 是否刪除原始欄位

        回傳:
            self: 回傳工程器實例,支援鏈式呼叫
        """
        for col in columns:
            # 確保欄位存在
            if col not in self.data.columns:
                continue

            if method == 'onehot':
                # One-Hot Encoding:為每個類別建立一個二元欄位
                # 使用 pandas get_dummies 函數
                dummies = pd.get_dummies(
                    self.data[col],
                    prefix=col,
                    dtype=int
                )

                # 將新欄位加入資料框架
                self.data = pd.concat([self.data, dummies], axis=1)

                # 記錄新特徵名稱
                self.feature_names.extend(dummies.columns.tolist())

                # 刪除原始欄位
                if drop_original:
                    self.data = self.data.drop(columns=[col])

            elif method == 'label':
                # Label Encoding:將類別轉換為整數
                encoder = LabelEncoder()

                # 處理缺失值
                # 先將缺失值填入特定值,編碼後再還原
                mask = self.data[col].isna()
                self.data[col] = self.data[col].fillna('_MISSING_')

                # 執行編碼
                self.data[col] = encoder.fit_transform(self.data[col])

                # 還原缺失值
                self.data.loc[mask, col] = np.nan

                # 儲存編碼器
                self.encoders[col] = encoder

            elif method == 'frequency':
                # 頻率編碼:使用類別出現的頻率作為編碼值
                frequency_map = self.data[col].value_counts(normalize=True)
                self.data[col + '_freq'] = self.data[col].map(frequency_map)

                # 記錄新特徵名稱
                self.feature_names.append(col + '_freq')

                # 刪除原始欄位
                if drop_original:
                    self.data = self.data.drop(columns=[col])

        return self

    def extract_datetime_features(self, column: str,
                                   features: Optional[List[str]] = None,
                                   drop_original: bool = True) -> 'FeatureEngineer':
        """
        萃取日期時間特徵

        從日期時間欄位中提取有用的特徵,如年、月、日、星期等。

        參數:
            column: 日期時間欄位名稱
            features: 要萃取的特徵清單
                可選值: 'year', 'month', 'day', 'weekday', 'hour',
                        'minute', 'quarter', 'is_weekend', 'day_of_year'
            drop_original: 是否刪除原始欄位

        回傳:
            self: 回傳工程器實例,支援鏈式呼叫
        """
        # 確保欄位存在
        if column not in self.data.columns:
            return self

        # 轉換為日期時間型別
        dt_col = pd.to_datetime(self.data[column])

        # 預設萃取所有特徵
        if features is None:
            features = ['year', 'month', 'day', 'weekday', 'hour',
                       'quarter', 'is_weekend', 'day_of_year']

        # 萃取各個特徵
        for feature in features:
            if feature == 'year':
                self.data[f'{column}_year'] = dt_col.dt.year
            elif feature == 'month':
                self.data[f'{column}_month'] = dt_col.dt.month
            elif feature == 'day':
                self.data[f'{column}_day'] = dt_col.dt.day
            elif feature == 'weekday':
                # 星期幾:0 是星期一,6 是星期日
                self.data[f'{column}_weekday'] = dt_col.dt.weekday
            elif feature == 'hour':
                self.data[f'{column}_hour'] = dt_col.dt.hour
            elif feature == 'minute':
                self.data[f'{column}_minute'] = dt_col.dt.minute
            elif feature == 'quarter':
                # 季度:1-4
                self.data[f'{column}_quarter'] = dt_col.dt.quarter
            elif feature == 'is_weekend':
                # 是否為週末
                self.data[f'{column}_is_weekend'] = (dt_col.dt.weekday >= 5).astype(int)
            elif feature == 'day_of_year':
                # 一年中的第幾天
                self.data[f'{column}_day_of_year'] = dt_col.dt.dayofyear

            # 記錄新特徵名稱
            self.feature_names.append(f'{column}_{feature}')

        # 刪除原始欄位
        if drop_original:
            self.data = self.data.drop(columns=[column])

        return self

    def create_polynomial_features(self, columns: List[str],
                                    degree: int = 2,
                                    include_interaction: bool = True) -> 'FeatureEngineer':
        """
        建立多項式特徵

        從數值特徵建立多項式特徵和交互特徵。

        參數:
            columns: 要處理的欄位清單
            degree: 多項式次數
            include_interaction: 是否包含交互特徵

        回傳:
            self: 回傳工程器實例,支援鏈式呼叫
        """
        # 建立多項式特徵
        for col in columns:
            if col not in self.data.columns:
                continue
            if self.data[col].dtype not in ['int64', 'float64']:
                continue

            # 建立次方特徵
            for d in range(2, degree + 1):
                new_col = f'{col}_pow{d}'
                self.data[new_col] = self.data[col] ** d
                self.feature_names.append(new_col)

        # 建立交互特徵
        if include_interaction and len(columns) > 1:
            for i in range(len(columns)):
                for j in range(i + 1, len(columns)):
                    col1, col2 = columns[i], columns[j]

                    # 確保兩個欄位都存在且為數值型別
                    if col1 not in self.data.columns or col2 not in self.data.columns:
                        continue
                    if (self.data[col1].dtype not in ['int64', 'float64'] or
                        self.data[col2].dtype not in ['int64', 'float64']):
                        continue

                    # 建立乘積交互特徵
                    new_col = f'{col1}_x_{col2}'
                    self.data[new_col] = self.data[col1] * self.data[col2]
                    self.feature_names.append(new_col)

        return self

    def create_binned_features(self, column: str,
                                bins: int = 5,
                                strategy: str = 'quantile',
                                labels: Optional[List[str]] = None) -> 'FeatureEngineer':
        """
        建立分箱特徵

        將連續數值特徵轉換為離散的分箱特徵。

        參數:
            column: 要分箱的欄位名稱
            bins: 分箱數量
            strategy: 分箱策略
                - 'quantile': 等頻分箱
                - 'uniform': 等距分箱
            labels: 分箱標籤

        回傳:
            self: 回傳工程器實例,支援鏈式呼叫
        """
        # 確保欄位存在且為數值型別
        if column not in self.data.columns:
            return self
        if self.data[column].dtype not in ['int64', 'float64']:
            return self

        # 建立新欄位名稱
        new_col = f'{column}_binned'

        if strategy == 'quantile':
            # 等頻分箱:每個箱子包含相同數量的資料
            self.data[new_col] = pd.qcut(
                self.data[column],
                q=bins,
                labels=labels,
                duplicates='drop'
            )
        elif strategy == 'uniform':
            # 等距分箱:每個箱子的範圍相同
            self.data[new_col] = pd.cut(
                self.data[column],
                bins=bins,
                labels=labels
            )

        # 記錄新特徵名稱
        self.feature_names.append(new_col)

        return self

    def create_aggregation_features(self, group_column: str,
                                     agg_column: str,
                                     agg_functions: List[str]) -> 'FeatureEngineer':
        """
        建立聚合特徵

        根據分組欄位計算聚合統計量作為新特徵。

        參數:
            group_column: 分組欄位名稱
            agg_column: 要聚合的欄位名稱
            agg_functions: 聚合函數清單
                可選值: 'mean', 'sum', 'min', 'max', 'std', 'count'

        回傳:
            self: 回傳工程器實例,支援鏈式呼叫
        """
        # 確保欄位存在
        if group_column not in self.data.columns or agg_column not in self.data.columns:
            return self

        # 計算聚合統計量
        for func in agg_functions:
            new_col = f'{agg_column}_by_{group_column}_{func}'

            # 計算分組聚合
            agg_values = self.data.groupby(group_column)[agg_column].transform(func)

            # 將聚合值加入資料框架
            self.data[new_col] = agg_values

            # 記錄新特徵名稱
            self.feature_names.append(new_col)

        return self

    def get_engineered_data(self) -> pd.DataFrame:
        """
        取得特徵工程後的資料

        回傳:
            處理後的資料框架
        """
        return self.data.copy()

    def get_new_features(self) -> List[str]:
        """
        取得新建立的特徵名稱

        回傳:
            新特徵名稱清單
        """
        return self.feature_names.copy()

    def get_scalers(self) -> Dict[str, object]:
        """
        取得縮放器

        回傳:
            縮放器字典
        """
        return self.scalers.copy()

    def get_encoders(self) -> Dict[str, object]:
        """
        取得編碼器

        回傳:
            編碼器字典
        """
        return self.encoders.copy()

探索性資料分析

探索性資料分析(Exploratory Data Analysis,簡稱 EDA)是資料科學流程中不可或缺的環節,其目標是透過各種統計方法與視覺化技術,深入了解資料的特性、分布、關聯性以及潛在的模式。EDA 不僅能夠幫助資料科學家熟悉資料,還能夠發現資料品質問題、驗證假設,並指引後續的特徵工程與模型設計方向。

描述性統計是 EDA 的基礎,包括集中趨勢的衡量如平均數、中位數、眾數,以及離散程度的衡量如變異數、標準差、四分位距等。這些統計量能夠快速呈現資料的基本特徵,幫助識別資料分布的形態與異常情況。

相關性分析是 EDA 的重要組成部分,用於探索變數之間的線性關係。Pearson 相關係數是最常用的相關性衡量指標,其值介於負一到一之間,正值表示正相關,負值表示負相關,絕對值越大表示相關性越強。然而,相關性不等於因果關係,這是資料科學家必須謹記的原則。

分布分析則關注資料的分布形態,包括偏度、峰度以及是否符合特定的理論分布如常態分布等。了解資料的分布特性對於選擇適當的統計方法與機器學習演算法至關重要。

以下是 EDA 流程的視覺化呈現:

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "資料載入" as DL
rectangle "資料概覽" as DO
rectangle "描述性統計" as DS
rectangle "分布分析" as DA
rectangle "相關性分析" as CA
rectangle "類別分析" as CatA
rectangle "時間序列分析" as TS
rectangle "視覺化報告" as VR

DL --> DO
DO --> DS
DS --> DA
DA --> CA
CA --> CatA
CatA --> TS
TS --> VR

@enduml

以下是一個完整的 EDA 類別實作:

# 匯入必要的函式庫
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
from scipy import stats

class ExploratoryAnalyzer:
    """
    探索性資料分析類別

    這個類別提供完整的 EDA 功能,包含描述性統計、
    分布分析、相關性分析以及資料概覽等。

    屬性:
        data: 待分析的資料框架
    """

    def __init__(self, data: pd.DataFrame):
        """
        初始化探索性分析器

        參數:
            data: 待分析的資料框架
        """
        self.data = data.copy()

    def get_overview(self) -> Dict:
        """
        取得資料概覽

        提供資料的基本資訊,包括資料筆數、欄位數、
        資料型別分布以及記憶體使用量等。

        回傳:
            資料概覽字典
        """
        # 計算各種概覽指標
        overview = {
            'total_rows': len(self.data),
            'total_columns': len(self.data.columns),
            'memory_usage_mb': self.data.memory_usage(deep=True).sum() / 1024 / 1024,
            'dtypes_distribution': self.data.dtypes.value_counts().to_dict(),
            'missing_values_total': self.data.isnull().sum().sum(),
            'duplicate_rows': self.data.duplicated().sum(),
            'numeric_columns': self.data.select_dtypes(include=[np.number]).columns.tolist(),
            'categorical_columns': self.data.select_dtypes(include=['object', 'category']).columns.tolist()
        }

        return overview

    def describe_numeric(self, percentiles: List[float] = None) -> pd.DataFrame:
        """
        數值欄位的描述性統計

        計算數值欄位的各種統計量,包括平均值、標準差、
        最小值、最大值、四分位數等。

        參數:
            percentiles: 要計算的百分位數清單

        回傳:
            描述性統計 DataFrame
        """
        # 設定預設的百分位數
        if percentiles is None:
            percentiles = [0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99]

        # 選擇數值欄位
        numeric_data = self.data.select_dtypes(include=[np.number])

        # 計算基本描述性統計
        desc = numeric_data.describe(percentiles=percentiles).T

        # 新增額外的統計量
        # 偏度:衡量分布的不對稱程度
        desc['skewness'] = numeric_data.skew()

        # 峰度:衡量分布的尖銳程度
        desc['kurtosis'] = numeric_data.kurtosis()

        # 缺失值數量
        desc['missing'] = numeric_data.isnull().sum()

        # 缺失值比例
        desc['missing_pct'] = (numeric_data.isnull().sum() / len(self.data)) * 100

        # 唯一值數量
        desc['unique'] = numeric_data.nunique()

        return desc

    def describe_categorical(self) -> pd.DataFrame:
        """
        類別欄位的描述性統計

        計算類別欄位的各種統計量,包括唯一值數量、
        眾數、眾數頻率等。

        回傳:
            描述性統計 DataFrame
        """
        # 選擇類別欄位
        categorical_data = self.data.select_dtypes(include=['object', 'category'])

        # 如果沒有類別欄位,回傳空 DataFrame
        if len(categorical_data.columns) == 0:
            return pd.DataFrame()

        # 儲存統計結果
        stats_list = []

        for col in categorical_data.columns:
            # 計算各種統計量
            col_stats = {
                'column': col,
                'unique_count': categorical_data[col].nunique(),
                'missing_count': categorical_data[col].isnull().sum(),
                'missing_pct': (categorical_data[col].isnull().sum() / len(self.data)) * 100,
                'mode': categorical_data[col].mode().iloc[0] if len(categorical_data[col].mode()) > 0 else None,
                'mode_count': categorical_data[col].value_counts().iloc[0] if len(categorical_data[col].value_counts()) > 0 else 0,
                'mode_pct': (categorical_data[col].value_counts().iloc[0] / len(self.data)) * 100 if len(categorical_data[col].value_counts()) > 0 else 0
            }
            stats_list.append(col_stats)

        return pd.DataFrame(stats_list).set_index('column')

    def calculate_correlation(self, method: str = 'pearson') -> pd.DataFrame:
        """
        計算相關性矩陣

        計算數值欄位之間的相關係數。

        參數:
            method: 相關係數計算方法
                - 'pearson': Pearson 相關係數
                - 'spearman': Spearman 等級相關係數
                - 'kendall': Kendall 等級相關係數

        回傳:
            相關性矩陣 DataFrame
        """
        # 選擇數值欄位
        numeric_data = self.data.select_dtypes(include=[np.number])

        # 計算相關性矩陣
        correlation_matrix = numeric_data.corr(method=method)

        return correlation_matrix

    def get_high_correlations(self, threshold: float = 0.7,
                               method: str = 'pearson') -> pd.DataFrame:
        """
        取得高相關性的特徵對

        找出相關係數絕對值超過閾值的特徵對。

        參數:
            threshold: 相關係數閾值
            method: 相關係數計算方法

        回傳:
            高相關性特徵對 DataFrame
        """
        # 計算相關性矩陣
        corr_matrix = self.calculate_correlation(method)

        # 取得上三角矩陣(避免重複)
        upper_triangle = corr_matrix.where(
            np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
        )

        # 找出高相關性的特徵對
        high_corr_pairs = []
        for col in upper_triangle.columns:
            for idx in upper_triangle.index:
                corr_value = upper_triangle.loc[idx, col]
                if pd.notna(corr_value) and abs(corr_value) >= threshold:
                    high_corr_pairs.append({
                        'feature_1': idx,
                        'feature_2': col,
                        'correlation': corr_value
                    })

        # 建立 DataFrame 並按相關係數絕對值排序
        result = pd.DataFrame(high_corr_pairs)
        if len(result) > 0:
            result = result.sort_values('correlation', key=abs, ascending=False)

        return result

    def analyze_distribution(self, column: str) -> Dict:
        """
        分析單一欄位的分布

        計算欄位的分布相關統計量,並進行常態性檢定。

        參數:
            column: 欄位名稱

        回傳:
            分布分析結果字典
        """
        # 確保欄位存在且為數值型別
        if column not in self.data.columns:
            raise ValueError(f"欄位 {column} 不存在")

        col_data = self.data[column].dropna()

        if self.data[column].dtype not in ['int64', 'float64']:
            raise ValueError(f"欄位 {column} 不是數值型別")

        # 計算分布統計量
        analysis = {
            'mean': col_data.mean(),
            'median': col_data.median(),
            'mode': col_data.mode().iloc[0] if len(col_data.mode()) > 0 else None,
            'std': col_data.std(),
            'variance': col_data.var(),
            'skewness': col_data.skew(),
            'kurtosis': col_data.kurtosis(),
            'min': col_data.min(),
            'max': col_data.max(),
            'range': col_data.max() - col_data.min()
        }

        # 進行常態性檢定(Shapiro-Wilk 檢定)
        # 當樣本數超過 5000 時,使用前 5000 筆資料
        if len(col_data) > 5000:
            sample_data = col_data.sample(5000, random_state=42)
        else:
            sample_data = col_data

        if len(sample_data) >= 3:
            shapiro_stat, shapiro_p = stats.shapiro(sample_data)
            analysis['shapiro_statistic'] = shapiro_stat
            analysis['shapiro_pvalue'] = shapiro_p
            analysis['is_normal'] = shapiro_p > 0.05

        return analysis

    def get_value_counts(self, column: str,
                         top_n: int = 10,
                         normalize: bool = False) -> pd.DataFrame:
        """
        取得欄位的值計數

        參數:
            column: 欄位名稱
            top_n: 顯示前 N 個值
            normalize: 是否顯示比例

        回傳:
            值計數 DataFrame
        """
        # 確保欄位存在
        if column not in self.data.columns:
            raise ValueError(f"欄位 {column} 不存在")

        # 計算值計數
        counts = self.data[column].value_counts(normalize=normalize)

        # 取前 N 個
        top_counts = counts.head(top_n)

        # 轉換為 DataFrame
        result = pd.DataFrame({
            'value': top_counts.index,
            'count' if not normalize else 'proportion': top_counts.values
        })

        return result

    def calculate_statistics_by_group(self, group_column: str,
                                       value_column: str) -> pd.DataFrame:
        """
        計算分組統計量

        根據分組欄位計算數值欄位的各種統計量。

        參數:
            group_column: 分組欄位名稱
            value_column: 數值欄位名稱

        回傳:
            分組統計 DataFrame
        """
        # 確保欄位存在
        if group_column not in self.data.columns or value_column not in self.data.columns:
            raise ValueError("指定的欄位不存在")

        # 計算分組統計量
        grouped_stats = self.data.groupby(group_column)[value_column].agg([
            'count', 'mean', 'std', 'min', 'max', 'median'
        ]).reset_index()

        return grouped_stats

統計分析方法

統計分析是資料科學的理論基礎,提供了嚴謹的方法論來從資料中得出可靠的結論。統計方法可以分為描述性統計與推論統計兩大類。描述性統計用於摘要和呈現資料的特徵,而推論統計則用於從樣本資料推論母體的特性。

假設檢定是推論統計的核心技術,用於判斷觀察到的效果是否具有統計顯著性。假設檢定的基本流程包括設定虛無假設與對立假設、選擇適當的檢定方法、計算檢定統計量、決定 p 值,以及根據顯著水準做出結論。常用的假設檢定方法包括 t 檢定、卡方檢定、ANOVA 等。

信賴區間是另一個重要的統計概念,它提供了母體參數的區間估計,而非單一的點估計。信賴區間的寬度反映了估計的精確程度,信賴水準則表示區間包含真實參數值的機率。

以下是一個完整的統計分析類別實作:

# 匯入必要的函式庫
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
from scipy import stats

class StatisticalAnalyzer:
    """
    統計分析類別

    這個類別提供完整的統計分析功能,包含假設檢定、
    信賴區間計算以及 ANOVA 分析等。

    屬性:
        data: 待分析的資料框架
    """

    def __init__(self, data: pd.DataFrame):
        """
        初始化統計分析器

        參數:
            data: 待分析的資料框架
        """
        self.data = data.copy()

    def t_test_one_sample(self, column: str,
                          population_mean: float,
                          alpha: float = 0.05) -> Dict:
        """
        單一樣本 t 檢定

        檢定樣本平均值是否與假設的母體平均值有顯著差異。

        參數:
            column: 欄位名稱
            population_mean: 假設的母體平均值
            alpha: 顯著水準

        回傳:
            檢定結果字典
        """
        # 取得資料並移除缺失值
        sample_data = self.data[column].dropna()

        # 執行單一樣本 t 檢定
        t_stat, p_value = stats.ttest_1samp(sample_data, population_mean)

        # 計算信賴區間
        sample_mean = sample_data.mean()
        sample_std = sample_data.std(ddof=1)
        n = len(sample_data)

        # 計算標準誤
        se = sample_std / np.sqrt(n)

        # 計算 t 臨界值
        t_critical = stats.t.ppf(1 - alpha / 2, df=n - 1)

        # 計算信賴區間
        ci_lower = sample_mean - t_critical * se
        ci_upper = sample_mean + t_critical * se

        # 整理結果
        result = {
            'test_type': 'One-Sample t-test',
            'sample_size': n,
            'sample_mean': sample_mean,
            'population_mean': population_mean,
            't_statistic': t_stat,
            'p_value': p_value,
            'alpha': alpha,
            'confidence_interval': (ci_lower, ci_upper),
            'reject_null': p_value < alpha,
            'conclusion': '拒絕虛無假設,樣本平均值與母體平均值有顯著差異'
                         if p_value < alpha
                         else '無法拒絕虛無假設,樣本平均值與母體平均值無顯著差異'
        }

        return result

    def t_test_two_samples(self, column1: str, column2: str,
                           equal_var: bool = True,
                           alpha: float = 0.05) -> Dict:
        """
        兩獨立樣本 t 檢定

        檢定兩個獨立樣本的平均值是否有顯著差異。

        參數:
            column1: 第一個樣本的欄位名稱
            column2: 第二個樣本的欄位名稱
            equal_var: 是否假設兩樣本變異數相等
            alpha: 顯著水準

        回傳:
            檢定結果字典
        """
        # 取得資料並移除缺失值
        sample1 = self.data[column1].dropna()
        sample2 = self.data[column2].dropna()

        # 執行兩獨立樣本 t 檢定
        t_stat, p_value = stats.ttest_ind(sample1, sample2, equal_var=equal_var)

        # 整理結果
        result = {
            'test_type': 'Two-Sample t-test' if equal_var else "Welch's t-test",
            'sample1_size': len(sample1),
            'sample2_size': len(sample2),
            'sample1_mean': sample1.mean(),
            'sample2_mean': sample2.mean(),
            'sample1_std': sample1.std(ddof=1),
            'sample2_std': sample2.std(ddof=1),
            't_statistic': t_stat,
            'p_value': p_value,
            'alpha': alpha,
            'reject_null': p_value < alpha,
            'conclusion': '拒絕虛無假設,兩樣本平均值有顯著差異'
                         if p_value < alpha
                         else '無法拒絕虛無假設,兩樣本平均值無顯著差異'
        }

        return result

    def paired_t_test(self, column1: str, column2: str,
                      alpha: float = 0.05) -> Dict:
        """
        配對樣本 t 檢定

        檢定配對樣本的平均差異是否顯著。

        參數:
            column1: 第一個配對測量的欄位名稱
            column2: 第二個配對測量的欄位名稱
            alpha: 顯著水準

        回傳:
            檢定結果字典
        """
        # 取得資料並移除任一欄位有缺失值的資料列
        paired_data = self.data[[column1, column2]].dropna()
        sample1 = paired_data[column1]
        sample2 = paired_data[column2]

        # 執行配對樣本 t 檢定
        t_stat, p_value = stats.ttest_rel(sample1, sample2)

        # 計算差異
        differences = sample1 - sample2

        # 整理結果
        result = {
            'test_type': 'Paired t-test',
            'sample_size': len(paired_data),
            'mean_difference': differences.mean(),
            'std_difference': differences.std(ddof=1),
            't_statistic': t_stat,
            'p_value': p_value,
            'alpha': alpha,
            'reject_null': p_value < alpha,
            'conclusion': '拒絕虛無假設,配對樣本的平均差異顯著'
                         if p_value < alpha
                         else '無法拒絕虛無假設,配對樣本的平均差異不顯著'
        }

        return result

    def chi_square_test(self, column1: str, column2: str,
                        alpha: float = 0.05) -> Dict:
        """
        卡方獨立性檢定

        檢定兩個類別變數是否獨立。

        參數:
            column1: 第一個類別變數的欄位名稱
            column2: 第二個類別變數的欄位名稱
            alpha: 顯著水準

        回傳:
            檢定結果字典
        """
        # 建立列聯表
        contingency_table = pd.crosstab(self.data[column1], self.data[column2])

        # 執行卡方檢定
        chi2, p_value, dof, expected = stats.chi2_contingency(contingency_table)

        # 計算 Cramer's V(效果量)
        n = contingency_table.sum().sum()
        min_dim = min(contingency_table.shape[0] - 1, contingency_table.shape[1] - 1)
        cramers_v = np.sqrt(chi2 / (n * min_dim)) if min_dim > 0 else 0

        # 整理結果
        result = {
            'test_type': 'Chi-Square Test of Independence',
            'chi2_statistic': chi2,
            'p_value': p_value,
            'degrees_of_freedom': dof,
            'cramers_v': cramers_v,
            'alpha': alpha,
            'reject_null': p_value < alpha,
            'conclusion': '拒絕虛無假設,兩變數之間存在顯著關聯'
                         if p_value < alpha
                         else '無法拒絕虛無假設,兩變數之間無顯著關聯'
        }

        return result

    def one_way_anova(self, value_column: str, group_column: str,
                      alpha: float = 0.05) -> Dict:
        """
        單因子變異數分析

        檢定多個群組的平均值是否有顯著差異。

        參數:
            value_column: 數值變數的欄位名稱
            group_column: 群組變數的欄位名稱
            alpha: 顯著水準

        回傳:
            檢定結果字典
        """
        # 取得各群組的資料
        groups = []
        group_names = self.data[group_column].unique()

        for name in group_names:
            group_data = self.data[self.data[group_column] == name][value_column].dropna()
            groups.append(group_data)

        # 執行單因子 ANOVA
        f_stat, p_value = stats.f_oneway(*groups)

        # 計算各群組的描述性統計
        group_stats = self.data.groupby(group_column)[value_column].agg([
            'count', 'mean', 'std'
        ]).reset_index()

        # 整理結果
        result = {
            'test_type': 'One-Way ANOVA',
            'f_statistic': f_stat,
            'p_value': p_value,
            'number_of_groups': len(group_names),
            'group_statistics': group_stats.to_dict('records'),
            'alpha': alpha,
            'reject_null': p_value < alpha,
            'conclusion': '拒絕虛無假設,至少有一個群組的平均值與其他群組不同'
                         if p_value < alpha
                         else '無法拒絕虛無假設,各群組的平均值無顯著差異'
        }

        return result

    def calculate_confidence_interval(self, column: str,
                                       confidence_level: float = 0.95) -> Tuple[float, float]:
        """
        計算平均值的信賴區間

        參數:
            column: 欄位名稱
            confidence_level: 信賴水準

        回傳:
            信賴區間的下界和上界
        """
        # 取得資料並移除缺失值
        sample_data = self.data[column].dropna()

        # 計算樣本統計量
        sample_mean = sample_data.mean()
        sample_std = sample_data.std(ddof=1)
        n = len(sample_data)

        # 計算標準誤
        se = sample_std / np.sqrt(n)

        # 計算 alpha
        alpha = 1 - confidence_level

        # 計算 t 臨界值
        t_critical = stats.t.ppf(1 - alpha / 2, df=n - 1)

        # 計算信賴區間
        ci_lower = sample_mean - t_critical * se
        ci_upper = sample_mean + t_critical * se

        return (ci_lower, ci_upper)

資料視覺化技術

資料視覺化是將資料轉化為圖形表示的過程,其目的是透過視覺元素如位置、長度、面積、顏色等,讓資料中的模式、趨勢與異常能夠一目了然。有效的資料視覺化能夠快速傳達複雜的資訊,是溝通分析結果的重要工具。

選擇適當的圖表類型是資料視覺化的關鍵。不同的圖表類型適合呈現不同類型的資料與關係。散點圖適合呈現兩個數值變數之間的關係,直方圖適合呈現數值變數的分布,長條圖適合比較不同類別的數量或比例,折線圖適合呈現時間序列資料的趨勢,而熱力圖則適合呈現相關性矩陣或多維度資料的概覽。

以下的圖表展示了不同圖表類型的適用場景:

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "資料視覺化" as DV
rectangle "散點圖" as SP
rectangle "直方圖" as HG
rectangle "長條圖" as BC
rectangle "折線圖" as LC
rectangle "熱力圖" as HM
rectangle "箱型圖" as BP

DV --> SP : 兩變數關係
DV --> HG : 分布呈現
DV --> BC : 類別比較
DV --> LC : 時間趨勢
DV --> HM : 相關性矩陣
DV --> BP : 分布與異常值

@enduml

整合應用範例

在實際的資料科學專案中,上述的各個技術環節需要緊密配合,形成一個完整的工作流程。以下是一個整合應用的範例,展示如何將 ETL、資料清理、特徵工程、EDA 以及統計分析串接起來:

# 匯入必要的函式庫
import pandas as pd
import numpy as np

# 假設我們有一個客戶貸款資料集
# 以下示範完整的資料科學工作流程

# 步驟一:使用 DataPipeline 進行 ETL
# 建立資料管線實例
pipeline = DataPipeline(name="loan_analysis_pipeline")

# 從 CSV 檔案擷取資料
# 這裡假設資料存放在 data/loan_data.csv
pipeline.extract_from_csv("data/loan_data.csv")

# 執行資料轉換
# 選擇需要的欄位並重新命名
pipeline.select_columns([
    'customer_id', 'age', 'income', 'loan_amount',
    'credit_score', 'employment_status', 'loan_status', 'application_date'
])

pipeline.rename_columns({
    'customer_id': 'id',
    'loan_status': 'is_approved'
})

# 取得處理後的資料
df = pipeline.get_data()

# 步驟二:使用 DataCleaner 進行資料清理
# 建立資料清理器實例
cleaner = DataCleaner(df)

# 分析缺失值
missing_report = cleaner.analyze_missing_values()
print("缺失值分析報告:")
print(missing_report)

# 處理缺失值
# 數值欄位使用中位數填補
cleaner.handle_missing_values(
    strategy='median',
    columns=['age', 'income', 'loan_amount', 'credit_score']
)

# 類別欄位使用眾數填補
cleaner.handle_missing_values(
    strategy='mode',
    columns=['employment_status']
)

# 偵測並移除異常值
outlier_report = cleaner.detect_outliers(
    columns=['income', 'loan_amount'],
    method='iqr',
    threshold=1.5
)
print("\n異常值偵測報告:")
print(outlier_report)

# 移除異常值
cleaner.remove_outliers(
    columns=['income', 'loan_amount'],
    method='iqr',
    threshold=3  # 使用較寬鬆的閾值
)

# 移除重複資料
cleaner.remove_duplicates(subset=['id'])

# 取得清理後的資料
df_cleaned = cleaner.get_cleaned_data()

# 步驟三:使用 FeatureEngineer 進行特徵工程
# 建立特徵工程器實例
engineer = FeatureEngineer(df_cleaned)

# 縮放數值特徵
engineer.scale_numeric(
    columns=['income', 'loan_amount', 'credit_score'],
    method='standard'
)

# 編碼類別特徵
engineer.encode_categorical(
    columns=['employment_status'],
    method='onehot'
)

# 萃取日期時間特徵
engineer.extract_datetime_features(
    column='application_date',
    features=['year', 'month', 'weekday', 'is_weekend']
)

# 建立多項式特徵
engineer.create_polynomial_features(
    columns=['age', 'credit_score'],
    degree=2,
    include_interaction=True
)

# 建立分箱特徵
engineer.create_binned_features(
    column='income',
    bins=5,
    strategy='quantile',
    labels=['very_low', 'low', 'medium', 'high', 'very_high']
)

# 取得特徵工程後的資料
df_engineered = engineer.get_engineered_data()

# 步驟四:使用 ExploratoryAnalyzer 進行 EDA
# 建立探索性分析器實例
analyzer = ExploratoryAnalyzer(df_engineered)

# 取得資料概覽
overview = analyzer.get_overview()
print("\n資料概覽:")
for key, value in overview.items():
    print(f"  {key}: {value}")

# 數值欄位描述性統計
numeric_stats = analyzer.describe_numeric()
print("\n數值欄位描述性統計:")
print(numeric_stats)

# 計算相關性矩陣
correlation_matrix = analyzer.calculate_correlation()
print("\n相關性矩陣:")
print(correlation_matrix)

# 找出高相關性特徵對
high_corr = analyzer.get_high_correlations(threshold=0.7)
print("\n高相關性特徵對:")
print(high_corr)

# 步驟五:使用 StatisticalAnalyzer 進行統計分析
# 建立統計分析器實例
stat_analyzer = StatisticalAnalyzer(df_cleaned)

# 進行單一樣本 t 檢定
# 檢定平均信用分數是否與假設值 650 有顯著差異
t_test_result = stat_analyzer.t_test_one_sample(
    column='credit_score',
    population_mean=650,
    alpha=0.05
)
print("\n單一樣本 t 檢定結果:")
for key, value in t_test_result.items():
    print(f"  {key}: {value}")

# 計算平均收入的信賴區間
ci = stat_analyzer.calculate_confidence_interval(
    column='income',
    confidence_level=0.95
)
print(f"\n平均收入的 95% 信賴區間: ({ci[0]:.2f}, {ci[1]:.2f})")

# 輸出處理摘要
print("\n資料處理完成")
print(f"原始資料筆數: {len(df)}")
print(f"清理後資料筆數: {len(df_cleaned)}")
print(f"特徵工程後欄位數: {len(df_engineered.columns)}")
print(f"新建立的特徵數: {len(engineer.get_new_features())}")

總結

資料科學工作流程是一套系統化的方法論,從問題定義到最終的洞察產出,每一個環節都需要嚴謹的技術執行與深入的領域理解。本文詳細介紹了 ETL 資料管線建置、資料清理與品質控管、特徵工程、探索性資料分析、統計方法以及資料視覺化等核心技術,並提供了完整的 Python 程式碼實作。

在實務應用中,資料科學家需要根據具體的業務場景與資料特性,靈活運用這些技術。資料品質是分析結果可靠性的根本,任何資料缺失或錯誤都可能導致偏差,影響最終結果的可信度。特徵工程則是提升模型效能的關鍵,好的特徵往往比複雜的演算法更為重要。統計方法提供了嚴謹的理論基礎,確保從資料中得出的結論具有統計顯著性。

隨著資料量的持續增長與技術的不斷演進,自動化機器學習與 AutoML 正在改變資料科學的工作模式。然而,資料的詮釋與應用仍然需要人類的智慧與判斷。資料科學家除了掌握工具和技術,更需要具備批判性思維與領域知識,才能在資料洪流中萃取真正有價值的商業洞察。未來,資料治理與模型可解釋性將成為越來越重要的議題,確保資料分析的透明度與可信度,是在資料驅動時代保持競爭優勢的關鍵。