Skip to content

用 Python 實作混合式邏輯時鐘

Published: 3 分鐘

山姆鍋平常喜歡搜尋一些有趣的開放源碼專案,尤其是有提供設計文件的。有天不知道怎麼地逛到了 Cockroach DB 這個專案,它的設計文件有許多有趣的地方,但我特別注意到其中一篇論文,關於使用邏輯與實體時鐘來實現一個分散式時間郵戳(timestamp)的演算法。

邏輯時間的用途

在軟體系統,常常需要比較兩個事件發生的前後順序,例如:哪個交易(transaction)先發生, 這可以透過記錄事件發生的時間來達成。 但對分散式系統,這個問題就變得比較複雜,由於網路延遲、主機時鐘的誤差等等因素, 使用一個全域(global)的時間來源幾近不可能。

所以,分散式系統常採用邏輯時間來做為比較事件發生前後的依據, 其中 Lamport logical clock 或是 Vector clock 是兩種實現邏輯時鐘常見的方法。

混合式邏輯時鐘

本文所參考的論文 1 ,提供所謂「混合式邏輯時鐘(Hybrid Logical Clocks; HLC)」的演算法。 HLC 跟過去常看到的採用實體時鐘(Physical clock), 或邏輯時鐘(Logical clock) 的主要不同點,顧名思義:採用混合實體跟邏輯時鐘。

主要引起山姆鍋興趣的有兩點:

  1. 可以跟 NTP 使用的 64-bit 時間郵戳格式相容。
  2. 可以代替常用的 Unix epoch timestamp。

也就是說,把使用 Python time.time() 當作時間郵戳的程式可以很簡單地更換成這種邏輯時鐘。 主要麻煩的問題在於:

  1. 畢竟是分散式邏輯時鐘,需要不時地跟其他節點(node)的時間做校正。
  2. HLC 假設主機(host)實體時鐘已經採用 NTP 同步。

第 2 點雖然好像有點麻煩,其實真正部署的分散式系統理應都已經有採用 NTP 或其他協定來同步主機時間。 所以,剩下就是邏輯時間維護的問題。

邏輯時鐘維護

HLC 需要在下列情況發生時,更新時鐘:

  1. 當從其他節點收到訊息時:

    : 本地節點根據訊息的邏輯時鐘,自己現在的邏輯時鐘以及實體時鐘來做調整。 在範例程式中, update 方法負責處理這個情況。

  2. 當本地事件發生,或者送出訊息時:

    : 本地節點只採用自己的現在邏輯時鐘跟實體時鐘調整。 範例程式中,tick 方法負責處理這個情況。

程式實作

ava.util.clock 模組內容:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import time


class Clock(object):
    """ Logical timestamp generator.
    """
    def __init__(self, timestamp=None):
        if timestamp:
            self.l = (timestamp >> 16) & 0xffffffffffff
            self.c = timestamp & 0xffff
        else:
            self.l = 0
            self.c = 0
            self.tick()

    def update(self, rt=None):
        """ Updates the clock with a received timestamp from another node.

        :param rt: received timestamp
        """
        ml = (rt >> 16) & 0xffffffffffff
        mc = rt & 0xffff

        old_l = self.l
        pt = int(round((time.time() + 0.0005) * 1000.0))
        self.l = max(old_l, ml, pt)
        if self.l == old_l == ml:
            self.c = max(self.c, mc) + 1
        elif self.l == old_l:
            self.c += 1
        elif self.l == ml:
            self.c = mc + 1
        else:
            self.c = 0

    def tick(self):
        """ Updates and tick the clock for local or send events.

        :return: the updated timestamp
        """
        # local or send event
        old_l = self.l
        pt = int(round((time.time() + 0.0005) * 1000.0))
        self.l = max(old_l, pt)
        if self.l == old_l:
            self.c += 1
        else:
            self.c = 0

        return self.timestamp()

    def timestamp(self):
        """ Gets the current timestamp without updating counter.

        :return: the timestamp
        """
        return (self.l << 16) | self.c

    def seconds(self):
        """ Gets the value compatible with time.time() function.

        :return: the float value represent seconds from epoc
        """
        return (self.l/1000.0) + (self.c/65536.0)

    def __hash__(self):
        return self.timestamp()

    def __cmp__(self, other):
        if self.l != other.l:
            if self.l > other.l:
                return 1
            if self.l < other.l:
                return -1

        if self.c != other.c:
            if self.c > other.c:
                return 1
            if self.c < other.c:
                return -1
        return 0

    def __repr__(self):
        return "Clock[l=%r, c=%r]" % (self.l, self.c)

    @staticmethod
    def timestamp_to_secs(ts):
        """ Converts a timestamp to seconds.
        :param ts: the clock's timestamp.
        :return:
        """
        l = (ts >> 16) & 0xffffffffffff
        c = ts & 0xffff
        return (l/1000.0) + (c/65536.0)


clock = Clock()


__all__ = ['Clock', 'clock']

測試程式:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import time
from ava.util.clock import Clock

import pytest

@pytest.fixture
def clock():
    return Clock()


class TestClock(object):

    def test_milliseconds_round_up(self):
        t = time.time()
        pt = round((t + 0.0005), 3)
        assert pt >= t

    def test_clock_should_increment_monotonically(self, clock):
        prev = clock.timestamp()
        for _ in xrange(2000):
            cur = clock.tick()
            # print('%r' % cur)
            assert cur > prev
            prev = cur

    def test_create_clock_from_timestamp(self):
        c = Clock(94132454961709074)
        assert c.l == 1436347274196
        assert c.c == 18

    def test_clock_seconds_should_be_bounded(self):
        c = Clock().seconds()
        t = time.time()
        assert (c - t) < 0.002

    def test_clock_tick_with_timestamp(self, monkeypatch):
        def mytime():
            return 1436345964.484081
        monkeypatch.setattr(time, 'time', mytime)
        # print(time.time())
        c = Clock(94132454961709074)
        assert c.l == 1436347274196
        assert c.c == 18
        c.update(94132454961709075)
        assert c.c == 20  # tick() increments c by 1 as well

結語

本文提供的實作是山姆鍋根據自己的理解撰寫並依照 Python 特性做調整。 若有錯誤請不吝指教。

參考資料

Cockroach DB: https://github.com/cockroachdb/cockroach

NTP: https://en.wikipedia.org/wiki/Network_Time_Protocol

Lamport logical clock: https://en.wikipedia.org/wiki/Lamport_timestamps

Vector clock: https://en.wikipedia.org/wiki/Vector_clock


Footnotes

  1. Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases

郭信義 (Sam Kuo)

奔騰網路科技技術長,專長分散式系統、Web 應用與雲端服務架構、設計、開發、部署與維運。工作之餘,喜歡關注自由軟體的發展與應用,偶爾寫一下部落格文章。

你可能會有興趣的文章