RussellLuo

让思想在文字间徜徉

一、引言

最近在工作中负责制定重构计划,需要将部分业务代码从 Python 迁移到 Golang。其中一些功能涉及到 Celery 延时任务,所以一直在思考 Golang 中处理延时任务的有效方案。

其实在软件系统中,“在一段时间后执行一个任务” 的需求比比皆是。比如:

  • 客户端发起 HTTP 请求后,如果在指定时间内没有收到服务器的响应,则自动断开连接。

为了实现上述功能,通常我们会使用定时器 Timer:

  1. 客户端发起请求后,立即创建(启动)一个 Timer:到期间隔为 d,到期后执行 “断开连接” 的操作。
  2. 如果到期间隔 d 以内收到了服务器的响应,客户端就删除(停止)这个 Timer。
  3. 如果一直没有收到响应,则 Timer 最终会到期,然后执行 “断开连接” 的操作。

Golang 内置的 Timer 是采用最小堆来实现的,创建和删除的时间复杂度都为 O(log n)。现代的 Web 服务动辄管理 100w+ 的连接,每个连接都会有很多超时任务(比如发送超时、心跳检测等),如果每个超时任务都对应一个 Timer,性能会比较低下。

论文 Hashed and Hierarchical Timing Wheels 提出了一种用于实现 Timer 的高效数据结构:时间轮。采用时间轮实现的 Timer,创建和删除的时间复杂度为 O(1)。

常见的时间轮实现有两种:

  • 简单时间轮(Simple Timing Wheel)—— 比如 Netty4 的 HashedWheelTimer
  • 层级时间轮(Hierarchical Timing Wheels)—— 比如 Kafka 的 Purgatory

参考 Kafka 的层级时间轮实现(基于 Java/Scala 语言),我依葫芦画瓢实现了一个 Golang 版本的层级时间轮,实现源码作为个人项目放到了 GitHub

下面我们来看看简单时间轮、层级时间轮、Kafka 的层级时间轮变体的实现原理,以及 Golang 实现中的一些要点。

二、简单时间轮

一个 简单时间轮 就是一个循环列表,列表中的每一格包含一个定时任务列表(双向链表)。一个时间单位为 u、大小为 n 的简单时间轮,可以包含的定时任务的最大到期间隔为 n*u。

以 u 为 1ms、n 为 3 的简单时间轮为例,可以包含的定时任务的最大到期间隔为 3ms。

simple-timing-wheel

如上图所示,该简单时间轮的运行原理如下:

  1. 初始时,假设当前时间(蓝色箭头)指向第 1 格(此时:到期间隔为 [0ms, 1ms) 的定时任务放第 1 格,[1ms, 2ms) 的放第 2 格,[2ms, 3ms) 的放第 3 格)。
  2. 此时我们创建一个到期间隔为 1ms 的定时任务 task1,按规则该任务会被插入到第 2 格。
  3. 随着时间的流逝,过了 1ms 后当前时间指向第 2 格,这一格包含的定时任务 task1 会被删除并执行。
  4. 当前时间指向第 2 格(此时:到期间隔为 [0ms, 1ms) 的定时任务放第 2 格,[1ms, 2ms) 的放第 3 格,[2ms, 3ms) 的放第 1 格),我们继续创建一个到期间隔为 2ms 的定时任务 task2,按规则该任务被插入到第 1 格。

简单时间轮的优点是实现简单,缺点是:

  • 一旦选定 n,就不能包含到期间隔超过 n*u 的定时任务。
  • 如果定时任务的到期时间跨度较大,就会选择较大的 n,在定时任务较少时会造成很大的空间浪费。

有一些简单时间轮的 变体实现,它们通过在定时任务中增加记录 round 轮次信息,可以有效弥补上述两个缺点。同样以上面 u 为 1ms、n 为 3 的简单时间轮为例,初始时间指向第 1 格;此时如果要创建到期时间为 4ms 的定时任务,可以在该任务中设置 round 为 1(4/3 取整),剩余到期时间用 4ms 减去 round*3ms 等于 1ms,因此放到第 2 格;等到当前时间指向第 2 格时,判断任务中的 round 大于 0,所以不会删除并执行该任务,而是对其 round 减一(于是 round 变为 0);等到再过 3ms 后,当前时间再次指向第 2 格,判断任务中的 round 为 0,进而删除并执行该任务。

然而,这些变体实现因为只使用了一个时间轮,所以仍然存在一个缺点:处理每一格的定时任务列表的时间复杂度是 O(n),如果定时任务数量很大,分摊到每一格的定时任务列表就会很长,这样的处理性能显然是让人无法接受的。

三、层级时间轮

层级时间轮 通过使用多个时间轮,并且对每个时间轮采用不同的 u,可以有效地解决简单时间轮及其变体实现的问题。

参考 Kafka 的 Purgatory 中的层级时间轮实现:

  • 每一层时间轮的大小都固定为 n,第一层时间轮的时间单位为 u,那么第二层时间轮(我们称之为第一层时间轮的溢出时间轮 Overflow Wheel)的时间单位就为 n*u,以此类推。
  • 除了第一层时间轮是固定创建的,其他层的时间轮(均为溢出时间轮)都是按需创建的。
  • 原先插入到高层时间轮(溢出时间轮)的定时任务,随着时间的流逝,会被降级重新插入到低层时间轮中。

以 u 为 1ms、n 为 3 的层级时间轮为例,第一层时间轮的时间单位为 1ms、大小为 3,第二层时间轮的时间单位为 3ms、大小为 3,以此类推。

hierarchical-timing-wheels

如上图所示,该层级时间轮的运行原理如下:

  1. 初始时,只有第一层(Level 1)时间轮,假设当前时间(蓝色箭头)指向第 1 格(此时:到期间隔为 [0ms, 1ms) 的定时任务放第 1 格,[1ms, 2ms) 的放第 2 格,[2ms, 3ms) 的放第 3 格)。
  2. 此时我们创建一个到期间隔为 2ms 的定时任务 task1,按规则该任务会被插入到第一层时间轮的第 3 格。
  3. 同一时刻,我们再次创建一个到期间隔为 4ms 的定时任务 task2,因为到期间隔超过了第一层时间轮的间隔范围,所以会创建第二层(Level 2)时间轮;第二层时间轮中的当前时间(蓝色箭头)也指向第 1 格,按规则该任务会被插入到第二层时间轮的第 2 格。
  4. 随着时间的流逝,过了 2ms 后,第一层时间轮中的当前时间指向第 3 格,这一格包含的任务 task1 会被删除并执行;此时,第二层时间轮的当前时间没有变化,依然指向第 1 格。
  5. 随着时间的流逝,又过了 1ms 后,第一层时间轮中的当期时间指向第 1 格,这一格中没有任务;此时,第二层当前时间指向第 2 格,这一格包含的任务 task2 会被删除并重新插入时间轮,因为剩余到期时间为 1ms,所以 task2 会被插入到第一层时间轮的第 2 格。
  6. 随着时间的流逝,又过了 1ms 后,第一层时间轮中的当前时间指向第 2 格,这一格包含的定时任务 task2 会被删除并执行;此时,第二层时间轮的当前时间没有变化,依然指向第 2 格。

四、Kafka 的变体实现

在具体实现层面(参考 Kafka Timer 实现源码),Kafka 的层级时间轮与上面描述的原理有一些差别。

1. 时间轮表示

kafka-implementation-timing-wheel-representation

如上图所示,在时间轮的表示上面:

  • 使用大小为 wheelSize 的数组来表示一层时间轮,其中每一格是一个 bucket,每个 bucket 的时间单位为 tick。
  • 这个时间轮数组并没有模拟循环列表的行为(如图左所示),而是模拟了哈希表的行为。具体而言(如图右所示),这个时间轮数组会随着 currentTime 的流逝而移动,也就是说 currentTime 永远是指向第一个 bucket 的,每个落到该时间轮的定时任务,都会根据哈希函数 (expiration/tick)%wheelSize 散列到对应的 bucket 中(参考 源码)。

2. 时钟驱动方式

常规的时间轮实现中,会在一个线程中每隔一个时间单位 tick 就醒来一次,并驱动时钟走向下一格,然后检查这一格中是否包含定时任务。如果时间单位 tick 很小(比如 Kafka 中 tick 为 1ms)并且(在最低层时间轮的)定时任务很少,那么这种驱动方式将会非常低效。

Kafka 的层级时间轮实现中,利用了 Java 内置的 DelayQueue 结构,将每一层时间轮中所有 “包含有定时任务的 bucket” 都加入到同一个 DelayQueue 中(参考 源码),然后 等到有 bucket 到期后再驱动时钟往前走(参考 源码),并逐个处理该 bucket 中的定时任务(参考 源码)。

kafka-implementation-clock-driving-method

如上图所示:

  1. 往层级时间轮中添加一个定时任务 task1 后,会将该任务所属的 bucket2 的到期时间设置为 task1 的到期时间 expiration(= 当前时间 currentTime + 定时任务到期间隔 duration),并将这个 bucket2 添加(Offer)到 DelayQueue 中。
  2. DelayQueue(内部有一个线程)会等待 “到期时间最早(earliest)的 bucket” 到期,图中等到的是排在队首的 bucket2,于是经由 poll 返回并删除这个 bucket2;随后,时间轮会将当前时间 currentTime 往前移动到 bucket2 的 expiration 所指向的时间(图中是 1ms 所在的位置);最后,bucket2 中包含的 task1 会被删除并执行。

上述 Kafka 层级时间轮的驱动方式是非常高效的。虽然 DelayQueue 中 offer(添加)和 poll(获取并删除)操作的时间复杂度为 O(log n),但是相比定时任务的个数而言,bucket 的个数其实是非常小的(也就是 O(log n) 中的 n 很小),因此性能也是没有问题的。

五、Golang 实现要点

timingwheel 中的 Golang 实现,基本上都是参考 Kafka 的层级时间轮的原理来实现的。

因为 Golang 中没有现成的 DelayQueue 结构,所以自己实现了一个 DelayQueue,其中:

  • PriorityQueue —— 从 NSQ 借用过来的 优先级队列(基于最小堆实现)。
  • DelayQueue —— Offer(添加 bucket)和 Poll(获取并删除 bucket)的运作方式,跟 Golang Timer 运行时中 addtimerLockedtimerproc 的运作方式如出一辙,因此参考了其中的实现方式(参考 原理介绍)。

六、相关阅读

一、标准 Timer 的问题

以下讨论只针对由 NewTimer 创建的 Timer,因为这种 Timer 会使用 channel 来传递到期事件,而正确操作 channel 并非易事。

Timer.Stop

按照 Timer.Stop 文档 的说法,每次调用 Stop 后需要判断返回值,如果返回 false(表示 Stop 失败,Timer 已经在 Stop 前到期)则需要排掉(drain)channel 中的事件:

1
2
3
if !t.Stop() {
<-t.C
}

但是如果之前程序已经从 channel 中接收过事件,那么上述 <-t.C 就会发生阻塞。可能的解决办法是借助 select 进行 非阻塞 排放(draining):

1
2
3
4
5
6
if !t.Stop() {
select {
case <-t.C: // try to drain the channel
default:
}
}

但是因为 channel 的发送和接收发生在不同的 goroutine,所以 存在竞争条件(race condition),最终可能导致 channel 中的事件未被排掉。

以下就是一种有问题的场景,按时间先后顺序发生:

  • goroutine A:Go 运行时判断 Timer 已经到期,于是从最小堆中删除该 Timer
  • goroutine B:应用程序执行 Timer.Stop,发现 Timer 已经到期,进而返回 false
  • goroutine B:应用程序继续执行 select...case <-t.C,因为 channel 中并没有事件,所以会立即返回
  • goroutine A:Go 运行时将到期事件发送到该 Timer 的 channel 中

Timer.Reset

按照 Timer.Reset 文档 的说法,要正确地 Reset Timer,首先需要正确地 Stop Timer。因此 Reset 的问题跟 Stop 基本相同。

二、使用 Timer 的正确方式

参考 Russ Cox 的回复(这里这里),目前 Timer 唯一合理的使用方式是:

  • 程序始终在同一个 goroutine 中进行 Timer 的 Stop、Reset 和 receive/drain channel 操作
  • 程序需要维护一个状态变量,用于记录它是否已经从 channel 中接收过事件,进而作为 Stop 中 draining 操作的判断依据

如果每次使用 Timer 都要按照上述方式来处理,无疑是一件很费神的事。为此,我专门写了一个 Go 库 goodtimer 来解决标准 Timer 的问题。懒是一种美德 :-)

三、相关阅读

一、要解决的问题

一直以来,Redis 都是单线程的(参考 FAQ)。这种模型使得 Redis 简单、高效,但缺点也很明显:如果执行一个比较耗时的命令,那么在该命令执行期间,整个 Redis 服务都将被阻塞(无法并发地执行其他命令)。

大部分 Redis 命令的执行速度都很快,所以不是问题;但也有一些命令,比如 ZUNIONSTORELRANGESINTER,以及臭名昭著的 KEYS,根据处理数据集大小的不同,可能会阻塞 Redis 数秒或几分钟。

DEL 命令为例,当被删除的 key 是 list、set、sorted set 或 hash 类型时,时间复杂度为 O(M),其中 M 是 key 中包含的元素的个数。

二、非阻塞删除

Redis 的作者 Salvatore Sanfilippo 自己也意识到了上述问题,并提出了对应的解决方案:非阻塞删除(参考 Lazy Redis is better Redis)。简而言之,「非阻塞删除」就是将删除操作放到另外一个线程(而非 Redis 主线程)去处理。

最终「非阻塞删除」在 Redis 4.0 中得以实现(参考 Redis 4.0 release notes),从此 Redis 开启了 “多线程” 时代。

新增实现的「非阻塞删除」包括以下命令:

命令 (原来的)阻塞版本
UNLINK DEL
FLUSHALL ASYNC FLUSHALL
FLUSHDB ASYNC FLUSHDB

1. 源码实现

参考 Redis 源码 可以发现,DELUNLINK 分别对应不同的处理函数:

命令 处理函数
DEL dbSyncDelete
UNLINK dbAsyncDelete

具体的实现细节请自行研读源码。

2. 耗时对比

下面我们来实际对比一下 DELUNLINK 的耗时差异。

开启 Slow log

设置 Slow log 记录每条命令的耗时(参考 SLOWLOG):

1
2
3
4
5
127.0.0.1:6379> CONFIG SET slowlog-log-slower-than 0
OK
127.0.0.1:6379> CONFIG GET slowlog-log-slower-than
1) "slowlog-log-slower-than"
2) "0"

创建两个大 hash

准备一个 Lua 脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
local bulk = 1000
local fvs = {}
local j
for i = 1, ARGV[1] do
j = i % bulk
if j == 0 then
fvs[2 * bulk - 1] = "field" .. i
fvs[2 * bulk] = "value" .. i
redis.call("HMSET", KEYS[1], unpack(fvs))
fvs = {}
else
fvs[2 * j - 1] = "field" .. i
fvs[2 * j] = "value" .. i
end
end
if #fvs > 0 then
redis.call("HMSET", KEYS[1], unpack(fvs))
end
return "OK"

将上述脚本保存为 huge_hmset.lua,然后借助该脚本创建两个大 hash(参考 how to load lua script from file for redis),分别为 hash1hash2,它们各自拥有 100 万个 field:

1
2
3
4
$ redis-cli --eval huge_hmset.lua hash1 , 1000000
"OK"
$ redis-cli --eval huge_hmset.lua hash2 , 1000000
"OK"

上述操作会在 Slow log 中产生大量 HMSET 命令,这里先清除掉:

1
2
127.0.0.1:6379> SLOWLOG RESET
OK

DEL hash1

1
2
3
127.0.0.1:6379> DEL hash1
(integer) 1
(0.63s)
1
2
127.0.0.1:6379> UNLINK hash2
(integer) 1

查看 Slow log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1:6379> SLOWLOG GET 2
1) 1) (integer) 5089
2) (integer) 1534653951
3) (integer) 17
4) 1) "UNLINK"
2) "hash2"
5) "127.0.0.1:56560"
6) ""
2) 1) (integer) 5088
2) (integer) 1534653948
3) (integer) 630305
4) 1) "DEL"
2) "hash1"
5) "127.0.0.1:56560"
6) ""

耗时对比结果:

命令 耗时
DEL hash1 630305 us
UNLINK hash2 17 us

值得注意的是:UNLINK 执行如此之快,并非使用了什么快速算法,而是因为它将真正的删除操作异步化了。

四、相关阅读

一、要解决的问题

按照 Redis 官方文档 - Replication 的说法:Redis replication 是一种 master-slave 模式的复制机制,这种机制使得 slave 节点可以成为与 master 节点完全相同的副本。

我们知道,单个 Redis 节点也是可以直接工作的。那为什么一个 Redis 节点(master)还需要一个或多个副本(slave)呢?或者说 replication 到底想要解决什么问题?官方文档如是说:

Replication can be used both for scalability, in order to have multiple slaves for read-only queries (for example, slow O(N) operations can be offloaded to slaves), or simply for improving data safety and high availability.

简而言之,replication 主要用于解决两个问题:

1. 读扩展

一个 master 用于写,多个 slave 用于分摊读的压力。

redis-replication-scalability

2. 高可用

如果 master 挂掉了,可以提升(promote)一个 slave 为新的 master,进而实现故障转移(failover)。

redis-replication-high-availability

思考:如果没有 replication,上述两个问题该如何应对?

二、replication 初体验

开两个终端,分别启动一个 Redis 节点:

1
2
3
4
# Terminal 1
$ redis-4.0.8/src/redis-server -p 6379
# Terminal 2
$ redis-4.0.8/src/redis-server -p 6380

在 6379 节点上设置并获取 key1:

1
2
3
4
5
$ redis-4.0.8/src/redis-cli -p 6379
127.0.0.1:6379> SET key1 value1
OK
127.0.0.1:6379> GET key1
"value1"

在 6380 节点上尝试获取 key1:

1
2
3
$ redis-4.0.8/src/redis-cli -p 6380
127.0.0.1:6380> GET key1
(nil)

可以看出,两个 Redis 节点各自为政,二者的数据并没有同步。

下面我们让 6380 成为 6379 的 slave 节点:

1
2
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379
OK

然后再尝试获取 key1:

1
2
127.0.0.1:6380> GET key1
"value1"

很显然,最初在 6379 节点(后续称为 master)设置的 key1 已经被同步到了 6380 节点(后续称为 slave)。

实验:尝试在 master 设置更多的 key 或删除 key,然后在 slave 上获取并观察结果。

三、情景分析

1. slave 初次连接 master

上述过程中,在 slave 上执行 SLAVEOF 命令以后,可以看到 slave 的日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
31667:S 03 Jul 21:32:17.809 * Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.
31667:S 03 Jul 21:32:17.809 * SLAVE OF 127.0.0.1:6379 enabled (user request from 'id=2 addr=127.0.0.1:58544 fd=8 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=slaveof')
31667:S 03 Jul 21:32:17.825 * Connecting to MASTER 127.0.0.1:6379
31667:S 03 Jul 21:32:17.826 * MASTER <-> SLAVE sync started
31667:S 03 Jul 21:32:17.826 * Non blocking connect for SYNC fired the event.
31667:S 03 Jul 21:32:17.826 * Master replied to PING, replication can continue...
31667:S 03 Jul 21:32:17.826 * Trying a partial resynchronization (request 823e1002c282b4c088a6f80d4251de04f920068d:1).
31667:S 03 Jul 21:32:17.827 * Full resync from master: 599456031709498747f866bc3f7f4382db99ed89:0
31667:S 03 Jul 21:32:17.827 * Discarding previously cached master state.
31667:S 03 Jul 21:32:17.926 * MASTER <-> SLAVE sync: receiving 193 bytes from master
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Flushing old data
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Loading DB in memory
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Finished with success

对应 master 的日志如下:

1
2
3
4
5
6
7
31655:M 03 Jul 21:32:17.826 * Slave 127.0.0.1:6380 asks for synchronization
31655:M 03 Jul 21:32:17.826 * Partial resynchronization not accepted: Replication ID mismatch (Slave asked for '823e1002c282b4c088a6f80d4251de04f920068d', my replication IDs are '4014bea143e2ade5aa81012849b0775ab0377b85' and '0000000000000000000000000000000000000000')
31655:M 03 Jul 21:32:17.826 * Starting BGSAVE for SYNC with target: disk
31655:M 03 Jul 21:32:17.826 * Background saving started by pid 31669
31669:C 03 Jul 21:32:17.827 * DB saved on disk
31655:M 03 Jul 21:32:17.926 * Background saving terminated with success
31655:M 03 Jul 21:32:17.926 * Synchronization with slave 127.0.0.1:6380 succeeded

分析上述输出日志,我们可以初步总结出 slave 和 master 的交互时序:

  1. slave 主动连接 master。
  2. 连接成功后,slave 会向 master 发起 partial resynchronization 的请求。
  3. master 收到请求后,判断 replication ID 不匹配,拒绝执行 partial resynchronization,转而通知 slave 执行 full resync。
  4. 随后 master 开始执行 BGSAVE 命令,将当前 DB 数据保存到 disk 磁盘,最后向 slave 发送 DB 数据。
  5. slave 从 master 接收到 DB 数据后,将其加载到内存,同时删除旧数据。

2. slave 断开后重连 master

思考:在同一台机器上,如何模拟 master 和 slave 的网络断开与恢复?

master 日志:

1
2
3
4
33518:M 03 Jul 22:46:48.432 # Disconnecting timedout slave: 127.0.0.1:6380
33518:M 03 Jul 22:46:48.432 # Connection with slave 127.0.0.1:6380 lost.
33518:M 03 Jul 22:46:50.538 * Slave 127.0.0.1:6380 asks for synchronization
33518:M 03 Jul 22:46:50.538 * Partial resynchronization request from 127.0.0.1:6380 accepted. Sending 0 bytes of backlog starting from offset 1541.

slave 日志:

1
2
3
4
5
6
7
8
9
33519:S 03 Jul 22:46:48.432 # Connection with master lost.
33519:S 03 Jul 22:46:48.432 * Caching the disconnected master state.
33519:S 03 Jul 22:46:50.536 * Connecting to MASTER 127.0.0.1:6379
33519:S 03 Jul 22:46:50.537 * MASTER <-> SLAVE sync started
33519:S 03 Jul 22:46:50.537 * Non blocking connect for SYNC fired the event.
33519:S 03 Jul 22:46:50.537 * Master replied to PING, replication can continue...
33519:S 03 Jul 22:46:50.537 * Trying a partial resynchronization (request 6b1b77bebea22557686922f99cfa3103ba0824ae:1541).
33519:S 03 Jul 22:46:50.538 * Successful partial resynchronization with master.
33519:S 03 Jul 22:46:50.538 * MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.

可以看出:

  1. 网络断开一段时间后,master 会断开与 slave 的连接。
  2. 网络恢复后,仍然是 slave 主动连接 master。
  3. 连接成功后,slave 会向 master 发起 partial resynchronization 的请求。
  4. 这一次,master 接受了该 partial resynchronization 请求,然后将 backlog 中由 (offset, size) 标记的数据流发送给 slave。
  5. slave 从 master 接收到数据流后,更新自己内存中的数据。

实验redis.conf 中有两个参数 repl-timeout(默认值为 60 秒)和 repl-backlog-ttl(默认值为 3600 秒),尝试都设置为 10 秒,然后断开网络一直等到 25 秒后再恢复,再观察 master 和 slave 的日志会有什么不同?

3. master 与 slave 连接正常,写 master

通过 telnet 连接到 master:

1
2
3
4
$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

键入 PSYNC 命令,尝试与 master 进行同步:

1
2
3
4
5
6
7
8
9
$ telnet 127.0.0.1 6379
...
PSYNC ? -1
+FULLRESYNC 8cdd5be435af5bcda9bb332e319cae9b71f788d7 344
$194
REDIS0008? redis-ver4.0.8?
redis-bits?@?ctime?6?@[used-mem???repl-stream-db??repl-id(8cdd5be435af5bcda9bb332e319cae9b71f788d7?
repl-offset?X?
aof-preamble???key1value1?'>?w?Z

此时查看 master 的日志:

1
2
3
4
5
6
7
40535:M 07 Jul 17:04:51.009 * Slave 127.0.0.1:<unknown-slave-port> asks for synchronization
40535:M 07 Jul 17:04:51.009 * Full resync requested by slave 127.0.0.1:<unknown-slave-port>
40535:M 07 Jul 17:04:51.009 * Starting BGSAVE for SYNC with target: disk
40535:M 07 Jul 17:04:51.009 * Background saving started by pid 40579
40579:C 07 Jul 17:04:51.012 * DB saved on disk
40535:M 07 Jul 17:04:51.045 * Background saving terminated with success
40535:M 07 Jul 17:04:51.045 * Synchronization with slave 127.0.0.1:<unknown-slave-port> succeeded

随后在 master 上设置 key2:

1
2
127.0.0.1:6379> SET key2 value2
OK

然后观察 telnet 的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ telnet 127.0.0.1 6379
...
*1
$4
PING
*2
$6
SELECT
$1
0
*3
$3
SET
$4
key2
$6
value2
*1
$4
PING

可以看出:

  1. telnet 通过 PSYNC 命令,成为了 master 的一个新的 slave。
  2. master 上的写命令(这里是 SET key2 value2),会被传播(propagate)到 salve 上,进而保证了 slave 与 master 的数据一致性。

四、replication 原理

上面的三种情景,其实已经涵盖了 Redis replication 的两大核心操作:

  1. 重同步(resync)
    • 完整重同步(full resynchronization)
    • 部分重同步(partial resynchronization)
  2. 命令传播(command propagate)

下面我们对这两种操作,做进一步阐述。

1. 重同步

「重同步」用于将 slave 的数据库状态更新至 master 当前所处的数据库状态。

SYNC 与 PSYNC

旧版本 Redis 中,「重同步」通过 SYNC 命令来实现。从 2.8 版本开始,Redis 改用 PSYNC 命令来代替 SYNC 命令。

SYNC 命令和 PSYNC 命令的区别:

命令 初次复制 断线后复制
SYNC 完整重同步 完整重同步
PSYNC 完整重同步:PSYNC ? -1 部分重同步:PSYNC <replication-id> <offset>

完整重同步

redis-replication-full-resync

说明:

  1. slave 通过 SYNC 或 PSYNC 命令,向 master 发起同步请求。
  2. master 返回 FULLRESYNC 告知 slave 将执行「完整重同步」,先决条件为:
    • 请求命令是「完整重同步」SYNC
    • 请求命令是「完整重同步」PSYNC ? -1
    • 请求命令是「部分重同步」PSYNC <replication-id> <offset>,但是 <replication-id> 不是 master 的 replication-id,或者 slave 给的 <offset> 不在 master 的「复制积压缓冲区」backlog 里面。
  3. master 执行 BGSAVE 命令,将当前数据库状态保存为 RDB 文件。
  4. 生成 RDB 文件完毕后,master 将该文件发送给 slave。
  5. slave 收到 RDB 文件后,将其加载至内存。
  6. master 将 backlog 中缓冲的命令发送给 slave(一开始在 BGSAVE 时记录了当时的 offset)。
  7. slave 收到后,逐个执行这些命令。

部分重同步

redis-replication-partial-resync

说明:

  1. slave 通过 PSYNC <replication-id> <offset> 命令,向 master 发起「部分重同步」请求。
  2. master 返回 CONTINUE 告知 slave 同意执行「部分重同步」,先决条件为:
    • <replication-id> 是 master 的 replication-id,并且 slave 给的 <offset> 在 master 的「复制积压缓冲区」backlog 里面
  3. master 将 backlog 中缓冲的命令发送给 slave(根据 slave 给的 offset)。
  4. slave 收到后,逐个执行这些命令。

由上可以看出,「复制积压缓冲区」backlog 是「部分重同步」得以实现的关键所在。

复制积压缓冲区

「复制积压缓冲区」是 master 维护的一个固定长度(fixed-sized)的先进先出(FIFO)的内存队列。值得注意的是:

  • 队列的大小由配置 repl-backlog-size 决定,默认为 1MB。当队列长度超过 repl-backlog-size 时,最先入队的元素会被弹出,用于腾出空间给新入队的元素。
  • 队列的生存时间由配置 repl-backlog-ttl 决定,默认为 3600 秒。如果 master 不再有与之相连接的 slave,并且该状态持续时间超过了 repl-backlog-ttl,master 就会释放该队列,等到有需要(下次又有 slave 连接进来)的时候再创建。

master 会将最近接收到的写命令(按 Redis 协议的格式)保存到「复制积压缓冲区」,其中每个字节都会对应记录一个偏移量 offset。

. . . . . . . . . . . . . .
偏移量 10087 10088 10089 10090 10091 10092 10093 10094 10095 10096 10097
字节值 ‘*’ 3 ‘\r’ ‘\n’ ‘$’ 3 ‘\r’ ‘\n’ ‘S’ ‘E’ ‘T’

与此同时,slave 会维护一个 offset 值,每次从 master 传播过来的命令,一旦成功执行就会更新该 offset。尝试「部分重同步」的时候,slave 都会带上自己的 offset,master 再判断 offset 偏移量之后的数据是否存在于自己的「复制积压缓冲区」中,以此来决定执行「部分重同步」还是「完整重同步」。

2. 命令传播

「命令传播」用于在 master 的数据库状态被修改时,将导致变更的命令传播给 slave,从而让 slave 的数据库状态与 master 保持一致。

redis-replication-command-propagate

说明:master 进行命令传播时,除了将写命令直接发送给所有 slave,还会将这些命令写入「复制积压缓冲区」,用于后续可能发生的「部分重同步」操作。

五、参考资料

对于 REST API 的开发者而言,不管是对内作为团队的开发文档,还是对外作为给用户的说明文档,API 文档都是不可或缺的。

然而 “文档是死的、代码是活的”,在现实中,文档跟不上代码的更新节奏的情况比比皆是。如何编写 实时更新的易于阅读的 文档成了一个普遍的难题。由此,API 描述语言应用而生。

Swagger 是一个简单但功能强大的 API 表达工具。它具有地球上最大的 API 工具生态系统。数以千计的开发人员,使用几乎所有的现代编程语言,都在支持和使用 Swagger。使用 Swagger 生成 API,我们可以得到交互式文档,自动生成代码的 SDK 以及 API 的发现特性等(参考 使用Swagger生成RESTful API文档)。

Swagger 的功能很丰富,但在这里我们只关心一点:如何基于简单的 Swagger 描述语言,为 REST API 生成易读的 Markdown 离线文档。

一、基于 Swagger Spec 编写 API 描述文档

这一步无需多说,打开你喜欢的编辑器,或者使用官方的 Swagger Editor,参考 Spec 语法 编写即可。

这里我们以 petstore-minimal.yaml 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
---
swagger: "2.0"
info:
version: "1.0.0"
title: "Swagger Petstore"
description: "A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification"
termsOfService: "http://swagger.io/terms/"
contact:
name: "Swagger API Team"
license:
name: "MIT"
host: "petstore.swagger.io"
basePath: "/api"
schemes:
- "http"
consumes:
- "application/json"
produces:
- "application/json"
paths:
/pets:
get:
description: "Returns all pets from the system that the user has access to"
produces:
- "application/json"
responses:
"200":
description: "A list of pets."
schema:
type: "array"
items:
$ref: "#/definitions/Pet"
definitions:
Pet:
type: "object"
required:
- "id"
- "name"
properties:
id:
type: "integer"
format: "int64"
name:
type: "string"
tag:
type: "string"

二、安装转换工具 Swagger2Markup

Swagger2Markup 是一个 Java 编写的工具,用于将 Swagger 文档转换为 AsciiDoc 或者 Markdown 文档。简直就是为我们这里的需求量身定做的 :-)

安装 Swagger2Markup 的步骤如下:

1. 安装 Java

以 Ubuntu 为例,参考 How To Install Java on Ubuntu with Apt-GetUbuntu 安装 JDK 7 / JDK8 的两种方式

  1. 安装默认的 JRE/JDK

    1
    2
    3
    4
    5
    $ sudo apt-get update
    $ # 安装默认的 JRE
    $ sudo apt-get install default-jre
    $ # 安装默认的 JDK
    $ sudo apt-get install default-jdk
  2. 安装 Oracle JDK 8

    1
    2
    3
    4
    5
    $ # 添加 ppa
    $ sudo add-apt-repository ppa:webupd8team/java
    $ sudo apt-get update
    $ # 安装 oracle-java-installer(按提示依次选择 ok 和 yes 即可)
    $ sudo apt-get install oracle-java8-installer

2. 下载 Swagger2Markup 的命令行工具

参考 Command Line Interface,下载最新的 jar 包(当前为 swagger2markup-cli-1.3.1.jar)即可。

三、使用 Swagger2Markup 将 Swagger 转换为 Markdown

参考 Command Line Interface 中的步骤:

1. 创建一个 config.properties 配置文件

设置 markupLanguage 为 MARKDOWN

1
swagger2markup.markupLanguage=MARKDOWN

2. 将 Swagger 转换为 Markdown

1
$ java -jar swagger2markup-cli-1.3.1.jar convert -i /path/to/petstore-minimal.yaml -f /tmp/petstore-minimal -c /path/to/config.properties

3. 查看生成的文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Swagger Petstore


<a name="overview"></a>
## Overview
A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification


### Version information
*Version* : 1.0.0


### Contact information
*Contact* : Swagger API Team


### License information
*License* : MIT
*Terms of service* : http://swagger.io/terms/


### URI scheme
*Host* : petstore.swagger.io
*BasePath* : /api
*Schemes* : HTTP


### Consumes

* `application/json`


### Produces

* `application/json`




<a name="paths"></a>
## Paths

<a name="pets-get"></a>
### GET /pets

#### Description
Returns all pets from the system that the user has access to


#### Responses

|HTTP Code|Description|Schema|
|---|---|---|
|**200**|A list of pets.|< [Pet](#pet) > array|


#### Produces

* `application/json`




<a name="definitions"></a>
## Definitions

<a name="pet"></a>
### Pet

|Name|Schema|
|---|---|
|**id** <br>*required*|integer (int64)|
|**name** <br>*required*|string|
|**tag** <br>*optional*|string|


四、CLI as a service

如果团队内部人员都会用到这个工具,但是又不想在每个人的电脑上都安装 Java 和 Swagger2Markup,这时可以基于命令行工具 Swagger2Markup 提供一个 “文档转换服务”。

作为示例,以下是使用 Python 语言并且借助 RESTArt 库实现的一个 “文档转换服务”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# swagger2markdown.py

import os
import tempfile

from restart import status
from restart.api import RESTArt
from restart.parsers import Parser
from restart.renderers import Renderer
from restart.resource import Resource

api = RESTArt()


class SwaggerParser(Parser):

content_type = 'text/plain'

def parse(self, stream, content_type, content_length, context=None):
return stream.read().decode('utf-8')


class MarkdownRenderer(Renderer):

content_type = 'text/plain'
format_suffix = 'md'

def render(self, data, context=None):
return data.encode('utf-8')


@api.register
class SwaggerMarkdownDocs(Resource):

name = 'swagger_markdown_docs'

parser_classes = (SwaggerParser,)
renderer_classes = (MarkdownRenderer,)

def create(self, request):
with tempfile.NamedTemporaryFile(suffix='.yml', delete=False) as yml:
yml_filename = yml.name
yml.write(request.data.encode('utf-8'))

with tempfile.NamedTemporaryFile(suffix='.md', delete=False) as md:
md_filename = md.name

jar = '/path/to/swagger2markup-cli-1.3.1.jar'
conf = '/path/to/config.properties'
os.system('java -jar {jar} convert -i {yml} -f {md} -c {conf}'.format(
jar=jar, yml=yml_filename, md=md_filename[:-len('.md')], conf=conf,
))

with open(md_filename) as md:
content = md.read().decode('utf-8')

os.unlink(yml_filename)
os.unlink(md_filename)

return content, status.HTTP_201_CREATED

启动 “文档转换服务”:

1
2
$ restart swagger2markdown:api
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

使用 “文档转换服务” 生成 Markdown 文档:

1
$ curl -H 'Content-Type: text/plain' -XPOST http://localhost:5000/swagger_markdown_docs --data-binary @/path/to/petstore-minimal.yaml > /tmp/petstore-minimal.md
0%