返回文章列表

領域模型與並發控制

本文探討領域模型的重要性,說明如何使用聚合根來維護資料一致性,並深入講解樂觀並發控制和重試機制,確保在高並發環境下資料函式庫操作的完整性。文章以產品分配案例示範如何設計領域模型、實作版本號以及處理並發衝突,提供程式碼範例和測試案例,讓讀者能實際應用這些技巧。

軟體設計 領域驅動設計

在軟體開發中,資料一致性是至關重要的。尤其在高並發的環境下,如何確保資料函式庫操作的正確性更是一大挑戰。本文將探討如何運用領域驅動設計(DDD)的理念,結合樂觀並發控制和重試機制,開發一個穩固的系統。領域模型並非僅是程式碼的抽象表示,它更體現了業務邏輯的核心。透過定義聚合根,我們可以有效地劃分領域模型的邊界,並確保資料在事務結束時保持一致。以產品分配為例,我們可以將產品作為聚合根,包含其所有批次資訊,並透過聚合根上的方法來管理分配邏輯,避免直接操作內部物件,從而降低資料不一致的風險。

為什麼不直接在試算表中執行所有事務?

我們到底為何需要領域模型?根本問題是什麼?難道我們不能直接在試算表中執行所有事務嗎?許多使用者會很高興看到這種情況。業務使用者喜歡試算表,因為它們簡單、熟悉且功能強大。

事實上,許多業務流程都是透過電子郵件手動傳送試算表來運作的。這種「CSV over SMTP」架構初始複雜度低,但往往無法很好地擴充套件,因為很難應用邏輯和保持一致性。

誰被允許檢視特定欄位?誰被允許更新它?當我們嘗試訂購-350張椅子或10,000,000張桌子時會發生什麼?員工的薪水可以是負數嗎?

這些都是系統的約束條件。我們編寫的大部分領域邏輯都用於強制執行這些約束條件,以維護系統的不變數(invariants)。不變數是指每次完成操作時必須為真的條件。

不變數、約束條件和一致性

這兩個詞在某些程度上是可以互換的,但約束條件是一條規則,限制了我們的模型可能達到的狀態,而不變數則被更精確地定義為始終為真的條件。

如果我們正在編寫一個酒店預訂系統,我們可能會有一個約束條件,即不允許重複預訂。這支援了一個不變數,即一個房間在同一晚不能有多於一個預訂。

當然,有時我們可能需要暫時彎曲規則。也許我們需要因為VIP預訂而調整房間。在記憶體中移動預訂時,我們可能會重複預訂,但我們的領域模型應該確保,當我們完成時,我們最終會處於一個最終一致的狀態,即滿足不變數。如果我們無法找到一種方法來容納所有客人,我們應該引發錯誤並拒絕完成操作。

讓我們來看幾個來自業務需求的具體例子;我們先從這個開始:

一個訂單行一次只能分配給一個批次。 ——業務需求

這是一個業務規則,它強制執行了一個不變數。不變數是指一個訂單行被分配給零個或一個批次,但永遠不會多於一個。我們需要確保我們的程式碼永遠不會意外地對同一行呼叫Batch.allocate()兩次不同的批次,目前,沒有任何東西明確阻止我們這樣做。

不變數、並發性和鎖定

讓我們來看另一個業務規則:

如果批次的可用數量少於訂單行的數量,我們就不能將其分配給該批次。 ——業務需求

這裡的約束條件是,我們不能將超過可用數量的庫存分配給一個批次,因此我們永遠不會透過將兩個客戶分配給同一個實體枕頭來超賣庫存。每次我們更新系統狀態時,我們的程式碼都需要確保我們不會破壞不變數,即可用數量必須大於或等於零。

在單執行緒、單使用者應用程式中,維護這個不變數相對容易。我們可以一次分配一個訂單行的庫存,如果沒有庫存可用,就引發錯誤。

當我們引入並發性概念時,這變得更加困難。突然間,我們可能會同時為多個訂單行分配庫存。我們甚至可能在處理批次本身的更改時同時分配訂單行。

我們通常透過對資料函式庫表施加鎖定來解決這個問題。這可以防止兩個操作同時發生在同一行或同一表上。

當我們開始考慮擴充套件我們的應用程式時,我們意識到我們的將訂單行分配給所有可用批次的模型可能無法擴充套件。如果我們每小時處理成千上萬的訂單和數十萬的訂單行,我們無法在每個訂單行上鎖定整個批次表——至少會出現死鎖或效能問題。

什麼是聚合?

好吧,如果我們不能在每次想要分配訂單行時鎖定整個資料函式庫,我們該怎麼辦?我們希望保護系統的不變數,但允許最大程度的並發性。維護不變數不可避免地意味著防止並發寫入;如果多個使用者可以同時分配DEADLY-SPOON,我們就有可能過度分配。

另一方面,沒有理由我們不能同時分配DEADLY-SPOON和FLIMSY-DESK。同時分配兩個產品是安全的,因為沒有涵蓋它們兩者的不變數。我們不需要它們彼此一致。

聚合模式是DDD社群中的一種設計模式,可以幫助我們解決這個問題。聚合只是一個領域物件,它包含其他領域物件,並允許我們將整個集合視為單個單元。

修改聚合內部物件的唯一方法是載入整個聚合,並呼叫聚合本身的方法。

隨著模型變得越來越複雜,並增長出更多的實體和值物件,它們之間相互參照,形成了一個糾纏的圖形。要跟蹤誰可以修改什麼變得越來越困難。尤其是當模型中有集合(如我們的批次)時,指定某些實體作為修改其相關物件的單一入口點是一個好主意。這使得系統在概念上更簡單,也更容易推理。

例如,如果我們正在構建一個購物網站,購物車可能是一個很好的聚合:它是一個專案集合,我們可以將其視為單個單元。

重要的是,我們希望將整個購物籃作為單個資料塊從資料儲存中載入。我們不希望兩個請求同時修改購物籃,否則就有可能出現奇怪的並發錯誤。相反,我們希望每次對購物籃的更改都在單個資料函式庫事務中執行。

我們不想在一個事務中修改多個購物籃,因為沒有使用案例需要在同一時間更改多個客戶的購物籃。每個購物籃都是一個單獨的一致性邊界,負責維護自己的不變數。

聚合是一簇相關物件,我們將其視為資料更改的單元。 ——Eric Evans,《領域驅動設計》藍皮書

根據Evans的說法,我們的聚合有一個根實體(購物車),它封裝了對專案的存取。每個專案都有自己的身份,但系統的其他部分始終只會將購物車視為一個不可分割的整體。

選擇合適的聚合根(Aggregate)

在領域驅動設計(DDD)中,聚合根是一個非常重要的概念。它定義了領域模型中物件之間的邊界,確保在這個邊界內的所有物件在事務結束時保持一致的狀態。

為什麼需要聚合根?

聚合根的主要目的是為了確保領域模型的一致性和完整性。它幫助我們劃分領域模型中的物件,將相關的物件聚集在一起,形成一個事務邊界。這樣可以避免複雜的事務問題和競爭條件。

如何選擇聚合根?

選擇聚合根需要根據具體的業務需求和領域模型進行分析。在我們的例子中,我們需要處理批次(Batch)和訂單明細(OrderLine)之間的分配問題。經過分析,我們發現可以使用產品(Product)作為聚合根,因為它包含了與特定SKU相關的所有批次。

定義產品聚合根

# src/allocation/domain/model.py
class Product:
    def __init__(self, sku: str, batches: List[Batch]):
        self.sku = sku
        self.batches = batches

    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(
                b for b in sorted(self.batches) if b.can_allocate(line)
            )
            batch.allocate(line)
            return batch.reference
        except StopIteration:
            raise OutOfStock(f'Out of stock for sku {line.sku}')

內容解密:

  1. __init__ 方法:初始化產品物件,需要skubatches兩個引數。sku是產品的唯一識別碼,batches是與該產品相關的所有批次。
  2. allocate 方法:將訂單明細分配給合適的批次。首先,它會遍歷所有批次,找到第一個可以滿足訂單明細需求的批次。如果找到合適的批次,就將訂單明細分配給該批次,並傳回批次的參考碼。如果沒有找到合適的批次,則丟擲OutOfStock異常,表示該SKU的庫存不足。

聚合根與倉儲(Repository)的關係

在DDD中,每個聚合根都應該有對應的倉儲。倉儲負責封裝資料存取邏輯,提供對聚合根的查詢和持久化操作。

# unit_of_work.py 和 repository.py
class AbstractUnitOfWork(abc.ABC):
    products: repository.AbstractProductRepository
    ...

class AbstractProductRepository(abc.ABC):
    @abc.abstractmethod
    def add(self, product):
        ...

    @abc.abstractmethod
    def get(self, sku) -> model.Product:
        ...

內容解密:

  1. AbstractUnitOfWork 類別:定義了工作單元的介面,包含了對產品倉儲的參照。
  2. AbstractProductRepository 類別:定義了產品倉儲的介面,提供了新增和取得產品的方法。

有界上下文(Bounded Context)的重要性

有界上下文是DDD中的另一個重要概念。它定義了領域模型的邊界,在這個邊界內,領域模型是明確和一致的。不同的有界上下文可以有不同的領域模型,即使它們處理的是相同的業務實體。

在我們的例子中,分配服務(Allocation Service)有自己的產品模型,只包含與分配邏輯相關的屬性,如skubatches。而電子商務服務(Ecommerce Service)可能有不同的產品模型,包含更多的屬性,如descriptionpriceimage_url

服務層的領域模型整合與效能考量

在前面的章節中,我們已經探討了領域模型的設計以及如何使用聚合根來維護資料的一致性。現在,讓我們將模型整合到服務層中,看看以 Product 為主要入口點的實作方式。

服務層實作

src/allocation/service_layer/services.py 中,我們定義了服務層的函式:

def add_batch(
    ref: str, sku: str, qty: int, eta: Optional[date],
    uow: unit_of_work.AbstractUnitOfWork
):
    with uow:
        product = uow.products.get(sku=sku)
        if product is None:
            product = model.Product(sku, batches=[])
            uow.products.add(product)
        product.batches.append(model.Batch(ref, sku, qty, eta))
        uow.commit()

def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f'Invalid sku {line.sku}')
        batchref = product.allocate(line)
        uow.commit()
        return batchref

內容解密:

  1. add_batch 函式:負責新增批次到指定的產品中。首先檢查產品是否存在,若不存在則建立新的產品並加入批次,最後提交變更。

    • 使用 unit_of_work 模式來管理資料函式庫的交易。
    • 透過 uow.products.get(sku=sku) 來取得產品,若產品不存在則建立新的 Product 物件。
    • 將新的批次加入產品的 batches 列表中,並提交變更。
  2. allocate 函式:負責為訂單分配批次。首先檢查產品是否存在,若不存在則丟擲 InvalidSku 例外,最後提交變更並傳回分配的批次參考。

    • 建立 OrderLine 物件來代表訂單專案。
    • 取得產品並呼叫 product.allocate(line) 來分配批次。
    • 提交變更並傳回分配的批次參考。

效能考量

雖然我們載入了所有批次,但這裡有幾個原因讓我們對效能保持樂觀:

  1. 單一查詢與更新:我們的領域模型設計使得我們可以對資料函式庫進行單一查詢來讀取資料,並進行單一更新來儲存變更。這種方式通常比發出多個臨時查詢的系統更有效率。

  2. 最小化的資料結構:我們的資料結構很簡單,主要包含一些字串和整數。因此,即使載入數十或數百個批次,也能在幾毫秒內完成。

  3. 有限的批次數量:我們預期每個產品同時存在的批次數量有限(約20個左右)。當一個批次用完後,可以從計算中排除,因此資料量不會無限制增長。

如果預期會有數千個活躍批次,可以考慮使用延遲載入技術。這樣可以在背景中使用 SQLAlchemy 分頁處理資料,從而減少每次請求的資料量。

版本號與樂觀平行控制

為了在資料函式庫層級強制資料完整性,我們可以使用版本號來實作樂觀平行控制。其原理是為 Product 模型新增一個版本屬性,當多個交易同時讀取和修改資料時,只有一個交易能夠成功提交變更。

圖示說明:

@startuml
note
  無法自動轉換的 Plantuml 圖表
  請手動檢查和調整
@enduml

此圖示說明瞭兩個交易同時嘗試更新 Product 的情況。由於使用了版本號,只有第一個提交的交易能夠成功,第二個交易會因為版本號衝突而失敗。

版本號實作:

版本號是實作樂觀鎖定的一種方式。除了使用版本號,也可以使用其他方法,例如將 Postgres 交易隔離級別設為 SERIALIZABLE,但這可能會帶來嚴重的效能損失。

樂觀並發控制與重試機制

在資料函式庫操作中,當多個使用者嘗試對同一資料進行修改時,如何處理並發衝突是一個重要的議題。本篇文章將探討樂觀並發控制(Optimistic Concurrency Control)和重試機制(Retries),以及它們在實際應用中的實作方式。

樂觀並發控制 vs. 悲觀並發控制

樂觀並發控制假設多個使用者對資料函式庫的修改通常不會發生衝突,因此允許他們同時進行修改,並在提交時檢查是否有衝突。如果發生衝突,則會丟擲錯誤。另一方面,悲觀並發控制則假設多個使用者之間會發生衝突,因此會對資料函式庫進行鎖定,以防止其他使用者進行修改,直到目前的操作完成為止。

程式碼範例:樂觀並發控制的實作

class Product:
    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number

    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
            batch.allocate(line)
            self.version_number += 1
            return batch.reference
        except StopIteration:
            raise OutOfStock(f'Out of stock for sku {line.sku}')

內容解密:

  1. Product類別的初始化Product類別包含skubatchesversion_number屬性。version_number用於記錄產品的版本號,預設為0。
  2. allocate方法的邏輯:當對產品進行分配時,首先檢查是否有可用的批次。如果有,則進行分配,並將version_number加1。如果沒有可用的批次,則丟擲OutOfStock錯誤。

版本號的實作選項

有三種方式可以實作版本號:

  1. 將版本號放在領域模型中,由領域模型負責更新。
  2. 由服務層負責更新版本號。
  3. 由工作單元(Unit of Work)和儲存函式庫(Repository)自動更新版本號。

版本號實作的比較

實作方式優點缺點
領域模型清晰的領域邏輯需要額外的領域邏輯
服務層簡單的實作混合了領域邏輯和服務邏輯
工作單元/儲存函式庫自動化可能會不必要地更新版本號

測試並發控制的行為

為了測試並發控制的行為,我們可以使用多執行緒來模擬兩個並發的操作。

程式碼範例:測試並發更新

def try_to_allocate(orderid, sku, exceptions):
    line = model.OrderLine(orderid, sku, 10)
    try:
        with unit_of_work.SqlAlchemyUnitOfWork() as uow:
            product = uow.products.get(sku=sku)
            product.allocate(line)
            time.sleep(0.2)
            uow.commit()
    except Exception as e:
        print(traceback.format_exc())
        exceptions.append(e)

def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory):
    sku, batch = random_sku(), random_batchref()
    session = postgres_session_factory()
    insert_batch(session, batch, sku, 100, eta=None, product_version=1)
    session.commit()

    order1, order2 = random_orderid(1), random_orderid(2)
    exceptions = []  # type: List[Exception]
    try_to_allocate_order1 = lambda: try_to_allocate(order1, sku, exceptions)
    try_to_allocate_order2 = lambda: try_to_allocate(order2, sku, exceptions)

    thread1 = threading.Thread(target=try_to_allocate_order1)
    thread2 = threading.Thread(target=try_to_allocate_order2)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

    [[version]] = session.execute(
        "SELECT version_number FROM products WHERE sku=:sku",
        dict(sku=sku),
    )
    assert version == 2
    [exception] = exceptions
    assert 'could not serialize access due to concurrent update' in str(exception)

內容解密:

  1. try_to_allocate函式:模擬一個分配操作,並在提交後暫停0.2秒,以模擬一個較慢的操作。
  2. test_concurrent_updates_to_version_are_not_allowed測試:使用兩個執行緒來模擬兩個並發的分配操作。檢查是否只有一個操作成功,並且版本號正確更新。