有時候在需要訊息傳遞的應用場景 , 因為需求簡單不想使用到像 RabbitMQ 這種企業級的訊息中介軟體 (middleware), 使用 Redis 雖然夠簡單但畢竟還是需要執行另一個進程 (process), 總還是有殺雞用牛刀的感覺 。 但是另一方面又有訊息不能遺失的需求 , 自己處理訊息持久性 (persistence) 實在麻煩 , 畢竟訊息儲存還是要支援 ACID 特性才能號稱做到不遺失 。

說到這 , 山姆鍋相信有些人已經想到使用內嵌式資料庫 (embedded database) 來實現 , 例如 SQLite。 雖然山姆鍋也會用 SQLite 做其它用途 , 但本文山姆鍋使用的是另一個比較少人知道或使用的資料庫 :LMDB。

對於懷疑使用資料庫作為訊息佇列是否恰當的人 , 可以參考 「 為什麼使用資料庫當作訊息佇列不是問題 ?」 這篇文章 。 本文所討論的情況是訊息生產者與消費者都在同一個進程 , 透過 Event 機制來同步 , 不會有忙碌迴圈 (busy waiting) 的問題 。

LMDB

LMDB, 全名是 Lightning Memory-Mapped Database, 是一個內嵌式資料庫 , 可歸為 NoSQL 資料庫陣營 。 LMDB 的主要特性 :

  1. 支援交易 (transaction), 確保資料持久性 (persistence)。
  2. 資料庫採記憶體映射 (memory-mapped): 利用作業系統的虛擬記憶體管理作為緩衝 (buffer) 機制 。
  3. 可以由多個進程 (process) 共同存取同一個資料庫 。 這點山姆鍋沒試過 , 但可作為單一主機進程間的通訊機制 。

其它相關資訊 , 可以參考 LMDB 官方網站

Python 的 LMDB 綁定 , 以 lmdb 最為完整 , 文件對大部份需求說明的也夠詳細 。

基本上 ,LMDB 提供有序的 (ordered) key-value 資料庫 , 在同一個資料庫中記錄會依照鍵值 (keys) 排序 。 這個特性很適合訊息佇列 , 訊息以時間為基礎的鍵值放入資料庫中 , 然後從最舊的訊息開始取出 。

訊息佇列實作

底下程式片段是山姆鍋基於 LMDB 以及 gevent [1] 的簡單實作 , 只支援單一訊息接收端 (receiver)。 為什麼只支援單一接收端 , 這跟山姆鍋的需求有關就不詳述 。

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import msgpack
import struct
from gevent.event import Event
from ava.util.clock import clock


class LmdbQueue(object):
    """ Persistent queue backed by LMDB for single consumer.
    """
    def __init__(self, db_env, prefix=b'\x00'):
        self.db_env = db_env
        self.prefix = prefix
        self.available = Event()
        self.begin_key = prefix + b'\x00'
        self.end_key = prefix + b'\xff'
        self.key_fmt = b'!%dsBQ' % len(prefix)
        self.key_size = struct.calcsize(self.key_fmt)

    def get(self, block=True, timeout=None, txn=None):
        while True:
            with self.db_env.begin(write=True) as txn:
                with txn.cursor() as cur:
                    if cur.set_range(self.begin_key) \
                            and cur.key() < self.end_key:
                        ret = msgpack.unpackb(cur.value())
                        cur.delete()
                        return ret

            self.available.clear()
            if block:
                if timeout is None:
                    self.available.wait()
                else:
                    self.available.wait(timeout)
            else:
                return None

    def get_nowait(self, txn=None):
        return self.get(block=False, txn=txn)

    def put(self, item, txn=None):
        key = struct.pack(self.key_fmt,
                          self.prefix,
                          ord(b'T'),
                          clock.tick())
        if txn is None:
            with self.db_env.begin(write=True) as txn:
                txn.put(key, msgpack.packb(item))
                self.available.set()
        else:
            txn.put(key, msgpack.packb(item))
            self.available.set()

    def put_multi(self, items, txn=None):
        """ Puts multiple items into the queue.

        :param items:
        :param txn:
        """
        if txn is None:
            with self.db_env.begin(write=True) as txn:
                for item in items:
                    key = struct.pack(self.key_fmt,
                                      self.prefix,
                                      ord(b'T'),
                                      clock.tick())
                    txn.put(key, msgpack.packb(item))
                    self.available.set()
        else:
            for item in items:
                key = struct.pack(self.key_fmt,
                                  self.prefix,
                                  ord(b'T'),
                                  clock.tick())
                txn.put(key, msgpack.packb(item))
                self.available.set()

其中的 clock, 請參考 用 Python 實作混合式邏輯時鐘 這篇文章 。

佇列物件建構

要建構 LmdbQueue 物件 , 需要有 LMDB 的環境 (Environment) 物件 , 且為了讓同一個資料庫可以支援多個佇列 (queue), 每個佇列使用一個前置詞 (prefix) 用來區分 。 每個放入佇列中的訊息都會給予一個唯一的鍵值 (key), 這個鍵值包涵前置詞以及目前的混合式邏輯時間 , 確保不會有出現同一鍵值的情況發生 。

訊息排序與存取

訊息的鍵值以 <prefix>'T'<timestamp> 這樣的格式組成 , 其中 :

  1. <prefix> 是佇列的前置詞 。
  2. 'T' 是介於 0x00 跟 0xff(255) 之間任意挑選的值 , 沒有特殊意義 。
  3. <timestamp> 是訊息置入時的邏輯時間 。

運用這樣格式的設計 , 就可以確保所有訊息的鍵值都會介於 `<prefix>0x00` 與 <prefix>0xff 之間 。 程式碼中分別使用 begin_key 以及 end_key 來代表這兩個值 。 藉由這兩個邊界值 , 我們就可以很方便的使用 set_range 這個方法找到佇列的第一個訊息 , 需要的話也可以用來遍歷 (iterate) 佇列的所有訊息 。

訊息格式轉換

訊息的鍵值使用 struct 這個套件來從多個資料項轉成精簡的格式 。 訊息在放入佇列之前會經過 MessagePack 序列化 (serialized) 成 bytes; 在取出時會被反序列化成當初的資料型態 。

交易 ( 事務 ) 處理

預設情況下 , 每個方法各自在獨立的交易中執行 , 如果需要讓多個操作在同一個交易執行的話 , 需要先產生 交易物件再傳入被呼叫的方法 。

佇列物件方法

各個方法 (method) 的簡短說明如下 :

  1. get:

    提供同步 (blocking) 的方式來取出佇列中的訊息 , 可以設定是否要同步 , 操作逾時 (timeout) 等 。

  2. get_nowait:

    功能類似 get 方法 , 但沒有訊息的話會 , 不等待直接回傳 None。

  3. put:

    允許訊息生產者 (producers) 把訊息放入佇列中 。

  4. put_multi:

    允許一次放入多個訊息到佇列中 。

測試用程式碼

使用 pytest 的測試案例 [2]

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

from gevent import monkey
monkey.patch_all(thread=False)  # 1

import gevent
import lmdb
import pytest

from ava.queue import LmdbQueue


@pytest.fixture
def db_env(tmpdir):
    dir = tmpdir.mkdir("data")
    _db_env = lmdb.Environment(dir.dirname)
    return _db_env


class TestLmdbQueue(object):

    def test_create_queue(self, db_env):
        queue = LmdbQueue(db_env)

    def test_get_and_put(self, db_env):
        queue = LmdbQueue(db_env)
        queue.put(2)
        queue.put(3)
        queue.put(1)

        # queue.dump()
        assert 2 == queue.get_nowait()
        assert 3 == queue.get_nowait()
        assert 1 == queue.get_nowait()
        assert queue.get_nowait() is None

    def test_get_with_timeout(self, db_env):
        queue = LmdbQueue(db_env)

        def producer():
            gevent.sleep(0.5)
            queue.put(1)

        gevent.spawn(producer)
        assert 1 == queue.get(timeout=1)

    def test_get_indefinitely(self, db_env):
        queue = LmdbQueue(db_env)

        def producer():
            gevent.sleep(0.5)
            queue.put(1)

        gevent.spawn(producer)
        assert 1 == queue.get()

    def test_put_with_transaction(self, db_env):
        queue = LmdbQueue(db_env)

        with db_env.begin(write=True) as txn:
            queue.put(1, txn)
            queue.put(1, txn)

        assert 1 == queue.get_nowait()
        assert 1 == queue.get_nowait()

        with pytest.raises(RuntimeError):
            with db_env.begin(write=True) as txn:
                queue.put(2, txn)
                queue.put(2, txn)
                raise RuntimeError()

        assert None == queue.get_nowait()

    def test_queue_with_prefix(self, db_env):
        queue1 = LmdbQueue(db_env, prefix=b'myqueue1')
        queue2 = LmdbQueue(db_env, prefix=b'myqueue2')

        with db_env.begin(write=True) as txn:
            queue1.put(1, txn)
            queue1.put(1, txn)
            queue2.put(2, txn)
            queue2.put(2, txn)

        assert 1 == queue1.get_nowait()
        assert 2 == queue2.get_nowait()
        assert 1 == queue1.get_nowait()
        assert 2 == queue2.get_nowait()

    def test_put_multi(self, db_env):
        queue = LmdbQueue(db_env)

        items = [1, 2, '3']
        with db_env.begin(write=True) as txn:
            queue.put_multi(items, txn)

        assert 1 == queue.get_nowait()
        assert 2 == queue.get_nowait()
        assert '3' == queue.get_nowait()

標記 1: gevent 透過 monkey patch 讓多數標準程式庫不用修改也可以支援並發 (concurrent) 執行 。

結語

在一般稍微有點規模的分散式系統 , 通常會使用訊息中介軟體 。 如果訊息的遺失無關緊要 , 其實也就不用自找麻煩 。 偏偏總會有些場合 , 尤其是在開發客戶端應用時 , 會需要先將要傳遞的訊息暫存在持久性儲存體 , 如硬碟 。 但要確保訊息或其它資料的永久性 , 並沒有想像中的簡單 , 需要考慮到很多的例外狀況 : 寫到一半系統當機怎麼處理 ? 同時兩個執行緒在寫入怎麼處理 ? 也因為如此 ,SQLite 或者 LMDB 這種內嵌式資料庫 , 對於需要支援資料永久性的客戶端應用來說 , 是不可少的工具 。


[1] 應該也可以適用在多執行緒的程式 , 只是山姆鍋大多時候是使用 gevent。 對於需要執行大量並發 (concurrent) 的 I/O 工作 , 又不想要非同步編程模式 ( 使用 callbacks) 複雜性的人 ,gevent 值得考慮 。
[2] 以現在的標準來說 , 不寫測試程式會被歸類為不好的 (bad) 的工程師 , 山姆鍋自然要提供一下 :D。