經過了多年後,因為工作的關係,山姆鍋終於又有動機寫 Python 相關的程式了。在目前開發的軟體產品上,用到事件驅動架構。其中關鍵的事件總線(event bus),由於是針對中小型企業的系統,就沒有考慮使用 Kafka 這樣的方案。經評估後,發現使用 PostgreSQL 來實現一個事件總線似乎是一個可行的方案。
事件總線簡介
事件驅動架構常常見到 Event bus 元件的存在,使用「事件總線」的主要目的有:
- 降低元件間的耦合性。
- 確保系統狀態最終一致性。
雖然說「事件總線」的概念相似,但其設計與實作會根據系統需求而不同。 下圖是山姆鍋對於事件總線的架構理解:
事件生產者(Producer)將事件發布到「事件總線」後就不再需要管後續的事情,「事件總線」根據目前的「訂閱(Subscription)」來決定事件發送的目的地;屬於同一組訂閱的事件消費者(Consumer)不會收到相同的事件(例外情況,如 consumer 本身 crash,則同樣事件有可能被處理多次)。
事件傳遞的用處
基本上,只要是系統狀態的改變都可以被表示成一個「事件」,最常見的就是微服務中,為了達到資料同步所發送的領域事件(domain event)。例如:產生一筆訂單(order),可能會需要驅動不同子系統進行相對應的動作。在山姆鍋的需求中,事件主要用來:
- 產生安全稽核紀錄。
- 同步 Cache。
- 通知前端 UI。
- 建立全文檢索。
實作構想
本文所指的「事件總線」元件,目的是作為後端服務元件來以非同步方式來進行事件的通知,需要做到:
- 事件不能遺失。
- 事件處理需要按照發生順序。
- 容許 consumer 從錯誤中復原。
為了達到上述設計目標,事件顯然需要存放在永久性的儲存媒體、事件必須由應用元件來決定處理順序、現有 consumer 故障,可以由備援的 consumer 接手。
事件發布
事件發布是相對容易實現的部分,所謂事件發布也就是在某個資料表(table)新增一筆紀錄罷了。為了表示事件發生的順序,每個事件需要一個時間標記,這裏山姆鍋使用 cuid
這個 Python 套件產生所需的事件 ID, 這些 ID 的字母排序跟發生的時間順序一致。
發布事件只需要呼叫 Event bus 的 publish 方法,如下例:
await event_bus.publish(Event(id=cuid(), type="test"))
事件訂閱
一個被發布的事件,邏輯上會被複製成多份送到不同的「訂閱」。「訂閱」像是一個單向管道,同時限制同一時間,同一組訂閱只能有一個消費者可以處理事件。所以,根據不同的目的會定義不同的「訂閱」,而每個不同的「訂閱」會有不同的 ID 用以分辨。當事件在某個「訂閱」被處理後,也會紀錄最後處理的事件,如此,下次就可以取得較新的事件來處理。
假設一個 EventBus 已經被建立且啟動,下面是訂閱事件的程式片段:
async with event_bus.subscribe(sub_id) as subscriber:
async for event in subscriber:
print(f"Received: {event}")
下圖是範例的執行畫面,左邊是 Producer, 右邊則是 Consumer:
確保事件按照發生順序被處理
為了達到在同一個訂閱中,事件能夠按照順序被處理。 Consumer 要先鎖定「訂閱」來確保只有自己可以處理該訂閱的事件。在背後,鎖定的機制利用 PostgreSQL 的 Advisory lock 功能。如果訂閱的事件可以不用按照順序處理,consumer 可以將事件處理邏輯透過 Task queue(e.g. Celery)轉送給其他元件並行處理。
減少事件處理延遲
把關聯式資料庫當作某種形式的訊息佇列(queue),如本文所用的 Event bus,常常會使用 busy looping 方式來不斷地查詢資料庫看是否有新的訊息需要處理。為了避免對資料庫造成太大的負擔,也會間隔一段時間(e.g. 5秒)再重新查詢,但這樣就造成訊息最多要延遲 5秒才會被處理。為了避免這個延遲問題,在實作上,利用 PostgreSQL 的 Notify/Listen 功能來作為有事件發生的通知機制。由於並非作為事件本身的傳輸機制,所以,不會造成事件遺失的問題。在事件發布時,producer 同時會發出一個 Notify 來通知所有在線的 consumer 有事件發生; Consumer 收到通知後會立即做後續處理。
小結
本文所提的實作只是作為概念驗證,且此 Event bus 的特性完全是按照山姆鍋目前參與開發的產品需求而設計。在評估現有的解決方案的時候,由於沒有符合的項目可以直接運用,才衍生出這個 Event bus 實作。對於需要一個 Python 可靠事件傳遞機制的人,希望對您有所啟發。由於是概念驗證,實作有許多考慮不周延的地方,有錯誤的地方請多多指正。
原始碼:py_eventbus_pg ⎘