有時候在需要訊息傳遞的應用場景,因為需求簡單不想使用到像 RabbitMQ 這種企業級的訊息中介軟體(middleware), 使用 Redis 雖然夠簡單但畢竟還是需要執行另一個進程(process),總還是有殺雞用牛刀的感覺。 但是另一方面又有訊息不能遺失的需求,自己處理訊息持久性(persistence)實在麻煩, 畢竟訊息儲存還是要支援 ACID ⎘ 特性才能號稱做到不遺失。
說到這,山姆鍋相信有些人已經想到使用內嵌式資料庫(embedded database)來實現,例如 SQLite。 雖然山姆鍋也會用 SQLite 做其它用途,但本文山姆鍋使用的是另一個比較少人知道或使用的資料庫:LMDB。
對於懷疑使用資料庫作為訊息佇列是否恰當的人,可以參考 「為什麼使用資料庫當作訊息佇列不是問題?」 這篇文章。本文所討論的情況是訊息生產者與消費者都在同一個進程,透過 Event 機制來同步, 不會有忙碌迴圈(busy waiting)的問題。
LMDB
LMDB, 全名是 Lightning Memory-Mapped Database,是一個內嵌式資料庫,可歸為 NoSQL 資料庫陣營。 LMDB 的主要特性:
- 支援交易(transaction),確保資料持久性(persistence)。
- 資料庫採記憶體映射(memory-mapped): 利用作業系統的虛擬記憶體管理作為緩衝(buffer)機制。
- 可以由多個進程(process)共同存取同一個資料庫。這點山姆鍋沒試過,但可作為單一主機進程間的通訊機制。
其它相關資訊,可以參考 LMDB 官方網站 ⎘ 。
Python 的 LMDB 綁定,以 lmdb ⎘ 最為完整, 文件對大部份需求說明的也夠詳細。
基本上,LMDB 提供有序的(ordered) key-value 資料庫,在同一個資料庫中記錄會依照鍵值(keys)排序。 這個特性很適合訊息佇列,訊息以時間為基礎的鍵值放入資料庫中,然後從最舊的訊息開始取出。
訊息佇列實作
底下程式片段是山姆鍋基於 LMDB 以及 gevent 1 的簡單實作,只支援單一訊息接收端(receiver)。 為什麼只支援單一接收端,這跟山姆鍋的需求有關就不詳述。
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import msgpack
import struct
from gevent.event import Event
from ava.util.clock import clock
class LmdbQueue(object):
""" Persistent queue backed by LMDB for single consumer.
"""
def __init__(self, db_env, prefix=b'\x00'):
self.db_env = db_env
self.prefix = prefix
self.available = Event()
self.begin_key = prefix + b'\x00'
self.end_key = prefix + b'\xff'
self.key_fmt = b'!%dsBQ' % len(prefix)
self.key_size = struct.calcsize(self.key_fmt)
def get(self, block=True, timeout=None, txn=None):
while True:
with self.db_env.begin(write=True) as txn:
with txn.cursor() as cur:
if cur.set_range(self.begin_key) \
and cur.key() < self.end_key:
ret = msgpack.unpackb(cur.value())
cur.delete()
return ret
self.available.clear()
if block:
if timeout is None:
self.available.wait()
else:
self.available.wait(timeout)
else:
return None
def get_nowait(self, txn=None):
return self.get(block=False, txn=txn)
def put(self, item, txn=None):
key = struct.pack(self.key_fmt,
self.prefix,
ord(b'T'),
clock.tick())
if txn is None:
with self.db_env.begin(write=True) as txn:
txn.put(key, msgpack.packb(item))
self.available.set()
else:
txn.put(key, msgpack.packb(item))
self.available.set()
def put_multi(self, items, txn=None):
""" Puts multiple items into the queue.
:param items:
:param txn:
"""
if txn is None:
with self.db_env.begin(write=True) as txn:
for item in items:
key = struct.pack(self.key_fmt,
self.prefix,
ord(b'T'),
clock.tick())
txn.put(key, msgpack.packb(item))
self.available.set()
else:
for item in items:
key = struct.pack(self.key_fmt,
self.prefix,
ord(b'T'),
clock.tick())
txn.put(key, msgpack.packb(item))
self.available.set()
其中的 clock,請參考 用 Python 實作混合式邏輯時鐘 這篇文章。
佇列物件建構
要建構 LmdbQueue 物件,需要有 LMDB 的環境(Environment)物件,且為了讓同一個資料庫可以支援多個佇列(queue),每個佇列使用一個前置詞(prefix)用來區分。 每個放入佇列中的訊息都會給予一個唯一的鍵值(key),這個鍵值包涵前置詞以及目前的混合式邏輯時間, 確保不會有出現同一鍵值的情況發生。
訊息排序與存取
訊息的鍵值以 <prefix>'T'<timestamp>
這樣的格式組成,其中:
- <prefix>是佇列的前置詞。
- ‘T’ 是介於 0x00 跟 0xff(255) 之間任意挑選的值,沒有特殊意義。
- <timestamp> 是訊息置入時的邏輯時間。
運用這樣格式的設計,就可以確保所有訊息的鍵值都會介於`<prefix>0x00`
與 <prefix>0xff
之間。 程式碼中分別使用
begin_key
以及 end_key
來代表這兩個值。
藉由這兩個邊界值,我們就可以很方便的使用 set_range
這個方法找到佇列的第一個訊息,
需要的話也可以用來遍歷(iterate)佇列的所有訊息。
訊息格式轉換
訊息的鍵值使用 struct
這個套件來從多個資料項轉成精簡的格式。 訊息在放入佇列之前會經過
MessagePack ⎘
序列化(serialized)成 bytes; 在取出時會被反序列化成當初的資料型態。
交易(事務)處理
預設情況下,每個方法各自在獨立的交易中執行,如果需要讓多個操作在同一個交易執行的話,需要先產生 交易物件再傳入被呼叫的方法。
佇列物件方法
各個方法(method)的簡短說明如下:
-
get:
提供同步(blocking)的方式來取出佇列中的訊息,可以設定是否要同步,操作逾時(timeout)等。
-
get_nowait:
功能類似
get
方法,但沒有訊息的話會,不等待直接回傳 None。 -
put:
允許訊息生產者(producers)把訊息放入佇列中。
-
put_multi:
允許一次放入多個訊息到佇列中。
測試用程式碼
使用 pytest 的測試案例 2 :
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from gevent import monkey
monkey.patch_all(thread=False) # 1
import gevent
import lmdb
import pytest
from ava.queue import LmdbQueue
@pytest.fixture
def db_env(tmpdir):
dir = tmpdir.mkdir("data")
_db_env = lmdb.Environment(dir.dirname)
return _db_env
class TestLmdbQueue(object):
def test_create_queue(self, db_env):
queue = LmdbQueue(db_env)
def test_get_and_put(self, db_env):
queue = LmdbQueue(db_env)
queue.put(2)
queue.put(3)
queue.put(1)
# queue.dump()
assert 2 == queue.get_nowait()
assert 3 == queue.get_nowait()
assert 1 == queue.get_nowait()
assert queue.get_nowait() is None
def test_get_with_timeout(self, db_env):
queue = LmdbQueue(db_env)
def producer():
gevent.sleep(0.5)
queue.put(1)
gevent.spawn(producer)
assert 1 == queue.get(timeout=1)
def test_get_indefinitely(self, db_env):
queue = LmdbQueue(db_env)
def producer():
gevent.sleep(0.5)
queue.put(1)
gevent.spawn(producer)
assert 1 == queue.get()
def test_put_with_transaction(self, db_env):
queue = LmdbQueue(db_env)
with db_env.begin(write=True) as txn:
queue.put(1, txn)
queue.put(1, txn)
assert 1 == queue.get_nowait()
assert 1 == queue.get_nowait()
with pytest.raises(RuntimeError):
with db_env.begin(write=True) as txn:
queue.put(2, txn)
queue.put(2, txn)
raise RuntimeError()
assert None == queue.get_nowait()
def test_queue_with_prefix(self, db_env):
queue1 = LmdbQueue(db_env, prefix=b'myqueue1')
queue2 = LmdbQueue(db_env, prefix=b'myqueue2')
with db_env.begin(write=True) as txn:
queue1.put(1, txn)
queue1.put(1, txn)
queue2.put(2, txn)
queue2.put(2, txn)
assert 1 == queue1.get_nowait()
assert 2 == queue2.get_nowait()
assert 1 == queue1.get_nowait()
assert 2 == queue2.get_nowait()
def test_put_multi(self, db_env):
queue = LmdbQueue(db_env)
items = [1, 2, '3']
with db_env.begin(write=True) as txn:
queue.put_multi(items, txn)
assert 1 == queue.get_nowait()
assert 2 == queue.get_nowait()
assert '3' == queue.get_nowait()
標記 1: gevent 透過 monkey patch 讓多數標準程式庫不用修改也可以支援並發(concurrent)執行。
結語
在一般稍微有點規模的分散式系統,通常會使用訊息中介軟體。如果訊息的遺失無關緊要,其實也就不用自找麻煩。 偏偏總會有些場合,尤其是在開發客戶端應用時,會需要先將要傳遞的訊息暫存在持久性儲存體,如硬碟。 但要確保訊息或其它資料的永久性,並沒有想像中的簡單,需要考慮到很多的例外狀況: 寫到一半系統當機怎麼處理?同時兩個執行緒在寫入怎麼處理? 也因為如此,SQLite 或者 LMDB 這種內嵌式資料庫, 對於需要支援資料永久性的客戶端應用來說,是不可少的工具。