山姆鍋平常喜歡搜尋一些有趣的開放源碼專案,尤其是有提供設計文件的。有天不知道怎麼地逛到了 Cockroach DB 這個專案,它的設計文件有許多有趣的地方,但我特別注意到其中一篇論文,關於使用邏輯與實體時鐘來實現一個分散式時間郵戳 (timestamp) 的演算法。

邏輯時間的用途

在軟體系統,常常需要比較兩個事件發生的前後順序,例如:哪個交易 (transaction) 先發生, 這可以透過記錄事件發生的時間來達成。 但對分散式系統,這個問題就變得比較複雜,由於網路延遲、主機時鐘的誤差等等因素, 使用一個全域 (global) 的時間來源幾近不可能。

所以,分散式系統常採用邏輯時間來做為比較事件發生前後的依據, 其中 Lamport logical clock 或是 Vector clock 是兩種實現邏輯時鐘常見的方法。

混合式邏輯時鐘

本文所參考的論文 < sup class="footnote-ref">[1] ,提供所謂「混合式邏輯時鐘(Hybrid Logical Clocks; HLC)」的演算法。 HLC 跟過去常看到的採用實體時鐘(Physical clock), 或邏輯時鐘(Logical clock) 的主要不同點,顧名思義:採用混合實體跟邏輯時鐘。

主要引起山姆鍋興趣的有兩點:

  1. 可以跟 NTP 使用的 64-bit 時間郵戳格式相容。
  2. 可以代替常用的 Unix epoch timestamp。

也就是說,把使用 Python time.time() 當作時間郵戳的程式可以很簡單地更換成這種邏輯時鐘。 主要麻煩的問題在於:

  1. 畢竟是分散式邏輯時鐘,需要不時地跟其他節點 (node) 的時間做校正。
  2. HLC 假設主機 (host) 實體時鐘已經採用 NTP 同步。

第 2 點雖然好像有點麻煩,其實真正部署的分散式系統理應都已經有採用 NTP 或其他協定來同步主機時間。 所以,剩下就是邏輯時間維護的問題。

邏輯時鐘維護

HLC 需要在下列情況發生時,更新時鐘:

  1. 當從其他節點收到訊息時:
    本地節點根據訊息的邏輯時鐘,自己現在的邏輯時鐘以及實體時鐘來做調整。 在範例程式中, update 方法負責處理這個情況。
  2. 當本地事件發生,或者送出訊息時:
    本地節點只採用自己的現在邏輯時鐘跟實體時鐘調整。 範例程式中,tick 方法負責處理這個情況。

程式實作

ava.util.clock 模組內容:

clock.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import time


class Clock(object):
"""Logical timestamp generator.
"""
def __init__(self, timestamp=None):
if timestamp:
self.l = (timestamp >> 16) & 0xffffffffffff
self.c = timestamp & 0xffff
else:
self.l = 0
self.c = 0
self.tick()

def update(self, rt=None):
"""Updates the clock with a received timestamp from another node.

:param rt: received timestamp
"""
ml = (rt >> 16) & 0xffffffffffff
mc = rt & 0xffff

old_l = self.l
pt = int(round((time.time() + 0.0005) * 1000.0))
self.l = max(old_l, ml, pt)
if self.l == old_l == ml:
self.c = max(self.c, mc) + 1
elif self.l == old_l:
self.c += 1
elif self.l == ml:
self.c = mc + 1
else:
self.c = 0

def tick(self):
"""Updates and tick the clock for local or send events.

:return: the updated timestamp
"""
# local or send event
old_l = self.l
pt = int(round((time.time() + 0.0005) * 1000.0))
self.l = max(old_l, pt)
if self.l == old_l:
self.c += 1
else:
self.c = 0

return self.timestamp()

def timestamp(self):
"""Gets the current timestamp without updating counter.

:return: the timestamp
"""
return (self.l << 16) | self.c

def seconds(self):
"""Gets the value compatible with time.time() function.

:return: the float value represent seconds from epoc
"""
return (self.l/1000.0) + (self.c/65536.0)

def __hash__(self):
return self.timestamp()

def __cmp__(self, other):
if self.l != other.l:
if self.l > other.l:
return 1
if self.l < other.l:
return -1

if self.c != other.c:
if self.c > other.c:
return 1
if self.c < other.c:
return -1
return 0

def __repr__(self):
return "Clock[l=%r, c=%r]" % (self.l, self.c)

@staticmethod
def timestamp_to_secs(ts):
"""Converts a timestamp to seconds.
:param ts: the clock's timestamp.
:return:
"""
l = (ts >> 16) & 0xffffffffffff
c = ts & 0xffff
return (l/1000.0) + (c/65536.0)


clock = Clock()


__all__ = ['Clock', 'clock']

測試程式:

test_util_clock.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

import time
from ava.util.clock import Clock

import pytest

@pytest.fixture
def clock():
return Clock()


class TestClock(object):

def test_milliseconds_round_up(self):
t = time.time()
pt = round((t + 0.0005), 3)
assert pt >= t

def test_clock_should_increment_monotonically(self, clock):
prev = clock.timestamp()
for _ in xrange(2000):
cur = clock.tick()
# print('%r' % cur)
assert cur > prev
prev = cur

def test_create_clock_from_timestamp(self):
c = Clock(94132454961709074)
assert c.l == 1436347274196
assert c.c == 18

def test_clock_seconds_should_be_bounded(self):
c = Clock().seconds()
t = time.time()
assert (c - t) < 0.002

def test_clock_tick_with_timestamp(self, monkeypatch):
def mytime():
return 1436345964.484081
monkeypatch.setattr(time, 'time', mytime)
# print(time.time())
c = Clock(94132454961709074)
assert c.l == 1436347274196
assert c.c == 18
c.update(94132454961709075)
assert c.c == 20 # tick() increments c by 1 as well

結語

本文提供的實作是山姆鍋根據自己的理解撰寫並依照 Python 特性做調整。 若有錯誤請不吝指教。

參考資料

_`Cockroach DB`: https://github.com/cockroachdb/cockroach

_`NTP`: https://en.wikipedia.org/wiki/Network_Time_Protocol

_`Lamport logical clock`: https://en.wikipedia.org/wiki/Lamport_timestamps

_`Vector clock`: https://en.wikipedia.org/wiki/Vector_clock


http://www.cse.buffalo.edu/tech-reports/2014-04.pdf Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases