在軟體開發的生命週期中,遺留程式碼的維護與重構是每位開發者都必須面對的重要課題。這些歷經多年演進的程式碼往往承載著豐富的業務邏輯與領域知識,但同時也累積了大量的技術債務。缺乏清晰的架構設計、複雜糾結的相依關係、過時的技術框架,以及不足甚至缺失的文件與測試,使得這些程式碼難以理解、修改與擴充。每次的功能調整都可能引發難以預料的副作用,維護成本隨著時間推移而不斷攀升。
設計模式為遺留程式碼的重構提供了系統化的解決方案。這些經過時間驗證的程式設計範式,封裝了軟體設計領域的最佳實務與專家經驗。透過適當地應用設計模式,開發者能夠將混亂的程式碼轉化為結構清晰、易於維護的系統。Facade模式簡化了複雜子系統的介面,Adapter模式解決了介面不相容的問題,Strategy模式將演算法封裝為可替換的物件,Composite模式優雅地處理部分與整體的關係,Observer模式實現了鬆耦合的事件通知機制,Template Method模式則抽取了演算法的共同骨架。
本文將深入探討如何運用這些核心設計模式來重構遺留程式碼。我們將從建立完善的測試框架開始,確保重構過程的安全性。接著詳細解析每個設計模式的應用場景、實作細節與最佳實務,並提供完整的Python程式碼範例。這些範例不僅展示了模式的基本結構,更包含了實務中的錯誤處理、效能考量與擴充性設計。透過系統化的重構流程與設計模式的合理應用,開發者能夠有效降低技術債務,提升程式碼品質,使遺留系統重獲新生,符合現代軟體工程的標準與需求。
遺留程式碼重構的基礎與挑戰
遺留程式碼通常指那些難以理解、修改與測試的既有程式碼。這些程式碼可能是由已離職的開發者編寫,使用過時的技術框架,或是經歷多次修改而失去原有的設計理念。遺留程式碼的特徵包括缺乏單元測試覆蓋率,高度耦合的模組結構,過長的函式與類別,以及充滿魔術數字與硬編碼的邏輯。這些特徵使得任何修改都可能引發連鎖反應,導致系統在意想不到的地方出現錯誤。
重構遺留程式碼面臨多個根本性挑戰。首先是風險控制的問題,在缺乏測試保護的情況下進行重構,就像在沒有安全繩的情況下進行高空作業,任何失誤都可能造成嚴重後果。其次是理解成本,閱讀與理解結構混亂的遺留程式碼需要投入大量時間與精力。第三是業務壓力,在快速迭代的開發環境中,管理層往往優先考慮新功能的開發而非程式碼品質的改善。第四是技術能力的要求,有效的重構需要深厚的設計模式知識與實務經驗。
成功的重構需要系統化的方法。第一步是建立完整的測試覆蓋率,這是重構的安全網,確保行為在重構過程中保持不變。測試應該包含單元測試、整合測試與端對端測試,覆蓋關鍵的業務邏輯與邊界條件。第二步是分析與識別問題區域,使用靜態分析工具找出程式碼異味,透過指標如循環複雜度、耦合度與內聚度來評估程式碼品質。第三步是制定重構計劃,根據風險與收益評估,優先處理影響最大且風險可控的部分。第四步是小步快跑地進行重構,每次只改變一個設計元素,確保每次修改後系統仍然正常運作。
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
title 遺留程式碼重構系統化流程
|準備階段|
start
:識別重構目標程式碼;
note right
評估標準:
• 變更頻率高
• 缺陷密度高
• 複雜度指標高
• 業務重要性高
end note
:建立測試框架;
partition "測試覆蓋建立" {
:撰寫單元測試;
note right
針對關鍵函式
覆蓋主要邏輯路徑
包含邊界條件
end note
:撰寫整合測試;
note right
測試模組間互動
驗證介面契約
確保資料流正確
end note
:撰寫端對端測試;
note right
模擬實際使用場景
驗證業務流程
確保系統行為
end note
}
:執行基準測試;
note right
記錄重構前的:
• 功能行為
• 效能指標
• 測試結果
end note
|分析階段|
:靜態程式碼分析;
partition "問題識別" {
:計算複雜度指標;
:識別程式碼異味;
:分析相依關係;
:評估耦合度;
}
:制定重構策略;
note right
優先級排序:
1. 高風險高價值
2. 低風險高價值
3. 高風險低價值
4. 低風險低價值
end note
|重構階段|
repeat
:選擇重構目標;
:應用設計模式;
partition "設計模式選擇" {
if (複雜介面?) then (是)
:Facade Pattern;
elseif (介面不相容?) then (是)
:Adapter Pattern;
elseif (演算法多變?) then (是)
:Strategy Pattern;
elseif (階層結構?) then (是)
:Composite Pattern;
elseif (事件通知?) then (是)
:Observer Pattern;
else (演算法骨架?)
:Template Method;
endif
}
:執行重構操作;
:執行測試套件;
if (測試通過?) then (是)
:提交變更;
else (否)
:回復變更;
:分析失敗原因;
:調整重構策略;
endif
repeat while (還有重構目標?) is (是)
|驗證階段|
:執行完整測試;
:效能基準比較;
:程式碼品質評估;
:重構完成;
stop
@enduml
這個流程圖展示了遺留程式碼重構的完整過程。從準備階段的測試框架建立,到分析階段的問題識別與策略制定,再到重構階段的模式應用與驗證,每個步驟都確保了重構的安全性與有效性。測試驅動的重構方法是成功的關鍵,它提供了即時的回饋機制,確保每次變更都不會破壞既有功能。
Facade模式簡化複雜子系統介面
Facade模式提供了一個統一且簡化的介面來存取複雜子系統的功能。在遺留系統中,往往存在著龐大且複雜的子系統,這些子系統包含多個相互關聯的類別與介面,使用者需要理解大量的細節才能正確使用。Facade模式透過引入一個外觀類別,將複雜的子系統操作封裝起來,對外提供簡潔易用的高階介面。這不僅降低了使用者的學習成本,也減少了客戶端程式碼與子系統的耦合度。
Facade模式特別適合用於遺留系統的現代化改造。當遺留系統需要與新開發的模組整合時,直接暴露遺留系統的複雜介面會污染新程式碼的設計。透過Facade模式,可以建立一個乾淨的介面層,新程式碼只需要與Facade互動,而Facade內部處理與遺留系統的所有複雜互動。這種方式也為未來的系統遷移留下了空間,當需要替換底層實作時,只需要修改Facade的內部實作,而不影響使用Facade的客戶端程式碼。
# Facade模式完整實作範例
# 展示如何簡化複雜的遺留資料存取子系統
import logging
from typing import Optional, Dict, List, Any
from datetime import datetime
from contextlib import contextmanager
# 配置日誌系統
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LegacyDatabaseConnection:
"""
遺留資料庫連線類別
模擬舊有系統的複雜連線管理機制
"""
def __init__(self, host: str, port: int, database: str):
"""
初始化資料庫連線
參數:
host: 資料庫主機位址
port: 連線埠號
database: 資料庫名稱
"""
self.host = host
self.port = port
self.database = database
self.is_connected = False
self.transaction_active = False
def open_connection(self) -> bool:
"""
開啟資料庫連線
回傳:
連線是否成功建立
"""
try:
# 模擬連線建立過程
# 實際環境中會涉及網路通訊與認證
logger.info(f"正在連線到 {self.host}:{self.port}/{self.database}")
self.is_connected = True
return True
except Exception as e:
logger.error(f"資料庫連線失敗: {e}")
return False
def close_connection(self) -> None:
"""關閉資料庫連線"""
if self.is_connected:
logger.info("關閉資料庫連線")
self.is_connected = False
def begin_transaction(self) -> None:
"""開始資料庫交易"""
if not self.is_connected:
raise RuntimeError("未建立資料庫連線")
logger.info("開始交易")
self.transaction_active = True
def commit_transaction(self) -> None:
"""提交交易"""
if not self.transaction_active:
raise RuntimeError("沒有進行中的交易")
logger.info("提交交易")
self.transaction_active = False
def rollback_transaction(self) -> None:
"""回復交易"""
if not self.transaction_active:
raise RuntimeError("沒有進行中的交易")
logger.info("回復交易")
self.transaction_active = False
class LegacyQueryLogger:
"""
遺留查詢日誌記錄器
模擬舊有系統的查詢日誌機制
"""
def __init__(self, log_file: str):
"""
初始化查詢日誌記錄器
參數:
log_file: 日誌檔案路徑
"""
self.log_file = log_file
def log_query(
self,
query: str,
parameters: Optional[Dict] = None
) -> None:
"""
記錄查詢資訊
參數:
query: SQL查詢陳述式
parameters: 查詢參數
"""
timestamp = datetime.now().isoformat()
log_entry = f"[{timestamp}] Query: {query}"
if parameters:
log_entry += f" | Parameters: {parameters}"
logger.info(log_entry)
# 實際環境中會寫入檔案
# with open(self.log_file, 'a') as f:
# f.write(log_entry + '\n')
class LegacyQueryExecutor:
"""
遺留查詢執行器
模擬舊有系統的查詢執行機制
"""
def __init__(self, connection: LegacyDatabaseConnection):
"""
初始化查詢執行器
參數:
connection: 資料庫連線物件
"""
self.connection = connection
def execute_query(
self,
query: str,
parameters: Optional[Dict] = None
) -> List[Dict[str, Any]]:
"""
執行查詢並回傳結果
參數:
query: SQL查詢陳述式
parameters: 查詢參數
回傳:
查詢結果列表
"""
if not self.connection.is_connected:
raise RuntimeError("資料庫未連線")
# 模擬查詢執行
# 實際環境中會執行真實的SQL查詢
logger.info(f"執行查詢: {query}")
# 模擬回傳結果
return [
{"id": 1, "name": "範例資料1"},
{"id": 2, "name": "範例資料2"}
]
def execute_command(
self,
command: str,
parameters: Optional[Dict] = None
) -> int:
"""
執行資料修改指令
參數:
command: SQL指令陳述式
parameters: 指令參數
回傳:
受影響的資料列數
"""
if not self.connection.is_connected:
raise RuntimeError("資料庫未連線")
if not self.connection.transaction_active:
raise RuntimeError("需要在交易中執行修改指令")
logger.info(f"執行指令: {command}")
# 模擬指令執行
return 1
class DataAccessFacade:
"""
資料存取外觀類別
提供簡化的介面來存取複雜的遺留資料存取子系統
"""
def __init__(
self,
host: str = "localhost",
port: int = 5432,
database: str = "legacy_db",
log_file: str = "query.log"
):
"""
初始化資料存取外觀
參數:
host: 資料庫主機
port: 資料庫連線埠
database: 資料庫名稱
log_file: 日誌檔案路徑
"""
# 建立子系統元件
# 客戶端不需要知道這些複雜的內部結構
self._connection = LegacyDatabaseConnection(host, port, database)
self._logger = LegacyQueryLogger(log_file)
self._executor = LegacyQueryExecutor(self._connection)
@contextmanager
def session(self):
"""
提供資料庫會話的上下文管理器
自動處理連線開啟與關閉
"""
try:
# 開啟連線
if not self._connection.open_connection():
raise RuntimeError("無法建立資料庫連線")
yield self
finally:
# 確保連線被關閉
self._connection.close_connection()
def query(
self,
query: str,
parameters: Optional[Dict] = None
) -> List[Dict[str, Any]]:
"""
執行查詢的簡化介面
自動處理日誌記錄與錯誤處理
參數:
query: SQL查詢陳述式
parameters: 查詢參數
回傳:
查詢結果列表
"""
try:
# 記錄查詢
self._logger.log_query(query, parameters)
# 執行查詢
results = self._executor.execute_query(query, parameters)
logger.info(f"查詢成功,回傳 {len(results)} 筆資料")
return results
except Exception as e:
logger.error(f"查詢執行失敗: {e}")
raise
def execute_in_transaction(
self,
command: str,
parameters: Optional[Dict] = None
) -> int:
"""
在交易中執行指令的簡化介面
自動處理交易管理與錯誤處理
參數:
command: SQL指令陳述式
parameters: 指令參數
回傳:
受影響的資料列數
"""
try:
# 開始交易
self._connection.begin_transaction()
# 記錄指令
self._logger.log_query(command, parameters)
# 執行指令
affected_rows = self._executor.execute_command(
command,
parameters
)
# 提交交易
self._connection.commit_transaction()
logger.info(f"指令執行成功,影響 {affected_rows} 筆資料")
return affected_rows
except Exception as e:
# 發生錯誤時回復交易
logger.error(f"指令執行失敗,回復交易: {e}")
try:
self._connection.rollback_transaction()
except:
pass
raise
def execute_business_logic():
"""
業務邏輯執行範例
展示如何使用簡化的Facade介面
"""
# 建立Facade實例
# 客戶端只需要與Facade互動
# 不需要理解底層複雜的子系統結構
facade = DataAccessFacade(
host="localhost",
port=5432,
database="production_db"
)
# 使用上下文管理器自動管理連線
with facade.session():
# 執行查詢
# 簡潔的介面隱藏了連線管理、日誌記錄等複雜性
users = facade.query(
"SELECT * FROM users WHERE active = :active",
parameters={"active": True}
)
logger.info(f"找到 {len(users)} 位活躍使用者")
# 執行資料修改
# 交易管理完全由Facade處理
affected = facade.execute_in_transaction(
"UPDATE users SET last_login = :timestamp WHERE id = :user_id",
parameters={
"timestamp": datetime.now(),
"user_id": 1
}
)
logger.info(f"更新了 {affected} 筆使用者資料")
# 使用範例
if __name__ == "__main__":
execute_business_logic()
這個完整的Facade模式實作展示了如何將複雜的遺留資料存取子系統封裝為簡潔的介面。底層包含連線管理、交易控制、日誌記錄與查詢執行等多個元件,每個元件都有自己的複雜性。Facade類別整合這些元件,提供query與execute_in_transaction兩個高階方法,並使用上下文管理器自動處理資源管理。客戶端程式碼變得簡潔易讀,完全不需要理解底層的複雜性。
Adapter模式解決介面相容性問題
Adapter模式允許具有不相容介面的類別能夠協同工作。在遺留系統重構過程中,經常遇到需要整合舊有元件與新設計的情況。舊有元件可能使用過時的介面設計,與新系統的介面標準不相容。直接修改遺留元件的介面往往風險很高,可能影響依賴它的其他部分。Adapter模式提供了一個優雅的解決方案,透過建立適配器類別,將舊介面轉換為新介面,使得新舊系統能夠無縫整合。
Adapter模式有兩種實作方式。類別適配器使用多重繼承,同時繼承目標介面與被適配類別。物件適配器則透過組合方式,將被適配物件作為適配器的成員變數。在Python中,由於支援多重繼承但更推崇組合優於繼承的原則,物件適配器是更常用且更靈活的方式。物件適配器不僅能適配單一類別,還能適配整個類別家族,提供更好的靈活性。
# Adapter模式完整實作範例
# 展示如何整合遺留支付系統與新的支付處理介面
from abc import ABC, abstractmethod
from typing import Dict, Optional
from enum import Enum
from datetime import datetime
import uuid
class PaymentStatus(Enum):
"""支付狀態列舉"""
PENDING = "pending"
PROCESSING = "processing"
SUCCESS = "success"
FAILED = "failed"
CANCELLED = "cancelled"
class PaymentResult:
"""
支付結果資料類別
標準化的支付結果表示
"""
def __init__(
self,
transaction_id: str,
status: PaymentStatus,
amount: float,
message: str = "",
timestamp: Optional[datetime] = None
):
"""
初始化支付結果
參數:
transaction_id: 交易識別碼
status: 支付狀態
amount: 支付金額
message: 狀態訊息
timestamp: 交易時間戳記
"""
self.transaction_id = transaction_id
self.status = status
self.amount = amount
self.message = message
self.timestamp = timestamp or datetime.now()
def is_successful(self) -> bool:
"""檢查支付是否成功"""
return self.status == PaymentStatus.SUCCESS
def __repr__(self) -> str:
return (
f"PaymentResult(id={self.transaction_id}, "
f"status={self.status.value}, "
f"amount={self.amount}, "
f"message='{self.message}')"
)
class LegacyPaymentProcessor:
"""
遺留支付處理系統
使用舊有的介面設計與資料格式
"""
def __init__(self, merchant_id: str, api_key: str):
"""
初始化遺留支付處理器
參數:
merchant_id: 商戶識別碼
api_key: API金鑰
"""
self.merchant_id = merchant_id
self.api_key = api_key
def make_payment(
self,
amount: float,
card_number: str,
card_holder: str,
expiry_date: str,
cvv: str
) -> Dict[str, any]:
"""
執行支付的遺留介面
使用詳細的信用卡資訊參數
參數:
amount: 支付金額
card_number: 信用卡號
card_holder: 持卡人姓名
expiry_date: 到期日
cvv: 安全碼
回傳:
包含支付結果的字典
"""
# 模擬遺留系統的支付處理
# 實際環境中會呼叫第三方支付閘道API
print(f"[遺留系統] 處理支付: ${amount}")
print(f"[遺留系統] 商戶ID: {self.merchant_id}")
# 模擬支付處理
# 遺留系統回傳的資料格式與新系統不相容
return {
"payment_id": f"LEG-{uuid.uuid4().hex[:8]}",
"result_code": "00", # 00表示成功
"result_message": "Payment processed successfully",
"processed_amount": amount,
"processing_time": datetime.now().isoformat()
}
def refund_payment(
self,
payment_id: str,
amount: float
) -> Dict[str, any]:
"""
退款的遺留介面
參數:
payment_id: 原始支付識別碼
amount: 退款金額
回傳:
包含退款結果的字典
"""
print(f"[遺留系統] 處理退款: ${amount}")
return {
"refund_id": f"REF-{uuid.uuid4().hex[:8]}",
"original_payment_id": payment_id,
"result_code": "00",
"result_message": "Refund processed successfully",
"refunded_amount": amount
}
class PaymentProcessorInterface(ABC):
"""
新的支付處理介面標準
定義了現代化的支付處理抽象
"""
@abstractmethod
def process_payment(
self,
amount: float,
payment_method: Dict[str, str]
) -> PaymentResult:
"""
處理支付的新介面
參數:
amount: 支付金額
payment_method: 支付方式資訊字典
回傳:
標準化的PaymentResult物件
"""
pass
@abstractmethod
def process_refund(
self,
transaction_id: str,
amount: float
) -> PaymentResult:
"""
處理退款的新介面
參數:
transaction_id: 原始交易識別碼
amount: 退款金額
回傳:
標準化的PaymentResult物件
"""
pass
class LegacyPaymentAdapter(PaymentProcessorInterface):
"""
遺留支付系統適配器
將遺留系統的介面適配為新的標準介面
"""
def __init__(self, legacy_processor: LegacyPaymentProcessor):
"""
初始化適配器
參數:
legacy_processor: 遺留支付處理器實例
"""
# 使用組合方式持有遺留系統實例
# 這是物件適配器模式的標準實作
self._legacy_processor = legacy_processor
def process_payment(
self,
amount: float,
payment_method: Dict[str, str]
) -> PaymentResult:
"""
實作新介面的支付處理方法
內部呼叫遺留系統並轉換結果格式
參數:
amount: 支付金額
payment_method: 包含支付資訊的字典
回傳:
標準化的PaymentResult物件
"""
try:
# 從新格式提取支付資訊
# 轉換為遺留系統需要的詳細參數
card_number = payment_method.get("card_number", "")
card_holder = payment_method.get("card_holder", "")
expiry_date = payment_method.get("expiry_date", "")
cvv = payment_method.get("cvv", "")
# 呼叫遺留系統的舊介面
legacy_result = self._legacy_processor.make_payment(
amount=amount,
card_number=card_number,
card_holder=card_holder,
expiry_date=expiry_date,
cvv=cvv
)
# 轉換遺留系統的回傳格式為新的標準格式
# 這是適配器的核心功能
status = self._convert_result_code(
legacy_result.get("result_code")
)
return PaymentResult(
transaction_id=legacy_result.get("payment_id"),
status=status,
amount=legacy_result.get("processed_amount", amount),
message=legacy_result.get("result_message", ""),
timestamp=datetime.fromisoformat(
legacy_result.get("processing_time")
)
)
except Exception as e:
# 錯誤處理:建立失敗的支付結果
return PaymentResult(
transaction_id=f"ERR-{uuid.uuid4().hex[:8]}",
status=PaymentStatus.FAILED,
amount=amount,
message=f"支付處理失敗: {str(e)}"
)
def process_refund(
self,
transaction_id: str,
amount: float
) -> PaymentResult:
"""
實作新介面的退款處理方法
參數:
transaction_id: 原始交易識別碼
amount: 退款金額
回傳:
標準化的PaymentResult物件
"""
try:
# 呼叫遺留系統的退款方法
legacy_result = self._legacy_processor.refund_payment(
payment_id=transaction_id,
amount=amount
)
# 轉換結果格式
status = self._convert_result_code(
legacy_result.get("result_code")
)
return PaymentResult(
transaction_id=legacy_result.get("refund_id"),
status=status,
amount=legacy_result.get("refunded_amount", amount),
message=legacy_result.get("result_message", "")
)
except Exception as e:
return PaymentResult(
transaction_id=f"ERR-{uuid.uuid4().hex[:8]}",
status=PaymentStatus.FAILED,
amount=amount,
message=f"退款處理失敗: {str(e)}"
)
def _convert_result_code(self, result_code: str) -> PaymentStatus:
"""
轉換遺留系統的結果代碼為新的狀態列舉
參數:
result_code: 遺留系統的結果代碼
回傳:
對應的PaymentStatus
"""
# 遺留系統使用數字代碼
# 新系統使用語意化的列舉
code_mapping = {
"00": PaymentStatus.SUCCESS,
"01": PaymentStatus.FAILED,
"02": PaymentStatus.CANCELLED,
"03": PaymentStatus.PENDING
}
return code_mapping.get(result_code, PaymentStatus.FAILED)
def process_order(
amount: float,
payment_method: Dict[str, str],
processor: PaymentProcessorInterface
) -> bool:
"""
訂單處理函式
使用新的支付處理介面
參數:
amount: 訂單金額
payment_method: 支付方式資訊
processor: 支付處理器(可以是任何實作新介面的處理器)
回傳:
處理是否成功
"""
print(f"\n處理訂單: ${amount}")
# 處理支付
# 客戶端程式碼只依賴抽象介面
# 不需要知道底層使用的是遺留系統還是新系統
result = processor.process_payment(amount, payment_method)
print(f"支付結果: {result}")
if result.is_successful():
print("訂單處理成功!")
return True
else:
print(f"訂單處理失敗: {result.message}")
return False
# 使用範例
if __name__ == "__main__":
# 建立遺留支付處理器
legacy_processor = LegacyPaymentProcessor(
merchant_id="MERCHANT_12345",
api_key="secret_key_xyz"
)
# 使用適配器包裝遺留處理器
# 適配器將遺留介面轉換為新介面
adapter = LegacyPaymentAdapter(legacy_processor)
# 準備支付資訊
payment_info = {
"card_number": "4111111111111111",
"card_holder": "張三",
"expiry_date": "12/25",
"cvv": "123"
}
# 使用新介面處理訂單
# 雖然底層使用遺留系統,但介面是現代化的
success = process_order(
amount=1500.00,
payment_method=payment_info,
processor=adapter
)
這個Adapter模式實作展示了如何整合遺留支付系統與新的支付處理架構。遺留系統使用詳細的信用卡參數與數字結果代碼,而新系統使用字典封裝支付資訊與語意化的狀態列舉。適配器類別實作新介面,內部呼叫遺留系統並轉換資料格式。這使得客戶端程式碼能夠使用現代化的介面,同時保留對遺留系統的投資。當未來需要替換為新的支付系統時,只需要實作新的適配器,而不影響使用介面的業務邏輯程式碼。
Strategy模式封裝可替換演算法
Strategy模式定義了一系列演算法,將每個演算法封裝起來,並使它們可以互相替換。這個模式讓演算法的變化獨立於使用演算法的客戶端。在遺留程式碼中,經常可以看到大量的if-else或switch-case條件判斷來選擇不同的處理邏輯。這種寫法不僅使程式碼難以閱讀,也違反了開放封閉原則,每次新增新的處理邏輯都需要修改原有程式碼。Strategy模式透過將每種處理邏輯封裝為獨立的策略類別,消除了條件判斷,使系統更容易擴充與維護。
# Strategy模式完整實作範例
# 展示如何重構複雜的折扣計算邏輯
from abc import ABC, abstractmethod
from typing import Optional, List
from datetime import datetime, date
from decimal import Decimal
class DiscountStrategy(ABC):
"""
折扣策略抽象基底類別
定義所有折扣策略必須實作的介面
"""
@abstractmethod
def calculate_discount(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""
計算折扣後的價格
參數:
original_price: 原始價格
context: 計算所需的上下文資訊
回傳:
折扣後的價格
"""
pass
@abstractmethod
def get_description(self) -> str:
"""
取得折扣策略的描述
回傳:
策略描述文字
"""
pass
class NoDiscountStrategy(DiscountStrategy):
"""
無折扣策略
保持原價不變
"""
def calculate_discount(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""原價回傳,不進行任何折扣"""
return original_price
def get_description(self) -> str:
return "無折扣"
class PercentageDiscountStrategy(DiscountStrategy):
"""
百分比折扣策略
根據指定的百分比進行折扣
"""
def __init__(self, percentage: Decimal):
"""
初始化百分比折扣策略
參數:
percentage: 折扣百分比(0-100)
"""
if not 0 <= percentage <= 100:
raise ValueError("折扣百分比必須介於0-100之間")
self.percentage = percentage
def calculate_discount(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""
計算百分比折扣後的價格
計算公式: 原價 × (1 - 折扣百分比/100)
"""
discount_multiplier = Decimal('1') - (self.percentage / Decimal('100'))
discounted_price = original_price * discount_multiplier
# 四捨五入到小數點後兩位
return discounted_price.quantize(Decimal('0.01'))
def get_description(self) -> str:
return f"{self.percentage}% 折扣"
class FixedAmountDiscountStrategy(DiscountStrategy):
"""
固定金額折扣策略
扣除固定的金額
"""
def __init__(self, discount_amount: Decimal):
"""
初始化固定金額折扣策略
參數:
discount_amount: 折扣金額
"""
if discount_amount < 0:
raise ValueError("折扣金額不能為負數")
self.discount_amount = discount_amount
def calculate_discount(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""
計算固定金額折扣後的價格
確保折扣後價格不會低於零
"""
discounted_price = original_price - self.discount_amount
# 確保價格不為負
return max(Decimal('0'), discounted_price)
def get_description(self) -> str:
return f"折扣 ${self.discount_amount}"
class BuyXGetYFreeStrategy(DiscountStrategy):
"""
買X送Y折扣策略
購買指定數量後贈送免費商品
"""
def __init__(self, buy_quantity: int, free_quantity: int):
"""
初始化買X送Y策略
參數:
buy_quantity: 需購買的數量
free_quantity: 贈送的數量
"""
if buy_quantity <= 0 or free_quantity <= 0:
raise ValueError("購買與贈送數量必須大於0")
self.buy_quantity = buy_quantity
self.free_quantity = free_quantity
def calculate_discount(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""
計算買X送Y折扣後的價格
需要從context中取得購買數量
"""
if not context or 'quantity' not in context:
raise ValueError("計算買X送Y折扣需要提供購買數量")
quantity = context['quantity']
# 計算可以獲得多少組免費商品
free_sets = quantity // (self.buy_quantity + self.free_quantity)
# 計算實際需要付費的商品數量
paid_quantity = quantity - (free_sets * self.free_quantity)
# 計算折扣後總價
discounted_price = original_price * Decimal(paid_quantity) / Decimal(quantity)
return discounted_price.quantize(Decimal('0.01'))
def get_description(self) -> str:
return f"買{self.buy_quantity}送{self.free_quantity}"
class SeasonalDiscountStrategy(DiscountStrategy):
"""
季節性折扣策略
在特定日期範圍內提供折扣
"""
def __init__(
self,
base_percentage: Decimal,
start_date: date,
end_date: date
):
"""
初始化季節性折扣策略
參數:
base_percentage: 基礎折扣百分比
start_date: 折扣開始日期
end_date: 折扣結束日期
"""
if start_date > end_date:
raise ValueError("開始日期不能晚於結束日期")
self.base_percentage = base_percentage
self.start_date = start_date
self.end_date = end_date
def calculate_discount(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""
根據當前日期決定是否應用折扣
"""
current_date = date.today()
# 檢查當前日期是否在折扣期間內
if self.start_date <= current_date <= self.end_date:
# 應用折扣
discount_multiplier = Decimal('1') - (
self.base_percentage / Decimal('100')
)
discounted_price = original_price * discount_multiplier
return discounted_price.quantize(Decimal('0.01'))
else:
# 不在折扣期間,回傳原價
return original_price
def get_description(self) -> str:
return (
f"季節性折扣 {self.base_percentage}% "
f"({self.start_date} ~ {self.end_date})"
)
class CompositeDiscountStrategy(DiscountStrategy):
"""
組合折扣策略
可以組合多個折扣策略,依序應用
"""
def __init__(self, strategies: List[DiscountStrategy]):
"""
初始化組合折扣策略
參數:
strategies: 要組合的折扣策略列表
"""
if not strategies:
raise ValueError("至少需要一個折扣策略")
self.strategies = strategies
def calculate_discount(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""
依序應用所有折扣策略
每個策略基於前一個策略的結果計算
"""
current_price = original_price
for strategy in self.strategies:
current_price = strategy.calculate_discount(
current_price,
context
)
return current_price
def get_description(self) -> str:
descriptions = [
strategy.get_description()
for strategy in self.strategies
]
return " + ".join(descriptions)
class PriceCalculator:
"""
價格計算器
使用策略模式計算折扣後的價格
"""
def __init__(self, strategy: DiscountStrategy):
"""
初始化價格計算器
參數:
strategy: 折扣策略
"""
self._strategy = strategy
def set_strategy(self, strategy: DiscountStrategy) -> None:
"""
動態更換折扣策略
參數:
strategy: 新的折扣策略
"""
self._strategy = strategy
def compute_final_price(
self,
original_price: Decimal,
context: Optional[dict] = None
) -> Decimal:
"""
計算最終價格
參數:
original_price: 原始價格
context: 計算上下文
回傳:
折扣後的最終價格
"""
return self._strategy.calculate_discount(original_price, context)
def get_strategy_description(self) -> str:
"""取得當前策略的描述"""
return self._strategy.get_description()
# 使用範例
if __name__ == "__main__":
original_price = Decimal('1000.00')
# 範例1: 使用百分比折扣
print("=== 百分比折扣範例 ===")
calculator = PriceCalculator(PercentageDiscountStrategy(Decimal('15')))
final_price = calculator.compute_final_price(original_price)
print(f"原價: ${original_price}")
print(f"折扣策略: {calculator.get_strategy_description()}")
print(f"最終價格: ${final_price}\n")
# 範例2: 動態切換為固定金額折扣
print("=== 固定金額折扣範例 ===")
calculator.set_strategy(FixedAmountDiscountStrategy(Decimal('200')))
final_price = calculator.compute_final_price(original_price)
print(f"原價: ${original_price}")
print(f"折扣策略: {calculator.get_strategy_description()}")
print(f"最終價格: ${final_price}\n")
# 範例3: 買二送一折扣
print("=== 買二送一折扣範例 ===")
calculator.set_strategy(BuyXGetYFreeStrategy(2, 1))
context = {'quantity': 6} # 購買6件
final_price = calculator.compute_final_price(original_price, context)
print(f"原價(6件): ${original_price}")
print(f"折扣策略: {calculator.get_strategy_description()}")
print(f"最終價格: ${final_price}\n")
# 範例4: 組合折扣
print("=== 組合折扣範例 ===")
composite = CompositeDiscountStrategy([
PercentageDiscountStrategy(Decimal('10')),
FixedAmountDiscountStrategy(Decimal('50'))
])
calculator.set_strategy(composite)
final_price = calculator.compute_final_price(original_price)
print(f"原價: ${original_price}")
print(f"折扣策略: {calculator.get_strategy_description()}")
print(f"最終價格: ${final_price}")
這個Strategy模式實作展示了如何將複雜的折扣計算邏輯重構為清晰的策略類別。每種折扣類型都封裝在獨立的策略類別中,具有自己的計算邏輯與描述。PriceCalculator作為上下文類別,持有當前的策略並可以動態切換。這種設計消除了大量的條件判斷,新增新的折扣類型只需要建立新的策略類別,完全符合開放封閉原則。組合折扣策略更展示了策略模式的靈活性,可以將多個策略組合使用。
設計模式為遺留程式碼重構提供了系統化且經過驗證的解決方案。透過Facade模式簡化複雜介面,Adapter模式解決相容性問題,Strategy模式封裝可替換演算法,開發者能夠逐步改善遺留系統的品質。重構必須建立在完善的測試基礎上,採用小步快跑的方式,每次變更後都要確保系統仍然正常運作。這種漸進式的重構方法能夠有效控制風險,持續改善程式碼品質,最終將混亂的遺留系統轉化為結構清晰、易於維護的現代化系統。