viper配置框架的介绍支持zookeeper的读取和监听

时间:2022-08-31 20:14:43

viper作为配置框架,其功能非常的强大,我们没有理由不去了解一下。我们先看官网对它的功能简介:

viper是完整配置解决方案,他可以处理所有类型和格式的配置文件,他有如下功能:

  • 设置默认配置
  • 支持读取 JSON TOML YAML HCL 和 Java 属性配置文件
  • 监听配置文件变化,实时读取读取配置文件内容
  • 读取环境变量值
  • 读取远程配置系统 (etcd Consul) 和监控配置变化
  • 读取命令 Flag 值
  • 读取 buffer 值
  • 读取确切值

乍一看,未免有相见恨晚之感,可仔细一想,不免脑袋里有另外一种声音:不会不支持读取 zookeeper 吧?好吧,至少我是这样的。

基于这种想法,当然要去立马尝试,如下:

?
1
viper.AddRemoteProvider("zookeeper", "xx.xx.xx.xx:2181", "/viper/test")

返回结果是:

Unsupported Remote Provider Type zookeeper

果不其然,于是追踪 viper.AddRemoteProvider 的源码,发现viper只支持如下几种

?
1
var SupportedRemoteProviders = []string{"etcd", "consul", "firestore"}

如果就此打住,未免有点太可惜,作为偏执狂,总想着能否来改造下viper,让其支持 zookeeper ,于是在issue上找是否有人遇到同样的问题,还整让我找到了, 传送 。但是不完整,且稍微有点bug。所以根据他的基础上,我做了些调整。进入正题,我们开始修改viper源码。说明下,我的viper版本是最新的 1.7.0

修改源码

1、添加zookeeper.go

添加的位置: github.com/bketelsen/crypt/zookeeper , zookeeper 目录需要自己创建, github.com/bketelsen/crypt 是viper的依赖包,会自动下载

viper配置框架的介绍支持zookeeper的读取和监听

文件内容:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package zookeeper
 
import (
    "errors"
    "fmt"
    zk "github.com/samuel/go-zookeeper/zk"
    //"github.com/xordataexchange/crypt/backend"
    "github.com/bketelsen/crypt/backend"
    "strings"
    "time"
)
 
type Client struct {
    client *zk.Conn
    waitIndex uint64
}
 
func New(machines []string) (*Client, error) {
    zkclient, _, err := zk.Connect(machines, time.Second)
    if err != nil {
        return nil, err
    }
    return &Client{zkclient, 0}, nil
}
 
func (c *Client) Get(key string) ([]byte, error) {
    resp, _, err := c.client.Get(key)
    if err != nil {
        return nil, err
    }
    return []byte(resp), nil
}
 
func nodeWalk(prefix string, c *Client, vars map[string]string) error {
    l, stat, err := c.client.Children(prefix)
    if err != nil {
        return err
    }
 
    if stat.NumChildren == 0 {
        b, _, err := c.client.Get(prefix)
        if err != nil {
            return err
        }
        vars[prefix] = string(b)
 
    } else {
        for _, key := range l {
            s := prefix + "/" + key
            _, stat, err := c.client.Exists(s)
            if err != nil {
                return err
            }
            if stat.NumChildren == 0 {
                b, _, err := c.client.Get(s)
                if err != nil {
                    return err
                }
                vars[s] = string(b)
            } else {
                nodeWalk(s, c, vars)
            }
        }
    }
    return nil
}
 
func (c *Client) GetValues(key string, keys []string) (map[string]string, error) {
    vars := make(map[string]string)
    for _, v := range keys {
        v = fmt.Sprintf("%s/%s", key, v)
        v = strings.Replace(v, "/*", "", -1)
        _, _, err := c.client.Exists(v)
        if err != nil {
            return vars, err
        }
        if v == "/" {
            v = ""
        }
        err = nodeWalk(v, c, vars)
        if err != nil {
            return vars, err
        }
    }
    return vars, nil
}
 
func (c *Client) List(key string) (backend.KVPairs, error) {
    var list backend.KVPairs
    resp, stat, err := c.client.Children(key)
    if err != nil {
        return nil, err
    }
 
    if stat.NumChildren == 0 {
        return list, nil
    }
 
    entries, err := c.GetValues(key, resp)
    if err != nil {
        return nil, err
    }
 
    for k, v := range entries {
        list = append(list, &backend.KVPair{Key: k, Value: []byte(v)})
    }
    return list, nil
}
 
func (c *Client) createParents(key string) error {
    flags := int32(0)
    acl := zk.WorldACL(zk.PermAll)
 
    if key[0] != '/' {
        return errors.New("Invalid path")
    }
 
    payload := []byte("")
    pathString := ""
    pathNodes := strings.Split(key, "/")
    for i := 1; i < len(pathNodes); i++ {
        pathString += "/" + pathNodes[i]
        _, err := c.client.Create(pathString, payload, flags, acl)
        // not being able to create the node because it exists or not having
        // sufficient rights is not an issue. It is ok for the node to already
        // exist and/or us to only have read rights
        if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
            return err
        }
    }
    return nil
}
 
func (c *Client) Set(key string, value []byte) error {
    err := c.createParents(key)
    if err != nil {
        return err
    }
    _, err = c.client.Set(key, []byte(value), -1)
    return err
}
 
func (c *Client) Watch(key string, stop chan bool) <-chan *backend.Response {
    respChan := make(chan *backend.Response, 0)
    go func() {
        for {
            resp, _, watch, err := c.client.GetW(key)
            if err != nil {
                respChan <- &backend.Response{nil, err}
                time.Sleep(time.Second * 5)
            }
 
            select {
            case e := <-watch:
                if e.Type == zk.EventNodeDataChanged {
                    resp, _, err = c.client.Get(key)
                    if err != nil {
                        respChan <- &backend.Response{nil, err}
                    }
                    c.waitIndex = 0
                    respChan <- &backend.Response{[]byte(resp), nil}
                }
            }
        }
    }()
    return respChan
}

这个文件是实现 ConfigManager 接口,我们在上图中看到 etcdconsulfilestore ,均有实现该接口,接口的定义很简单

?
1
2
3
4
5
6
type ConfigManager interface {
    Get(key string) ([]byte, error)
    List(key string) (KVPairs, error)
    Set(key string, value []byte) error
    Watch(key string, stop chan bool) <-chan *Response
}

2、修改config.go

文件的位置: github.com/bketelsen/crypt/config/config.go ,如下图

 

viper配置框架的介绍支持zookeeper的读取和监听

func NewStandardEtcdConfigManager(machines []string) (ConfigManager, error) 方法下面添加如下方法:

?
1
2
3
4
5
6
7
8
9
// NewStandardZookeeperConfigManager returns a new ConfigManager backed by Zookeeper.
// Data will be encrypted.
func NewStandardZookeeperConfigManager(machines []string) (ConfigManager, error) {
    store, err := zookeeper.New(machines)
    if err != nil {
        return nil, err
    }
    return NewStandardConfigManager(store)
}

func NewEtcdConfigManager(machines []string, keystore io.Reader) (ConfigManager, error) 方法下面添加如下方法:

?
1
2
3
4
5
6
7
8
9
// NewZookeeperConfigManager returns a new ConfigManager backed by zookeeper.
// Data will be encrypted.
func NewZookeeperConfigManager(machines []string, keystore io.Reader) (ConfigManager, error) {
    store, err := zookeeper.New(machines)
    if err != nil {
        return nil, err
    }
    return NewConfigManager(store, keystore)
}

这两个方法是初始化 ConfigManager 对象,也就是我们刚才添加的 zookeeper.go 文件的对象

3、修改remote.go

文件的位置: github.com/spf13/viper/remote/remote.go ,如下图

 

viper配置框架的介绍支持zookeeper的读取和监听

找到74行,用下面的代码替换 func getConfigManager(rp viper.RemoteProvider) (crypt.ConfigManager, error) 方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func getConfigManager(rp viper.RemoteProvider) (crypt.ConfigManager, error) {
    var cm crypt.ConfigManager
    var err error
 
    if rp.SecretKeyring() != "" {
        var kr *os.File
        kr, err = os.Open(rp.SecretKeyring())
        if err != nil {
            return nil, err
        }
        defer kr.Close()
        switch rp.Provider() {
        case "etcd":
            cm, err = crypt.NewEtcdConfigManager([]string{rp.Endpoint()}, kr)
        case "zookeeper":
            cm, err = crypt.NewZookeeperConfigManager([]string{rp.Endpoint()}, kr)
        case "firestore":
            cm, err = crypt.NewFirestoreConfigManager([]string{rp.Endpoint()}, kr)
        default:
            cm, err = crypt.NewConsulConfigManager([]string{rp.Endpoint()}, kr)
        }
    } else {
        switch rp.Provider() {
        case "etcd":
            cm, err = crypt.NewStandardEtcdConfigManager([]string{rp.Endpoint()})
        case "zookeeper":
            cm, err = crypt.NewStandardZookeeperConfigManager([]string{rp.Endpoint()})
        case "firestore":
            cm, err = crypt.NewStandardFirestoreConfigManager([]string{rp.Endpoint()})
        default:
            cm, err = crypt.NewStandardConsulConfigManager([]string{rp.Endpoint()})
        }
    }
    if err != nil {
        return nil, err
    }
    return cm, nil
}

细心的读者可能已经发现,其实就添加了两个case选项:

viper配置框架的介绍支持zookeeper的读取和监听

4、修改viper.go

文件的位置: github.com/spf13/viper/viper.go ,如下图

 

viper配置框架的介绍支持zookeeper的读取和监听

取+监听zookeeper(1)\image-20200521222843002.png)

找到两个 SupportedRemoteProviders 定义的定法,1.7.0版本的行号分别是:290,331。只要添加 zookeeper ,即可

?
1
SupportedRemoteProviders = []string{"etcd", "consul", "firestore", "zookeeper"}

好了,修改代码的工作已经完了,接下来我们来测试:

测试

注意:zookeeper中已经设置了内容

set /viper/test {"appName":"test","nodes":["127.0.0.1","127.0.0.2","127.0.0.3"]}

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main
 
import (
    "fmt"
    "github.com/fsnotify/fsnotify"
    "github.com/spf13/viper"
    _ "github.com/spf13/viper/remote"
    "time"
)
 
 
type config struct {
    AppName string
    Nodes []string
}
 
func main() {
    var waitGroup=sync.WaitGroup{}
    waitGroup.Add(1)
    readRemoteZookeeper()
    go watchRemoteZookeeper()
    waitGroup.Wait()
}
 
func readRemoteZookeeper() {
    viper.AddRemoteProvider("zookeeper", "62.234.15.24:2181", "/viper/test")
    viper.SetConfigType("json")
    err := viper.ReadRemoteConfig()
    if err != nil {
        panic(fmt.Sprintf("read remote zookeeper error:+%v", err))
    }
 
    var C config
    viper.Unmarshal(&C)
    fmt.Printf("从zookeeper读取配置内容:%+v\n", C)
}
 
func watchRemoteZookeeper() {
    go func() {
        for {
  //delay after each request
            time.Sleep(time.Second * 5)
            err := viper.WatchRemoteConfig()
            if err != nil {
                fmt.Errorf("unable to read remote config: %v", err)
                continue
            }
            fmt.Printf("从zookeeper读取更新内容:appName=%s,nodes=%+v\n", viper.Get("appName"), viper.Get("nodes"))
        }
    }()
}

输出内容:

从zookeeper读取配置内容:{AppName:test Nodes:[127.0.0.1 127.0.0.2 127.0.0.3]}
从zookeeper读取更新内容:appName=test,nodes=[127.0.0.1 127.0.0.2 127.0.0.3]

如果我们修改zookeeper的内容,则viper会读取到更新后的内容:

?
1
set /viper/test {"appName":"test","nodes":["127.0.0.1","127.0.0.2","127.0.0.3","127.0.0.4"]}
?
1
从zookeeper读取更新内容:appName=test,nodes=[127.0.0.1 127.0.0.2 127.0.0.3 127.0.0.4]

结语

让viper支持 zookeeper 并不复杂的,并且基本上不需要修改原有的方法, 这要归结于viper用到一个非常重要的设计原则: 开闭原则 ,读者可以自行体会。

关于viper的基本使用, github 已经有非常详细的例子,这里就不再赘述,如有疑问,可以私信我

到此这篇关于viper配置框架的介绍支持zookeeper的读取和监听的文章就介绍到这了,更多相关viper配置框架支持zookeeper的读取和监听内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://studygolang.com/articles/28829