一. 前言
本文章的主要内容是对etcd-release-3.4中client package的一个源码阅读分析记录。
二. 概括简述
对于client包,是通过http方式来与etcd集群进行交互的一个sdk包,该包的内容主要分为:
1.用于底层发送请求给集群的http client类;
2.用于上层调用的各种功能封装类(如key类,其包含查询、插入和删除等用于键值对操作的功能,如member类,其包含查询、增加和删除等用于对集群成员操作的功能)。
三. 具体分析
1. http client相关类
Client interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31//client.go
type Client interface {
//用于发送请求同步当前集群可用的节点
Sync(context.Context) error
//设置定时自动发送节点同步请求
AutoSync(context.Context, time.Duration) error
//返回当前所持有的远程集群节点url
Endpoints() []string
//设置远程集群节点
SetEndpoints(eps []string) error
//获取当前etcd服务版本以及集群版本
GetVersion(ctx context.Context) (*version.Versions, error)
httpClient
}
type httpClient interface {
//用于发送http请求的调用
Do(context.Context, httpAction) (*http.Response, []byte, error)
}
type httpAction interface {
//用于构建相应功能的http请求
HTTPRequest(url.URL) *http.Request
}
etcd 用于发送http请求的client接口主要有以上几个相关的借口,而etcd官方也根据给出的client接口实现了一个开箱即用的http client类:
1 | //client.go |
该httpClusterClient对包外并不可见,只能通过包的New函数传入自定义的Config来进行构建返回,Config结构如下所示:
1 | //client.go |
对于httpClusterClient的clientFactory成员,使用内部的newHTTPClientFactory函数来进行赋值,其入参正是Config中所给出的三个配置;其返回值是一个函数类型,该函数类型的入参为想要向其发送请求的集群节点,返回一个httpClient的接口类(前面Client interface的部分有列出),内部的httpClient接口实现类为redirectFollowingHTTPClient,而其内部还有一个httpClient接口类型成员,而该成员又使用具体的httpClient接口实现类simpleHTTPClient来进行赋值,由此可见redirectFollowingHTTPClient和simpleHTTPClient都实现了httpClient这个接口,从整个设计模式看来,其属于一个装饰器模式,redirectFollowingHTTPClient的Do方法在simpleHTTPClient的Do方法上装饰了一个重定向请求处理的过程,而simpleHTTPClient持有transport来完成最底层的http请求发送。对于clientFactory,其使用之处在于当准备发送一个功能http请求的时候,通过给出想要通信的节点作为入参来实时生成用于发送请求的redirectFollowingHTTPClient,每一个请求发送时的redirectFollowingHTTPClient都是独立的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//client.go
func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc, headerTimeout time.Duration) httpClientFactory {
return func(ep url.URL) httpClient {
return &redirectFollowingHTTPClient{
checkRedirect: cr,
client: &simpleHTTPClient{
transport: tr,
endpoint: ep,
headerTimeout: headerTimeout,
},
}
}
}
type redirectFollowingHTTPClient struct {
client httpClient
checkRedirect CheckRedirectFunc
}
type simpleHTTPClient struct {
transport CancelableTransport
endpoint url.URL
headerTimeout time.Duration
}
接下来来分析一下底层的http请求发送流程,由于redirectFollowingHTTPClient和simpleHTTPClient在设计模式上属于装饰器模式,因此我们先从最底层的simpleHTTPClient的Do方法开始分析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104//client.go
func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
//构建http请求
req := act.HTTPRequest(c.endpoint)
//此处打印要发送的功能请求对应的curl操作命令(用于查看此时代码实现的功能对应curl是哪些命令)
if err := printcURL(req); err != nil {
return nil, nil, err
}
//用于分析请求是否设置了wait参数(watch请求的时候如果没有对应的事件发生则需要进行wait)
isWait := false
if req != nil && req.URL != nil {
ws := req.URL.Query().Get("wait")
if len(ws) != 0 {
var err error
isWait, err = strconv.ParseBool(ws)
if err != nil {
return nil, nil, fmt.Errorf("wrong wait value %s (%v for %+v)", ws, err, req)
}
}
}
/*
如果没有设置wait参数且HeaderTimeoutPerRequest设置大于0,则设置timeout context,用于响应超时的时候取消请求;
否则只设置普通的cancel context
*/
var hctx context.Context
var hcancel context.CancelFunc
if !isWait && c.headerTimeout > 0 {
hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
} else {
hctx, hcancel = context.WithCancel(ctx)
}
defer hcancel()
/*
设置http request的超时chan,该函数返回一个匿名函数,匿名函数内部为关闭chan的操作;
若上层控制需要取消请求时,调用该匿名函数关闭chan,底层即会把请求取消。
*/
reqcancel := requestCanceler(c.transport, req)
//创建goroutine发送http请求,得到的响应通过rtchan来进行同步结果通知
rtchan := make(chan roundTripResponse, 1)
go func() {
resp, err := c.transport.RoundTrip(req)
rtchan <- roundTripResponse{resp: resp, err: err}
close(rtchan)
}()
var resp *http.Response
var err error
//等待请求响应结果通知或者超时(如果没有设置wait参数且HeaderTimeoutPerRequest大于0)
select {
case rtresp := <-rtchan:
resp, err = rtresp.resp, rtresp.err
case <-hctx.Done():
// cancel and wait for request to actually exit before continuing
reqcancel()
rtresp := <-rtchan
resp = rtresp.resp
switch {
case ctx.Err() != nil:
err = ctx.Err()
case hctx.Err() != nil:
err = fmt.Errorf("client: endpoint %s exceeded header timeout", c.endpoint.String())
default:
panic("failed to get error from context")
}
}
//函数结束后释放response的body资源
defer func() {
if resp != nil {
resp.Body.Close()
}
}()
if err != nil {
return nil, nil, err
}
//创建goroutine去读去响应的body数据,并通过chan来通知读去的结果
var body []byte
done := make(chan struct{})
go func() {
body, err = ioutil.ReadAll(resp.Body)
done <- struct{}{}
}()
//等待响应数据读取结果或外部传入的context的取消操作
select {
case <-ctx.Done():
resp.Body.Close()
<-done
return nil, nil, ctx.Err()
case <-done:
}
return resp, body, err
}
redirectFollowingHTTPClient在simpleHTTPClient的基础上增加了重定向请求处理的相关操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53//client.go
func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
next := act
//此处设置循环用于处理有重定向要求时重新构建重定向请求
for i := 0; i < 100; i++ {
if i > 0 {
/*
调用重定向检查函数检查是否符合继续重定向的条件;
默认的重定向检查函数不允许重定向次数超过10次
*/
if err := r.checkRedirect(i); err != nil {
return nil, nil, err
}
}
//发送请求
resp, body, err := r.client.Do(ctx, next)
if err != nil {
return nil, nil, err
}
//如果响应的状态码为3XX表示需要重定向
if resp.StatusCode/100 == 3 {
//解析重定向的地址
hdr := resp.Header.Get("Location")
if hdr == "" {
return nil, nil, fmt.Errorf("location header not set")
}
loc, err := url.Parse(hdr)
if err != nil {
return nil, nil, fmt.Errorf("location header not valid URL: %s", hdr)
}
//重定向action装饰类,装饰上一次的action
//此处时关于httpAction接口类的一个装饰器模式,后面会进行分析
next = &redirectedHTTPAction{
action: act,
location: *loc,
}
continue
}
//获取到最终结果返回
return resp, body, nil
}
//继续重定向的条件不满足后返回重定向检查错误
return nil, nil, errTooManyRedirectChecks
}
了解完底层的http请求发送后,再继续分析最上层的请求发送调用,即httpClusterClient的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117//client.go
func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
action := act
//拷贝当前所知的所有集群节点
c.RLock()
leps := len(c.endpoints)
eps := make([]url.URL, leps)
n := copy(eps, c.endpoints)
//选取当前的固定通信节点
pinned := c.pinned
/*
如果有auth凭证信息则将原http action外装饰一层auth,
为最终的请求生成附上auth信息
*/
if c.credentials != nil {
action = &authedAction{
act: act,
credentials: *c.credentials,
}
}
c.RUnlock()
if leps == 0 {
return nil, nil, ErrNoEndpoints
}
if leps != n {
return nil, nil, errors.New("unable to pick endpoint: copy failed")
}
var resp *http.Response
var body []byte
var err error
cerr := &ClusterError{}
//获取当前请求是否是一次性请求
isOneShot := ctx.Value(&oneShotCtxValue) != nil
/*
此处for循环从固定的通信节点开始,
如果当前固定节点可用则使用其进行通信,
否则则轮询其他节点查找可用的节点并将其当作固定节点
*/
for i := pinned; i < leps+pinned; i++ {
k := i % leps
//通过clientFactory获取当前发送请求所用的redirectFollowingHTTPClient
hc := c.clientFactory(eps[k])
//发送请求
resp, body, err = hc.Do(ctx, action)
//请求错误处理
if err != nil {
cerr.Errors = append(cerr.Errors, err)
//此处错误为上下文错误,即可能是上层的调用终止了当前请求或者设置了超时
if err == ctx.Err() {
return nil, nil, ctx.Err()
}
if err == context.Canceled || err == context.DeadlineExceeded {
return nil, nil, err
}
} else if resp.StatusCode/100 == 5 {
//此处为服务器错误,基本为节点不可用
switch resp.StatusCode {
case http.StatusInternalServerError, http.StatusServiceUnavailable:
// TODO: make sure this is a no leader response
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String()))
default:
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
}
err = cerr.Errors[0]
}
if err != nil {
/*
如果当前请求发送失败
若该请求不是一次性请求,则更换通信节点尝试发送
*/
if !isOneShot {
continue
}
/*
如果当前请求发送失败
若该请求是一次性请求,当前节点已通信失败则其已不能再作为固定节点
更新固定节点,将下一个节点作为固定节点
然后不再继续发送请求直接返回
*/
c.Lock()
c.pinned = (k + 1) % leps
c.Unlock()
return nil, nil, err
}
/*
如果当前请求发送成功
若该请求并不是通过最初的固定通信节点发送请求成功的,则进行固定可用节点更新
将当前通信成功的节点记录为固定节点
*/
if k != pinned {
c.Lock()
c.pinned = k
c.Unlock()
}
return resp, body, nil
}
//此处所有集群节点都尝试失败,返回所有记录的错误
return nil, nil, cerr
}
分析完httpClusterClient的http请求发送相关的方法后,接着来分析一下它的节点同步相关的方法,即Sync方法和AutoSync方法,而AutoSync方法是在定时器的基础上来调用Sync方法,所以主要分析Sync方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79//client.go
func (c *httpClusterClient) Sync(ctx context.Context) error {
//创建member功能的功能类(后续会有api功能类的代码分析)
mAPI := NewMembersAPI(c)
//调用member功能类的List方法列出当前可用节点,返回的是Member类列表
ms, err := mAPI.List(ctx)
if err != nil {
return err
}
//记录节点的url string
var eps []string
for _, m := range ms {
eps = append(eps, m.ClientURLs...)
}
//解析成URL列表
neps, err := c.parseEndpoints(eps)
if err != nil {
return err
}
//默认固定节点为第一个节点
npin := 0
//根据配置的节点选择模式来对节点列表与固定节点做调整
switch c.selectionMode {
//随机节点选择模式
case EndpointSelectionRandom:
c.RLock()
//当前已记录的可用节点和请求查询的当前可用节点进行比较
eq := endpointsEqual(c.endpoints, neps)
c.RUnlock()
//如果完全一样则直接返回不做任何后续处理
if eq {
return nil
}
//如果不同,则将获取到的当前可用节点列表进行随机打乱
neps = shuffleEndpoints(c.rand, neps)
//leader节点优先模式
case EndpointSelectionPrioritizeLeader:
/*
发送请求查询当前集群的leader节点,
该函数里同样创建了member功能类,调用Leader方法发送请求获取leader信息
*/
nle, err := c.getLeaderEndpoint(ctx, neps)
//leader查询错误则直接返回错误
if err != nil {
return ErrNoLeaderEndpoint
}
//在获取的可用节点列表中对比查询leader节点的索引号,将其作为固定节点
for i, n := range neps {
if n.String() == nle {
npin = i
break
}
}
default:
return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode)
}
//将记录节点列表和固定节点更新成获取后所处理过的
c.Lock()
defer c.Unlock()
c.endpoints = neps
c.pinned = npin
return nil
}
1. 功能api类
key api类
keys api接口定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//keys.go
type KeysAPI interface {
//获取相应键值对
Get(ctx context.Context, key string, opts *GetOptions) (*Response, error)
//设置一个键值对,可以配置set选项
Set(ctx context.Context, key, value string, opts *SetOptions) (*Response, error)
//删除一个键值对
Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error)
//本质上是一种特定的set,即当所设置的键不存在时set生效
Create(ctx context.Context, key, value string) (*Response, error)
//在一个给定目录下原子地创建一个键
CreateInOrder(ctx context.Context, dir, value string, opts *CreateInOrderOptions) (*Response, error)
//本质上是一种特定的set,即当所设置的键存在时set生效
Update(ctx context.Context, key, value string) (*Response, error)
//生成一个对应key的watcher
Watcher(key string, opts *WatcherOptions) Watcher
}
各种key操作所需的选项参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80//keys.go
type WatcherOptions struct {
//从所指定的revision索引后开始进行watch发送事件
AfterIndex uint64
//是否递归watch,即给定的watch的key底下的子目录发生事件是否也会watch
Recursive bool
}
type CreateInOrderOptions struct {
//节点的期限值
TTL time.Duration
}
type SetOptions struct {
/*
设置当前值,只有所要设置的key其对应的值为指定值时,set才成功
无设置时忽略,设置目录模式时该字段也被忽略
*/
PrevValue string
/*
设置当前revision值,只有当前的revision值为指定值时,set才成功
设置为0表示没有限制
*/
PrevIndex uint64
/*
表示当前键值是否需要存在,
如果需要存在,则只有指定键值对已存在set才成功
如果需要不存在,则只有指定键值对不存在set才成功
*/
PrevExist PrevExistType
//当前节点的期限值
TTL time.Duration
//设置为true表明TTL值能够在不触发watch或更新键值对时被更新,更新时不能设置值
Refresh bool
//表明当前设置的key是否为dir
Dir bool
//如果设置为true,表明只有当请求失败的时候响应才会包含当前值
NoValueOnSuccess bool
}
type GetOptions struct {
//是否需要将当前节点下的所有子节点都返回
Recursive bool
//获取到的结果是否按字典序排序
Sort bool
/*
相当于是否需要线性读
即只有集群大多数节点都确认了commit的键值对才会作为最终结果返回
*/
Quorum bool
}
type DeleteOptions struct {
//指定当前值,只有当前键值对为指定值时才delete成功
PrevValue string
//指定当前revision值,只有当前revision值为指定值时才delete成功
PrevIndex uint64
//是否递归删除节点下的所有子节点
Recursive bool
//是否为dir模式删除
Dir bool
}
未完待续…