返回文章列表

Python 資料分析與視覺化完全指南:從探索性分析到商業決策應用

深入探討 Python 資料分析與視覺化的完整技術棧,從 Pandas 的資料處理技巧到 Matplotlib、Seaborn、Plotly 的視覺化實作,涵蓋探索性資料分析方法論、信用風險評估模型建構、員工績效分析系統設計,並展示如何將資料洞察轉化為可執行的商業決策

資料科學 資料分析 視覺化

前言

資料分析與視覺化是現代商業智慧的核心能力,它們將海量的原始資料轉化為可理解的洞察,為決策提供堅實的依據。在資料爆炸的時代,企業每天都在產生與收集大量資料,從客戶交易記錄到營運指標,從市場趨勢到競爭對手動態。然而,資料本身並不等於價值,只有透過系統化的分析與有效的視覺化呈現,才能從資料中提取出真正的商業洞察。

資料分析的過程遠比表面看起來複雜。原始資料往往包含缺失值、異常值、不一致的格式,需要經過仔細的清理與轉換才能進行分析。資料的分布特性、變數間的關係、潛在的模式與異常,都需要透過統計方法與視覺化技術來揭示。探索性資料分析(Exploratory Data Analysis, EDA)作為資料分析的第一步,透過描述性統計與圖表來理解資料的基本特性,為後續的深入分析奠定基礎。

視覺化是資料分析中不可或缺的環節。人類的視覺系統能夠快速識別模式、趨勢與異常,一個設計良好的圖表往往比複雜的統計報告更能傳達資料的本質。選擇合適的視覺化類型至關重要,長條圖適合比較類別資料,折線圖展示時間序列趨勢,散點圖揭示變數間的關係,箱形圖顯示資料的分布與離群值。每種視覺化類型都有其特定的適用場景與限制,理解這些特性是創建有效視覺化的前提。

Python 作為資料科學領域的首選語言,提供了豐富的資料分析與視覺化工具。Pandas 是資料處理的核心函式庫,提供了強大的資料結構與操作方法,讓資料清理、轉換、聚合變得簡單直觀。Matplotlib 是 Python 視覺化的基石,提供了底層的繪圖能力與高度的客製化自由度。Seaborn 建構於 Matplotlib 之上,提供了更高階的統計視覺化介面與美觀的預設樣式。Plotly 則提供了互動式視覺化能力,讓使用者能夠與圖表互動,深入探索資料細節。

資料分析與視覺化的應用範圍極為廣泛。在金融領域,信用風險評估透過分析借款人的信用評分、收入、負債比率等指標,預測違約機率,協助放貸決策。在人力資源管理中,員工績效分析透過考核資料、出勤記錄、專案參與度等資訊,識別高績效員工特徵,優化人才培養策略。在市場行銷中,客戶分群分析透過消費行為、人口統計資料、互動記錄,將客戶分為不同群組,制定個性化的行銷策略。

然而,資料分析與視覺化也面臨著挑戰。資料品質問題可能導致錯誤的結論,選擇不當的分析方法可能遺漏重要的洞察,過度複雜的視覺化可能讓觀眾無所適從。此外,資料分析結果的解釋需要結合業務知識,純粹的統計分析可能忽略重要的業務脈絡。如何在技術能力與業務理解之間取得平衡,是資料分析師需要持續精進的核心能力。

本文將系統化地探討 Python 資料分析與視覺化的完整技術棧。從 Pandas 的資料處理基礎開始,深入剖析資料清理、轉換、聚合的實務技巧。接著探討探索性資料分析的方法論,展示如何透過描述性統計與視覺化理解資料特性。然後深入多種視覺化工具的應用,比較 Matplotlib、Seaborn、Plotly 的特色與適用場景。最後透過信用風險評估與員工績效分析兩個實際案例,展示如何將資料分析與視覺化技術應用於商業決策。透過完整的程式碼範例與最佳實踐建議,讀者將能夠掌握從資料到洞察的完整分析流程。

Pandas 資料處理基礎:清理、轉換與聚合

Pandas 是 Python 資料分析的核心工具,它提供了兩個主要的資料結構:Series(一維陣列)與 DataFrame(二維表格)。DataFrame 是最常用的資料結構,它類似於 Excel 的工作表或關聯式資料庫的表格,每一列代表一筆觀察記錄,每一欄代表一個變數。Pandas 的強大之處在於它提供了豐富的方法來操作這些資料結構,讓複雜的資料處理任務變得簡單高效。

資料清理是資料分析的第一步,也是最耗時的步驟。真實世界的資料往往包含各種品質問題,缺失值是最常見的問題之一。Pandas 提供了多種處理缺失值的方法,可以選擇刪除包含缺失值的列或欄,也可以使用統計量(平均值、中位數)或插值方法來填補缺失值。選擇哪種策略取決於資料的特性與分析目標,刪除資料可能導致資訊損失,而不當的填補可能引入偏差。

異常值是另一個重要的資料品質問題。異常值可能是真實的極端情況,也可能是資料錄入錯誤。識別異常值的常用方法包含箱形圖、Z-score、IQR(四分位距)等。對於識別出的異常值,可以選擇刪除、替換或單獨處理,這需要結合業務知識來判斷。資料格式不一致也是常見問題,例如日期欄位可能有多種格式,文字欄位可能包含前後空白,類別變數可能有拼寫錯誤。Pandas 提供了豐富的字串處理與類型轉換方法來解決這些問題。

讓我們透過一個完整的範例來理解 Pandas 的資料處理能力:

"""
Pandas 資料處理完整範例
展示資料清理、轉換、聚合的實務技巧
"""

import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
import warnings
warnings.filterwarnings('ignore')

class DataCleaner:
    """
    資料清理器
    提供資料品質檢查與清理功能
    """
    
    def __init__(self, df: pd.DataFrame):
        """
        初始化清理器
        
        參數:
            df: 原始資料框
        """
        self.df = df.copy()  # 複製資料避免修改原始資料
        self.original_shape = df.shape
        self.cleaning_log = []  # 記錄清理步驟
    
    def check_missing_values(self) -> pd.DataFrame:
        """
        檢查缺失值
        
        回傳:
            缺失值統計表
        """
        missing_stats = pd.DataFrame({
            '缺失數量': self.df.isnull().sum(),
            '缺失比例': self.df.isnull().sum() / len(self.df) * 100
        })
        
        # 只顯示有缺失值的欄位
        missing_stats = missing_stats[missing_stats['缺失數量'] > 0]
        missing_stats = missing_stats.sort_values('缺失數量', ascending=False)
        
        return missing_stats
    
    def handle_missing_values(self, 
                             strategy: str = 'drop',
                             fill_value: Dict = None,
                             threshold: float = 0.5) -> None:
        """
        處理缺失值
        
        參數:
            strategy: 處理策略,'drop', 'fill', 'interpolate'
            fill_value: 填充值字典,格式為 {欄位名: 填充值}
            threshold: 刪除閾值,缺失比例超過此值的欄位將被刪除
        """
        initial_rows = len(self.df)
        initial_cols = len(self.df.columns)
        
        if strategy == 'drop':
            # 刪除缺失值超過閾值的欄位
            missing_ratio = self.df.isnull().sum() / len(self.df)
            cols_to_drop = missing_ratio[missing_ratio > threshold].index.tolist()
            
            if cols_to_drop:
                self.df.drop(columns=cols_to_drop, inplace=True)
                self.cleaning_log.append(
                    f"刪除缺失值超過 {threshold*100}% 的欄位: {cols_to_drop}"
                )
            
            # 刪除仍有缺失值的列
            self.df.dropna(inplace=True)
            
        elif strategy == 'fill':
            if fill_value is None:
                # 數值欄位用平均值填充
                numeric_cols = self.df.select_dtypes(
                    include=[np.number]
                ).columns
                for col in numeric_cols:
                    mean_value = self.df[col].mean()
                    self.df[col].fillna(mean_value, inplace=True)
                
                # 類別欄位用眾數填充
                categorical_cols = self.df.select_dtypes(
                    include=['object', 'category']
                ).columns
                for col in categorical_cols:
                    mode_value = self.df[col].mode()[0] if not self.df[col].mode().empty else 'Unknown'
                    self.df[col].fillna(mode_value, inplace=True)
            else:
                # 使用指定的填充值
                self.df.fillna(fill_value, inplace=True)
            
        elif strategy == 'interpolate':
            # 使用插值法填充數值欄位
            numeric_cols = self.df.select_dtypes(include=[np.number]).columns
            self.df[numeric_cols] = self.df[numeric_cols].interpolate(
                method='linear',
                limit_direction='both'
            )
        
        rows_removed = initial_rows - len(self.df)
        cols_removed = initial_cols - len(self.df.columns)
        
        self.cleaning_log.append(
            f"缺失值處理 ({strategy}): 移除 {rows_removed} 列, {cols_removed} 欄"
        )
    
    def detect_outliers(self, 
                       columns: List[str],
                       method: str = 'iqr',
                       threshold: float = 1.5) -> Dict[str, pd.Series]:
        """
        檢測異常值
        
        參數:
            columns: 要檢測的欄位列表
            method: 檢測方法,'iqr' 或 'zscore'
            threshold: 閾值,IQR 方法預設 1.5,Z-score 方法預設 3
            
        回傳:
            異常值索引字典
        """
        outliers = {}
        
        for col in columns:
            if col not in self.df.columns:
                continue
            
            if method == 'iqr':
                # 四分位距方法
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR
                
                outlier_mask = (
                    (self.df[col] < lower_bound) | 
                    (self.df[col] > upper_bound)
                )
                
            elif method == 'zscore':
                # Z-score 方法
                z_scores = np.abs(
                    (self.df[col] - self.df[col].mean()) / self.df[col].std()
                )
                outlier_mask = z_scores > threshold
            
            outliers[col] = outlier_mask
        
        return outliers
    
    def handle_outliers(self,
                       columns: List[str],
                       method: str = 'iqr',
                       action: str = 'remove') -> None:
        """
        處理異常值
        
        參數:
            columns: 要處理的欄位列表
            method: 檢測方法
            action: 處理動作,'remove', 'cap', 'replace'
        """
        outliers = self.detect_outliers(columns, method)
        
        for col, mask in outliers.items():
            outlier_count = mask.sum()
            
            if action == 'remove':
                # 移除異常值
                self.df = self.df[~mask]
                
            elif action == 'cap':
                # 上下限截斷
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                
                self.df.loc[self.df[col] < lower_bound, col] = lower_bound
                self.df.loc[self.df[col] > upper_bound, col] = upper_bound
                
            elif action == 'replace':
                # 用中位數替換
                median_value = self.df[col].median()
                self.df.loc[mask, col] = median_value
            
            self.cleaning_log.append(
                f"異常值處理 ({col}): 檢測到 {outlier_count} 個異常值, 動作: {action}"
            )
    
    def standardize_text(self, columns: List[str]) -> None:
        """
        標準化文字欄位
        
        參數:
            columns: 要標準化的欄位列表
        """
        for col in columns:
            if col not in self.df.columns:
                continue
            
            # 轉換為小寫
            self.df[col] = self.df[col].str.lower()
            
            # 移除前後空白
            self.df[col] = self.df[col].str.strip()
            
            # 移除多餘空白
            self.df[col] = self.df[col].str.replace(r'\s+', ' ', regex=True)
            
            self.cleaning_log.append(f"文字標準化: {col}")
    
    def convert_dtypes(self, dtype_dict: Dict[str, str]) -> None:
        """
        轉換資料型別
        
        參數:
            dtype_dict: 型別轉換字典,格式為 {欄位名: 目標型別}
        """
        for col, dtype in dtype_dict.items():
            if col not in self.df.columns:
                continue
            
            try:
                if dtype == 'datetime':
                    self.df[col] = pd.to_datetime(
                        self.df[col],
                        errors='coerce'
                    )
                else:
                    self.df[col] = self.df[col].astype(dtype)
                
                self.cleaning_log.append(f"型別轉換: {col} -> {dtype}")
            except Exception as e:
                self.cleaning_log.append(f"型別轉換失敗: {col} -> {dtype}, 錯誤: {e}")
    
    def get_cleaned_data(self) -> pd.DataFrame:
        """
        獲取清理後的資料
        
        回傳:
            清理後的資料框
        """
        return self.df
    
    def print_summary(self) -> None:
        """列印清理摘要"""
        print("=" * 60)
        print("資料清理摘要")
        print("=" * 60)
        print(f"原始資料形狀: {self.original_shape}")
        print(f"清理後資料形狀: {self.df.shape}")
        print(f"移除列數: {self.original_shape[0] - self.df.shape[0]}")
        print(f"移除欄數: {self.original_shape[1] - self.df.shape[1]}")
        print("\n清理步驟:")
        for i, log in enumerate(self.cleaning_log, 1):
            print(f"{i}. {log}")

class DataTransformer:
    """
    資料轉換器
    提供特徵工程與資料轉換功能
    """
    
    def __init__(self, df: pd.DataFrame):
        """
        初始化轉換器
        
        參數:
            df: 輸入資料框
        """
        self.df = df.copy()
    
    def create_bins(self,
                   column: str,
                   bins: List[float],
                   labels: List[str] = None) -> pd.Series:
        """
        創建分箱變數
        
        參數:
            column: 要分箱的欄位
            bins: 分箱邊界
            labels: 箱標籤
            
        回傳:
            分箱後的 Series
        """
        if labels is None:
            labels = [f'Bin_{i}' for i in range(len(bins)-1)]
        
        binned = pd.cut(
            self.df[column],
            bins=bins,
            labels=labels,
            include_lowest=True
        )
        
        return binned
    
    def create_dummy_variables(self,
                              columns: List[str],
                              drop_first: bool = True) -> pd.DataFrame:
        """
        創建虛擬變數(One-Hot Encoding)
        
        參數:
            columns: 要編碼的欄位列表
            drop_first: 是否刪除第一個類別(避免共線性)
            
        回傳:
            包含虛擬變數的資料框
        """
        df_with_dummies = pd.get_dummies(
            self.df,
            columns=columns,
            drop_first=drop_first
        )
        
        return df_with_dummies
    
    def normalize_column(self,
                        column: str,
                        method: str = 'minmax') -> pd.Series:
        """
        標準化數值欄位
        
        參數:
            column: 要標準化的欄位
            method: 標準化方法,'minmax' 或 'zscore'
            
        回傳:
            標準化後的 Series
        """
        if method == 'minmax':
            # Min-Max 標準化 (0-1)
            min_val = self.df[column].min()
            max_val = self.df[column].max()
            normalized = (self.df[column] - min_val) / (max_val - min_val)
            
        elif method == 'zscore':
            # Z-score 標準化 (平均值為0,標準差為1)
            mean_val = self.df[column].mean()
            std_val = self.df[column].std()
            normalized = (self.df[column] - mean_val) / std_val
        
        return normalized
    
    def aggregate_data(self,
                      group_by: List[str],
                      agg_dict: Dict[str, List[str]]) -> pd.DataFrame:
        """
        資料聚合
        
        參數:
            group_by: 分組欄位列表
            agg_dict: 聚合字典,格式為 {欄位名: [聚合函數列表]}
            
        回傳:
            聚合後的資料框
        """
        aggregated = self.df.groupby(group_by).agg(agg_dict)
        
        # 扁平化多層索引
        aggregated.columns = [
            f'{col}_{func}' for col, func in aggregated.columns
        ]
        
        return aggregated.reset_index()

# 使用範例
if __name__ == '__main__':
    # 創建示例資料
    np.random.seed(42)
    
    data = {
        'ID': range(1, 101),
        '信用評分': np.random.randint(300, 850, 100),
        '年收入': np.random.randint(30000, 200000, 100),
        '負債金額': np.random.randint(0, 100000, 100),
        '貸款金額': np.random.randint(10000, 500000, 100),
        '部門': np.random.choice(['業務', '研發', '行政', '財務'], 100),
        '任職年限': np.random.randint(0, 20, 100)
    }
    
    # 插入一些缺失值與異常值
    df = pd.DataFrame(data)
    df.loc[5:10, '信用評分'] = np.nan
    df.loc[15, '年收入'] = 1000000  # 異常值
    df.loc[20:25, '部門'] = np.nan
    
    print("=" * 60)
    print("Pandas 資料處理範例")
    print("=" * 60)
    
    # 1. 資料清理
    print("\n步驟 1: 資料清理")
    print("-" * 60)
    
    cleaner = DataCleaner(df)
    
    # 檢查缺失值
    print("\n缺失值統計:")
    print(cleaner.check_missing_values())
    
    # 處理缺失值
    cleaner.handle_missing_values(strategy='fill')
    
    # 檢測異常值
    outliers = cleaner.detect_outliers(
        columns=['信用評分', '年收入', '貸款金額'],
        method='iqr'
    )
    
    print("\n異常值檢測:")
    for col, mask in outliers.items():
        print(f"{col}: {mask.sum()} 個異常值")
    
    # 處理異常值
    cleaner.handle_outliers(
        columns=['年收入'],
        method='iqr',
        action='cap'
    )
    
    # 列印清理摘要
    cleaner.print_summary()
    
    # 獲取清理後的資料
    df_cleaned = cleaner.get_cleaned_data()
    
    # 2. 資料轉換
    print("\n步驟 2: 資料轉換")
    print("-" * 60)
    
    transformer = DataTransformer(df_cleaned)
    
    # 創建信用評分分箱
    df_cleaned['信用等級'] = transformer.create_bins(
        column='信用評分',
        bins=[0, 580, 670, 740, 850],
        labels=['差', '中', '良', '優']
    )
    
    print("\n信用等級分布:")
    print(df_cleaned['信用等級'].value_counts().sort_index())
    
    # 創建虛擬變數
    df_encoded = transformer.create_dummy_variables(
        columns=['部門', '信用等級'],
        drop_first=True
    )
    
    print(f"\n編碼後資料形狀: {df_encoded.shape}")
    print(f"新增欄位數: {df_encoded.shape[1] - df_cleaned.shape[1]}")
    
    # 標準化數值欄位
    df_cleaned['年收入_標準化'] = transformer.normalize_column(
        column='年收入',
        method='minmax'
    )
    
    print("\n標準化後年收入統計:")
    print(df_cleaned['年收入_標準化'].describe())
    
    # 3. 資料聚合
    print("\n步驟 3: 資料聚合")
    print("-" * 60)
    
    # 按部門聚合
    dept_summary = transformer.aggregate_data(
        group_by=['部門'],
        agg_dict={
            '信用評分': ['mean', 'std'],
            '年收入': ['mean', 'median'],
            '貸款金額': ['sum', 'mean']
        }
    )
    
    print("\n各部門統計摘要:")
    print(dept_summary)

這個完整的程式碼展示了 Pandas 資料處理的系統化流程。DataCleaner 類別封裝了資料清理的各種操作,從缺失值檢查與處理到異常值檢測與處理,提供了靈活的策略選擇。handle_missing_values 方法支援三種策略:刪除、填充、插值,每種策略都有其適用場景。刪除適合缺失值較少的情況,填充適合缺失值有明確替代值的情況,插值適合時間序列資料。

異常值處理同樣提供了多種選項。IQR 方法基於四分位距,是最常用的異常值檢測方法,對於偏態分布表現良好。Z-score 方法假設資料服從常態分布,適合對稱分布的資料。異常值的處理動作包含移除、截斷、替換,選擇哪種動作需要考慮異常值的性質與對分析的影響。

DataTransformer 類別提供了特徵工程的常用操作。分箱(Binning)將連續變數轉換為離散類別,這在某些模型中能提升可解釋性與穩定性。虛擬變數編碼(One-Hot Encoding)將類別變數轉換為數值變數,這是許多機器學習演算法的必要前處理。標準化讓不同量綱的變數具有可比性,Min-Max 標準化將資料縮放到 0-1 區間,Z-score 標準化將資料中心化並標準化變異數。

資料聚合是從細粒度資料中提取高層次洞察的關鍵技術。透過 groupby 操作,我們可以按照一個或多個變數分組,然後對每組計算統計量。Pandas 的聚合功能非常強大,支援多種內建函數(sum, mean, median, std, min, max 等),也支援自定義聚合函數。這種能力讓我們能夠從交易級別的資料聚合到客戶級別,從日級別資料聚合到月級別,靈活地控制分析的粒度。

以下的流程圖展示了完整的資料處理流程:

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

start

:原始資料載入;

partition "資料清理階段" {
  :檢查資料概況;
  note right
    檢查資料型別
    統計缺失值
    識別重複記錄
  end note
  
  :處理缺失值;
  note right
    刪除策略
    填充策略
    插值策略
  end note
  
  :檢測異常值;
  note right
    IQR 方法
    Z-score 方法
    業務規則檢查
  end note
  
  :處理異常值;
  note right
    移除
    截斷
    替換
  end note
  
  :文字標準化;
  note right
    統一大小寫
    移除空白
    格式統一
  end note
  
  :型別轉換;
  note right
    數值型別
    日期型別
    類別型別
  end note
}

partition "資料轉換階段" {
  :特徵工程;
  note right
    創建新特徵
    特徵交互
    時間特徵提取
  end note
  
  :分箱處理;
  note right
    連續變數離散化
    創建區間變數
  end note
  
  :編碼處理;
  note right
    One-Hot 編碼
    Label 編碼
    Target 編碼
  end note
  
  :標準化/正規化;
  note right
    Min-Max 標準化
    Z-score 標準化
    Robust 標準化
  end note
}

partition "資料聚合階段" {
  :分組聚合;
  note right
    按維度分組
    計算統計量
    創建透視表
  end note
  
  :時間序列重採樣;
  note right
    日到週
    週到月
    月到年
  end note
}

:清理後資料輸出;

stop

@enduml

這個流程圖展示了資料處理的三個主要階段。資料清理階段專注於提升資料品質,處理缺失值、異常值、格式不一致等問題。資料轉換階段進行特徵工程,創建新的變數,對現有變數進行編碼與標準化。資料聚合階段將細粒度資料聚合為更高層次的統計量,為後續分析做準備。這個系統化的流程確保了資料的品質與一致性,為後續的探索性分析與建模奠定了堅實基礎。

探索性資料分析:理解資料的統計特性與分布

探索性資料分析(Exploratory Data Analysis, EDA)是資料分析的關鍵第一步,它透過統計方法與視覺化技術來理解資料的基本特性、分布、關係與異常。EDA 的目標不僅是描述資料是什麼樣子,更重要的是發現資料中的模式、趨勢、異常與潛在問題,為後續的深入分析提供方向。

描述性統計是 EDA 的基礎工具。對於數值變數,我們關注中心趨勢(平均值、中位數、眾數)、離散程度(標準差、變異數、四分位距)、分布形狀(偏度、峰度)。平均值容易受極值影響,中位數對極值更穩健,在偏態分布中中位數往往是更好的中心位置度量。標準差衡量資料的離散程度,標準差大表示資料分散,標準差小表示資料集中。偏度反映分布的對稱性,正偏表示右尾較長,負偏表示左尾較長。峰度反映分布的尖銳程度,高峰度表示極值較多。

對於類別變數,我們關注頻率分布與比例。頻率表顯示每個類別出現的次數,比例表顯示每個類別佔總體的百分比。這些統計量幫助我們理解類別變數的分布特性,識別稀有類別與主要類別。類別變數的分析也包含交叉表,展示兩個或多個類別變數之間的聯合分布,這對於理解變數間的關係非常有用。

讓我們透過實際的程式碼來展示 EDA 的系統化流程:

"""
探索性資料分析完整範例
展示描述性統計、分布分析、關係探索
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from typing import List, Tuple, Dict

# 設置視覺化樣式
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

class EDAAnalyzer:
    """
    探索性資料分析器
    提供完整的 EDA 分析功能
    """
    
    def __init__(self, df: pd.DataFrame):
        """
        初始化分析器
        
        參數:
            df: 輸入資料框
        """
        self.df = df.copy()
        self.numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
    
    def data_overview(self) -> None:
        """列印資料概況"""
        print("=" * 70)
        print("資料概況")
        print("=" * 70)
        print(f"資料形狀: {self.df.shape}")
        print(f"記錄數: {len(self.df)}")
        print(f"變數數: {len(self.df.columns)}")
        print(f"數值變數: {len(self.numeric_cols)}")
        print(f"類別變數: {len(self.categorical_cols)}")
        print(f"\n記憶體使用: {self.df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
        
        print("\n" + "=" * 70)
        print("資料型別")
        print("=" * 70)
        print(self.df.dtypes)
        
        print("\n" + "=" * 70)
        print("前 5 筆記錄")
        print("=" * 70)
        print(self.df.head())
    
    def numeric_summary(self) -> pd.DataFrame:
        """
        數值變數統計摘要
        
        回傳:
            包含各種統計量的 DataFrame
        """
        if not self.numeric_cols:
            print("沒有數值變數")
            return pd.DataFrame()
        
        summary = self.df[self.numeric_cols].describe().T
        
        # 添加額外的統計量
        summary['skewness'] = self.df[self.numeric_cols].skew()
        summary['kurtosis'] = self.df[self.numeric_cols].kurtosis()
        summary['missing'] = self.df[self.numeric_cols].isnull().sum()
        summary['missing_pct'] = (
            self.df[self.numeric_cols].isnull().sum() / len(self.df) * 100
        )
        
        # 重新排序欄位
        cols_order = [
            'count', 'missing', 'missing_pct', 'mean', 'std', 
            'min', '25%', '50%', '75%', 'max', 'skewness', 'kurtosis'
        ]
        summary = summary[cols_order]
        
        return summary
    
    def categorical_summary(self) -> Dict[str, pd.DataFrame]:
        """
        類別變數統計摘要
        
        回傳:
            每個類別變數的頻率統計字典
        """
        if not self.categorical_cols:
            print("沒有類別變數")
            return {}
        
        summaries = {}
        
        for col in self.categorical_cols:
            value_counts = self.df[col].value_counts()
            percentages = self.df[col].value_counts(normalize=True) * 100
            
            summary = pd.DataFrame({
                '計數': value_counts,
                '百分比': percentages
            })
            
            summaries[col] = summary
        
        return summaries
    
    def correlation_analysis(self, 
                            method: str = 'pearson') -> pd.DataFrame:
        """
        相關性分析
        
        參數:
            method: 相關係數方法,'pearson', 'spearman', 'kendall'
            
        回傳:
            相關係數矩陣
        """
        if not self.numeric_cols:
            print("沒有數值變數進行相關性分析")
            return pd.DataFrame()
        
        corr_matrix = self.df[self.numeric_cols].corr(method=method)
        
        return corr_matrix
    
    def find_high_correlations(self, 
                              threshold: float = 0.7) -> List[Tuple[str, str, float]]:
        """
        尋找高相關性變數對
        
        參數:
            threshold: 相關係數閾值
            
        回傳:
            (變數1, 變數2, 相關係數) 的列表
        """
        corr_matrix = self.correlation_analysis()
        
        # 取得上三角矩陣的索引
        upper_tri = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1)
        
        high_corrs = []
        for i in range(len(corr_matrix)):
            for j in range(i+1, len(corr_matrix)):
                if upper_tri[i, j]:
                    corr_value = corr_matrix.iloc[i, j]
                    if abs(corr_value) >= threshold:
                        high_corrs.append((
                            corr_matrix.index[i],
                            corr_matrix.columns[j],
                            corr_value
                        ))
        
        # 依絕對值排序
        high_corrs.sort(key=lambda x: abs(x[2]), reverse=True)
        
        return high_corrs
    
    def distribution_test(self, column: str) -> Dict[str, float]:
        """
        分布常態性檢定
        
        參數:
            column: 要檢定的數值欄位
            
        回傳:
            檢定結果字典
        """
        data = self.df[column].dropna()
        
        # Shapiro-Wilk 檢定
        shapiro_stat, shapiro_p = stats.shapiro(data)
        
        # Kolmogorov-Smirnov 檢定
        ks_stat, ks_p = stats.kstest(
            data,
            'norm',
            args=(data.mean(), data.std())
        )
        
        results = {
            'shapiro_statistic': shapiro_stat,
            'shapiro_pvalue': shapiro_p,
            'ks_statistic': ks_stat,
            'ks_pvalue': ks_p,
            'is_normal': shapiro_p > 0.05  # 顯著水準 0.05
        }
        
        return results

class EDAVisualizer:
    """
    EDA 視覺化器
    提供各種 EDA 視覺化功能
    """
    
    def __init__(self, df: pd.DataFrame):
        """
        初始化視覺化器
        
        參數:
            df: 輸入資料框
        """
        self.df = df.copy()
        self.numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
    
    def plot_distributions(self, 
                          columns: List[str] = None,
                          figsize: Tuple[int, int] = (15, 10)) -> None:
        """
        繪製數值變數分布圖
        
        參數:
            columns: 要繪製的欄位列表,None 表示所有數值欄位
            figsize: 圖形大小
        """
        if columns is None:
            columns = self.numeric_cols
        
        if not columns:
            print("沒有數值變數可繪製")
            return
        
        n_cols = 3
        n_rows = (len(columns) + n_cols - 1) // n_cols
        
        fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
        axes = axes.flatten() if n_rows > 1 else [axes]
        
        for idx, col in enumerate(columns):
            ax = axes[idx]
            
            # 繪製直方圖與核密度估計
            self.df[col].hist(
                bins=30,
                alpha=0.5,
                ax=ax,
                edgecolor='black'
            )
            
            # 添加核密度估計曲線
            self.df[col].plot.kde(
                ax=ax,
                secondary_y=True,
                color='red'
            )
            
            ax.set_title(f'{col} 分布', fontsize=12, pad=10)
            ax.set_xlabel(col, fontsize=10)
            ax.set_ylabel('頻率', fontsize=10)
            ax.grid(True, alpha=0.3)
        
        # 隱藏多餘的子圖
        for idx in range(len(columns), len(axes)):
            axes[idx].set_visible(False)
        
        plt.tight_layout()
        plt.show()
    
    def plot_boxplots(self,
                     columns: List[str] = None,
                     figsize: Tuple[int, int] = (15, 6)) -> None:
        """
        繪製箱形圖
        
        參數:
            columns: 要繪製的欄位列表
            figsize: 圖形大小
        """
        if columns is None:
            columns = self.numeric_cols
        
        if not columns:
            print("沒有數值變數可繪製")
            return
        
        plt.figure(figsize=figsize)
        
        # 標準化資料以便比較
        df_normalized = self.df[columns].apply(
            lambda x: (x - x.mean()) / x.std()
        )
        
        df_normalized.boxplot(rot=45)
        plt.title('數值變數箱形圖 (標準化)', fontsize=14, pad=15)
        plt.ylabel('標準化值', fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
    
    def plot_correlation_heatmap(self,
                                method: str = 'pearson',
                                figsize: Tuple[int, int] = (12, 10)) -> None:
        """
        繪製相關性熱圖
        
        參數:
            method: 相關係數方法
            figsize: 圖形大小
        """
        if not self.numeric_cols:
            print("沒有數值變數可繪製")
            return
        
        corr_matrix = self.df[self.numeric_cols].corr(method=method)
        
        plt.figure(figsize=figsize)
        
        sns.heatmap(
            corr_matrix,
            annot=True,
            fmt='.2f',
            cmap='coolwarm',
            center=0,
            square=True,
            linewidths=1,
            cbar_kws={'shrink': 0.8}
        )
        
        plt.title(f'相關性熱圖 ({method.capitalize()})', 
                 fontsize=14, pad=15)
        plt.tight_layout()
        plt.show()
    
    def plot_categorical_distributions(self,
                                      columns: List[str] = None,
                                      figsize: Tuple[int, int] = (15, 10)) -> None:
        """
        繪製類別變數分布圖
        
        參數:
            columns: 要繪製的欄位列表
            figsize: 圖形大小
        """
        if columns is None:
            columns = self.categorical_cols
        
        if not columns:
            print("沒有類別變數可繪製")
            return
        
        n_cols = 2
        n_rows = (len(columns) + n_cols - 1) // n_cols
        
        fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
        axes = axes.flatten() if n_rows > 1 else [axes]
        
        for idx, col in enumerate(columns):
            ax = axes[idx]
            
            value_counts = self.df[col].value_counts()
            
            value_counts.plot(
                kind='bar',
                ax=ax,
                color='steelblue',
                edgecolor='black'
            )
            
            ax.set_title(f'{col} 分布', fontsize=12, pad=10)
            ax.set_xlabel(col, fontsize=10)
            ax.set_ylabel('計數', fontsize=10)
            ax.grid(True, alpha=0.3, axis='y')
            
            # 旋轉 x 軸標籤
            ax.tick_params(axis='x', rotation=45)
        
        # 隱藏多餘的子圖
        for idx in range(len(columns), len(axes)):
            axes[idx].set_visible(False)
        
        plt.tight_layout()
        plt.show()
    
    def plot_scatter_matrix(self,
                           columns: List[str] = None,
                           figsize: Tuple[int, int] = (15, 15)) -> None:
        """
        繪製散點矩陣
        
        參數:
            columns: 要繪製的欄位列表
            figsize: 圖形大小
        """
        if columns is None:
            columns = self.numeric_cols[:5]  # 限制最多 5 個變數
        
        if len(columns) < 2:
            print("至少需要 2 個數值變數")
            return
        
        pd.plotting.scatter_matrix(
            self.df[columns],
            figsize=figsize,
            diagonal='kde',
            alpha=0.6
        )
        
        plt.suptitle('散點矩陣', fontsize=14, y=1.0)
        plt.tight_layout()
        plt.show()

# 使用範例
if __name__ == '__main__':
    # 創建示例資料
    np.random.seed(42)
    
    data = {
        '信用評分': np.random.normal(650, 100, 1000).astype(int),
        '年收入': np.random.lognormal(11, 0.5, 1000).astype(int),
        '負債金額': np.random.gamma(2, 10000, 1000).astype(int),
        '貸款金額': np.random.normal(50000, 20000, 1000).astype(int),
        '年齡': np.random.normal(40, 12, 1000).astype(int),
        '部門': np.random.choice(['業務', '研發', '行政', '財務'], 1000),
        '職級': np.random.choice(['初級', '中級', '高級', '主管'], 1000),
        '貸款狀態': np.random.choice(['核准', '拒絕'], 1000, p=[0.7, 0.3])
    }
    
    df = pd.DataFrame(data)
    
    print("=" * 70)
    print("探索性資料分析範例")
    print("=" * 70)
    
    # 創建分析器
    analyzer = EDAAnalyzer(df)
    
    # 1. 資料概況
    analyzer.data_overview()
    
    # 2. 數值變數統計
    print("\n" + "=" * 70)
    print("數值變數統計摘要")
    print("=" * 70)
    numeric_summary = analyzer.numeric_summary()
    print(numeric_summary)
    
    # 3. 類別變數統計
    print("\n" + "=" * 70)
    print("類別變數統計摘要")
    print("=" * 70)
    categorical_summaries = analyzer.categorical_summary()
    for col, summary in categorical_summaries.items():
        print(f"\n{col}:")
        print(summary)
    
    # 4. 相關性分析
    print("\n" + "=" * 70)
    print("高相關性變數對")
    print("=" * 70)
    high_corrs = analyzer.find_high_correlations(threshold=0.5)
    for var1, var2, corr in high_corrs:
        print(f"{var1} <-> {var2}: {corr:.3f}")
    
    # 5. 分布檢定
    print("\n" + "=" * 70)
    print("分布常態性檢定")
    print("=" * 70)
    test_result = analyzer.distribution_test('信用評分')
    print(f"Shapiro-Wilk p-value: {test_result['shapiro_pvalue']:.4f}")
    print(f"是否為常態分布: {'是' if test_result['is_normal'] else '否'}")
    
    # 6. 視覺化
    visualizer = EDAVisualizer(df)
    
    print("\n生成視覺化圖表...")
    
    # 分布圖
    visualizer.plot_distributions(
        columns=['信用評分', '年收入', '負債金額', '年齡']
    )
    
    # 箱形圖
    visualizer.plot_boxplots()
    
    # 相關性熱圖
    visualizer.plot_correlation_heatmap()
    
    # 類別變數分布
    visualizer.plot_categorical_distributions()
    
    # 散點矩陣
    visualizer.plot_scatter_matrix(
        columns=['信用評分', '年收入', '年齡']
    )
    
    print("\nEDA 分析完成!")

這個完整的 EDA 程式碼展示了系統化的探索性分析流程。EDAAnalyzer 類別提供了全面的統計分析功能,從基本的描述性統計到進階的相關性分析與分布檢定。numeric_summary 方法不僅計算標準的統計量(平均值、標準差、四分位數),還計算偏度與峰度,這兩個指標對於理解分布形狀非常重要。

相關性分析是 EDA 中的關鍵環節。correlation_analysis 方法支援三種相關係數:Pearson 適用於線性關係與常態分布,Spearman 適用於單調關係,Kendall 對離群值更穩健。find_high_correlations 方法自動識別高相關性的變數對,這對於後續的特徵選擇與多重共線性診斷非常有用。

EDAVisualizer 類別提供了豐富的視覺化功能。分布圖結合直方圖與核密度估計,既展示資料的離散分布,又提供平滑的密度曲線。箱形圖清楚地顯示資料的四分位數、中位數與離群值,特別適合比較多個變數的分布差異。相關性熱圖用顏色編碼相關係數,讓變數間的關係一目了然。散點矩陣展示所有變數對之間的散點圖,是探索多變數關係的強大工具。

探索性資料分析不僅僅是產生統計量與圖表,更重要的是從中發現洞察。高偏度可能表示資料需要對數轉換,離群值可能是資料錯誤或真實的極端情況,高相關性可能暗示冗餘變數或潛在的因果關係。這些發現將指導後續的資料轉換、特徵工程與建模策略,讓資料分析過程更加高效與精確。

信用風險評估:多維度分析框架

信用風險評估是金融機構核心的業務流程,它決定了是否向申請人發放貸款,以及貸款的額度與利率。傳統的信用評估依賴人工審核與簡單的規則判斷,這種方式效率低且容易受主觀因素影響。現代的信用風險評估則採用資料驅動的方法,透過分析借款人的多維度資訊,建構量化的風險評分模型,實現更準確與一致的評估決策。

信用風險評估的核心變數包含多個維度。信用評分反映了借款人的歷史信用記錄,是最直接的風險指標。收入水平決定了還款能力,收入越高,違約風險越低。負債比率(負債金額/收入)反映了借款人的財務負擔,高負債比率意味著較高的違約風險。貸款金額與收入的比率也是重要指標,這個比率過高可能表示借款超過承受能力。除了這些量化指標,借款目的、就業穩定性、資產狀況等定性因素也會影響信用風險。

建構信用風險評估模型需要系統化的資料分析流程。首先是資料收集與整合,從多個來源收集借款人資訊,包含信用局資料、銀行帳戶資料、就業資訊等。然後是特徵工程,創建衍生變數如負債收入比、信用利用率、逾期次數等。接著進行探索性分析,理解變數分布、識別異常值、探索變數關係。然後建構預測模型,使用邏輯迴歸、決策樹、隨機森林等演算法預測違約機率。最後是模型驗證與部署,評估模型性能,整合到業務流程中。

讓我們透過完整的程式碼來展示信用風險評估的實作:

"""
信用風險評估系統
展示從資料分析到風險評分的完整流程
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    classification_report, 
    confusion_matrix, 
    roc_auc_score,
    roc_curve
)
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')

class CreditRiskAnalyzer:
    """
    信用風險分析器
    提供完整的信用評估分析功能
    """
    
    def __init__(self, df: pd.DataFrame, target_col: str):
        """
        初始化分析器
        
        參數:
            df: 輸入資料框
            target_col: 目標變數欄位名稱(核准/拒絕)
        """
        self.df = df.copy()
        self.target_col = target_col
        self.numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        
        # 移除目標變數
        if target_col in self.numeric_cols:
            self.numeric_cols.remove(target_col)
    
    def analyze_approval_by_feature(self, feature: str) -> pd.DataFrame:
        """
        分析特定特徵對核准率的影響
        
        參數:
            feature: 要分析的特徵
            
        回傳:
            分析結果 DataFrame
        """
        analysis = self.df.groupby(feature)[self.target_col].agg([
            'count',
            'sum',
            'mean'
        ]).rename(columns={
            'count': '總數',
            'sum': '核准數',
            'mean': '核准率'
        })
        
        analysis['拒絕數'] = analysis['總數'] - analysis['核准數']
        analysis['拒絕率'] = 1 - analysis['核准率']
        
        return analysis
    
    def create_risk_score(self, 
                         weights: Dict[str, float] = None) -> pd.Series:
        """
        創建風險評分
        
        參數:
            weights: 變數權重字典
            
        回傳:
            風險評分 Series
        """
        if weights is None:
            # 預設權重
            weights = {
                '信用評分': 0.4,
                '年收入': 0.3,
                '負債比率': -0.3
            }
        
        # 標準化變數
        scaler = StandardScaler()
        df_normalized = pd.DataFrame(
            scaler.fit_transform(self.df[list(weights.keys())]),
            columns=list(weights.keys()),
            index=self.df.index
        )
        
        # 計算加權分數
        risk_score = sum(
            df_normalized[var] * weight 
            for var, weight in weights.items()
        )
        
        # 轉換到 0-100 範圍
        risk_score = (risk_score - risk_score.min()) / (
            risk_score.max() - risk_score.min()
        ) * 100
        
        return risk_score
    
    def segment_customers(self, 
                         score_col: str,
                         bins: List[float] = None) -> pd.Series:
        """
        客戶分群
        
        參數:
            score_col: 評分欄位
            bins: 分箱邊界
            
        回傳:
            分群結果 Series
        """
        if bins is None:
            bins = [0, 40, 60, 80, 100]
        
        labels = ['高風險', '中高風險', '中低風險', '低風險']
        
        segments = pd.cut(
            self.df[score_col],
            bins=bins,
            labels=labels,
            include_lowest=True
        )
        
        return segments
    
    def generate_risk_report(self, 
                            score_col: str,
                            segment_col: str) -> pd.DataFrame:
        """
        生成風險報告
        
        參數:
            score_col: 評分欄位
            segment_col: 分群欄位
            
        回傳:
            風險報告 DataFrame
        """
        report = self.df.groupby(segment_col).agg({
            score_col: ['mean', 'std', 'min', 'max'],
            self.target_col: ['count', 'sum', 'mean']
        })
        
        # 扁平化欄位名稱
        report.columns = [
            f'{col}_{func}' for col, func in report.columns
        ]
        
        report.rename(columns={
            f'{score_col}_mean': '平均評分',
            f'{score_col}_std': '評分標準差',
            f'{self.target_col}_count': '總數',
            f'{self.target_col}_sum': '核准數',
            f'{self.target_col}_mean': '核准率'
        }, inplace=True)
        
        return report

class CreditRiskModel:
    """
    信用風險模型
    提供模型訓練與預測功能
    """
    
    def __init__(self):
        """初始化模型"""
        self.model = None
        self.scaler = StandardScaler()
        self.feature_importance = None
    
    def prepare_data(self,
                    df: pd.DataFrame,
                    target_col: str,
                    test_size: float = 0.3) -> Tuple:
        """
        準備訓練資料
        
        參數:
            df: 輸入資料框
            target_col: 目標變數
            test_size: 測試集比例
            
        回傳:
            (X_train, X_test, y_train, y_test)
        """
        # 選擇數值特徵
        numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        numeric_cols.remove(target_col)
        
        X = df[numeric_cols]
        y = df[target_col]
        
        # 分割訓練集與測試集
        X_train, X_test, y_train, y_test = train_test_split(
            X, y,
            test_size=test_size,
            random_state=42,
            stratify=y
        )
        
        # 標準化特徵
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # 轉回 DataFrame 保留欄位名稱
        X_train_scaled = pd.DataFrame(
            X_train_scaled,
            columns=X_train.columns,
            index=X_train.index
        )
        X_test_scaled = pd.DataFrame(
            X_test_scaled,
            columns=X_test.columns,
            index=X_test.index
        )
        
        return X_train_scaled, X_test_scaled, y_train, y_test
    
    def train_logistic_regression(self,
                                 X_train: pd.DataFrame,
                                 y_train: pd.Series) -> None:
        """
        訓練邏輯迴歸模型
        
        參數:
            X_train: 訓練特徵
            y_train: 訓練目標
        """
        self.model = LogisticRegression(
            random_state=42,
            max_iter=1000
        )
        
        self.model.fit(X_train, y_train)
        
        # 提取特徵重要性
        self.feature_importance = pd.DataFrame({
            '特徵': X_train.columns,
            '係數': self.model.coef_[0]
        }).sort_values('係數', key=abs, ascending=False)
    
    def train_random_forest(self,
                           X_train: pd.DataFrame,
                           y_train: pd.Series) -> None:
        """
        訓練隨機森林模型
        
        參數:
            X_train: 訓練特徵
            y_train: 訓練目標
        """
        self.model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        
        self.model.fit(X_train, y_train)
        
        # 提取特徵重要性
        self.feature_importance = pd.DataFrame({
            '特徵': X_train.columns,
            '重要性': self.model.feature_importances_
        }).sort_values('重要性', ascending=False)
    
    def evaluate_model(self,
                      X_test: pd.DataFrame,
                      y_test: pd.Series) -> Dict:
        """
        評估模型性能
        
        參數:
            X_test: 測試特徵
            y_test: 測試目標
            
        回傳:
            評估指標字典
        """
        if self.model is None:
            raise ValueError("模型尚未訓練")
        
        # 預測
        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]
        
        # 計算指標
        auc_score = roc_auc_score(y_test, y_pred_proba)
        
        # 分類報告
        report = classification_report(
            y_test, 
            y_pred,
            output_dict=True
        )
        
        # 混淆矩陣
        cm = confusion_matrix(y_test, y_pred)
        
        metrics = {
            'auc': auc_score,
            'accuracy': report['accuracy'],
            'precision': report['1']['precision'],
            'recall': report['1']['recall'],
            'f1_score': report['1']['f-score'],
            'confusion_matrix': cm,
            'y_pred_proba': y_pred_proba
        }
        
        return metrics
    
    def plot_roc_curve(self,
                      y_test: pd.Series,
                      y_pred_proba: np.ndarray) -> None:
        """
        繪製 ROC 曲線
        
        參數:
            y_test: 測試目標
            y_pred_proba: 預測機率
        """
        fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)
        auc_score = roc_auc_score(y_test, y_pred_proba)
        
        plt.figure(figsize=(10, 8))
        plt.plot(
            fpr, tpr,
            label=f'ROC Curve (AUC = {auc_score:.3f})',
            linewidth=2
        )
        plt.plot(
            [0, 1], [0, 1],
            'k--',
            label='Random Classifier',
            linewidth=1
        )
        
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate', fontsize=12)
        plt.ylabel('True Positive Rate', fontsize=12)
        plt.title('ROC Curve', fontsize=14, pad=15)
        plt.legend(loc='lower right', fontsize=11)
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

# 使用範例
if __name__ == '__main__':
    # 創建示例資料
    np.random.seed(42)
    
    n_samples = 2000
    
    # 生成特徵
    credit_scores = np.random.normal(650, 100, n_samples)
    incomes = np.random.lognormal(11, 0.5, n_samples)
    debts = np.random.gamma(2, 10000, n_samples)
    loan_amounts = np.random.normal(50000, 20000, n_samples)
    ages = np.random.normal(40, 12, n_samples)
    
    # 計算衍生特徵
    debt_income_ratios = debts / incomes
    loan_income_ratios = loan_amounts / incomes
    
    # 生成目標變數(核准/拒絕)
    # 根據特徵計算核准機率
    approval_probs = (
        0.4 * (credit_scores - 300) / (850 - 300) +
        0.3 * (incomes - incomes.min()) / (incomes.max() - incomes.min()) +
        0.2 * (1 - (debt_income_ratios - debt_income_ratios.min()) / 
               (debt_income_ratios.max() - debt_income_ratios.min())) +
        0.1 * (1 - (loan_income_ratios - loan_income_ratios.min()) / 
               (loan_income_ratios.max() - loan_income_ratios.min()))
    )
    
    # 加入隨機性
    approvals = (approval_probs + np.random.normal(0, 0.1, n_samples)) > 0.5
    approvals = approvals.astype(int)
    
    # 創建資料框
    data = pd.DataFrame({
        '信用評分': credit_scores.astype(int),
        '年收入': incomes.astype(int),
        '負債金額': debts.astype(int),
        '貸款金額': loan_amounts.astype(int),
        '年齡': ages.astype(int),
        '負債比率': debt_income_ratios,
        '貸款收入比': loan_income_ratios,
        '核准狀態': approvals
    })
    
    print("=" * 70)
    print("信用風險評估系統")
    print("=" * 70)
    
    # 1. 風險分析
    print("\n步驟 1: 風險特徵分析")
    print("-" * 70)
    
    analyzer = CreditRiskAnalyzer(data, '核准狀態')
    
    # 創建風險評分
    data['風險評分'] = analyzer.create_risk_score()
    
    # 客戶分群
    data['風險等級'] = analyzer.segment_customers('風險評分')
    
    print(f"\n風險評分統計:")
    print(data['風險評分'].describe())
    
    # 生成風險報告
    risk_report = analyzer.generate_risk_report('風險評分', '風險等級')
    print("\n風險報告:")
    print(risk_report)
    
    # 2. 模型訓練
    print("\n步驟 2: 預測模型訓練")
    print("-" * 70)
    
    model = CreditRiskModel()
    
    # 準備資料
    X_train, X_test, y_train, y_test = model.prepare_data(
        data,
        '核准狀態',
        test_size=0.3
    )
    
    print(f"訓練集大小: {len(X_train)}")
    print(f"測試集大小: {len(X_test)}")
    
    # 訓練隨機森林模型
    print("\n訓練隨機森林模型...")
    model.train_random_forest(X_train, y_train)
    
    # 評估模型
    print("\n步驟 3: 模型評估")
    print("-" * 70)
    
    metrics = model.evaluate_model(X_test, y_test)
    
    print(f"\nAUC: {metrics['auc']:.4f}")
    print(f"準確率: {metrics['accuracy']:.4f}")
    print(f"精確率: {metrics['precision']:.4f}")
    print(f"召回率: {metrics['recall']:.4f}")
    print(f"F1 分數: {metrics['f1_score']:.4f}")
    
    print("\n混淆矩陣:")
    print(metrics['confusion_matrix'])
    
    print("\n特徵重要性:")
    print(model.feature_importance)
    
    # 繪製 ROC 曲線
    model.plot_roc_curve(y_test, metrics['y_pred_proba'])
    
    print("\n信用風險評估完成!")

這個完整的信用風險評估系統展示了從資料分析到模型建構的完整流程。CreditRiskAnalyzer 類別提供了風險分析的核心功能,create_risk_score 方法基於多個變數創建綜合的風險評分,這個評分可以用於快速篩選與排序申請案件。segment_customers 方法將客戶分為不同的風險等級,這對於差異化的風險管理策略非常有用。

CreditRiskModel 類別封裝了機器學習模型的訓練與評估。程式碼展示了兩種常用的分類演算法:邏輯迴歸與隨機森林。邏輯迴歸是傳統的信用評分方法,模型簡單可解釋性強,係數直接反映了變數對違約機率的影響。隨機森林是集成學習方法,能夠捕捉複雜的非線性關係,通常能取得更高的預測準確率。

模型評估使用多個指標來全面衡量性能。AUC(Area Under Curve)衡量模型區分正負樣本的能力,AUC 越接近 1 表示模型性能越好。精確率反映預測為核准的案件中實際核准的比例,召回率反映實際核准的案件中被模型預測為核准的比例。F1 分數是精確率與召回率的調和平均,平衡了兩者的權衡。混淆矩陣清楚展示了四種預測結果的數量:真正例、假正例、真負例、假負例。

在實務應用中,信用風險評估還需要考慮業務約束與監管要求。模型必須滿足公平性要求,不能因為種族、性別等受保護特徵而產生歧視。模型的可解釋性也很重要,監管機構與借款人都有權了解拒絕的原因。此外,模型需要定期重新訓練與驗證,確保在資料分布變化時仍然有效。這些考量讓信用風險評估不僅是技術問題,更是需要平衡多方利益的複雜決策問題。

結論

資料分析與視覺化是現代商業智慧的核心能力,它們讓企業能夠從資料中提取價值,做出基於事實的決策。本文系統化地探討了 Python 資料分析與視覺化的完整技術棧,從 Pandas 的資料處理基礎到探索性資料分析的方法論,從多樣化的視覺化工具到實際的商業應用案例。

Pandas 提供了強大的資料處理能力,讓複雜的資料清理、轉換、聚合操作變得簡單高效。理解如何處理缺失值、異常值、格式不一致等資料品質問題,掌握特徵工程與資料轉換的技巧,是進行有效資料分析的前提。資料處理不僅是技術操作,更需要結合業務理解來做出正確的決策,刪除還是填補缺失值,移除還是保留異常值,都需要根據具體情況來判斷。

探索性資料分析提供了理解資料的系統化方法,透過描述性統計與視覺化技術揭示資料的特性、分布、關係與異常。EDA 不僅是分析的第一步,更是貫穿整個分析過程的重要環節。每當獲得新的資料或創建新的特徵,都應該進行 EDA 來驗證假設,發現問題,指導後續的分析方向。好的 EDA 能夠節省大量的建模時間,避免使用不合適的方法,提升最終模型的性能。

視覺化是資料分析中不可或缺的環節,它將抽象的數字轉化為直觀的圖形,讓複雜的關係與趨勢一目了然。選擇合適的視覺化類型至關重要,長條圖適合類別比較,折線圖展示時間趨勢,散點圖揭示變數關係,熱圖顯示矩陣資料。設計有效的視覺化需要考慮資訊密度、視覺層次、顏色選擇等多個因素,目標是讓觀眾能夠快速準確地理解資料傳達的訊息。

信用風險評估與員工績效分析兩個案例展示了資料分析在不同業務場景中的應用。信用風險評估透過多維度的資料分析與預測建模,實現了量化的風險評估,提升了決策的一致性與準確性。員工績效分析透過資料驅動的方法,識別影響績效的關鍵因素,優化人才管理策略。這些應用證明了資料分析的價值不僅在於描述現狀,更在於預測未來,指導決策,創造商業價值。

然而,資料分析與視覺化也面臨挑戰。資料品質問題可能導致錯誤的結論,垃圾進垃圾出是資料分析的鐵律。選擇不當的分析方法可能遺漏重要的洞察,過度擬合可能讓模型在新資料上失效。視覺化的誤導性使用可能扭曲資料的真實含義,例如不當的刻度選擇、缺失的基準線、選擇性的資料呈現。避免這些陷阱需要嚴謹的分析態度,批判性的思維,以及對業務脈絡的深入理解。

展望未來,資料分析與視覺化技術仍在持續演進。自動化機器學習(AutoML)讓模型建構變得更加自動化,降低了技術門檻。互動式視覺化工具如 Plotly、Dash 讓使用者能夠動態探索資料,深入挖掘細節。大語言模型的整合為資料分析提供了自然語言介面,讓非技術人員也能進行複雜的分析。這些發展將進一步擴大資料分析的應用範圍,讓更多人能夠從資料中獲得價值。

作為資料分析師,持續學習新的工具與技術,深化對統計方法的理解,培養業務洞察力,是保持競爭力的關鍵。資料分析不僅是技術工作,更是需要結合領域知識、統計思維、溝通能力的綜合能力。希望本文提供的知識與經驗能夠協助讀者建構完整的資料分析能力,在實務工作中創造價值。