Skip to content

如何實作簡單的持久性訊息佇列?

Published: 5 分鐘

有時候在需要訊息傳遞的應用場景,因為需求簡單不想使用到像 RabbitMQ 這種企業級的訊息中介軟體(middleware), 使用 Redis 雖然夠簡單但畢竟還是需要執行另一個進程(process),總還是有殺雞用牛刀的感覺。 但是另一方面又有訊息不能遺失的需求,自己處理訊息持久性(persistence)實在麻煩, 畢竟訊息儲存還是要支援 ACID 特性才能號稱做到不遺失。

說到這,山姆鍋相信有些人已經想到使用內嵌式資料庫(embedded database)來實現,例如 SQLite。 雖然山姆鍋也會用 SQLite 做其它用途,但本文山姆鍋使用的是另一個比較少人知道或使用的資料庫:LMDB。

對於懷疑使用資料庫作為訊息佇列是否恰當的人,可以參考 「為什麼使用資料庫當作訊息佇列不是問題?」 這篇文章。本文所討論的情況是訊息生產者與消費者都在同一個進程,透過 Event 機制來同步, 不會有忙碌迴圈(busy waiting)的問題。

LMDB

LMDB, 全名是 Lightning Memory-Mapped Database,是一個內嵌式資料庫,可歸為 NoSQL 資料庫陣營。 LMDB 的主要特性:

  1. 支援交易(transaction),確保資料持久性(persistence)。
  2. 資料庫採記憶體映射(memory-mapped): 利用作業系統的虛擬記憶體管理作為緩衝(buffer)機制。
  3. 可以由多個進程(process)共同存取同一個資料庫。這點山姆鍋沒試過,但可作為單一主機進程間的通訊機制。

其它相關資訊,可以參考 LMDB 官方網站

Python 的 LMDB 綁定,以 lmdb 最為完整, 文件對大部份需求說明的也夠詳細。

基本上,LMDB 提供有序的(ordered) key-value 資料庫,在同一個資料庫中記錄會依照鍵值(keys)排序。 這個特性很適合訊息佇列,訊息以時間為基礎的鍵值放入資料庫中,然後從最舊的訊息開始取出。

訊息佇列實作

底下程式片段是山姆鍋基於 LMDB 以及 gevent 1 的簡單實作,只支援單一訊息接收端(receiver)。 為什麼只支援單一接收端,這跟山姆鍋的需求有關就不詳述。

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import msgpack
import struct
from gevent.event import Event
from ava.util.clock import clock


class LmdbQueue(object):
    """ Persistent queue backed by LMDB for single consumer.
    """
    def __init__(self, db_env, prefix=b'\x00'):
        self.db_env = db_env
        self.prefix = prefix
        self.available = Event()
        self.begin_key = prefix + b'\x00'
        self.end_key = prefix + b'\xff'
        self.key_fmt = b'!%dsBQ' % len(prefix)
        self.key_size = struct.calcsize(self.key_fmt)

    def get(self, block=True, timeout=None, txn=None):
        while True:
            with self.db_env.begin(write=True) as txn:
                with txn.cursor() as cur:
                    if cur.set_range(self.begin_key) \
                            and cur.key() < self.end_key:
                        ret = msgpack.unpackb(cur.value())
                        cur.delete()
                        return ret

            self.available.clear()
            if block:
                if timeout is None:
                    self.available.wait()
                else:
                    self.available.wait(timeout)
            else:
                return None

    def get_nowait(self, txn=None):
        return self.get(block=False, txn=txn)

    def put(self, item, txn=None):
        key = struct.pack(self.key_fmt,
                          self.prefix,
                          ord(b'T'),
                          clock.tick())
        if txn is None:
            with self.db_env.begin(write=True) as txn:
                txn.put(key, msgpack.packb(item))
                self.available.set()
        else:
            txn.put(key, msgpack.packb(item))
            self.available.set()

    def put_multi(self, items, txn=None):
        """ Puts multiple items into the queue.

        :param items:
        :param txn:
        """
        if txn is None:
            with self.db_env.begin(write=True) as txn:
                for item in items:
                    key = struct.pack(self.key_fmt,
                                      self.prefix,
                                      ord(b'T'),
                                      clock.tick())
                    txn.put(key, msgpack.packb(item))
                    self.available.set()
        else:
            for item in items:
                key = struct.pack(self.key_fmt,
                                  self.prefix,
                                  ord(b'T'),
                                  clock.tick())
                txn.put(key, msgpack.packb(item))
                self.available.set()

其中的 clock,請參考 用 Python 實作混合式邏輯時鐘 這篇文章。

佇列物件建構

要建構 LmdbQueue 物件,需要有 LMDB 的環境(Environment)物件,且為了讓同一個資料庫可以支援多個佇列(queue),每個佇列使用一個前置詞(prefix)用來區分。 每個放入佇列中的訊息都會給予一個唯一的鍵值(key),這個鍵值包涵前置詞以及目前的混合式邏輯時間, 確保不會有出現同一鍵值的情況發生。

訊息排序與存取

訊息的鍵值以 <prefix>'T'<timestamp> 這樣的格式組成,其中:

  1. <prefix>是佇列的前置詞。
  2. ‘T’ 是介於 0x00 跟 0xff(255) 之間任意挑選的值,沒有特殊意義。
  3. <timestamp> 是訊息置入時的邏輯時間。

運用這樣格式的設計,就可以確保所有訊息的鍵值都會介於`<prefix>0x00` 與 <prefix>0xff 之間。 程式碼中分別使用 begin_key 以及 end_key 來代表這兩個值。 藉由這兩個邊界值,我們就可以很方便的使用 set_range 這個方法找到佇列的第一個訊息, 需要的話也可以用來遍歷(iterate)佇列的所有訊息。

訊息格式轉換

訊息的鍵值使用 struct 這個套件來從多個資料項轉成精簡的格式。 訊息在放入佇列之前會經過 MessagePack 序列化(serialized)成 bytes; 在取出時會被反序列化成當初的資料型態。

交易(事務)處理

預設情況下,每個方法各自在獨立的交易中執行,如果需要讓多個操作在同一個交易執行的話,需要先產生 交易物件再傳入被呼叫的方法。

佇列物件方法

各個方法(method)的簡短說明如下:

  1. get:

    提供同步(blocking)的方式來取出佇列中的訊息,可以設定是否要同步,操作逾時(timeout)等。

  2. get_nowait:

    功能類似 get 方法,但沒有訊息的話會,不等待直接回傳 None。

  3. put:

    允許訊息生產者(producers)把訊息放入佇列中。

  4. put_multi:

    允許一次放入多個訊息到佇列中。

測試用程式碼

使用 pytest 的測試案例 2

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

from gevent import monkey
monkey.patch_all(thread=False)  # 1

import gevent
import lmdb
import pytest

from ava.queue import LmdbQueue


@pytest.fixture
def db_env(tmpdir):
    dir = tmpdir.mkdir("data")
    _db_env = lmdb.Environment(dir.dirname)
    return _db_env


class TestLmdbQueue(object):

    def test_create_queue(self, db_env):
        queue = LmdbQueue(db_env)

    def test_get_and_put(self, db_env):
        queue = LmdbQueue(db_env)
        queue.put(2)
        queue.put(3)
        queue.put(1)

        # queue.dump()
        assert 2 == queue.get_nowait()
        assert 3 == queue.get_nowait()
        assert 1 == queue.get_nowait()
        assert queue.get_nowait() is None

    def test_get_with_timeout(self, db_env):
        queue = LmdbQueue(db_env)

        def producer():
            gevent.sleep(0.5)
            queue.put(1)

        gevent.spawn(producer)
        assert 1 == queue.get(timeout=1)

    def test_get_indefinitely(self, db_env):
        queue = LmdbQueue(db_env)

        def producer():
            gevent.sleep(0.5)
            queue.put(1)

        gevent.spawn(producer)
        assert 1 == queue.get()

    def test_put_with_transaction(self, db_env):
        queue = LmdbQueue(db_env)

        with db_env.begin(write=True) as txn:
            queue.put(1, txn)
            queue.put(1, txn)

        assert 1 == queue.get_nowait()
        assert 1 == queue.get_nowait()

        with pytest.raises(RuntimeError):
            with db_env.begin(write=True) as txn:
                queue.put(2, txn)
                queue.put(2, txn)
                raise RuntimeError()

        assert None == queue.get_nowait()

    def test_queue_with_prefix(self, db_env):
        queue1 = LmdbQueue(db_env, prefix=b'myqueue1')
        queue2 = LmdbQueue(db_env, prefix=b'myqueue2')

        with db_env.begin(write=True) as txn:
            queue1.put(1, txn)
            queue1.put(1, txn)
            queue2.put(2, txn)
            queue2.put(2, txn)

        assert 1 == queue1.get_nowait()
        assert 2 == queue2.get_nowait()
        assert 1 == queue1.get_nowait()
        assert 2 == queue2.get_nowait()

    def test_put_multi(self, db_env):
        queue = LmdbQueue(db_env)

        items = [1, 2, '3']
        with db_env.begin(write=True) as txn:
            queue.put_multi(items, txn)

        assert 1 == queue.get_nowait()
        assert 2 == queue.get_nowait()
        assert '3' == queue.get_nowait()

標記 1: gevent 透過 monkey patch 讓多數標準程式庫不用修改也可以支援並發(concurrent)執行。

結語

在一般稍微有點規模的分散式系統,通常會使用訊息中介軟體。如果訊息的遺失無關緊要,其實也就不用自找麻煩。 偏偏總會有些場合,尤其是在開發客戶端應用時,會需要先將要傳遞的訊息暫存在持久性儲存體,如硬碟。 但要確保訊息或其它資料的永久性,並沒有想像中的簡單,需要考慮到很多的例外狀況: 寫到一半系統當機怎麼處理?同時兩個執行緒在寫入怎麼處理? 也因為如此,SQLite 或者 LMDB 這種內嵌式資料庫, 對於需要支援資料永久性的客戶端應用來說,是不可少的工具。


Footnotes

  1. 應該也可以適用在多執行緒的程式,只是山姆鍋大多時候是使用 gevent。對於需要執行大量並發(concurrent)的 I/O 工作,又不想要非同步編程模式(使用 callbacks)複雜性的人,gevent 值得考慮。

  2. 以現在的標準來說,不寫測試程式會被歸類為不好的(bad)的工程師,山姆鍋自然要提供一下 :D。

郭信義 (Sam Kuo)

奔騰網路科技技術長,專長分散式系統、Web 應用與雲端服務架構、設計、開發、部署與維運。工作之餘,喜歡關注自由軟體的發展與應用,偶爾寫一下部落格文章。

你可能會有興趣的文章