返回文章列表

非同步訊息與CQRS提升系統解耦

本文探討如何運用 Connascence 理論、非同步訊息傳遞和 CQRS 模式來降低系統耦合,提升微服務架構的彈性與可維護性。文中以 Redis Pub/Sub 作為範例,說明如何實作事件驅動架構,並透過端對端測試確保系統整合的正確性。此外,文章也討論了資料一致性與系統效能的權衡,以及如何利用

系統設計 Web 開發

在微服務架構中,系統間的耦合度是影響系統彈性和可維護性的關鍵因素。本文將探討如何利用 Connascence 理論分析系統耦合的型別,並結合非同步訊息傳遞機制和 CQRS 模式,有效降低系統間的依賴性。我們將以 Redis Pub/Sub 頻道作為非同步通訊的例項,示範如何構建事件驅動的微服務架構,並透過端對端測試驗證系統的正確性。同時,我們也將探討在讀取密集型應用中,如何權衡資料一致性和系統效能,並介紹如何應用 CQS 原則和 Post/Redirect/Get 模式最佳化 API 設計,提升系統的整體效能和可維護性。

降低系統間的耦合:Connascence 與非同步訊息傳遞

在設計微服務架構時,降低系統間的耦合(coupling)是至關重要的。耦合描述了系統各部分之間的相互依賴程度,而 Connascence 則進一步將耦合細分為不同型別,以幫助我們理解不同架構風格下的耦合強度與型別。

Connascence:理解耦合的強度與型別

Connascence 並非全然負面,但某些型別的 Connascence 會比其他型別更強。我們希望在區域性範圍內保持較強的 Connascence,例如兩個緊密相關的類別之間;而在較遠的距離下,則希望保持較弱的 Connascence。

執行順序的 Connascence

在第一個分散式泥球(Distributed Ball of Mud)範例中,我們觀察到 執行順序的 Connascence(Connascence of Execution):多個元件需要知道正確的工作順序,以確保操作的成功。

時序的 Connascence

在處理錯誤條件時,我們面臨 時序的 Connascence(Connascence of Timing):多個事件必須按照特定順序發生,操作才能成功。

名稱的 Connascence

當我們用事件(events)取代 RPC 風格的系統呼叫時,我們將上述兩種 Connascence 替換為更弱的 名稱的 Connascence(Connascence of Name):多個元件只需就事件名稱及攜帶的欄位名稱達成共識。

使用非同步訊息傳遞實作時間解耦

那麼,如何實作適當的耦合呢?其中一個方法是使用動詞(verbs)而非名詞(nouns)來思考系統設計。我們的領域模型關注的是業務流程的建模,而非靜態的資料模型。

以動詞為中心的系統設計

我們不再將系統視為「訂單系統」和「批次系統」,而是思考「下單」和「分配」的過程。這種思維方式有助於明確各系統的職責。當我們專注於「下單」時,首要任務是確保訂單被成功下達;其他流程可以稍後進行,只要它們最終完成即可。

使用非同步訊息傳遞降低耦合

為了避免分散式泥球反模式,我們使用非同步訊息傳遞來整合系統,而不是依賴時間上耦合的 HTTP API 呼叫。這樣一來,當一個系統發生故障時,其他系統仍可繼續運作,實作了更好的容錯能力。

使用 Redis Pub/Sub 頻道進行系統整合

在具體實作上,我們需要一個訊息代理(message broker)來傳遞事件。Redis 的 Pub/Sub 頻道可以用於此目的。我們的流程如下圖所示:Redis 提供 BatchQuantityChanged 事件來啟動整個流程,而我們的 Allocated 事件則會被釋出回 Redis,供下游系統監聽。

圖示:重新分配流程的序列圖

@startuml
note
  無法自動轉換的 Plantuml 圖表
  請手動檢查和調整
@enduml

此圖示展示了事件如何透過 Redis Pub/Sub 在不同系統間流轉。

使用端對端測試驅動開發

我們可以編寫端對端測試來驗證整個流程。以 test_change_batch_quantity_leading_to_reallocation 為例,我們測試了當批次數量變更時,系統是否能正確地重新分配訂單,並透過 Redis Pub/Sub 釋出相關事件。

程式碼範例:端對端測試

def test_change_batch_quantity_leading_to_reallocation():
    # 建立兩個批次和一個已分配的訂單
    orderid, sku = random_orderid(), random_sku()
    earlier_batch, later_batch = random_batchref('old'), random_batchref('newer')
    api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta='2011-01-02')
    api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-02')
    response = api_client.post_to_allocate(orderid, sku, 10)
    assert response.json()['batchref'] == earlier_batch
    
    # 變更已分配批次的數量
    redis_client.publish_message('change_batch_quantity', {
        'batchref': earlier_batch, 'qty': 5
    })
    
    # 等待重新分配事件
    messages = []
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                messages.append(message)
            data = json.loads(messages[-1]['data'])
            assert data['orderid'] == orderid
            assert data['batchref'] == later_batch

內容解密:

  1. 測試案例:此測試案例模擬了一個場景:當已分配批次的數量變更時,系統是否能正確地重新分配訂單。
  2. API 呼叫:使用 api_client 建立兩個批次並分配訂單。
  3. Redis 訊息傳遞:透過 redis_client 釋出 change_batch_quantity 事件,模擬批次數量的變更。
  4. 事件監聽:使用 subscription.get_message 監聽 line_allocated 頻道,以取得重新分配的事件。
  5. 斷言驗證:驗證收到的事件中包含正確的訂單 ID 和批次參考碼。

Redis Pub/Sub 作為訊息匯流排的包裝

我們的 Redis Pub/Sub 監聽器(稱為事件消費者)類別似於 Flask,將外部事件轉譯為我們的內部事件。這種設計使得我們的系統能夠與外部系統解耦,並實作更好的擴充套件性和容錯能力。

簡化Redis訊息監聽器與發布器實作

在系統設計中,事件驅動架構扮演著至關重要的角色。透過將事件處理與系統內部邏輯解耦,我們能夠實作更高的靈活性和可擴充套件性。本篇文章將探討如何利用Redis實作訊息的監聽與發布,並進一步闡述事件驅動架構在微服務中的應用。

Redis訊息監聽器實作

首先,我們來看看如何實作一個簡單的Redis訊息監聽器。以下為redis_eventconsumer.py的程式碼範例:

r = redis.Redis(**config.get_redis_host_and_port())

def main():
    orm.start_mappers()
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe('change_batch_quantity')
    for m in pubsub.listen():
        handle_change_batch_quantity(m)

def handle_change_batch_quantity(m):
    logging.debug('handling %s', m)
    data = json.loads(m['data'])
    cmd = commands.ChangeBatchQuantity(ref=data['batchref'], qty=data['qty'])
    messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())

main()

程式碼解析:

  • 初始化Redis連線並訂閱指定的頻道(change_batch_quantity)。
  • 透過pubsub.listen()持續監聽頻道中的訊息。
  • 當接收到訊息時,呼叫handle_change_batch_quantity進行處理。
  • 在處理函式中,將JSON格式的訊息轉換為命令物件,並交由訊息匯流排(messagebus)處理。

Redis訊息發布器實作

接下來,我們探討如何實作Redis訊息發布器。以下為redis_eventpublisher.py的程式碼範例:

r = redis.Redis(**config.get_redis_host_and_port())

def publish(channel, event: events.Event):
    logging.debug('publishing: channel=%s, event=%s', channel, event)
    r.publish(channel, json.dumps(asdict(event)))

程式碼解析:

  • 定義publish函式,接受頻道名稱和事件物件作為引數。
  • 將事件物件轉換為字典並序列化為JSON格式。
  • 透過Redis的publish方法將訊息發布到指定的頻道。

事件驅動架構的優勢與挑戰

事件驅動架構能夠有效地解耦系統內部的各個元件,提高系統的擴充套件性和維護性。然而,這種架構也帶來了一些挑戰,例如事件的一致性和可靠性問題。

優勢:

  • 降低耦合度:各個服務之間透過事件進行非同步通訊,降低了直接依賴。
  • 提高擴充套件性:能夠更容易地新增新的服務或修改現有服務,而不影響其他部分。

挑戰:

  • 事件一致性:需要處理事件丟失或重複消費的問題。
  • 除錯困難:由於事件是非同步處理的,除錯和追蹤問題變得更加複雜。

讀取一致性與系統設計的權衡

在現代軟體系統中,特別是在處理高並發和高用性的場景下,如何平衡資料的一致性和系統的效能,是一個重要的設計挑戰。本文將探討在讀取密集的應用場景下,如何透過犧牲一定的資料一致性來提升系統的效能。

系統設計的兩面:讀取與寫入

在大多數業務系統中,讀取和寫入操作具有不同的特性和需求。寫入操作通常涉及複雜的業務邏輯,需要確保資料的一致性和正確性。相比之下,讀取操作則更關注效能和可用性。

寫入操作的複雜性

在處理訂單分配等業務邏輯時,系統需要確保資料的正確性和一致性。這需要引入諸如工作單元(Unit of Work)和聚合根(Aggregate)等設計模式,以保證資料的完整性。

def test_allocating_to_a_batch_reduces_the_available_quantity():
    batch = Batch("batch-001", "SMALL-TABLE", qty=20, eta=date.today())
    line = OrderLine('order-ref', "SMALL-TABLE", 2)
    batch.allocate(line)
    assert batch.available_quantity == 18

def test_cannot_allocate_if_available_smaller_than_required():
    small_batch, large_line = make_batch_and_line("ELEGANT-LAMP", 2, 20)
    assert small_batch.can_allocate(large_line) is False

內容解密:

  1. test_allocating_to_a_batch_reduces_the_available_quantity 測試函式驗證了當一個訂單行被分配到批次時,批次的可用數量是否正確減少。
  2. test_cannot_allocate_if_available_smaller_than_required 測試函式檢查當批次的可用數量小於訂單行的需求數量時,是否無法進行分配。

讀取操作的最佳化

在許多業務場景中,讀取操作的頻率遠遠高於寫入操作。例如,在電子商務網站中,使用者瀏覽商品的頻率遠高於下單的頻率。對於這些場景,可以透過犧牲一定的資料一致性來提升讀取操作的效能。

資料一致性的權衡

在分散式系統中,完全保持資料的一致性是非常困難的。即使在理想情況下,當使用者存取商品頁面時,資料也可能已經是過時的。因此,在讀取操作中犧牲一定的資料一致性是可接受的。

最終一致性

透過採用最終一致性的策略,可以在保證系統可用性和效能的同時,最大限度地減少資料不一致帶來的影響。

CQRS與Post/Redirect/Get模式

CQRS(命令查詢責任分離)模式是一種將命令(寫入)和查詢(讀取)分離的架構模式。這種模式可以幫助開發者更好地最佳化讀取和寫入操作的效能。

Post/Redirect/Get是一種常見的Web開發模式,用於避免使用者重複提交表單。透過將寫入操作與讀取操作分離,可以提高系統的整體效能和可用性。

應用命令查詢分離(CQS)原理最佳化API設計

在開發API時,我們經常遇到兩個主要問題:客戶端在嘗試GET一個POST端點時會得到一個損壞的頁面;瀏覽器歷史記錄和書籤功能會被破壞。這些問題的根源在於我們在回應寫入操作時傳回了資料。Post/Redirect/Get技術透過分離讀取和寫入階段來解決這個問題。

命令查詢分離(CQS)原理

CQS是一種簡單的設計原則:函式應該要麼修改狀態,要麼回答問題,但不能同時做兩件事。這使得軟體更容易被理解:我們應該總是能夠詢問“燈是否開啟?”而無需切換燈的開關。

CQS在API設計中的應用

在構建API時,我們可以透過傳回201 Created或202 Accepted狀態碼,並在Location頭中包含新資源的URI來應用相同的設計技術。重要的是將工作邏輯地分成寫入階段和查詢階段。

修正現有程式碼中的CQS違規

過去,我們引入了一個allocate端點,該端點接受訂單並呼叫服務層來分配庫存。在呼叫結束時,我們傳回200 OK和批次ID。這導致了一些醜陋的設計缺陷,以便我們能夠獲得所需的資料。讓我們將其更改為傳回簡單的OK訊息,並提供一個新的只讀端點來檢索分配狀態。

API測試例項

@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_202_and_batch_is_allocated():
    orderid = random_orderid()
    sku, othersku = random_sku(), random_sku('other')
    earlybatch = random_batchref(1)
    laterbatch = random_batchref(2)
    otherbatch = random_batchref(3)
    api_client.post_to_add_batch(laterbatch, sku, 100, '2011-01-02')
    api_client.post_to_add_batch(earlybatch, sku, 100, '2011-01-01')
    api_client.post_to_add_batch(otherbatch, othersku, 100, None)
    r = api_client.post_to_allocate(orderid, sku, qty=3)
    assert r.status_code == 202
    r = api_client.get_allocation(orderid)
    assert r.ok
    assert r.json() == [
        {'sku': sku, 'batchref': earlybatch},
    ]

Flask應用中的新端點

@app.route("/allocations/<orderid>", methods=['GET'])
def allocations_view_endpoint(orderid):
    uow = unit_of_work.SqlAlchemyUnitOfWork()
    result = views.allocations(orderid, uow)
    if not result:
        return 'not found', 404
    return jsonify(result), 200

使用Raw SQL實作檢視

def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
    with uow:
        results = list(uow.session.execute(
            'SELECT ol.sku, b.reference'
            ' FROM allocations AS a'
            ' JOIN batches AS b ON a.batch_id = b.id'
            ' JOIN order_lines AS ol ON a.orderline_id = ol.id'
            ' WHERE ol.orderid = :orderid',
            dict(orderid=orderid)
        ))
        return [{'sku': sku, 'batchref': batchref} for sku, batchref in results]

內容解密:

  1. allocations函式:該函式負責根據訂單ID檢索分配狀態。它使用unit_of_work.SqlAlchemyUnitOfWork來執行資料函式庫操作。
  2. SQL查詢:查詢透過連線allocationsbatchesorder_lines表來檢索相關的SKU和批次參考資訊。
  3. 引數繫結:使用:orderid作為引數繫結,以防止SQL注入攻擊。
  4. 結果轉換:查詢結果被轉換為字典列表,以便於JSON序列化。

測試CQS檢視

def test_allocations_view(sqlite_session_factory):
    uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
    messagebus.handle(commands.CreateBatch('sku1batch', 'sku1', 50, None), uow)
    messagebus.handle(commands.CreateBatch('sku2batch', 'sku2', 50, today), uow)
    messagebus.handle(commands.Allocate('order1', 'sku1', 20), uow)
    messagebus.handle(commands.Allocate('order1', 'sku2', 20), uow)
    # 新增額外的批次和訂單以確保檢索正確的資料
    messagebus.handle(commands.CreateBatch('sku1batch-later', 'sku1', 50, today), uow)
    messagebus.handle(commands.Allocate('otherorder', 'sku1', 30), uow)
    messagebus.handle(commands.Allocate('otherorder', 'sku2', 10), uow)
    assert views.allocations('order1', uow) == [
        {'sku': 'sku1', 'batchref': 'sku1batch'},
        {'sku': 'sku2', 'batchref': 'sku2batch'},
    ]

內容解密:

  1. 測試設定:使用messagebus.handle來設定測試資料,這保持了測試與實作細節的解耦。
  2. 斷言:驗證views.allocations函式傳回的結果是否符合預期。