RussellLuo

让思想在文字间徜徉


  • 首页

  • 归档

  • 标签

如何用 Go 实现 REST API 部分更新

发表于 2022-06-02 | 分类于 技术 |

一、更新 John 的数据

假设我们有一条关于 John 的数据如下:

1
2
3
4
5
6
7
8
9
10
11
GET /people/1 HTTP/1.1

{
"name": "John",
"age": 20,
"address": {
"country": "China",
"province": "Guangdong",
"city": "Shenzhen",
}
}

现在我们想更新他的年龄和所在城市,于是发起了一个请求:

1
2
3
4
5
6
7
8
PATCH /people/1 HTTP/1.1

{
"age": 25,
"address": {
"city": "Guangzhou",
}
}

作为 Go 服务端开发人员,我们如何才能正确处理这个部分更新请求呢?

二、Go 零值与 JSON

乍一看并不难,我们立马写下了结构体定义:

1
2
3
4
5
6
7
8
9
10
11
type Address struct {
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
}

type Person struct {
Name string `json:"name"`
Age int `json:"age"`
Address Address `json:"address"`
}

JSON 反序列化?自然也不在话下:

1
2
3
4
blob := []byte(`{"age": 25, "address": {"city": "Guangzhou"}}`)
var person Person
_ = json.Unmarshal(blob, &person)
fmt.Printf("person: %+v\n", person)

对应的输出结果(Go Playground):

1
person: {Name: Age:25 Address:{Country: Province: City:Guangzhou}}

很显然,如果我们直接用 person 去更新 John 的数据,他的姓名、所在国家和省份都会被清空!

那服务端该如何正确识别客户端的原始意图呢?具体到 John 的例子,在 Go 中如何做到 “只更新他的年龄和所在城市” 呢?

三、业界通用解法

据我所知,对于上述问题,业界通常有以下三种解法。

使用指针

因为 Go 的零值特性,普通类型无法表达 “未初始化” 的状态,典型解法就是使用指针。

采用指针后,上面的结构体定义将变成:

1
2
3
4
5
6
7
8
9
10
11
type Address struct {
Country *string `json:"country"`
Province *string `json:"province"`
City *string `json:"city"`
}

type Person struct {
Name *string `json:"name"`
Age *int `json:"age"`
Address *Address `json:"address"`
}

再次进行 JSON 反序列化:

1
2
3
4
blob := []byte(`{"age": 25, "address": {"city": "Guangzhou"}}`)
var person Person
_ = json.Unmarshal(blob, &person)
fmt.Printf("person: %+v, address: %+v\n", person, person.Address)

对应的输出结果(Go Playground):

1
person: {Name:<nil> Age:0xc000018218 Address:0xc00000c138}, address: &{Country:<nil> Province:<nil> City:0xc0000103f0}

可以看到只有 Age 和 Address.City 的值不为 nil,于是我们只需要更新不为 nil 的字段即可:

1
2
3
4
5
6
7
8
9
10
11
func (a *Address) Update(other *Address) {
if other.Country != nil {
a.Country = other.Country
}
if other.Province != nil {
a.Province = other.Province
}
if other.City != nil {
a.City = other.City
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (p *Person) Update(other *Person) {
if other.Name != nil {
p.Name = other.Name
}
if other.Age != nil {
p.Age = other.Age
}
if other.Address != nil {
// Side effects of using pointers
if p.Address == nil {
p.Address = new(Address)
}
p.Address.Update(other.Address)
}
}

参考完整代码(Go Playground)不难发现,使用指针后的 Person 结构体,操作起来会非常繁琐。比如:

  • 修改 address 前,需要首先保证 p.Address 不能为 nil
  • 此外,Initialization 初始化操作尤其麻烦

客户端维护的 FieldMask

受 Protocol Buffers 设计的影响,另一种较为流行的做法是在请求中新增一个 FieldMask 参数,用来补充说明需要更新的字段名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Address struct {
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
}

type Person struct {
Name string `json:"name"`
Age int `json:"age"`
Address Address `json:"address"`
}

type UpdatePersonRequest struct {
Person Person `json:"person"`
FieldMask string `json:"field_mask"`
}
1
2
3
4
blob := []byte(`{"person": {"age": 25, "address": {"city": "Guangzhou"}}, "field_mask": "age,address.city"}`)
var req UpdatePersonRequest
_ = json.Unmarshal(blob, &req)
fmt.Printf("req: %+v\n", req)

对应的输出结果(Go Playground):

1
req: {Person:{Name: Age:25 Address:{Country: Province: City:Guangzhou}} FieldMask:age,address.city}

有了 FieldMask 的补充说明,服务端就能正确进行部分更新了。但是对于客户端而言,FieldMask 其实是多余的,而且维护成本也不低(特别是待更新字段较多时),这也是我认为该方案最明显的一个不足之处。

改用 JSON Patch

前面讨论的方案,本质上都是 JSON Merge Patch 风格的。部分更新还有另外一个比较有名的风格,那就是 JSON Patch。

具体到 John 的例子,部分更新请求变成了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PATCH /people/1 HTTP/1.1

[
{
"op": "replace",
"path": "/age",
"value": 25
},
{
"op": "replace",
"path": "/address/city",
"value": "Guangzhou"
}
]

相比于前面的解法而言,该解法的主要缺点是 PATCH 请求体跟待更新文档的 JSON 数据格式差异太大,表达上不太符合直觉。

四、服务端维护的 FieldMask

如果我们坚持 JSON Merge Patch 风格的部分更新,综合来看「客户端维护的 FieldMask」是相对较好的方案。那有没有可能进一步规避该方案的不足,即不增加客户端的维护成本呢?经过一段时间的研究和思考,我认为答案是肯定的。

有经验的读者可能会发现,Go 的 JSON 反序列化其实有两种:

  • 将 JSON 反序列化为结构体(优势:操作直观方便;不足:有零值问题)
  • 将 JSON 反序列化为 map[string]interface{}(优势:能够准确表达 JSON 中有无特定字段;不足:操作不够直观方便)

可想而知,如果我们直接把 Person 从结构体改为 map[string]interface{},操作体验可能会比使用带指针的结构体更糟糕!

那如果我们只是把 map[string]interface{} 作为一个反序列化的中间结果呢?比如:

  1. 首先将 JSON 反序列化为 map[string]interface{}
  2. 然后用 map[string]interface{} 来充当(服务端维护的)FieldMask
  3. 最后将 map[string]interface{} 解析为结构体(幸运的是,已经有现成的库 mapstructure 可以做到!)

通过一些探索和试验,结果表明上述想法是可行的。为此,我还专门开发了一个小巧的库 fieldmask,用来辅助实现基于该想法的部分更新。

具体到 John 的例子,借助 fieldmask 库,结构体可以定义成最自然的方式(不需要使用指针):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Address struct {
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
}

type Person struct {
Name string `json:"name"`
Age int `json:"age"`
Address Address `json:"address"`
}

type UpdatePersonRequest struct {
Person
FieldMask fieldmask.FieldMask `json:"-"`
}

func (req *UpdatePersonRequest) UnmarshalJSON(b []byte) error {
if err := json.Unmarshal(b, &req.FieldMask); err != nil {
return err
}
return mapstructure.Decode(req.FieldMask, &req.Person)
}

注意,其中 JSON 反序列化的核心代码是 UnmarshalJSON。对应的更新逻辑如下(完整示例):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (a *Address) Update(other Address, fm fieldmask.FieldMask) {
if len(fm) == 0 {
// Clear the entire address.
*a = other
return
}

if fm.Has("country") {
a.Country = other.Country
}
if fm.Has("province") {
a.Province = other.Province
}
if fm.Has("city") {
a.City = other.City
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (p *Person) Update(other Person, fm fieldmask.FieldMask) {
if len(fm) == 0 {
// Clear the entire person.
*p = other
return
}

if fm.Has("name") {
p.Name = other.Name
}
if fm.Has("age") {
p.Age = other.Age
}
if addressFM, ok := fm.FieldMask("address"); ok {
p.Address.Update(other.Address, addressFM)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
john := Person{
Name: "John",
Age: 20,
Address: Address{
Country: "China",
Province: "Guangdong",
City: "Shenzhen",
},
}

blob := []byte(`{"age": 25, "address": {"city": "Guangzhou"}}`)
req := new(UpdatePersonRequest)
_ = json.Unmarshal(blob, req)

john.Update(req.Person, req.FieldMask)

个人觉得,相比其他方案而言,上述代码实现非常简单、自然(如果还有优化空间,欢迎指正👏🏻)。

当然该方案也不是完美的,目前来说,我认为至少有一个瑕疵就是需要两次解码:JSON -> map[string]interface{} -> 结构体,会增加一点性能上的开销。

五、相关阅读

  • Go, REST APIs, and Pointers
  • Practical API Design at Netflix, Part 2: Protobuf FieldMask for Mutation Operations
  • REST: Partial updates with PATCH

Go 服务通信工具 Kun

发表于 2021-12-12 | 分类于 技术 |

一、缘起

1. 背景:系统重构选型 Go kit

为什么不直接使用 Web 框架(如 Gin)?

根据旧系统使用 Flask 的经验,稍具规模、多人协作的工程代码,如果没有严格的分层约束,很容易做成一锅蛋炒饭(我们应该做盖浇饭)。

旧系统的常见问题:

  • HTTP 层和业务层,常常傻傻分不清楚(导致测试业务层代码,需要模拟 HTTP 请求);
  • HTTP/DB 层的细节,常常会混入业务层(比如业务层在处理 Flask 或者 SQLAlchemy 的数据结构);
  • 没有使用依赖注入,单元测试必须借助 Mock 工具(比如 Python 的 unittest.mock)。

为什么不使用 Go micro?

  • Go micro 作为一套微服务开发框架,复杂度较高,可控度不够;
  • 新系统一开始的定位是 “优雅(模块化)的大单体”(备注),所以选择了更灵活的 Go kit。

备注:Microservice Architecture at Medium(Medium 的微服务架构 )。

It is fine to start with a monolithic architecture, but make sure to modularize it and architect it with the above three microservice principles (single purpose, loose coupling and high cohesion), except that the “services” are implemented in the same tech stack, deployed together and run in the same process.

选择 Go kit 意味着什么?

  • 成熟的分层思想:整洁架构(意义指数:★★★)
  • 优雅的错误处理:如何将业务层 error 转换为 HTTP 状态码(意义指数:★★☆)
  • 灵活的协议切换:同时支持 HTTP 和 gRPC(意义指数:★★☆)

2. 问题:手写代码太繁琐

Go kit 核心代码分为三层:

  • Transport(繁琐指数:★★★)
  • Endpoint(繁琐指数:★★★)
  • Service(繁琐指数:☆☆☆)

业界吐槽:
Why I Recommend to Avoid Using the go-kit Library。

3. 为什么要造轮子?

Go kit 列举的 两款工具:

  • kujtimiihoxha/kit(缺点:不再维护)
  • metaverse/truss(缺点:需要手写 Protocol Buffers)

一言以蔽之:已有的轮子不简洁、不好用。

二、设计

1. 初衷

Kun 最初的定位:一款 HTTP 代码生成工具,让大家专注于 Service 层代码(业务逻辑),无需手写 Transport 层和 Endpoint 层代码。

Kun 可以生成的代码包括:

  • Endpoint 层代码
  • Transport 层代码
    • HTTP server
    • HTTP client
    • HTTP test
    • OAS 文档

2. 哲学

  • 聚焦最有价值的事
    • 别再折腾 Web 框架了,专注于你的业务价值!
  • 尽量只写 Go 代码
    • 用 Go(而不是其他 DSL)来描述你的服务定义。
  • 可维护性很重要
    • 拥抱整洁架构(核心是思想本质,而不是条条框框)。

3. 原理

参考 How it works。

4. 注解

为什么要用注解?

选项 简洁度 可维护性
Go interface 上的直接注解 ★★☆ ★★★
单独的服务定义(如 YAML 文件) ★★☆ ★★☆(备注)

备注:「直接注解」的可维护性高于「YAML 文件」,源自 Where to configure transcoding。

Two mechanisms are supported for doing this: direct annotations in your .proto file, and in YAML as part of your gRPC API configuration file. We recommend using proto annotations for ease of reading and maintenance.

注解设计

语法借鉴:

  • OpenAPI Specification
  • Google HTTP annotations
  • Go comment directives

设计考量:

  1. 越典型的场景越简洁
    • 最典型的 JSON API 只需写一行 //kun:op;
    • 支持自动绑定 URL 中的 Path 参数;
    • Query 也是常用的 HTTP 参数,所以 //kun:param 中默认 in=query,同时 //kun:body 中指定 - 可以让所有参数从 Body 切换绑定到 Query;
    • HTTP 参数名称默认为 snake case(下划线风格),也支持切换到 lower camel case(驼峰风格);
    • 200 是最常用的成功响应状态码,所以是 //kun:success 中默认 statusCode=200。
  2. 复杂的场景也能支持
    • 需要使用 Query、Header、Cookie 参数时,可以用 //kun:param 来指定;
    • 支持参数聚合:将多个 HTTP 参数绑定到一个 Method 参数上;
    • 复杂请求和响应的编解码,可以通过自己实现 HTTP Codec 来达成。
  3. 服务接口完全由业务决定
    • Go interface 中每个 Method 的入参和出参,不强求一定要定义成结构体,可以从业务出发使用最自然的签名。

三、展望

Kun 未来的定位:不仅仅局限于一款 Go kit 的代码生成工具,而是成为一款通用的 Go 服务通信工具 —— 致力于处理 Go 服务之间的通信,让开发者专注于业务价值。

Kun 对服务通信的约定:

  1. 服务通信应该依赖于抽象接口,而不是具体类型;
    • 服务之间只依赖 Go interface;
    • 当前 Kun 生成的 HTTP client 代码也实现了服务定义的 Go interface。
  2. 基于约定 1,服务通信的类型应该是可以灵活切换的。
    • 最终 Kun 可能会支持的通信类型:
      • 进程内函数调用;
      • RPC(比如 HTTP 和 gRPC);
      • 异步消息(尚不支持)。

测试金字塔

发表于 2021-08-05 | 分类于 技术 |

一、自动化测试的重要性

公理:要正确交付软件代码,测试是必不可少的。

1. 测试分类

  • 手动测试
  • 自动化测试

2. 何时采用何种测试?

  • 一次性使用的代码
    • 示例:临时脚本或工具
    • 采用手工测试
  • 需要维护的代码
    • 示例:业务代码
      • 警惕陷阱:这个需求后面不会改,先手动测试吧
    • 采用自动化测试
      • 手工测试低效、重复、乏味
      • 自动化测试可以充当代码变更的保护网

二、测试金字塔

Test-Pyramid

1. 核心原则

  • 编写不同粒度的测试
  • 层次越高,编写的测试应该越少

2. 反模式

  • 测试冰淇淋

三、单元测试

1. Why

为什么要多写单元测试?

  • 规模小(关注点聚焦,更容易编写)
  • 执行快(最小化依赖,运行成本低)

2. What

何为一个单元?

  • 面向过程/函数式:一个函数
  • 面向对象:一个方法、一个类

群居和独居

  • 独居:隔离所有外部依赖
  • 群居:只隔离那些执行慢或者副作用大的依赖(比如数据库、外部服务等)

3. How

测试什么?

  • 通常只测试一个类的公共方法
  • 如果一个类复杂到需要测试它的私有方法
    • 考虑从设计上拆分这个类,把这些私有方法变成另一个类的公共方法

测试结构(Arrange-Act-Assert 或者 Given-When-Then)

  • 准备测试数据
  • 调用被测方法
  • 断言返回的是期待的结果

四、集成测试

测试应用和所有外部依赖的集成。

常见的外部依赖有:

  • 数据库(如 MySQL、Redis、Elasticsearch)
  • 消息队列(如 Kafka)
  • 外部服务(如 SendCloud、S3)
    • 进行契约测试
      • 编写消费方测试
      • 同时为外部服务编写提供方测试?
        • 通常不需要。因为成熟的服务提供方往往会对 API 做版本控制;而且在废弃旧版本 API 之前,也会通知到服务消费方。

五、契约测试

1. 不适用场景

公共服务的提供方和消费方之间。

2. 适用场景

内部微服务的提供方和消费方之间。

提供方和消费方之间的通信方式,常见的有:

  • RPC 接口
    • HTTP/JSON
    • gRPC
  • 异步事件

3. 契约测试的特征

  • 消费方编写消费方测试,并生成一个协议文件(Pact 的 JSON 示例)
  • 提供方根据协议文件,编写提供方测试

六、UI 测试

前后端分离的架构下,UI 测试可以是:

  • 纯前端 UI 测试
  • 后端 API 集成测试(cURL 和 Postman 测试的自动化版本)

七、避免重复测试

1. 测试成本

  • 编写和维护测试要花时间
  • 阅读和理解他人的测试也要花时间
  • 执行这些测试也要花时间

2. 基本法则

  • 如果一个更高层级的测试发现了一个错误,并且底层测试全都通过了,那么应该编写一个低层级测试去覆盖这个错误
  • 竭尽所能把测试往金字塔下层赶
  • 删掉那些已经被低层级测试覆盖完全的高层级测试
    • 警惕陷阱:沉没成本(不忍删除花了时间精力编写的测试)

八、相关阅读

  • 测试金字塔实战(英文版)
  • Why bother writing tests at all?
  • Introducing the Software Testing Cupcake (Anti-Pattern)

如何确定一个 Go 变量会被分配在哪里?

发表于 2019-07-18 | 分类于 技术 |

一、由 iter 包引发的疑问

最近在 GitHub 上偶然发现了 Brad Fitzpatrick 的 iter 包,整个包只有 一个函数(一行代码):

1
2
3
func N(n int) []struct{} {
return make([]struct{}, n)
}

但其中的一行注释令人费解:

It does not cause any allocations.

1. 空结构体

我们知道,struct{} 是空结构体(empty struct)。关于空结构体,Dave Cheney 在 The empty struct 中作了很好地阐述:

  • 空结构体不占用空间(The empty struct consumes no storage)。
  • 空结构体的切片只占用切片头的空间(Slices of struct{}s consume only the space for their slice header)。

2. Go 切片

按照官方博客 Go Slices: usage and internals 的说法:

A slice is a descriptor of an array segment. It consists of a pointer to the array, the length of the segment, and its capacity (the maximum length of the segment).

因为切片总是指向一个底层数组的,所以所谓的 “切片头” 其实就是切片本身。一个切片包括:指向数组片段的指针、数组片段的长度和最大长度,总共 3 个字长(在 64 位机器上,就是 24 个字节)。

3. 疑问

按照上面的分析,在 64 位机器上,不管 n 是多少,make([]struct{}, n) 得到的切片一定会占用 24 个字节,reddit 上的讨论 也证实了我们的分析。

那为什么 Brad Fitzpatrick 声称函数 N 不会引发分配呢?

为了解决这个疑惑,我们需要先弄清楚两个问题:

  1. 一个 Go 变量可能会被分配在哪里?
  2. 如何确定一个 Go 变量最终会被分配在哪里?

二、Go 变量可能的分配位置

1. 进程的内存布局

在 Linux/x86-32 系统中,一个进程的典型的内存布局如下图所示(图片来自 The Linux Programming Interface 图 6-1):

typical-memory-layout-of-a-process

结合维基百科对 Data segment 的描述,我们得知:

  • 初始化的全局变量或静态变量,会被分配在 Data 段。
  • 未初始化的全局变量或静态变量,会被分配在 BSS 段。
  • 在函数中定义的局部变量,会被分配在堆(Heap 段)或栈(Stack 段)。
    • 实际上,如果考虑到 编译器优化,局部变量还可能会被 分配在寄存器,或者直接被 优化去掉。

2. Go 内存分配

对于 Go 而言,有两个地方可以用于分配:

  • 堆(heap)
    • 由 GC 负责回收。
    • 对应于进程地址空间的堆。
  • 栈(stack)
    • 不涉及 GC 操作。
    • 每个 goroutine 都有自己的栈,初始时被分配在进程地址空间的栈上,扩容时被分配在进程地址空间的堆上。

Go 变量主要分为两种:

  • 全局变量
    • 会被 Go 编译器标记为一些特殊的 符号类型,分配在堆上还是栈上目前尚不清楚,不过不是本文讨论的重点。
  • 局部变量

所以综上,对于在函数中定义的 Go 局部变量:要么被分配在堆上,要么被分配在栈上。

三、确定 Go 变量最终的分配位置

至此,我们还剩下一个问题:对于一个 Go 局部变量,如何确定它被分配在堆上还是栈上?

按照官方 FAQ How do I know whether a variable is allocated on the heap or the stack? 的解释:

  • Go 编译器会尽可能将变量分配在栈上
  • 以下两种情况,Go 编译器会将变量分配在堆上
    • 如果一个变量被取地址(has its address taken),并且被逃逸分析(escape analysis)识别为 “逃逸到堆”(escapes to heap)
    • 如果一个变量很大(very large)

1. 逃逸分析

以使用 iter 包的这段代码为例:

1
2
3
4
5
6
7
package main

import "github.com/bradfitz/iter"

func main() {
for range iter.N(4) {}
}

下列演示中,我将使用 Go 1.11.4:

1
2
$ go version
go version go1.11.4 darwin/amd64

下面我们对这段代码作逃逸分析:

1
2
3
4
5
6
$ go build -gcflags='-m -m' examples/go_mem/main.go
# command-line-arguments
examples/go_mem/main.go:5:6: cannot inline main: unhandled op RANGE
examples/go_mem/main.go:6:18: inlining call to iter.N func(int) []struct {} { return make([]struct {}, iter.n) }
examples/go_mem/main.go:6:18: make([]struct {}, iter.n) escapes to heap
examples/go_mem/main.go:6:18: from make([]struct {}, iter.n) (non-constant size) at ./main.go:6:18

按照前面的分析,从 “make([]struct {}, iter.n) escapes to heap” 的信息,我们推断:make([]struct {}, iter.n) 会被分配在堆上。

到这里,我们最初的疑惑似乎已经有了答案:make([]struct {}, iter.n) 一定会引发堆分配,那是 Brad Fitzpatrick 的注释写错了吗?

2. 内存分配器追踪

除了逃逸分析,Go 还提供了一种叫内存分配器追踪(Memory Allocator Trace)的方法,用于细粒度地分析由程序引发的所有堆分配(和释放)操作:

1
$ GODEBUG=allocfreetrace=1 go run examples/go_mem/main.go 2>&1 | grep -C 10 examples/go_mem

因为进行内存分配器追踪时,很多由 runtime 引发的分配信息也会被打印出来,所以我们用 grep 进行过滤,只显示由用户代码(user code)引发的分配信息。然而这里的输出结果为空,表明 make([]struct {}, iter.n) 没有引发任何堆分配。

内存分配器追踪的结论与逃逸分析的结论截然相反!那到底哪个结论是对的呢?

3. 汇编分析

黔驴技穷之际,Go’s Memory Allocator - Overview 这篇文章给了我提示:

So, we know that i is going to be allocated on the heap. But how does the runtime set that up? With the compiler’s help! We can get an idea from reading the generated assembly.

关于 Go 汇编(assembly),推荐大家阅读 Go internals, Chapter 1: Go assembly。

下面我们来看看示例代码对应的汇编:

1
2
3
4
5
6
7
8
9
$ go tool compile -I $GOPATH/pkg/darwin_amd64 -S examples/go_mem/main.go
...
0x001d 00029 (examples/go_mem/main.go:6) LEAQ type.struct {}(SB), AX
0x0024 00036 (examples/go_mem/main.go:6) PCDATA $2, $0
0x0024 00036 (examples/go_mem/main.go:6) MOVQ AX, (SP)
0x0028 00040 (examples/go_mem/main.go:6) MOVQ $4, 8(SP)
0x0031 00049 (examples/go_mem/main.go:6) MOVQ $4, 16(SP)
0x003a 00058 (examples/go_mem/main.go:6) CALL runtime.makeslice(SB)
...

可以看到,其中有一处对 runtime.makeslice(SB) 的调用,显然是由 make([]struct{}, n) 引发的。

查看 runtime.makeslice 的源码:

1
2
3
4
5
func makeslice(et *_type, len, cap int) slice {
...
p := mallocgc(et.size*uintptr(cap), et, true)
return slice{p, len, cap}
}

其中,mallocgc 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
...
if size == 0 {
return unsafe.Pointer(&zerobase)
}
...
if debug.allocfreetrace != 0 {
tracealloc(x, size, typ)
}
...
}

slice 对应的结构体如下:

1
2
3
4
5
type slice struct {
array unsafe.Pointer
len int
cap int
}

结合上述几段源码,我们可以看出:

  • makeslice 函数中:slice 结构体正是我们在第一节提到的 Go 切片 —— array 是指向数组片段的指针,len 是数组片段的长度,cap 是数组片段的最大长度。
  • makeslice 函数中:array 的值来自 p,而 p 则是一个指针,它指向由 mallocgc 分配得到的底层数组。
  • mallocgc 函数中:因为空结构体的 size 为 0,所以 mallocgc 并没有实际进行堆分配;由于没有执行到 tracealloc 的地方,所以进行内存分配器追踪时,不会采集到相关的分配信息。
  • makeslice 函数中:切片 slice 本身是以结构体的形式返回的,所以只会被分配在栈上。

四、总结

经过一系列的探索和分析,至此,我们可以得出以下结论:

  • make([]struct{}, n) 只会被分配在栈上,而不会被分配在堆上。
  • Brad Fitzpatrick 的注释是对的,并且他的意思是 “不会引发堆分配”。
  • 逃逸分析识别出 escapes to heap,并不一定就是堆分配,也可能是栈分配。
  • 进行内存分配器追踪时,如果采集不到堆分配信息,那一定只有栈分配。

最后,我们来解答文章标题提出的疑问 —— 如何确定一个 Go 变量会被分配在哪里?对此,我们的答案是:

  1. 先对代码作逃逸分析。
    • 如果该变量被识别为 escapes to heap,那么它十有八九是被分配在堆上。
    • 如果该变量被识别为 does not escape,或者没有与之相关的分析结果,那么它一定是被分配在栈上。
  2. 如果对 escapes to heap 心存疑惑,就对代码作内存分配器追踪。
    • 如果有采集到与该变量相关的分配信息,那么它一定是被分配在堆上。
    • 否则,该变量一定是被分配在栈上。
  3. 此外,如果想知道 Go 编译器是如何将变量分配在堆上或者栈上的,可以去分析 Go 汇编(以及 runtime 源码)。

五、思考题

  • 如果换成 make([]int, n),结果还会是栈分配吗?
  • 如果换成 make([]int, 4) 呢?
  • 除了空结构体 make([]struct{}, n) 的特例,还有哪些 “被逃逸分析识别为 escapes to heap,但其实是栈分配” 的案例?
  • Go 支持闭包(closure),那么闭包中的变量,又是分配在哪里的?(Where are variables in a closure stored - stack or heap? 说是分配在栈上,对于 Go 也是成立的吗?)

六、相关阅读

  • The empty struct
  • Go Slices: usage and internals
  • Escape analysis
  • Go’s Memory Allocator - Overview
  • Go internals, Chapter 1: Go assembly
  • Five things that make Go fast

层级时间轮的 Golang 实现

发表于 2018-10-05 | 分类于 技术 |

一、引言

最近在工作中负责制定重构计划,需要将部分业务代码从 Python 迁移到 Golang。其中一些功能涉及到 Celery 延时任务,所以一直在思考 Golang 中处理延时任务的有效方案。

其实在软件系统中,“在一段时间后执行一个任务” 的需求比比皆是。比如:

  • 客户端发起 HTTP 请求后,如果在指定时间内没有收到服务器的响应,则自动断开连接。

为了实现上述功能,通常我们会使用定时器 Timer:

  1. 客户端发起请求后,立即创建(启动)一个 Timer:到期间隔为 d,到期后执行 “断开连接” 的操作。
  2. 如果到期间隔 d 以内收到了服务器的响应,客户端就删除(停止)这个 Timer。
  3. 如果一直没有收到响应,则 Timer 最终会到期,然后执行 “断开连接” 的操作。

Golang 内置的 Timer 是采用最小堆来实现的,创建和删除的时间复杂度都为 O(log n)。现代的 Web 服务动辄管理 100w+ 的连接,每个连接都会有很多超时任务(比如发送超时、心跳检测等),如果每个超时任务都对应一个 Timer,性能会比较低下。

论文 Hashed and Hierarchical Timing Wheels 提出了一种用于实现 Timer 的高效数据结构:时间轮。采用时间轮实现的 Timer,创建和删除的时间复杂度为 O(1)。

常见的时间轮实现有两种:

  • 简单时间轮(Simple Timing Wheel)—— 比如 Netty4 的 HashedWheelTimer。
  • 层级时间轮(Hierarchical Timing Wheels)—— 比如 Kafka 的 Purgatory。

参考 Kafka 的层级时间轮实现(基于 Java/Scala 语言),我依葫芦画瓢实现了一个 Golang 版本的层级时间轮,实现源码作为个人项目放到了 GitHub。

下面我们来看看简单时间轮、层级时间轮、Kafka 的层级时间轮变体的实现原理,以及 Golang 实现中的一些要点。

二、简单时间轮

一个 简单时间轮 就是一个循环列表,列表中的每一格包含一个定时任务列表(双向链表)。一个时间单位为 u、大小为 n 的简单时间轮,可以包含的定时任务的最大到期间隔为 n*u。

以 u 为 1ms、n 为 3 的简单时间轮为例,可以包含的定时任务的最大到期间隔为 3ms。

simple-timing-wheel

如上图所示,该简单时间轮的运行原理如下:

  1. 初始时,假设当前时间(蓝色箭头)指向第 1 格(此时:到期间隔为 [0ms, 1ms) 的定时任务放第 1 格,[1ms, 2ms) 的放第 2 格,[2ms, 3ms) 的放第 3 格)。
  2. 此时我们创建一个到期间隔为 1ms 的定时任务 task1,按规则该任务会被插入到第 2 格。
  3. 随着时间的流逝,过了 1ms 后当前时间指向第 2 格,这一格包含的定时任务 task1 会被删除并执行。
  4. 当前时间指向第 2 格(此时:到期间隔为 [0ms, 1ms) 的定时任务放第 2 格,[1ms, 2ms) 的放第 3 格,[2ms, 3ms) 的放第 1 格),我们继续创建一个到期间隔为 2ms 的定时任务 task2,按规则该任务被插入到第 1 格。

简单时间轮的优点是实现简单,缺点是:

  • 一旦选定 n,就不能包含到期间隔超过 n*u 的定时任务。
  • 如果定时任务的到期时间跨度较大,就会选择较大的 n,在定时任务较少时会造成很大的空间浪费。

有一些简单时间轮的 变体实现,它们通过在定时任务中增加记录 round 轮次信息,可以有效弥补上述两个缺点。同样以上面 u 为 1ms、n 为 3 的简单时间轮为例,初始时间指向第 1 格;此时如果要创建到期时间为 4ms 的定时任务,可以在该任务中设置 round 为 1(4/3 取整),剩余到期时间用 4ms 减去 round*3ms 等于 1ms,因此放到第 2 格;等到当前时间指向第 2 格时,判断任务中的 round 大于 0,所以不会删除并执行该任务,而是对其 round 减一(于是 round 变为 0);等到再过 3ms 后,当前时间再次指向第 2 格,判断任务中的 round 为 0,进而删除并执行该任务。

然而,这些变体实现因为只使用了一个时间轮,所以仍然存在一个缺点:处理每一格的定时任务列表的时间复杂度是 O(n),如果定时任务数量很大,分摊到每一格的定时任务列表就会很长,这样的处理性能显然是让人无法接受的。

三、层级时间轮

层级时间轮 通过使用多个时间轮,并且对每个时间轮采用不同的 u,可以有效地解决简单时间轮及其变体实现的问题。

参考 Kafka 的 Purgatory 中的层级时间轮实现:

  • 每一层时间轮的大小都固定为 n,第一层时间轮的时间单位为 u,那么第二层时间轮(我们称之为第一层时间轮的溢出时间轮 Overflow Wheel)的时间单位就为 n*u,以此类推。
  • 除了第一层时间轮是固定创建的,其他层的时间轮(均为溢出时间轮)都是按需创建的。
  • 原先插入到高层时间轮(溢出时间轮)的定时任务,随着时间的流逝,会被降级重新插入到低层时间轮中。

以 u 为 1ms、n 为 3 的层级时间轮为例,第一层时间轮的时间单位为 1ms、大小为 3,第二层时间轮的时间单位为 3ms、大小为 3,以此类推。

hierarchical-timing-wheels

如上图所示,该层级时间轮的运行原理如下:

  1. 初始时,只有第一层(Level 1)时间轮,假设当前时间(蓝色箭头)指向第 1 格(此时:到期间隔为 [0ms, 1ms) 的定时任务放第 1 格,[1ms, 2ms) 的放第 2 格,[2ms, 3ms) 的放第 3 格)。
  2. 此时我们创建一个到期间隔为 2ms 的定时任务 task1,按规则该任务会被插入到第一层时间轮的第 3 格。
  3. 同一时刻,我们再次创建一个到期间隔为 4ms 的定时任务 task2,因为到期间隔超过了第一层时间轮的间隔范围,所以会创建第二层(Level 2)时间轮;第二层时间轮中的当前时间(蓝色箭头)也指向第 1 格,按规则该任务会被插入到第二层时间轮的第 2 格。
  4. 随着时间的流逝,过了 2ms 后,第一层时间轮中的当前时间指向第 3 格,这一格包含的任务 task1 会被删除并执行;此时,第二层时间轮的当前时间没有变化,依然指向第 1 格。
  5. 随着时间的流逝,又过了 1ms 后,第一层时间轮中的当期时间指向第 1 格,这一格中没有任务;此时,第二层当前时间指向第 2 格,这一格包含的任务 task2 会被删除并重新插入时间轮,因为剩余到期时间为 1ms,所以 task2 会被插入到第一层时间轮的第 2 格。
  6. 随着时间的流逝,又过了 1ms 后,第一层时间轮中的当前时间指向第 2 格,这一格包含的定时任务 task2 会被删除并执行;此时,第二层时间轮的当前时间没有变化,依然指向第 2 格。

四、Kafka 的变体实现

在具体实现层面(参考 Kafka Timer 实现源码),Kafka 的层级时间轮与上面描述的原理有一些差别。

1. 时间轮表示

kafka-implementation-timing-wheel-representation

如上图所示,在时间轮的表示上面:

  • 使用大小为 wheelSize 的数组来表示一层时间轮,其中每一格是一个 bucket,每个 bucket 的时间单位为 tick。
  • 这个时间轮数组并没有模拟循环列表的行为(如图左所示),而是模拟了哈希表的行为。具体而言(如图右所示),这个时间轮数组会随着 currentTime 的流逝而移动,也就是说 currentTime 永远是指向第一个 bucket 的,每个落到该时间轮的定时任务,都会根据哈希函数 (expiration/tick)%wheelSize 散列到对应的 bucket 中(参考 源码)。

2. 时钟驱动方式

常规的时间轮实现中,会在一个线程中每隔一个时间单位 tick 就醒来一次,并驱动时钟走向下一格,然后检查这一格中是否包含定时任务。如果时间单位 tick 很小(比如 Kafka 中 tick 为 1ms)并且(在最低层时间轮的)定时任务很少,那么这种驱动方式将会非常低效。

Kafka 的层级时间轮实现中,利用了 Java 内置的 DelayQueue 结构,将每一层时间轮中所有 “包含有定时任务的 bucket” 都加入到同一个 DelayQueue 中(参考 源码),然后 等到有 bucket 到期后再驱动时钟往前走(参考 源码),并逐个处理该 bucket 中的定时任务(参考 源码)。

kafka-implementation-clock-driving-method

如上图所示:

  1. 往层级时间轮中添加一个定时任务 task1 后,会将该任务所属的 bucket2 的到期时间设置为 task1 的到期时间 expiration(= 当前时间 currentTime + 定时任务到期间隔 duration),并将这个 bucket2 添加(Offer)到 DelayQueue 中。
  2. DelayQueue(内部有一个线程)会等待 “到期时间最早(earliest)的 bucket” 到期,图中等到的是排在队首的 bucket2,于是经由 poll 返回并删除这个 bucket2;随后,时间轮会将当前时间 currentTime 往前移动到 bucket2 的 expiration 所指向的时间(图中是 1ms 所在的位置);最后,bucket2 中包含的 task1 会被删除并执行。

上述 Kafka 层级时间轮的驱动方式是非常高效的。虽然 DelayQueue 中 offer(添加)和 poll(获取并删除)操作的时间复杂度为 O(log n),但是相比定时任务的个数而言,bucket 的个数其实是非常小的(也就是 O(log n) 中的 n 很小),因此性能也是没有问题的。

五、Golang 实现要点

timingwheel 中的 Golang 实现,基本上都是参考 Kafka 的层级时间轮的原理来实现的。

因为 Golang 中没有现成的 DelayQueue 结构,所以自己实现了一个 DelayQueue,其中:

  • PriorityQueue —— 从 NSQ 借用过来的 优先级队列(基于最小堆实现)。
  • DelayQueue —— Offer(添加 bucket)和 Poll(获取并删除 bucket)的运作方式,跟 Golang Timer 运行时中 addtimerLocked 和 timerproc 的运作方式如出一辙,因此参考了其中的实现方式(参考 原理介绍)。

六、相关阅读

  • timingwheel - Golang 实现源码
  • Apache Kafka, Purgatory, and Hierarchical Timing Wheels
  • Kafka Timer 实现源码

使用 Golang Timer 的正确方式

发表于 2018-09-15 | 分类于 技术 |

一、标准 Timer 的问题

以下讨论只针对由 NewTimer 创建的 Timer,因为这种 Timer 会使用 channel 来传递到期事件,而正确操作 channel 并非易事。

Timer.Stop

按照 Timer.Stop 文档 的说法,每次调用 Stop 后需要判断返回值,如果返回 false(表示 Stop 失败,Timer 已经在 Stop 前到期)则需要排掉(drain)channel 中的事件:

1
2
3
if !t.Stop() {
<-t.C
}

但是如果之前程序已经从 channel 中接收过事件,那么上述 <-t.C 就会发生阻塞。可能的解决办法是借助 select 进行 非阻塞 排放(draining):

1
2
3
4
5
6
if !t.Stop() {
select {
case <-t.C: // try to drain the channel
default:
}
}

但是因为 channel 的发送和接收发生在不同的 goroutine,所以 存在竞争条件(race condition),最终可能导致 channel 中的事件未被排掉。

以下就是一种有问题的场景,按时间先后顺序发生:

  • goroutine A:Go 运行时判断 Timer 已经到期,于是从最小堆中删除该 Timer
  • goroutine B:应用程序执行 Timer.Stop,发现 Timer 已经到期,进而返回 false
  • goroutine B:应用程序继续执行 select...case <-t.C,因为 channel 中并没有事件,所以会立即返回
  • goroutine A:Go 运行时将到期事件发送到该 Timer 的 channel 中

Timer.Reset

按照 Timer.Reset 文档 的说法,要正确地 Reset Timer,首先需要正确地 Stop Timer。因此 Reset 的问题跟 Stop 基本相同。

二、使用 Timer 的正确方式

参考 Russ Cox 的回复(这里 和 这里),目前 Timer 唯一合理的使用方式是:

  • 程序始终在同一个 goroutine 中进行 Timer 的 Stop、Reset 和 receive/drain channel 操作
  • 程序需要维护一个状态变量,用于记录它是否已经从 channel 中接收过事件,进而作为 Stop 中 draining 操作的判断依据

如果每次使用 Timer 都要按照上述方式来处理,无疑是一件很费神的事。为此,我专门写了一个 Go 库 goodtimer 来解决标准 Timer 的问题。懒是一种美德 :-)

三、相关阅读

  • 论golang Timer Reset方法使用的正确姿势
  • time.Timer doc

Redis 4.0 非阻塞删除

发表于 2018-08-18 | 分类于 技术 |

一、要解决的问题

一直以来,Redis 都是单线程的(参考 FAQ)。这种模型使得 Redis 简单、高效,但缺点也很明显:如果执行一个比较耗时的命令,那么在该命令执行期间,整个 Redis 服务都将被阻塞(无法并发地执行其他命令)。

大部分 Redis 命令的执行速度都很快,所以不是问题;但也有一些命令,比如 ZUNIONSTORE、LRANGE、SINTER,以及臭名昭著的 KEYS,根据处理数据集大小的不同,可能会阻塞 Redis 数秒或几分钟。

以 DEL 命令为例,当被删除的 key 是 list、set、sorted set 或 hash 类型时,时间复杂度为 O(M),其中 M 是 key 中包含的元素的个数。

二、非阻塞删除

Redis 的作者 Salvatore Sanfilippo 自己也意识到了上述问题,并提出了对应的解决方案:非阻塞删除(参考 Lazy Redis is better Redis)。简而言之,「非阻塞删除」就是将删除操作放到另外一个线程(而非 Redis 主线程)去处理。

最终「非阻塞删除」在 Redis 4.0 中得以实现(参考 Redis 4.0 release notes),从此 Redis 开启了 “多线程” 时代。

新增实现的「非阻塞删除」包括以下命令:

命令 (原来的)阻塞版本
UNLINK DEL
FLUSHALL ASYNC FLUSHALL
FLUSHDB ASYNC FLUSHDB

三、DEL vs UNLINK

1. 源码实现

参考 Redis 源码 可以发现,DEL 和 UNLINK 分别对应不同的处理函数:

命令 处理函数
DEL dbSyncDelete
UNLINK dbAsyncDelete

具体的实现细节请自行研读源码。

2. 耗时对比

下面我们来实际对比一下 DEL 和 UNLINK 的耗时差异。

开启 Slow log

设置 Slow log 记录每条命令的耗时(参考 SLOWLOG):

1
2
3
4
5
127.0.0.1:6379> CONFIG SET slowlog-log-slower-than 0
OK
127.0.0.1:6379> CONFIG GET slowlog-log-slower-than
1) "slowlog-log-slower-than"
2) "0"

创建两个大 hash

准备一个 Lua 脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
local bulk = 1000
local fvs = {}
local j
for i = 1, ARGV[1] do
j = i % bulk
if j == 0 then
fvs[2 * bulk - 1] = "field" .. i
fvs[2 * bulk] = "value" .. i
redis.call("HMSET", KEYS[1], unpack(fvs))
fvs = {}
else
fvs[2 * j - 1] = "field" .. i
fvs[2 * j] = "value" .. i
end
end
if #fvs > 0 then
redis.call("HMSET", KEYS[1], unpack(fvs))
end
return "OK"

将上述脚本保存为 huge_hmset.lua,然后借助该脚本创建两个大 hash(参考 how to load lua script from file for redis),分别为 hash1 和 hash2,它们各自拥有 100 万个 field:

1
2
3
4
$ redis-cli --eval huge_hmset.lua hash1 , 1000000
"OK"
$ redis-cli --eval huge_hmset.lua hash2 , 1000000
"OK"

上述操作会在 Slow log 中产生大量 HMSET 命令,这里先清除掉:

1
2
127.0.0.1:6379> SLOWLOG RESET
OK

DEL hash1

1
2
3
127.0.0.1:6379> DEL hash1
(integer) 1
(0.63s)

UNLINK hash2

1
2
127.0.0.1:6379> UNLINK hash2
(integer) 1

查看 Slow log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1:6379> SLOWLOG GET 2
1) 1) (integer) 5089
2) (integer) 1534653951
3) (integer) 17
4) 1) "UNLINK"
2) "hash2"
5) "127.0.0.1:56560"
6) ""
2) 1) (integer) 5088
2) (integer) 1534653948
3) (integer) 630305
4) 1) "DEL"
2) "hash1"
5) "127.0.0.1:56560"
6) ""

耗时对比结果:

命令 耗时
DEL hash1 630305 us
UNLINK hash2 17 us

值得注意的是:UNLINK 执行如此之快,并非使用了什么快速算法,而是因为它将真正的删除操作异步化了。

四、相关阅读

  • Never Stop Serving: Making Redis Concurrent With Modules

Redis replication 揭秘

发表于 2018-07-03 | 分类于 技术 |

一、要解决的问题

按照 Redis 官方文档 - Replication 的说法:Redis replication 是一种 master-slave 模式的复制机制,这种机制使得 slave 节点可以成为与 master 节点完全相同的副本。

我们知道,单个 Redis 节点也是可以直接工作的。那为什么一个 Redis 节点(master)还需要一个或多个副本(slave)呢?或者说 replication 到底想要解决什么问题?官方文档如是说:

Replication can be used both for scalability, in order to have multiple slaves for read-only queries (for example, slow O(N) operations can be offloaded to slaves), or simply for improving data safety and high availability.

简而言之,replication 主要用于解决两个问题:

1. 读扩展

一个 master 用于写,多个 slave 用于分摊读的压力。

redis-replication-scalability

2. 高可用

如果 master 挂掉了,可以提升(promote)一个 slave 为新的 master,进而实现故障转移(failover)。

redis-replication-high-availability

思考:如果没有 replication,上述两个问题该如何应对?

二、replication 初体验

开两个终端,分别启动一个 Redis 节点:

1
2
3
4
# Terminal 1
$ redis-4.0.8/src/redis-server -p 6379
# Terminal 2
$ redis-4.0.8/src/redis-server -p 6380

在 6379 节点上设置并获取 key1:

1
2
3
4
5
$ redis-4.0.8/src/redis-cli -p 6379
127.0.0.1:6379> SET key1 value1
OK
127.0.0.1:6379> GET key1
"value1"

在 6380 节点上尝试获取 key1:

1
2
3
$ redis-4.0.8/src/redis-cli -p 6380
127.0.0.1:6380> GET key1
(nil)

可以看出,两个 Redis 节点各自为政,二者的数据并没有同步。

下面我们让 6380 成为 6379 的 slave 节点:

1
2
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379
OK

然后再尝试获取 key1:

1
2
127.0.0.1:6380> GET key1
"value1"

很显然,最初在 6379 节点(后续称为 master)设置的 key1 已经被同步到了 6380 节点(后续称为 slave)。

实验:尝试在 master 设置更多的 key 或删除 key,然后在 slave 上获取并观察结果。

三、情景分析

1. slave 初次连接 master

上述过程中,在 slave 上执行 SLAVEOF 命令以后,可以看到 slave 的日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
31667:S 03 Jul 21:32:17.809 * Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.
31667:S 03 Jul 21:32:17.809 * SLAVE OF 127.0.0.1:6379 enabled (user request from 'id=2 addr=127.0.0.1:58544 fd=8 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=slaveof')
31667:S 03 Jul 21:32:17.825 * Connecting to MASTER 127.0.0.1:6379
31667:S 03 Jul 21:32:17.826 * MASTER <-> SLAVE sync started
31667:S 03 Jul 21:32:17.826 * Non blocking connect for SYNC fired the event.
31667:S 03 Jul 21:32:17.826 * Master replied to PING, replication can continue...
31667:S 03 Jul 21:32:17.826 * Trying a partial resynchronization (request 823e1002c282b4c088a6f80d4251de04f920068d:1).
31667:S 03 Jul 21:32:17.827 * Full resync from master: 599456031709498747f866bc3f7f4382db99ed89:0
31667:S 03 Jul 21:32:17.827 * Discarding previously cached master state.
31667:S 03 Jul 21:32:17.926 * MASTER <-> SLAVE sync: receiving 193 bytes from master
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Flushing old data
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Loading DB in memory
31667:S 03 Jul 21:32:17.927 * MASTER <-> SLAVE sync: Finished with success

对应 master 的日志如下:

1
2
3
4
5
6
7
31655:M 03 Jul 21:32:17.826 * Slave 127.0.0.1:6380 asks for synchronization
31655:M 03 Jul 21:32:17.826 * Partial resynchronization not accepted: Replication ID mismatch (Slave asked for '823e1002c282b4c088a6f80d4251de04f920068d', my replication IDs are '4014bea143e2ade5aa81012849b0775ab0377b85' and '0000000000000000000000000000000000000000')
31655:M 03 Jul 21:32:17.826 * Starting BGSAVE for SYNC with target: disk
31655:M 03 Jul 21:32:17.826 * Background saving started by pid 31669
31669:C 03 Jul 21:32:17.827 * DB saved on disk
31655:M 03 Jul 21:32:17.926 * Background saving terminated with success
31655:M 03 Jul 21:32:17.926 * Synchronization with slave 127.0.0.1:6380 succeeded

分析上述输出日志,我们可以初步总结出 slave 和 master 的交互时序:

  1. slave 主动连接 master。
  2. 连接成功后,slave 会向 master 发起 partial resynchronization 的请求。
  3. master 收到请求后,判断 replication ID 不匹配,拒绝执行 partial resynchronization,转而通知 slave 执行 full resync。
  4. 随后 master 开始执行 BGSAVE 命令,将当前 DB 数据保存到 disk 磁盘,最后向 slave 发送 DB 数据。
  5. slave 从 master 接收到 DB 数据后,将其加载到内存,同时删除旧数据。

2. slave 断开后重连 master

思考:在同一台机器上,如何模拟 master 和 slave 的网络断开与恢复?

master 日志:

1
2
3
4
33518:M 03 Jul 22:46:48.432 # Disconnecting timedout slave: 127.0.0.1:6380
33518:M 03 Jul 22:46:48.432 # Connection with slave 127.0.0.1:6380 lost.
33518:M 03 Jul 22:46:50.538 * Slave 127.0.0.1:6380 asks for synchronization
33518:M 03 Jul 22:46:50.538 * Partial resynchronization request from 127.0.0.1:6380 accepted. Sending 0 bytes of backlog starting from offset 1541.

slave 日志:

1
2
3
4
5
6
7
8
9
33519:S 03 Jul 22:46:48.432 # Connection with master lost.
33519:S 03 Jul 22:46:48.432 * Caching the disconnected master state.
33519:S 03 Jul 22:46:50.536 * Connecting to MASTER 127.0.0.1:6379
33519:S 03 Jul 22:46:50.537 * MASTER <-> SLAVE sync started
33519:S 03 Jul 22:46:50.537 * Non blocking connect for SYNC fired the event.
33519:S 03 Jul 22:46:50.537 * Master replied to PING, replication can continue...
33519:S 03 Jul 22:46:50.537 * Trying a partial resynchronization (request 6b1b77bebea22557686922f99cfa3103ba0824ae:1541).
33519:S 03 Jul 22:46:50.538 * Successful partial resynchronization with master.
33519:S 03 Jul 22:46:50.538 * MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.

可以看出:

  1. 网络断开一段时间后,master 会断开与 slave 的连接。
  2. 网络恢复后,仍然是 slave 主动连接 master。
  3. 连接成功后,slave 会向 master 发起 partial resynchronization 的请求。
  4. 这一次,master 接受了该 partial resynchronization 请求,然后将 backlog 中由 (offset, size) 标记的数据流发送给 slave。
  5. slave 从 master 接收到数据流后,更新自己内存中的数据。

实验:redis.conf 中有两个参数 repl-timeout(默认值为 60 秒)和 repl-backlog-ttl(默认值为 3600 秒),尝试都设置为 10 秒,然后断开网络一直等到 25 秒后再恢复,再观察 master 和 slave 的日志会有什么不同?

3. master 与 slave 连接正常,写 master

通过 telnet 连接到 master:

1
2
3
4
$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

键入 PSYNC 命令,尝试与 master 进行同步:

1
2
3
4
5
6
7
8
9
$ telnet 127.0.0.1 6379
...
PSYNC ? -1
+FULLRESYNC 8cdd5be435af5bcda9bb332e319cae9b71f788d7 344
$194
REDIS0008? redis-ver4.0.8?
redis-bits?@?ctime?6?@[used-mem???repl-stream-db??repl-id(8cdd5be435af5bcda9bb332e319cae9b71f788d7?
repl-offset?X?
aof-preamble???key1value1?'>?w?Z

此时查看 master 的日志:

1
2
3
4
5
6
7
40535:M 07 Jul 17:04:51.009 * Slave 127.0.0.1:<unknown-slave-port> asks for synchronization
40535:M 07 Jul 17:04:51.009 * Full resync requested by slave 127.0.0.1:<unknown-slave-port>
40535:M 07 Jul 17:04:51.009 * Starting BGSAVE for SYNC with target: disk
40535:M 07 Jul 17:04:51.009 * Background saving started by pid 40579
40579:C 07 Jul 17:04:51.012 * DB saved on disk
40535:M 07 Jul 17:04:51.045 * Background saving terminated with success
40535:M 07 Jul 17:04:51.045 * Synchronization with slave 127.0.0.1:<unknown-slave-port> succeeded

随后在 master 上设置 key2:

1
2
127.0.0.1:6379> SET key2 value2
OK

然后观察 telnet 的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ telnet 127.0.0.1 6379
...
*1
$4
PING
*2
$6
SELECT
$1
0
*3
$3
SET
$4
key2
$6
value2
*1
$4
PING

可以看出:

  1. telnet 通过 PSYNC 命令,成为了 master 的一个新的 slave。
  2. master 上的写命令(这里是 SET key2 value2),会被传播(propagate)到 salve 上,进而保证了 slave 与 master 的数据一致性。

四、replication 原理

上面的三种情景,其实已经涵盖了 Redis replication 的两大核心操作:

  1. 重同步(resync)
    • 完整重同步(full resynchronization)
    • 部分重同步(partial resynchronization)
  2. 命令传播(command propagate)

下面我们对这两种操作,做进一步阐述。

1. 重同步

「重同步」用于将 slave 的数据库状态更新至 master 当前所处的数据库状态。

SYNC 与 PSYNC

旧版本 Redis 中,「重同步」通过 SYNC 命令来实现。从 2.8 版本开始,Redis 改用 PSYNC 命令来代替 SYNC 命令。

SYNC 命令和 PSYNC 命令的区别:

命令 初次复制 断线后复制
SYNC 完整重同步 完整重同步
PSYNC 完整重同步:PSYNC ? -1 部分重同步:PSYNC <replication-id> <offset>

完整重同步

redis-replication-full-resync

说明:

  1. slave 通过 SYNC 或 PSYNC 命令,向 master 发起同步请求。
  2. master 返回 FULLRESYNC 告知 slave 将执行「完整重同步」,先决条件为:
    • 请求命令是「完整重同步」SYNC。
    • 请求命令是「完整重同步」PSYNC ? -1。
    • 请求命令是「部分重同步」PSYNC <replication-id> <offset>,但是 <replication-id> 不是 master 的 replication-id,或者 slave 给的 <offset> 不在 master 的「复制积压缓冲区」backlog 里面。
  3. master 执行 BGSAVE 命令,将当前数据库状态保存为 RDB 文件。
  4. 生成 RDB 文件完毕后,master 将该文件发送给 slave。
  5. slave 收到 RDB 文件后,将其加载至内存。
  6. master 将 backlog 中缓冲的命令发送给 slave(一开始在 BGSAVE 时记录了当时的 offset)。
  7. slave 收到后,逐个执行这些命令。

部分重同步

redis-replication-partial-resync

说明:

  1. slave 通过 PSYNC <replication-id> <offset> 命令,向 master 发起「部分重同步」请求。
  2. master 返回 CONTINUE 告知 slave 同意执行「部分重同步」,先决条件为:
    • <replication-id> 是 master 的 replication-id,并且 slave 给的 <offset> 在 master 的「复制积压缓冲区」backlog 里面
  3. master 将 backlog 中缓冲的命令发送给 slave(根据 slave 给的 offset)。
  4. slave 收到后,逐个执行这些命令。

由上可以看出,「复制积压缓冲区」backlog 是「部分重同步」得以实现的关键所在。

复制积压缓冲区

「复制积压缓冲区」是 master 维护的一个固定长度(fixed-sized)的先进先出(FIFO)的内存队列。值得注意的是:

  • 队列的大小由配置 repl-backlog-size 决定,默认为 1MB。当队列长度超过 repl-backlog-size 时,最先入队的元素会被弹出,用于腾出空间给新入队的元素。
  • 队列的生存时间由配置 repl-backlog-ttl 决定,默认为 3600 秒。如果 master 不再有与之相连接的 slave,并且该状态持续时间超过了 repl-backlog-ttl,master 就会释放该队列,等到有需要(下次又有 slave 连接进来)的时候再创建。

master 会将最近接收到的写命令(按 Redis 协议的格式)保存到「复制积压缓冲区」,其中每个字节都会对应记录一个偏移量 offset。

. . . . . . . . . . . . . .
偏移量 … 10087 10088 10089 10090 10091 10092 10093 10094 10095 10096 10097 …
字节值 … ‘*’ 3 ‘\r’ ‘\n’ ‘$’ 3 ‘\r’ ‘\n’ ‘S’ ‘E’ ‘T’ …

与此同时,slave 会维护一个 offset 值,每次从 master 传播过来的命令,一旦成功执行就会更新该 offset。尝试「部分重同步」的时候,slave 都会带上自己的 offset,master 再判断 offset 偏移量之后的数据是否存在于自己的「复制积压缓冲区」中,以此来决定执行「部分重同步」还是「完整重同步」。

2. 命令传播

「命令传播」用于在 master 的数据库状态被修改时,将导致变更的命令传播给 slave,从而让 slave 的数据库状态与 master 保持一致。

redis-replication-command-propagate

说明:master 进行命令传播时,除了将写命令直接发送给所有 slave,还会将这些命令写入「复制积压缓冲区」,用于后续可能发生的「部分重同步」操作。

五、参考资料

  • Redis 官方文档 - Replication
  • Redis 设计与实现

基于 Swagger 描述语言为 REST API 生成 Markdown 文档

发表于 2017-10-31 | 分类于 技术 |

对于 REST API 的开发者而言,不管是对内作为团队的开发文档,还是对外作为给用户的说明文档,API 文档都是不可或缺的。

然而 “文档是死的、代码是活的”,在现实中,文档跟不上代码的更新节奏的情况比比皆是。如何编写 实时更新的、易于阅读的 文档成了一个普遍的难题。由此,API 描述语言应用而生。

Swagger 是一个简单但功能强大的 API 表达工具。它具有地球上最大的 API 工具生态系统。数以千计的开发人员,使用几乎所有的现代编程语言,都在支持和使用 Swagger。使用 Swagger 生成 API,我们可以得到交互式文档,自动生成代码的 SDK 以及 API 的发现特性等(参考 使用Swagger生成RESTful API文档)。

Swagger 的功能很丰富,但在这里我们只关心一点:如何基于简单的 Swagger 描述语言,为 REST API 生成易读的 Markdown 离线文档。

一、基于 Swagger Spec 编写 API 描述文档

这一步无需多说,打开你喜欢的编辑器,或者使用官方的 Swagger Editor,参考 Spec 语法 编写即可。

这里我们以 petstore-minimal.yaml 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
---
swagger: "2.0"
info:
version: "1.0.0"
title: "Swagger Petstore"
description: "A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification"
termsOfService: "http://swagger.io/terms/"
contact:
name: "Swagger API Team"
license:
name: "MIT"
host: "petstore.swagger.io"
basePath: "/api"
schemes:
- "http"
consumes:
- "application/json"
produces:
- "application/json"
paths:
/pets:
get:
description: "Returns all pets from the system that the user has access to"
produces:
- "application/json"
responses:
"200":
description: "A list of pets."
schema:
type: "array"
items:
$ref: "#/definitions/Pet"
definitions:
Pet:
type: "object"
required:
- "id"
- "name"
properties:
id:
type: "integer"
format: "int64"
name:
type: "string"
tag:
type: "string"

二、安装转换工具 Swagger2Markup

Swagger2Markup 是一个 Java 编写的工具,用于将 Swagger 文档转换为 AsciiDoc 或者 Markdown 文档。简直就是为我们这里的需求量身定做的 :-)

安装 Swagger2Markup 的步骤如下:

1. 安装 Java

以 Ubuntu 为例,参考 How To Install Java on Ubuntu with Apt-Get 和 Ubuntu 安装 JDK 7 / JDK8 的两种方式:

  1. 安装默认的 JRE/JDK

    1
    2
    3
    4
    5
    $ sudo apt-get update
    $ # 安装默认的 JRE
    $ sudo apt-get install default-jre
    $ # 安装默认的 JDK
    $ sudo apt-get install default-jdk
  2. 安装 Oracle JDK 8

    1
    2
    3
    4
    5
    $ # 添加 ppa
    $ sudo add-apt-repository ppa:webupd8team/java
    $ sudo apt-get update
    $ # 安装 oracle-java-installer(按提示依次选择 ok 和 yes 即可)
    $ sudo apt-get install oracle-java8-installer

2. 下载 Swagger2Markup 的命令行工具

参考 Command Line Interface,下载最新的 jar 包(当前为 swagger2markup-cli-1.3.1.jar)即可。

三、使用 Swagger2Markup 将 Swagger 转换为 Markdown

参考 Command Line Interface 中的步骤:

1. 创建一个 config.properties 配置文件

设置 markupLanguage 为 MARKDOWN

1
swagger2markup.markupLanguage=MARKDOWN

2. 将 Swagger 转换为 Markdown

1
$ java -jar swagger2markup-cli-1.3.1.jar convert -i /path/to/petstore-minimal.yaml -f /tmp/petstore-minimal -c /path/to/config.properties

3. 查看生成的文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Swagger Petstore


<a name="overview"></a>
## Overview
A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification


### Version information
*Version* : 1.0.0


### Contact information
*Contact* : Swagger API Team


### License information
*License* : MIT
*Terms of service* : http://swagger.io/terms/


### URI scheme
*Host* : petstore.swagger.io
*BasePath* : /api
*Schemes* : HTTP


### Consumes

* `application/json`


### Produces

* `application/json`




<a name="paths"></a>
## Paths

<a name="pets-get"></a>
### GET /pets

#### Description
Returns all pets from the system that the user has access to


#### Responses

|HTTP Code|Description|Schema|
|---|---|---|
|**200**|A list of pets.|< [Pet](#pet) > array|


#### Produces

* `application/json`




<a name="definitions"></a>
## Definitions

<a name="pet"></a>
### Pet

|Name|Schema|
|---|---|
|**id** <br>*required*|integer (int64)|
|**name** <br>*required*|string|
|**tag** <br>*optional*|string|

四、CLI as a service

如果团队内部人员都会用到这个工具,但是又不想在每个人的电脑上都安装 Java 和 Swagger2Markup,这时可以基于命令行工具 Swagger2Markup 提供一个 “文档转换服务”。

作为示例,以下是使用 Python 语言并且借助 RESTArt 库实现的一个 “文档转换服务”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# swagger2markdown.py

import os
import tempfile

from restart import status
from restart.api import RESTArt
from restart.parsers import Parser
from restart.renderers import Renderer
from restart.resource import Resource

api = RESTArt()


class SwaggerParser(Parser):

content_type = 'text/plain'

def parse(self, stream, content_type, content_length, context=None):
return stream.read().decode('utf-8')


class MarkdownRenderer(Renderer):

content_type = 'text/plain'
format_suffix = 'md'

def render(self, data, context=None):
return data.encode('utf-8')


@api.register
class SwaggerMarkdownDocs(Resource):

name = 'swagger_markdown_docs'

parser_classes = (SwaggerParser,)
renderer_classes = (MarkdownRenderer,)

def create(self, request):
with tempfile.NamedTemporaryFile(suffix='.yml', delete=False) as yml:
yml_filename = yml.name
yml.write(request.data.encode('utf-8'))

with tempfile.NamedTemporaryFile(suffix='.md', delete=False) as md:
md_filename = md.name

jar = '/path/to/swagger2markup-cli-1.3.1.jar'
conf = '/path/to/config.properties'
os.system('java -jar {jar} convert -i {yml} -f {md} -c {conf}'.format(
jar=jar, yml=yml_filename, md=md_filename[:-len('.md')], conf=conf,
))

with open(md_filename) as md:
content = md.read().decode('utf-8')

os.unlink(yml_filename)
os.unlink(md_filename)

return content, status.HTTP_201_CREATED

启动 “文档转换服务”:

1
2
$ restart swagger2markdown:api
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

使用 “文档转换服务” 生成 Markdown 文档:

1
$ curl -H 'Content-Type: text/plain' -XPOST http://localhost:5000/swagger_markdown_docs --data-binary @/path/to/petstore-minimal.yaml > /tmp/petstore-minimal.md

redis-py 连接池不能处理空闲的 CLOSE_WAIT 连接

发表于 2017-10-26 | 分类于 技术 |

距离上次排查 epoll 与 CLOSE_WAIT 连接 的问题,已经过去了将近一年。最近在看 《UNIX 网络编程》,看到 “TCP 状态转换图” 中提到 CLOSE_WAIT 状态时,突然又想起来上次还有一个 遗留问题,于是决定再次尝试分析一下。

一、问题现象

上次的遗留问题,归纳起来就是:(由于 Redis 的 server 端主动关闭超时连接)在 client 端产生的 CLOSE_WAIT 连接,一直无法被 redis-py 连接池复用,进而无法被正常 close。

二、分析 redis-py 连接池机制

以当前最新的 redis-py 2.10.6 为例,从连接池获取连接 的源码:

1
2
3
4
5
6
7
8
9
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection

释放连接到连接池 的源码:

1
2
3
4
5
6
7
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)

可以看出,redis-py 使用 _available_connections 来维护 “空闲可用的连接列表”,获取连接时 pop 出列表末尾的连接,释放连接时 append 连接到列表末尾。因此 “空闲可用的连接列表” 其实是个 后进先出的栈。

很显然,基于这种 “后进先出的栈” 的数据结构,redis-py 连接池对连接的获取和释放都发生在 “栈顶”。至此,原因就很明显了:如果某段时间内由于突发流量产生了大量连接,一旦流量趋于平稳(减少)后,位于 “栈底” 的部分连接就会一直无法被复用,于是这些连接被 Redis 的 server 端超时关闭后,就会一直处于 CLOSE_WAIT 状态。

关于这个问题,其实在 GitHub 上已经有一个类似的 issue:ConnectionPool doesn’t reap timeout’ed connections,不过一直还未得到处理 :-(

三、解决方案

为了让 redis-py 连接池能够更均衡地复用各个连接,很容易想到的一个方案是:将数据结构从 “后进先出的栈” 改成 “先进先出的队列”。

通过修改 get_connection 的实现可以很容易做到这一点:

1
2
# connection = self._available_connections.pop()
connection = self._available_connections.pop(0) # 获取连接时,从队列首部 pop 出来

关于这个方案,其实在 GitHub 上也有一个 pull request:Connection management improvements,然而还是没有得到响应 :-( 不得不手动尴尬一下…

四、复现和验证

为了简化场景,便于问题的复现和方案的验证,这里有一段辅助代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# example.py

import select

import redis


def main():
import os; print('pid: %s' % os.getpid())

r = redis.StrictRedis(host='localhost', port=6379, db=0)
pool = r.connection_pool
epoll = select.epoll()

for conn in (pool.get_connection(''), pool.get_connection('')):
conn.connect()
epoll.register(conn._sock, select.POLLIN)
pool.release(conn)

command_args = ('SET', 'foo', 'bar')
while True:
conn = pool.get_connection('')
conn.send_command(*command_args)
epoll.poll()
r.parse_response(conn, command_args[0])
pool.release(conn)


if __name__ == '__main__':
main()

操作步骤提示:

  1. 设置 Redis 的 server 端的 timeout 参数(比如 10 秒)
  2. 运行代码(python example.py)
  3. 一段时间后,观察进程的 CPU 占用率(top)
  4. 观察进程是否有 CLOSE_WAIT 连接(lsof -p PID)
12
RussellLuo

RussellLuo

17 日志
2 分类
16 标签
GitHub 豆瓣
© 2023 RussellLuo
由 Hexo 强力驱动
主题 - NexT.Mist
访问人数 总访问量 次