返回文章列表

事件與命令差異處理機制與可靠性設計

本文探討事件與命令在系統設計中的差異處理機制,著重於錯誤處理和系統一致性。文章分析了命令和事件的定義、訊息匯流排的處理機制,以及事件與命令的錯誤處理策略。同時,也探討了在事件驅動架構下如何透過錯誤隔離和同步錯誤還原機制提高系統可靠性,並以程式碼範例說明如何使用 Redis

系統設計 軟體架構

在現代軟體架構中,事件驅動架構已成為主流,特別是在微服務環境下。理解事件和命令的差異,以及如何有效地處理它們是構建可靠分散式系統的關鍵。事件代表已發生的事實,而命令則代表使用者的操作請求。訊息匯流排作為系統的核心元件,負責根據訊息型別將其分派給相應的處理器。命令處理的失敗會立即丟擲異常,而事件處理的失敗則允許繼續執行其他事件處理器,以確保系統的彈性。透過定義聚合根作為一致性邊界,可以確保操作的原子性,避免系統狀態不一致。同時,利用同步錯誤還原機制,例如重試機制和錯誤日誌記錄,可以進一步提高系統的可靠性。

事件與命令的差異處理機制

在系統設計中,事件(Event)與命令(Command)的處理方式存在顯著差異,這種差異主要體現在錯誤處理和系統一致性上。

命令與事件的定義

首先,我們定義了命令和事件的資料結構:

@dataclass
class CreateBatch(Command):
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

@dataclass
class ChangeBatchQuantity(Command):
    ref: str
    qty: int

這些命令物件代表了系統中需要執行的操作,如建立批次或更改批次數量。

訊息匯流排的處理機制

訊息匯流排(Message Bus)是系統中負責處理事件和命令的核心元件。它的主要功能是根據訊息型別將訊息分派給相應的處理器:

Message = Union[commands.Command, events.Event]

def handle(message: Message, uow: unit_of_work.AbstractUnitOfWork):
    results = []
    queue = [message]
    while queue:
        message = queue.pop(0)
        if isinstance(message, events.Event):
            handle_event(message, queue, uow)
        elif isinstance(message, commands.Command):
            cmd_result = handle_command(message, queue, uow)
            results.append(cmd_result)
        else:
            raise Exception(f'{message} was not an Event or Command')
    return results

事件處理

事件處理允許多個處理器對同一個事件做出反應,並且不會因為某個處理器的失敗而中斷整個事件處理流程:

def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork
):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            logger.debug('handling event %s with handler %s', event, handler)
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception('Exception handling event %s', event)
            continue

命令處理

命令處理則不同,它期望每個命令只有一個處理器,並且如果處理過程中出現異常,會立即丟擲異常並終止處理流程:

def handle_command(
    command: commands.Command,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork
):
    logger.debug('handling command %s', command)
    try:
        handler = COMMAND_HANDLERS[type(command)]
        result = handler(command, uow=uow)
        queue.extend(uow.collect_new_events())
        return result
    except Exception:
        logger.exception('Exception handling command %s', command)
        raise

事件與命令的錯誤處理討論

在系統設計中,錯誤處理是一個重要的議題。當事件處理失敗時,系統如何保持一致性?如果在處理事件過程中出現異常,系統狀態可能會變得不一致。

一致性邊界的重要性

在我們的分配服務中,我們已經透過識別聚合根(Aggregate)作為一致性邊界來避免這種問題。例如,當我們分配庫存給訂單時,產品聚合根確保了操作的原子性,要麼成功,要麼失敗,不會出現部分更新的情況。

事件與命令的分離原因

將訊息分為命令和事件有助於更好地管理系統的一致性。命令代表了使用者的操作請求,它應該修改單一的聚合根,並且要麼完全成功,要麼完全失敗。事件則代表了已經發生的事實,它可以用於更新其他的聚合根或執行額外的操作,而不會影響到命令的執行結果。

實際案例分析

考慮一個電子商務網站的例子,當顧客下了第三個訂單時,我們希望將其標記為VIP並傳送祝賀郵件。這裡,我們可以使用History聚合根來記錄訂單歷史,並在滿足條件時觸發CustomerBecameVIP事件:

class History:
    def __init__(self, customer_id: int):
        self.orders = set()
        self.customer_id = customer_id

    def record_order(self, order_id: str, order_amount: int):
        entry = HistoryEntry(order_id, order_amount)
        if entry in self.orders:
            return
        self.orders.add(entry)
        if len(self.orders) == 3:
            self.events.append(
                CustomerBecameVIP(self.customer_id)
            )

這個例子展示瞭如何透過事件和命令的分離來設計一個清晰且一致的系統。

內容解密:
  1. 事件與命令的區別:事件代表已經發生的事實,而命令代表使用者的操作請求。
  2. 訊息匯流排的作用:負責將訊息分派給相應的處理器。
  3. 錯誤處理機制:命令處理失敗會立即丟擲異常,而事件處理失敗則會繼續執行其他事件處理器。
  4. 一致性邊界的重要性:透過聚合根來確保操作的原子性,避免系統狀態不一致。
  5. 實際案例分析:透過History聚合根來記錄訂單歷史,並在滿足條件時觸發CustomerBecameVIP事件。

事件驅動系統中的錯誤處理與可靠性設計

在事件驅動系統中,如何處理錯誤並確保系統的可靠性是一個至關重要的課題。本文將探討在這樣的系統中,如何透過合理的設計和技術手段來提高錯誤處理的能力和系統的整體可靠性。

事件驅動架構下的錯誤隔離

首先,我們考慮一個簡單的事件驅動流程:當使用者下單時,系統會建立一個訂單並引發 OrderCreated 事件。接著,事件處理器會更新客戶的歷史記錄,如果客戶成為VIP,還會傳送祝賀郵件。

order = Order.from_basket(cmd.customer_id, cmd.basket_items)
uow.orders.add(order)
uow.commit()  # raises OrderCreated

def update_customer_history(uow, event: OrderCreated):
    with uow:
        history = uow.order_history.get(event.customer_id)
        history.record_order(event.order_id, event.order_amount)
        uow.commit()  # raises CustomerBecameVIP

def congratulate_vip_customer(uow, event: CustomerBecameVip):
    with uow:
        customer = uow.customers.get(event.customer_id)
        email.send(
            customer.email_address,
            f'Congratulations {customer.first_name}!'
        )

內容解密:

  1. 訂單建立:首先,系統建立一個新的訂單並將其新增到工作單元(Unit of Work)中,然後提交變更,觸發 OrderCreated 事件。
  2. 更新客戶歷史記錄:當 OrderCreated 事件被處理時,系統會更新客戶的歷史記錄。如果此操作成功並且客戶成為了VIP,則會引發 CustomerBecameVIP 事件。
  3. VIP祝賀郵件:最後,當 CustomerBecameVIP 事件被處理時,系統會向新VIP客戶傳送一封祝賀郵件。

透過將這些步驟分開處理,我們可以實作錯誤的隔離,從而提高系統的可靠性。例如,如果郵件伺服器暫時不可用,訂單建立和客戶歷史記錄更新仍可正常進行。

同步錯誤還原機制

為了進一步提高系統的可靠性,我們需要實作同步錯誤還原機制。這包括記錄錯誤日誌和使用重試機制。

def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork
):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            for attempt in Retrying(
                stop=stop_after_attempt(3),
                wait=wait_exponential()
            ):
                with attempt:
                    logger.debug('handling event %s with handler %s', event, handler)
                    handler(event, uow=uow)
                    queue.extend(uow.collect_new_events())
        except RetryError as retry_failure:
            logger.error(
                'Failed to handle event %s times, giving up!',
                retry_failure.last_attempt.attempt_number
            )
            continue

內容解密:

  1. 事件處理:事件處理器遍歷所有註冊的處理函式來處理事件。
  2. 重試機制:使用 tenacity 函式庫實作重試機制,重試次數最多為3次,重試間隔指數級增加。
  3. 錯誤日誌:如果所有重試嘗試都失敗,則記錄錯誤日誌。

這種機制可以有效地應對暫時性的故障,如網路閃斷、資料函式庫死鎖等,從而提高系統的整體可靠性。

事件驅動架構:使用事件整合微服務

在前一章中,我們並未詳細討論如何接收「批次數量變更」事件,或是如何通知外部系統關於重新分配的訊息。我們的系統目前具備一個具有網頁API的微服務,但對於與其他系統的溝通方式仍有待探討。我們需要了解如何得知貨件延遲或數量變更等情況,並告知倉儲系統某個訂單已被分配且需要寄送給客戶。

在本章中,我們將展示如何擴充套件事件的概念,以涵蓋系統處理輸入和輸出訊息的方式。在內部,我們的應用程式核心現在是一個訊息處理器。讓我們進一步延伸這個概念,使其在外部也成為一個訊息處理器。如圖11-1所示,我們的應用程式將透過外部訊息匯流排(我們將以Redis發布/訂閱佇列為例)接收來自外部來源的事件,並以事件的形式將輸出釋出到同一個匯流排。

分散式泥球架構與名詞思維

在探討之前,讓我們先討論一些替代方案。我們經常與正在嘗試建立微服務架構的工程師交流。通常,他們正在從現有的應用程式遷移,而他們的第一直覺是將系統拆分為名詞。我們在系統中迄今為止引入了哪些名詞?好吧,我們有庫存批次、訂單、產品和客戶。因此,一個簡單的嘗試可能會將系統拆分為如圖11-2所示(注意,我們將系統命名為Batches,而不是Allocation)。

每個「事物」在我們的系統中都有一個相關的服務,該服務公開了一個HTTP API。讓我們來分析一下圖11-3中的一個範例幸福路徑流程:我們的使用者存取網站並可以從有庫存的產品中選擇。當他們將商品新增到購物籃時,我們將為他們保留一些庫存。當訂單完成時,我們確認保留,這會導致我們向倉函式庫傳送排程指示。假設這是客戶的第三個訂單,我們希望更新客戶記錄以將他們標記為VIP。

命令與事件的分離:權衡利弊

將命令和事件分開處理,有助於我們瞭解哪些事情必須成功,哪些事情可以稍後清理。使用明確的名稱(如CreateBatch)比BatchCreated更不容易令人混淆。我們正在明確表達使用者的意圖,而明確比隱含更好。然而,命令和事件之間的語義差異可能很微妙。預期會有一些關於差異的爭論。我們正在明確地邀請失敗。我們知道有時候事情會中斷,而我們選擇透過使失敗更小、更孤立來處理這一點。這可能會使系統更難理解,並需要更好的監控。

程式碼範例:使用Redis發布/訂閱佇列處理外部事件

import redis

# 建立Redis連線
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 釋出事件到Redis頻道
def publish_event(event):
    redis_client.publish('events', event)

# 訂閱Redis頻道並處理事件
def subscribe_to_events():
    pubsub = redis_client.pubsub()
    pubsub.subscribe('events')
    for message in pubsub.listen():
        if message['type'] == 'message':
            event = message['data']
            # 處理事件
            process_event(event)

#### 內容解密:
此程式碼展示瞭如何使用Redis發布/訂閱佇列來處理外部事件首先我們建立了一個Redis連線然後我們定義了兩個函式:`publish_event`用於將事件釋出到Redis頻道`subscribe_to_events`則用於訂閱Redis頻道並處理接收到的事件`subscribe_to_events`函式中我們使用`pubsub.listen()`來監聽頻道上的訊息並在接收到訊息時呼叫`process_event`函式來處理事件

### 分散式系統中的錯誤處理

事物會壞是軟體工程的一個普遍規律在我們的系統中當其中一個請求失敗時會發生什麼假設在接受使用者對三個MISBEGOTTEN-RUG的訂單時發生了網路錯誤如圖11-5所示

我們有兩個選擇無論如何都要下單並保持未分配狀態或者因為無法保證分配而拒絕接受訂單我們的批次服務的失敗狀態已經冒泡並影響了訂單服務的可靠性當兩個事物必須一起更改時我們說它們是耦合的我們可以將這種故障級聯視為一種時間耦合系統的每個部分都必須同時運作任何部分才能運作隨著系統變得越來越大某個部分降級的機率呈指數級增加