前言

山姆鍋平常喜歡搜尋一些有趣的開放源碼專案 , 尤其是有提供設計文件的 。 有天不知道怎麼地逛到了 Cockroach DB 這個專案 , 它的設計文件有許多有趣的地方 , 但我特別注意到其中一篇論文 , 關於使用邏輯與實體時鐘來實現一個分散式時間郵戳 (timestamp) 的演算法 。

邏輯時間的用途

在軟體系統 , 常常需要比較兩個事件發生的前後順序 , 例如 : 哪個交易 (transaction) 先發生 , 這可以透過記錄事件發生的時間來達成 。 但對分散式系統 , 這個問題就變得比較複雜 , 由於網路延遲 、 主機時鐘的誤差等等因素 , 使用一個全域 (global) 的時間來源幾近不可能 。

所以 , 分散式系統常採用邏輯時間來做為比較事件發生前後的依據 , 其中 Lamport logical clock 或是 Vector clock 是兩種實現邏輯時鐘常見的方法 。

在分散式系統一定存在 , 但往往被視為理所當然的就是 「 時間 」。

混合式邏輯時鐘

本文所參考的論文 [1] , 提供所謂 「 混合式邏輯時鐘 (Hybrid Logical Clocks; HLC)」 的演算法 。 HLC 跟過去常看到的採用實體時鐘 (Physical clock), 或邏輯時鐘 (Logical clock) 的主要不同點 , 顧名思義 : 採用混合實體跟邏輯時鐘 。

主要引起山姆鍋興趣的有兩點 :

  1. 可以跟 NTP 使用的 64-bit 時間郵戳格式相容 。
  2. 可以代替常用的 Unix epoch timestamp。

也就是說 , 把使用 Python time.time() 當作時間郵戳的程式可以很簡單地更換成這種邏輯時鐘 。 主要麻煩的問題在於 :

  1. 畢竟是分散式邏輯時鐘 , 需要不時地跟其他節點 (node) 的時間做校正 。
  2. HLC 假設主機 (host) 實體時鐘已經採用 NTP 同步 。

第 2 點雖然好像有點麻煩 , 其實真正部署的分散式系統理應都已經有採用 NTP 或其他協定來同步主機時間 。 所以 , 剩下就是邏輯時間維護的問題 。

邏輯時鐘維護

HLC 需要在下列情況發生時 , 更新時鐘 :

  1. 當從其他節點收到訊息時 :

    本地節點根據訊息的邏輯時鐘 , 自己現在的邏輯時鐘以及實體時鐘來做調整 。 在範例程式中 , update 方法負責處理這個情況 。

  2. 當本地事件發生 , 或者送出訊息時 :

    本地節點只採用自己的現在邏輯時鐘跟實體時鐘調整 。 範例程式中 ,tick 方法負責處理這個情況 。

程式實作

ava.util.clock 模組內容 :

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import time


class Clock(object):
    """ Logical timestamp generator.
    """
    def __init__(self, timestamp=None):
        if timestamp:
            self.l = (timestamp >> 16) & 0xffffffffffff
            self.c = timestamp & 0xffff
        else:
            self.l = 0
            self.c = 0
            self.tick()

    def update(self, rt=None):
        """ Updates the clock with a received timestamp from another node.

        :param rt: received timestamp
        """
        ml = (rt >> 16) & 0xffffffffffff
        mc = rt & 0xffff

        old_l = self.l
        pt = int(round((time.time() + 0.0005) * 1000.0))
        self.l = max(old_l, ml, pt)
        if self.l == old_l == ml:
            self.c = max(self.c, mc) + 1
        elif self.l == old_l:
            self.c += 1
        elif self.l == ml:
            self.c = mc + 1
        else:
            self.c = 0

    def tick(self):
        """ Updates and tick the clock for local or send events.

        :return: the updated timestamp
        """
        # local or send event
        old_l = self.l
        pt = int(round((time.time() + 0.0005) * 1000.0))
        self.l = max(old_l, pt)
        if self.l == old_l:
            self.c += 1
        else:
            self.c = 0

        return self.timestamp()

    def timestamp(self):
        """ Gets the current timestamp without updating counter.

        :return: the timestamp
        """
        return (self.l << 16) | self.c

    def seconds(self):
        """ Gets the value compatible with time.time() function.

        :return: the float value represent seconds from epoc
        """
        return (self.l/1000.0) + (self.c/65536.0)

    def __hash__(self):
        return self.timestamp()

    def __cmp__(self, other):
        if self.l != other.l:
            if self.l > other.l:
                return 1
            if self.l < other.l:
                return -1

        if self.c != other.c:
            if self.c > other.c:
                return 1
            if self.c < other.c:
                return -1
        return 0

    def __repr__(self):
        return "Clock[l=%r, c=%r]" % (self.l, self.c)

    @staticmethod
    def timestamp_to_secs(ts):
        """ Converts a timestamp to seconds.
        :param ts: the clock's timestamp.
        :return:
        """
        l = (ts >> 16) & 0xffffffffffff
        c = ts & 0xffff
        return (l/1000.0) + (c/65536.0)


clock = Clock()


__all__ = ['Clock', 'clock']

測試程式 :

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import time
from ava.util.clock import Clock

import pytest

@pytest.fixture
def clock():
    return Clock()


class TestClock(object):

    def test_milliseconds_round_up(self):
        t = time.time()
        pt = round((t + 0.0005), 3)
        assert pt >= t

    def test_clock_should_increment_monotonically(self, clock):
        prev = clock.timestamp()
        for _ in xrange(2000):
            cur = clock.tick()
            # print('%r' % cur)
            assert cur > prev
            prev = cur

    def test_create_clock_from_timestamp(self):
        c = Clock(94132454961709074)
        assert c.l == 1436347274196
        assert c.c == 18

    def test_clock_seconds_should_be_bounded(self):
        c = Clock().seconds()
        t = time.time()
        assert (c - t) < 0.002

    def test_clock_tick_with_timestamp(self, monkeypatch):
        def mytime():
            return 1436345964.484081
        monkeypatch.setattr(time, 'time', mytime)
        # print(time.time())
        c = Clock(94132454961709074)
        assert c.l == 1436347274196
        assert c.c == 18
        c.update(94132454961709075)
        assert c.c == 20  # tick() increments c by 1 as well

結語

本文提供的實作是山姆鍋根據自己的理解撰寫並依照 Python 特性做調整 。 若有錯誤請不吝指教 。

參考資料

Cockroach DB: https://github.com/cockroachdb/cockroach

NTP: https://en.wikipedia.org/wiki/Network_Time_Protocol

Lamport logical clock: https://en.wikipedia.org/wiki/Lamport_timestamps

Vector clock: https://en.wikipedia.org/wiki/Vector_clock


[1]http://www.cse.buffalo.edu/tech-reports/2014-04.pdf Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases