# -*- coding: utf-8 -*- from __future__ import absolute_import, print_function, unicode_literals from gevent import monkey monkey.patch_all(thread=False) # 1 import gevent import lmdb import pytest from ava.queue import LmdbQueue @pytest.fixture def db_env(tmpdir): dir = tmpdir.mkdir("data") _db_env = lmdb.Environment(dir.dirname) return _db_env class TestLmdbQueue(object): def test_create_queue(self, db_env): queue = LmdbQueue(db_env) def test_get_and_put(self, db_env): queue = LmdbQueue(db_env) queue.put(2) queue.put(3) queue.put(1) # queue.dump() assert 2 == queue.get_nowait() assert 3 == queue.get_nowait() assert 1 == queue.get_nowait() assert queue.get_nowait() is None def test_get_with_timeout(self, db_env): queue = LmdbQueue(db_env) def producer(): gevent.sleep(0.5) queue.put(1) gevent.spawn(producer) assert 1 == queue.get(timeout=1) def test_get_indefinitely(self, db_env): queue = LmdbQueue(db_env) def producer(): gevent.sleep(0.5) queue.put(1) gevent.spawn(producer) assert 1 == queue.get() def test_put_with_transaction(self, db_env): queue = LmdbQueue(db_env) with db_env.begin(write=True) as txn: queue.put(1, txn) queue.put(1, txn) assert 1 == queue.get_nowait() assert 1 == queue.get_nowait() with pytest.raises(RuntimeError): with db_env.begin(write=True) as txn: queue.put(2, txn) queue.put(2, txn) raise RuntimeError() assert None == queue.get_nowait() def test_queue_with_prefix(self, db_env): queue1 = LmdbQueue(db_env, prefix=b'myqueue1') queue2 = LmdbQueue(db_env, prefix=b'myqueue2') with db_env.begin(write=True) as txn: queue1.put(1, txn) queue1.put(1, txn) queue2.put(2, txn) queue2.put(2, txn) assert 1 == queue1.get_nowait() assert 2 == queue2.get_nowait() assert 1 == queue1.get_nowait() assert 2 == queue2.get_nowait() def test_put_multi(self, db_env): queue = LmdbQueue(db_env) items = [1, 2, '3'] with db_env.begin(write=True) as txn: queue.put_multi(items, txn) assert 1 == queue.get_nowait() assert 2 == queue.get_nowait() assert '3' == queue.get_nowait()