RussellLuo

让思想在文字间徜徉

一、要解决的问题

按照 Redis 官方文档 - Replication 的说法:Redis replication 是一种 master-slave 模式的复制机制,这种机制使得 slave 节点可以成为与 master 节点完全相同的副本。

我们知道,单个 Redis 节点也是可以直接工作的。那为什么一个 Redis 节点(master)还需要一个或多个副本(slave)呢?或者说 replication 到底想要解决什么问题?官方文档如是说:

Replication can be used both for scalability, in order to have multiple slaves for read-only queries (for example, slow O(N) operations can be offloaded to slaves), or simply for improving data safety and high availability.

简而言之,replication 主要用于解决两个问题:

1. 读扩展

一个 master 用于写,多个 slave 用于分摊读的压力。

redis-replication-scalability

2. 高可用

如果 master 挂掉了,可以提升(promote)一个 slave 为新的 master,进而实现故障转移(failover)。

redis-replication-high-availability

思考:如果没有 replication,上述两个问题该如何应对?

二、replication 初体验

开两个终端,分别启动一个 Redis 节点:

1
2
3
4
# Terminal 1
$ redis-4.0.8/src/redis-server -p 6379
# Terminal 2
$ redis-4.0.8/src/redis-server -p 6380

在 6379 节点上设置并获取 key1:

1
2
3
4
5
$ redis-4.0.8/src/redis-cli -p 6379
127.0.0.1:6379> SET key1 value1
OK
127.0.0.1:6379> GET key1
"value1"

在 6380 节点上尝试获取 key1:

1
2
3
$ redis-4.0.8/src/redis-cli -p 6380
127.0.0.1:6380> GET key1
(nil)

可以看出,两个 Redis 节点各自为政,二者的数据并没有同步。

下面我们让 6380 成为 6379 的 slave 节点:

1
2
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379
OK

然后再尝试获取 key1:

1
2
127.0.0.1:6380> GET key1
"value1"

很显然,最初在 6379 节点(后续称为 master)设置的 key1 已经被同步到了 6380 节点(后续称为 slave)。

实验:尝试在 master 设置更多的 key 或删除 key,然后在 slave 上获取并观察结果。

三、情景分析

1. slave 初次连接 master

上述过程中,在 slave 上执行 SLAVEOF 命令以后,可以看到 slave 的日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
31667:S 03 Jul 21:32:17.809 * Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.
31667:S 03 Jul 21:32:17.809 * SLAVE OF 127.0.0.1:6379 enabled (user request from 'id=2 addr=127.0.0.1:58544 fd=8 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=slaveof')
31667:S 03 Jul 21:32:17.825 * Connecting to MASTER 127.0.0.1:6379
31667:S 03 Jul 21:32:17.826 * MASTER <-> SLAVE sync started
31667:S 03 Jul 21:32:17.826 * Non blocking connect for SYNC fired the event.
31667:S 03 Jul 21:32:17.826 * Master replied to PING, replication can continue...
31667:S 03 Jul 21:32:17.826 * Trying a partial resynchronization (request 823e1002c282b4c088a6f80d4251de04f920068d:1).
31667:S 03 Jul 21:32:17.827 * Full resync from master: 599456031709498747f866bc3f7f4382db99ed89:0
31667:S 03 Jul 21:32:17.827 * Discarding previously cached master state.
31667:S 03 Jul 21:32:17.926 * MASTER <-> SLAVE sync: receiving 193 bytes from master
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Flushing old data
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Loading DB in memory
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Finished with success

对应 master 的日志如下:

1
2
3
4
5
6
7
31655:M 03 Jul 21:32:17.826 * Slave 127.0.0.1:6380 asks for synchronization
31655:M 03 Jul 21:32:17.826 * Partial resynchronization not accepted: Replication ID mismatch (Slave asked for '823e1002c282b4c088a6f80d4251de04f920068d', my replication IDs are '4014bea143e2ade5aa81012849b0775ab0377b85' and '0000000000000000000000000000000000000000')
31655:M 03 Jul 21:32:17.826 * Starting BGSAVE for SYNC with target: disk
31655:M 03 Jul 21:32:17.826 * Background saving started by pid 31669
31669:C 03 Jul 21:32:17.827 * DB saved on disk
31655:M 03 Jul 21:32:17.926 * Background saving terminated with success
31655:M 03 Jul 21:32:17.926 * Synchronization with slave 127.0.0.1:6380 succeeded

分析上述输出日志,我们可以初步总结出 slave 和 master 的交互时序:

  1. slave 主动连接 master。
  2. 连接成功后,slave 会向 master 发起 partial resynchronization 的请求。
  3. master 收到请求后,判断 replication ID 不匹配,拒绝执行 partial resynchronization,转而通知 slave 执行 full resync。
  4. 随后 master 开始执行 BGSAVE 命令,将当前 DB 数据保存到 disk 磁盘,最后向 slave 发送 DB 数据。
  5. slave 从 master 接收到 DB 数据后,将其加载到内存,同时删除旧数据。

2. slave 断开后重连 master

思考:在同一台机器上,如何模拟 master 和 slave 的网络断开与恢复?

master 日志:

1
2
3
4
33518:M 03 Jul 22:46:48.432 # Disconnecting timedout slave: 127.0.0.1:6380
33518:M 03 Jul 22:46:48.432 # Connection with slave 127.0.0.1:6380 lost.
33518:M 03 Jul 22:46:50.538 * Slave 127.0.0.1:6380 asks for synchronization
33518:M 03 Jul 22:46:50.538 * Partial resynchronization request from 127.0.0.1:6380 accepted. Sending 0 bytes of backlog starting from offset 1541.

slave 日志:

1
2
3
4
5
6
7
8
9
33519:S 03 Jul 22:46:48.432 # Connection with master lost.
33519:S 03 Jul 22:46:48.432 * Caching the disconnected master state.
33519:S 03 Jul 22:46:50.536 * Connecting to MASTER 127.0.0.1:6379
33519:S 03 Jul 22:46:50.537 * MASTER <-> SLAVE sync started
33519:S 03 Jul 22:46:50.537 * Non blocking connect for SYNC fired the event.
33519:S 03 Jul 22:46:50.537 * Master replied to PING, replication can continue...
33519:S 03 Jul 22:46:50.537 * Trying a partial resynchronization (request 6b1b77bebea22557686922f99cfa3103ba0824ae:1541).
33519:S 03 Jul 22:46:50.538 * Successful partial resynchronization with master.
33519:S 03 Jul 22:46:50.538 * MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.

可以看出:

  1. 网络断开一段时间后,master 会断开与 slave 的连接。
  2. 网络恢复后,仍然是 slave 主动连接 master。
  3. 连接成功后,slave 会向 master 发起 partial resynchronization 的请求。
  4. 这一次,master 接受了该 partial resynchronization 请求,然后将 backlog 中由 (offset, size) 标记的数据流发送给 slave。
  5. slave 从 master 接收到数据流后,更新自己内存中的数据。

实验redis.conf 中有两个参数 repl-timeout(默认值为 60 秒)和 repl-backlog-ttl(默认值为 3600 秒),尝试都设置为 10 秒,然后断开网络一直等到 25 秒后再恢复,再观察 master 和 slave 的日志会有什么不同?

3. master 与 slave 连接正常,写 master

通过 telnet 连接到 master:

1
2
3
4
$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

键入 PSYNC 命令,尝试与 master 进行同步:

1
2
3
4
5
6
7
8
9
$ telnet 127.0.0.1 6379
...
PSYNC ? -1
+FULLRESYNC 8cdd5be435af5bcda9bb332e319cae9b71f788d7 344
$194
REDIS0008? redis-ver4.0.8?
redis-bits?@?ctime?6?@[used-mem???repl-stream-db??repl-id(8cdd5be435af5bcda9bb332e319cae9b71f788d7?
repl-offset?X?
aof-preamble???key1value1?'>?w?Z

此时查看 master 的日志:

1
2
3
4
5
6
7
40535:M 07 Jul 17:04:51.009 * Slave 127.0.0.1:<unknown-slave-port> asks for synchronization
40535:M 07 Jul 17:04:51.009 * Full resync requested by slave 127.0.0.1:<unknown-slave-port>
40535:M 07 Jul 17:04:51.009 * Starting BGSAVE for SYNC with target: disk
40535:M 07 Jul 17:04:51.009 * Background saving started by pid 40579
40579:C 07 Jul 17:04:51.012 * DB saved on disk
40535:M 07 Jul 17:04:51.045 * Background saving terminated with success
40535:M 07 Jul 17:04:51.045 * Synchronization with slave 127.0.0.1:<unknown-slave-port> succeeded

随后在 master 上设置 key2:

1
2
127.0.0.1:6379> SET key2 value2
OK

然后观察 telnet 的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ telnet 127.0.0.1 6379
...
*1
$4
PING
*2
$6
SELECT
$1
0
*3
$3
SET
$4
key2
$6
value2
*1
$4
PING

可以看出:

  1. telnet 通过 PSYNC 命令,成为了 master 的一个新的 slave。
  2. master 上的写命令(这里是 SET key2 value2),会被传播(propagate)到 salve 上,进而保证了 slave 与 master 的数据一致性。

四、replication 原理

上面的三种情景,其实已经涵盖了 Redis replication 的两大核心操作:

  1. 重同步(resync)
    • 完整重同步(full resynchronization)
    • 部分重同步(partial resynchronization)
  2. 命令传播(command propagate)

下面我们对这两种操作,做进一步阐述。

1. 重同步

「重同步」用于将 slave 的数据库状态更新至 master 当前所处的数据库状态。

SYNC 与 PSYNC

旧版本 Redis 中,「重同步」通过 SYNC 命令来实现。从 2.8 版本开始,Redis 改用 PSYNC 命令来代替 SYNC 命令。

SYNC 命令和 PSYNC 命令的区别:

命令 初次复制 断线后复制
SYNC 完整重同步 完整重同步
PSYNC 完整重同步:PSYNC ? -1 部分重同步:PSYNC <replication-id> <offset>

完整重同步

redis-replication-full-resync

说明:

  1. slave 通过 SYNC 或 PSYNC 命令,向 master 发起同步请求。
  2. master 返回 FULLRESYNC 告知 slave 将执行「完整重同步」,先决条件为:
    • 请求命令是「完整重同步」SYNC
    • 请求命令是「完整重同步」PSYNC ? -1
    • 请求命令是「部分重同步」PSYNC <replication-id> <offset>,但是 <replication-id> 不是 master 的 replication-id,或者 slave 给的 <offset> 不在 master 的「复制积压缓冲区」backlog 里面。
  3. master 执行 BGSAVE 命令,将当前数据库状态保存为 RDB 文件。
  4. 生成 RDB 文件完毕后,master 将该文件发送给 slave。
  5. slave 收到 RDB 文件后,将其加载至内存。
  6. master 将 backlog 中缓冲的命令发送给 slave(一开始在 BGSAVE 时记录了当时的 offset)。
  7. slave 收到后,逐个执行这些命令。

部分重同步

redis-replication-partial-resync

说明:

  1. slave 通过 PSYNC <replication-id> <offset> 命令,向 master 发起「部分重同步」请求。
  2. master 返回 CONTINUE 告知 slave 同意执行「部分重同步」,先决条件为:
    • <replication-id> 是 master 的 replication-id,并且 slave 给的 <offset> 在 master 的「复制积压缓冲区」backlog 里面
  3. master 将 backlog 中缓冲的命令发送给 slave(根据 slave 给的 offset)。
  4. slave 收到后,逐个执行这些命令。

由上可以看出,「复制积压缓冲区」backlog 是「部分重同步」得以实现的关键所在。

复制积压缓冲区

「复制积压缓冲区」是 master 维护的一个固定长度(fixed-sized)的先进先出(FIFO)的内存队列。值得注意的是:

  • 队列的大小由配置 repl-backlog-size 决定,默认为 1MB。当队列长度超过 repl-backlog-size 时,最先入队的元素会被弹出,用于腾出空间给新入队的元素。
  • 队列的生存时间由配置 repl-backlog-ttl 决定,默认为 3600 秒。如果 master 不再有与之相连接的 slave,并且该状态持续时间超过了 repl-backlog-ttl,master 就会释放该队列,等到有需要(下次又有 slave 连接进来)的时候再创建。

master 会将最近接收到的写命令(按 Redis 协议的格式)保存到「复制积压缓冲区」,其中每个字节都会对应记录一个偏移量 offset。

. . . . . . . . . . . . . .
偏移量 10087 10088 10089 10090 10091 10092 10093 10094 10095 10096 10097
字节值 ‘*’ 3 ‘\r’ ‘\n’ ‘$’ 3 ‘\r’ ‘\n’ ‘S’ ‘E’ ‘T’

与此同时,slave 会维护一个 offset 值,每次从 master 传播过来的命令,一旦成功执行就会更新该 offset。尝试「部分重同步」的时候,slave 都会带上自己的 offset,master 再判断 offset 偏移量之后的数据是否存在于自己的「复制积压缓冲区」中,以此来决定执行「部分重同步」还是「完整重同步」。

2. 命令传播

「命令传播」用于在 master 的数据库状态被修改时,将导致变更的命令传播给 slave,从而让 slave 的数据库状态与 master 保持一致。

redis-replication-command-propagate

说明:master 进行命令传播时,除了将写命令直接发送给所有 slave,还会将这些命令写入「复制积压缓冲区」,用于后续可能发生的「部分重同步」操作。

五、参考资料

对于 REST API 的开发者而言,不管是对内作为团队的开发文档,还是对外作为给用户的说明文档,API 文档都是不可或缺的。

然而 “文档是死的、代码是活的”,在现实中,文档跟不上代码的更新节奏的情况比比皆是。如何编写 实时更新的易于阅读的 文档成了一个普遍的难题。由此,API 描述语言应用而生。

Swagger 是一个简单但功能强大的 API 表达工具。它具有地球上最大的 API 工具生态系统。数以千计的开发人员,使用几乎所有的现代编程语言,都在支持和使用 Swagger。使用 Swagger 生成 API,我们可以得到交互式文档,自动生成代码的 SDK 以及 API 的发现特性等(参考 使用Swagger生成RESTful API文档)。

Swagger 的功能很丰富,但在这里我们只关心一点:如何基于简单的 Swagger 描述语言,为 REST API 生成易读的 Markdown 离线文档。

一、基于 Swagger Spec 编写 API 描述文档

这一步无需多说,打开你喜欢的编辑器,或者使用官方的 Swagger Editor,参考 Spec 语法 编写即可。

这里我们以 petstore-minimal.yaml 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
---
swagger: "2.0"
info:
version: "1.0.0"
title: "Swagger Petstore"
description: "A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification"
termsOfService: "http://swagger.io/terms/"
contact:
name: "Swagger API Team"
license:
name: "MIT"
host: "petstore.swagger.io"
basePath: "/api"
schemes:
- "http"
consumes:
- "application/json"
produces:
- "application/json"
paths:
/pets:
get:
description: "Returns all pets from the system that the user has access to"
produces:
- "application/json"
responses:
"200":
description: "A list of pets."
schema:
type: "array"
items:
$ref: "#/definitions/Pet"
definitions:
Pet:
type: "object"
required:
- "id"
- "name"
properties:
id:
type: "integer"
format: "int64"
name:
type: "string"
tag:
type: "string"

二、安装转换工具 Swagger2Markup

Swagger2Markup 是一个 Java 编写的工具,用于将 Swagger 文档转换为 AsciiDoc 或者 Markdown 文档。简直就是为我们这里的需求量身定做的 :-)

安装 Swagger2Markup 的步骤如下:

1. 安装 Java

以 Ubuntu 为例,参考 How To Install Java on Ubuntu with Apt-GetUbuntu 安装 JDK 7 / JDK8 的两种方式

  1. 安装默认的 JRE/JDK

    1
    2
    3
    4
    5
    $ sudo apt-get update
    $ # 安装默认的 JRE
    $ sudo apt-get install default-jre
    $ # 安装默认的 JDK
    $ sudo apt-get install default-jdk
  2. 安装 Oracle JDK 8

    1
    2
    3
    4
    5
    $ # 添加 ppa
    $ sudo add-apt-repository ppa:webupd8team/java
    $ sudo apt-get update
    $ # 安装 oracle-java-installer(按提示依次选择 ok 和 yes 即可)
    $ sudo apt-get install oracle-java8-installer

2. 下载 Swagger2Markup 的命令行工具

参考 Command Line Interface,下载最新的 jar 包(当前为 swagger2markup-cli-1.3.1.jar)即可。

三、使用 Swagger2Markup 将 Swagger 转换为 Markdown

参考 Command Line Interface 中的步骤:

1. 创建一个 config.properties 配置文件

设置 markupLanguage 为 MARKDOWN

1
swagger2markup.markupLanguage=MARKDOWN

2. 将 Swagger 转换为 Markdown

1
$ java -jar swagger2markup-cli-1.3.1.jar convert -i /path/to/petstore-minimal.yaml -f /tmp/petstore-minimal -c /path/to/config.properties

3. 查看生成的文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Swagger Petstore


<a name="overview"></a>
## Overview
A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification


### Version information
*Version* : 1.0.0


### Contact information
*Contact* : Swagger API Team


### License information
*License* : MIT
*Terms of service* : http://swagger.io/terms/


### URI scheme
*Host* : petstore.swagger.io
*BasePath* : /api
*Schemes* : HTTP


### Consumes

* `application/json`


### Produces

* `application/json`




<a name="paths"></a>
## Paths

<a name="pets-get"></a>
### GET /pets

#### Description
Returns all pets from the system that the user has access to


#### Responses

|HTTP Code|Description|Schema|
|---|---|---|
|**200**|A list of pets.|< [Pet](#pet) > array|


#### Produces

* `application/json`




<a name="definitions"></a>
## Definitions

<a name="pet"></a>
### Pet

|Name|Schema|
|---|---|
|**id** <br>*required*|integer (int64)|
|**name** <br>*required*|string|
|**tag** <br>*optional*|string|


四、CLI as a service

如果团队内部人员都会用到这个工具,但是又不想在每个人的电脑上都安装 Java 和 Swagger2Markup,这时可以基于命令行工具 Swagger2Markup 提供一个 “文档转换服务”。

作为示例,以下是使用 Python 语言并且借助 RESTArt 库实现的一个 “文档转换服务”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# swagger2markdown.py

import os
import tempfile

from restart import status
from restart.api import RESTArt
from restart.parsers import Parser
from restart.renderers import Renderer
from restart.resource import Resource

api = RESTArt()


class SwaggerParser(Parser):

content_type = 'text/plain'

def parse(self, stream, content_type, content_length, context=None):
return stream.read().decode('utf-8')


class MarkdownRenderer(Renderer):

content_type = 'text/plain'
format_suffix = 'md'

def render(self, data, context=None):
return data.encode('utf-8')


@api.register
class SwaggerMarkdownDocs(Resource):

name = 'swagger_markdown_docs'

parser_classes = (SwaggerParser,)
renderer_classes = (MarkdownRenderer,)

def create(self, request):
with tempfile.NamedTemporaryFile(suffix='.yml', delete=False) as yml:
yml_filename = yml.name
yml.write(request.data.encode('utf-8'))

with tempfile.NamedTemporaryFile(suffix='.md', delete=False) as md:
md_filename = md.name

jar = '/path/to/swagger2markup-cli-1.3.1.jar'
conf = '/path/to/config.properties'
os.system('java -jar {jar} convert -i {yml} -f {md} -c {conf}'.format(
jar=jar, yml=yml_filename, md=md_filename[:-len('.md')], conf=conf,
))

with open(md_filename) as md:
content = md.read().decode('utf-8')

os.unlink(yml_filename)
os.unlink(md_filename)

return content, status.HTTP_201_CREATED

启动 “文档转换服务”:

1
2
$ restart swagger2markdown:api
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

使用 “文档转换服务” 生成 Markdown 文档:

1
$ curl -H 'Content-Type: text/plain' -XPOST http://localhost:5000/swagger_markdown_docs --data-binary @/path/to/petstore-minimal.yaml > /tmp/petstore-minimal.md

距离上次排查 epoll 与 CLOSE_WAIT 连接 的问题,已经过去了将近一年。最近在看 《UNIX 网络编程》,看到 “TCP 状态转换图” 中提到 CLOSE_WAIT 状态时,突然又想起来上次还有一个 遗留问题,于是决定再次尝试分析一下。

一、问题现象

上次的遗留问题,归纳起来就是:(由于 Redis 的 server 端主动关闭超时连接)在 client 端产生的 CLOSE_WAIT 连接,一直无法被 redis-py 连接池复用,进而无法被正常 close。

二、分析 redis-py 连接池机制

以当前最新的 redis-py 2.10.6 为例,从连接池获取连接 的源码:

1
2
3
4
5
6
7
8
9
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection

释放连接到连接池 的源码:

1
2
3
4
5
6
7
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)

可以看出,redis-py 使用 _available_connections 来维护 “空闲可用的连接列表”,获取连接时 pop 出列表末尾的连接,释放连接时 append 连接到列表末尾。因此 “空闲可用的连接列表” 其实是个 后进先出的栈

很显然,基于这种 “后进先出的栈” 的数据结构,redis-py 连接池对连接的获取和释放都发生在 “栈顶”。至此,原因就很明显了:如果某段时间内由于突发流量产生了大量连接,一旦流量趋于平稳(减少)后,位于 “栈底” 的部分连接就会一直无法被复用,于是这些连接被 Redis 的 server 端超时关闭后,就会一直处于 CLOSE_WAIT 状态

关于这个问题,其实在 GitHub 上已经有一个类似的 issue:ConnectionPool doesn’t reap timeout’ed connections,不过一直还未得到处理 :-(

三、解决方案

为了让 redis-py 连接池能够更均衡地复用各个连接,很容易想到的一个方案是:将数据结构从 “后进先出的栈” 改成 “先进先出的队列”

通过修改 get_connection 的实现可以很容易做到这一点:

1
2
# connection = self._available_connections.pop()
connection = self._available_connections.pop(0) # 获取连接时,从队列首部 pop 出来

关于这个方案,其实在 GitHub 上也有一个 pull request:Connection management improvements,然而还是没有得到响应 :-( 不得不手动尴尬一下…

四、复现和验证

为了简化场景,便于问题的复现和方案的验证,这里有一段辅助代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# example.py

import select

import redis


def main():
import os; print('pid: %s' % os.getpid())

r = redis.StrictRedis(host='localhost', port=6379, db=0)
pool = r.connection_pool
epoll = select.epoll()

for conn in (pool.get_connection(''), pool.get_connection('')):
conn.connect()
epoll.register(conn._sock, select.POLLIN)
pool.release(conn)

command_args = ('SET', 'foo', 'bar')
while True:
conn = pool.get_connection('')
conn.send_command(*command_args)
epoll.poll()
r.parse_response(conn, command_args[0])
pool.release(conn)


if __name__ == '__main__':
main()

操作步骤提示:

  1. 设置 Redis 的 server 端的 timeout 参数(比如 10 秒)
  2. 运行代码(python example.py)
  3. 一段时间后,观察进程的 CPU 占用率(top)
  4. 观察进程是否有 CLOSE_WAIT 连接(lsof -p PID)

sync.Map 是并发安全的字典,于 2017 年 4 月 27 日合并到 Go 代码仓库的主分支:

Map is a concurrent map with amortized-constant-time loads, stores, and deletes.
It is safe for multiple goroutines to call a Map’s methods concurrently.

以下是对 sync.Map 的 Load/Store/Delete 等常用操作的图解:

go-sync-map

应用场景

对于数据量较大的业务功能(比如日志),如果使用单个 ES 索引来存储文档,与日俱增的数据量很快就会使得单个索引过大,因为无法水平扩展,最终会导致机器空间不足。这种大数据量的场景下,需要对数据进行切分,将数据分段存储在不同的索引中。

Sizing Elasticsearch 介绍了常用的几种数据切分方法,因为这两天在工作中刚好用到过,所以在这里重点总结下 “基于时间的索引” (time-based indices) 的管理技巧。

选择时间范围

根据数据增长速度的不同,可以选择按天索引(索引名称形如 2017-05-16),或者按月索引(索引名称形如 2017-05)等等。

设计索引模板

面对这么多不断新增的索引,如何管理它们的 settings 和 mappings 呢?一个一个地去手动维护,无疑是个噩梦。这时,就需要用到 ES 的 Index Templates 机制。

Index Templates 的基本原理是:首先预定义一个或多个 “索引模板”(index template,其中包括 settings 和 mappings 配置);然后在创建索引时,一旦索引名称匹配了某个 “索引模板”,ES 就会自动将该 “索引模板” 包含的配置(settings 和 mappings)应用到这个新创建的索引上面。

以日志为例,假设我们的 ES 索引需求如下:

  1. 按天索引(索引名称形如 log-2017-05-16)
  2. 每天的日志数据,只会进入当天的索引
  3. 搜索的时候,希望搜索范围是所有的索引(借助 alias)

基于上述索引需求,对应的 “索引模板” 可以设计为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ curl -XPUT http://localhost:9200/_template/log_template -d '{
"template": "log-*",
"settings": {
"number_of_shards": 1
},
"mappings": {
"log": {
"dynamic": false,
"properties": {
"content": {
"type": "string"
},
"created_at": {
"type": "date",
"format": "dateOptionalTime"
}
}
}
},
"aliases": {
"search-logs": {}
}
}'

两点说明:

  1. 创建索引时,如果索引名称的格式形如 “log-*”,ES 会自动将上述 settings 和 mappings 应用到该索引
  2. aliases 的配置,告诉 ES 在每次创建索引时,自动为该索引添加一个名为 “search-logs” 的 alias(别名)

索引与搜索

基于上述 “索引模板” 的设计,索引与搜索的策略就很直接了。

索引策略:每天的数据,只索引到当天对应的索引。比如,2017 年 5 月 16 日这天的数据,只索引到 log-2017-05-16 这个索引当中。

搜索策略:因为搜索需求是希望全量搜索,所以在搜索的时候,索引名称使用 “search-logs” 这个 alias 即可。

更多关于 “如何有效管理基于时间的索引” 的技巧,可以参考 Managing Elasticsearch time-based indices efficiently

0%