資料函式庫交易的隔離級別設定對於維持資料一致性至關重要,本文以 REPEATABLE READ 隔離級別為例,說明如何避免髒讀和不可重複讀問題。同時,示範 SQLAlchemy 的 with_for_update 方法實作悲觀鎖定,確保平行交易的正確性。此外,文章也探討領域事件和訊息匯流排的應用,闡述如何透過事件驅動架構解耦系統元件,並以庫存不足的電子郵件通知為例,說明如何避免將業務邏輯與基礎設施程式碼混雜。最後,文章也觸及 CQRS 架構模式,說明其在提升系統效能和可擴充套件性方面的優勢。
使用資料函式庫交易隔離級別強制平行控制規則
要使測試透過,我們可以在 session 中設定交易隔離級別:
為 Session 設定隔離級別
# src/allocation/service_layer/unit_of_work.py
DEFAULT_SESSION_FACTORY = sessionmaker(bind=create_engine(
config.get_postgres_uri(),
isolation_level="REPEATABLE READ",
))
內容解密:
這段程式碼設定了 SQLAlchemy 的 sessionmaker,並指定了資料函式庫的隔離級別為 “REPEATABLE READ”。這意味著在一個交易中,多次讀取同樣的資料會得到相同的結果,即使其他交易已經修改了這些資料。這種隔離級別有助於避免髒讀(dirty read)和不可重複讀(non-repeatable read)。
悲觀平行控制範例:SELECT FOR UPDATE
有多種方法可以實作悲觀平行控制,但我們將展示其中一種。SELECT FOR UPDATE 會產生不同的行為;兩個平行交易不允許同時對相同的列進行讀取:
使用 SQLAlchemy 的 with_for_update 方法
# src/allocation/adapters/repository.py
def get(self, sku):
return self.session.query(model.Product) \
.filter_by(sku=sku) \
.with_for_update() \
.first()
內容解密:
這段程式碼使用 SQLAlchemy 的 with_for_update 方法在查詢時指定 FOR UPDATE。這樣做的效果是將平行模式從 read1, read2, write1, write2(fail) 更改為 read1, write1, read2, write2(succeed)。當兩個交易同時嘗試使用 SELECT FOR UPDATE 選取相同的列時,一個交易會成功,而另一個交易將等待鎖釋放。這是悲觀平行控制的一個例子。
平行控制的取捨
在 REPEATABLE READ 和 SELECT FOR UPDATE 之間,或者在樂觀與悲觀鎖定之間進行選擇時,需要考慮多種因素。您可以根據業務需求和儲存技術選擇合適的方法。
選擇正確的聚合是關鍵
選擇正確的聚合對於系統的效能和概念組織至關重要。您可以閱讀更多關於有效聚合設計的相關書籍和論文。
聚合模式的取捨
| 優點 | 缺點 |
|---|---|
| Python 沒有“官方”的公有和私有方法,但我們有下劃線約定,因為它通常有助於指出什麼是“內部”使用,什麼是“外部”程式碼使用。選擇聚合只是下一層:它讓你決定哪些領域模型類別是公開的,哪些不是。 | 又一個新的概念需要新的開發人員學習。 |
| 將我們的運算圍繞在明確的一致性邊界上,有助於避免與 ORM 相關的效能問題。 | 堅持嚴格修改一個聚合的規則是一個巨大的思維轉變。 |
| 將聚合放在其附屬模型的狀態變更的唯一控制之下,使系統更容易推理,並使控制不變性更容易。 | 處理聚合之間的一致性可能會很複雜。 |
聚合和一致性邊界回顧
- 聚合是進入領域模型的入口點。
- 透過限制事物可以被變更的方式,我們使系統更容易推理。
- 聚合負責一個一致性邊界。
- 聚合的工作是管理我們的業務規則關於不變性的約束,因為它們適用於一組相關的物件。
- 聚合和平行問題是相關的。
第一部分回顧
我們已經看到了如何建立一個由一組高階單元測試所驅動的領域模型。我們的測試是活的檔案:它們描述了我們的系統行為——我們與業務利益相關者達成一致的規則——以可讀的程式碼呈現。當我們的業務需求發生變化時,我們有信心我們的測試將幫助我們證明新的功能,並且當新的開發人員加入專案時,他們可以閱讀我們的測試以瞭解事情是如何運作的。
事件驅動架構:解耦系統的關鍵
在前面的章節中,我們已經瞭解如何使用領域驅動設計(DDD)來建立一個可測試且具表達性的系統。然而,在現實世界中,我們的應用程式往往需要與其他系統進行互動和交換資訊。這時候,我們就需要考慮如何將我們的系統擴充套件到多個小元件,並透過非同步訊息傳遞來實作元件之間的互動。
領域事件:跨越一致性邊界的觸發器
在事件驅動架構中,領域事件(Domain Events)扮演著至關重要的角色。領域事件是指在領域模型中發生的重要事件,例如庫存不足或訂單已完成。這些事件可以用來觸發跨越一致性邊界的工作流程。
為什麼需要領域事件?
- 解耦系統元件:透過使用領域事件,我們可以將系統的不同元件解耦,使得元件之間的互動更加靈活和可擴充套件。
- 提高系統的可擴充套件性:領域事件使得我們可以更容易地新增新的功能和元件,而不需要修改現有的程式碼。
訊息匯流排:統一的端點呼叫方式
訊息匯流排(Message Bus)提供了一種統一的方式來呼叫使用案例(use cases)從任何端點。透過訊息匯流排,我們可以將領域事件傳遞給不同的處理器(handlers),從而實作不同的業務邏輯。
訊息匯流排的優點
- 統一的介面:訊息匯流排提供了一種統一的介面來處理不同的領域事件。
- 靈活的擴充套件:透過新增處理器,我們可以輕鬆地擴充套件系統的功能。
CQRS:分離讀寫操作
命令查詢責任分離(CQRS)是一種架構模式,它將讀寫操作分離開來,以避免在事件驅動架構中做出妥協。CQRS使得我們可以最佳化讀取和寫入操作的效能和可擴充套件性。
為什麼需要CQRS?
- 提高效能:透過分離讀寫操作,我們可以針對不同的操作進行最佳化,從而提高系統的整體效能。
- 增強可擴充套件性:CQRS使得我們可以獨立地擴充套件讀取和寫入操作,從而提高系統的可擴充套件性。
簡單的實作:電子郵件通知
讓我們考慮一個簡單的例子:當庫存不足時,傳送電子郵件通知給採購團隊。這個功能與核心領域邏輯無關,但它是真實世界中常見的需求。
初始實作:直接在Web控制器中處理
最直接的方法是將電子郵件通知的邏輯直接放入Web控制器中。然而,這種方法會導致程式碼混亂,並使系統難以維護。
# 不推薦的做法
def allocate_order(order):
# 分配訂單邏輯
if not enough_stock:
# 直接傳送電子郵件
send_email_to_buying_team()
使用領域事件和訊息匯流排
更好的方法是使用領域事件和訊息匯流排來處理電子郵件通知。這樣,我們可以將通知邏輯與核心領域邏輯分離開來。
# 推薦的做法
class Order:
def allocate(self, line_item):
# 分配訂單邏輯
if not enough_stock:
# 釋出領域事件
self.events.append(OutOfStockEvent(line_item.product_id))
class OutOfStockEvent:
def __init__(self, product_id):
self.product_id = product_id
# 在訊息匯流排中處理事件
def handle_out_of_stock_event(event):
send_email_to_buying_team(event.product_id)
內容解密:
- 事件驅動架構的重要性:本章節強調了在複雜系統中使用事件驅動架構的好處,包括提高系統的可擴充套件性和可維護性。
- 領域事件的作用:領域事件是實作業務邏輯和解耦系統元件的關鍵。
- 訊息匯流排的功能:訊息匯流排提供了一種統一的方式來處理領域事件,從而實作不同的業務邏輯。
- CQRS的優勢:CQRS透過分離讀寫操作,提高了系統的效能和可擴充套件性。
- 實際案例:透過電子郵件通知的例子,展示瞭如何使用領域事件和訊息匯流排來解耦系統元件並提高可維護性。
避免讓網頁控制器變得混亂
作為一次性的 hack,將所有東西都放在 endpoint 中可能會被接受:
# src/allocation/entrypoints/flask_app.py
@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
line = model.OrderLine(
request.json['orderid'],
request.json['sku'],
request.json['qty'],
)
try:
uow = unit_of_work.SqlAlchemyUnitOfWork()
batchref = services.allocate(line, uow)
except (model.OutOfStock, services.InvalidSku) as e:
send_mail(
'out of stock',
'[email protected]',
f'{line.orderid} - {line.sku}'
)
return jsonify({'message': str(e)}), 400
return jsonify({'batchref': batchref}), 201
然而,這種做法很容易導致混亂。傳送電子郵件並不是 HTTP 層的工作,我們希望能夠對這個新功能進行單元測試。
也不該讓領域模型變得混亂
假設我們不希望將這個程式碼放在網頁控制器中,因為我們希望它們盡可能地薄,我們可能會考慮將其放在領域模型中:
# src/allocation/domain/model.py
def allocate(self, line: OrderLine) -> str:
try:
batch = next(
b for b in sorted(self.batches) if b.can_allocate(line)
)
# ...
except StopIteration:
email.send_mail('[email protected]', f'Out of stock for {line.sku}')
raise OutOfStock(f'Out of stock for sku {line.sku}')
但這更糟糕!我們不希望我們的領域模型對基礎設施問題(如 email.send_mail)有任何依賴。
服務層也不是最佳選擇
需求「嘗試分配一些庫存,如果失敗則傳送電子郵件」是工作流程協調的一個例子:它是系統為了實作目標而需要遵循的一系列步驟。
我們已經編寫了一個服務層來管理協調,但即使在這裡,這個功能也感覺不合適:
# src/allocation/service_layer/services.py
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f'Invalid sku {line.sku}')
try:
batchref = product.allocate(line)
uow.commit()
return batchref
except model.OutOfStock:
email.send_mail('[email protected]', f'Out of stock for {line.sku}')
raise
捕捉異常並重新引發它?這仍然讓我們感到不滿意。為什麼這麼難找到一個合適的地方來放置這個程式碼?
單一責任原則
其實,這違反了單一責任原則(SRP)。我們的使用案例是分配。我們的 endpoint、服務函式和領域方法都被稱為 allocate,而不是 allocate_and_send_mail_if_out_of_stock。
小技巧:
經驗法則:如果無法在不使用諸如「然後」或「和」之類別的詞語的情況下描述函式的功能,那麼可能就違反了 SRP。
SRP 的一個表述是每個類別應該只有一個改變的理由。當我們從電子郵件切換到簡訊時,我們不應該需要更新我們的 allocate() 函式,因為這明顯是一個單獨的責任。
領域事件和訊息匯流排
我們將介紹的模式是領域事件和訊息匯流排。我們可以用幾種方式實作它們,所以我們將展示幾種方法,然後再確定我們最喜歡的一種。
模型記錄事件
首先,我們的模型將負責記錄事件——關於已經發生的事情的事實。我們將使用一個訊息匯流排來回應事件並呼叫新的操作。
事件是簡單的資料類別:
# src/allocation/domain/events.py
from dataclasses import dataclass
class Event:
pass
@dataclass
class OutOfStock(Event):
sku: str
當我們的領域模型記錄一個已經發生的事實時,我們說它引發了一個事件。
測試我們的聚合根以引發事件
# tests/unit/test_product.py
def test_records_out_of_stock_event_if_cannot_allocate():
batch = Batch('batch1', 'SMALL-FORK', 10, eta=today)
product = Product(sku="SMALL-FORK", batches=[batch])
product.allocate(OrderLine('order1', 'SMALL-FORK', 10))
allocation = product.allocate(OrderLine('order2', 'SMALL-FORK', 1))
assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK")
assert allocation is None
我們的聚合根將公開一個名為 .events 的新屬性,它將包含一個關於已經發生的事情的事實列表,以 Event 物件的形式存在。
模型引發領域事件
# src/allocation/domain/model.py
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
self.sku = sku
self.batches = batches
self.version_number = version_number
self.events = [] # type: List[events.Event]
def allocate(self, line: OrderLine) -> str:
try:
# ...
except StopIteration:
self.events.append(events.OutOfStock(line.sku))
# raise OutOfStock(f'Out of stock for sku {line.sku}')
return None
內容解密:
- 事件類別的定義:事件是簡單的資料類別,用於記錄領域中已經發生的事實。
- 模型的事件屬性:模型的
events屬性用於儲存已經發生的事件列表。 - 引發事件:當模型的狀態發生變化時,例如庫存不足,它會引發一個事件並將其新增到
events列表中。 - 測試事件:測試使用案例驗證模型在無法分配庫存時是否正確引發了
OutOfStock事件。