1.初始化etcdServer流程:
代码路径为:github.com\coreos\etcd\embed\etcd.go
StartEtcd(inCfg *Config) (e *Etcd, err error)
流程如下:
1.1: 参数校验:inCfg.Validate()
校验关注点1:
checkBindURLs(cfg.LPUrls):校验
peer-urls
schem相关信息,且
在3.1版本之后不允许使用域名作为url来进行绑定操作。
checkBindURLs(cfg.LCUrls) : 校验
client-urls
schem相关信息,且
在3.1版本之后不允许使用域名作为url来进行绑定操作。
使用域名对于性能上是有一定的影响,但是在实际生产环境中,是存在使用域名的场景,需要修改如下代码进行适配:
func checkBindURLs(urls []url.URL) error { //...
if net.ParseIP(host) == nil {
//取消err的return,改为打印告警信息,同3.1之前版本。
return fmt.Errorf("expected IP in URL for binding (%s)", url.String())
}
}
校验关注点2:
由于实际现网的网络延迟各不相同,选举及心跳超时时间可作为调优适配的考虑范畴。
5*cfg.TickMs > cfg.ElectionMs :选举超时时间必须大于五倍于心跳超时时间。
cfg.ElectionMs > maxElectionMs:选举超时时间必须小于5000ms
1.2:初始化PeerListeners,ClientListeners,用于监听peers间及client端发送的http请求
PeerListeners:
作为etcd member之间进行通信使用的listeners,
为了性能考量,建议内部使用schema:http
,由flag "
listen-peer-urls
"确定,
ClientListeners:
作为接受外部请求的listerners,
一般为了安全性考量,一般使用 schema:https
,由flag "
listen-client-urls
"确定,
具体方法实现为:
transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
默认的读写超时均为5s:
ConnReadTimeout = 5 * time.Second ConnWriteTimeout = 5 * time.Second
1.3: 获取PeerURLsMap以及cluster token
1.4: 生成new etcdServer所需的的ServerConfig结构体:
// ServerConfig holds the configuration of etcd as taken from the command line or discovery. type ServerConfig struct {
// etcdserver 名称,对应flag "name“
Name string
// etcd 用于服务发现,无需知道具体etcd节点ip即可访问etcd 服务,对应flag "discovery"
DiscoveryURL string
// 供服务发现url的代理地址, 对应flag "discovery-proxy"
DiscoveryProxy string
// 由ip+port组成,默认DefaultListenClientURLs = "http://localhost:2379";
// 实际情况使用https schema,用以外部etcd client访问,对应flag "listen-client-urls"
ClientURLs types.URLs
// 由ip+port组成,默认DefaultListenPeerURLs = "http://localhost:2380";
// 实际生产环境使用http schema, 供etcd member 通信,对应flag "peer-client-urls"
PeerURLs types.URLs
// 数据目录地址,为全路径,对应flag "data-dir"
DataDir string
// DedicatedWALDir config will make the etcd to write the WAL to the WALDir
// rather than the dataDir/member/wal.
DedicatedWALDir string
// 默认是10000次事件做一次快照:DefaultSnapCount = 100000
// 可以作为调优参数进行参考,对应flag "snapshot-count",
SnapCount uint64
// 默认是5,这是v2的参数,v3内只有一个db文件,
// DefaultMaxSnapshots = 5,对应flag "max-snapshots"
MaxSnapFiles uint
// 默认是5,DefaultMaxWALs = 5,表示最大存储wal文件的个数,
// 对应flag "max-wals",保留的文件可以作为etcd-dump-logs工具进行debug使用。
MaxWALFiles uint
// peerUrl 与 etcd name对应的map,由方法cfg.PeerURLsMapAndToken("etcd")生成。
InitialPeerURLsMap types.URLsMap
// etcd 集群token, 对应flang "initial-cluster-token"
InitialClusterToken string
// 确定是否为新建集群,对应flag "initial-cluster-state",
// 由方法func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }确定;
NewCluster bool
// 对应flag "force-new-cluster",默认为false,若为true,
// 在生产环境内,一般用于含v2数据的集群恢复,
// 效果为以现有数据或者空数据新建一个单节点的etcd集群,
// 如果存在数据,则会清楚数据内的元数据信息,并重建只包含该etcd的元数据信息。
ForceNewCluster bool
// member间通信使用的证书信息,若peerURL为https时使用,对应flag "peer-ca-file","peer-cert-file", "peer-key-file"
PeerTLSInfo transport.TLSInfo
// raft node 发送心跳信息的超时时间。 "heartbeat-interval"
TickMs uint
// raft node 发起选举的超时时间,最大为5000ms maxElectionMs = 50000,
// 对应flag "election-timeout",
// 选举时间与心跳时间在最佳实践内建议是10倍关系。
ElectionTicks int
// etcd server启动的超时时间,默认为1s,
// 由方法func (c *ServerConfig) bootstrapTimeout() time.Duration确定;
BootstrapTimeout time.Duration
// 默认为0,单位为小时,主要为了方便用户快速查询,
// 定时对key进行合并处理,对应flag "auto-compaction-retention",
// 由方法func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic确定,
// 具体compact的实现方法为:
//func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
AutoCompactionRetention int
// etcd后端数据文件的大小,默认为2GB,最大为8GB, v3的参数,
// 对应flag "quota-backend-bytes" ,具体定义:etcd\etcdserver\quota.go
QuotaBackendBytes int64
StrictReconfigCheck bool
// ClientCertAuthEnabled is true when cert has been signed by the client CA.
ClientCertAuthEnabled bool
AuthToken string
}
1.5, 调用方法
func NewServer(cfg *ServerConfig) (srv *EtcdServer, err
error)
初始化etcdServer:
// NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
1.5.1: 分配内存空间
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
1.5.2: 检测并生成数据目录,生成向远端raft node peer listeners发送请求的Transport
其中的超时时间计算方法为:
time.Second + time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond/5
1.5.3:
根据日志目录是否存在,对应生成raft node实体。
1.5.3.1: 若日志目录不存在且flag "initial-cluster-state"为'existing':
case !haveWAL && !cfg.NewCluster:
使用方法
func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL)
生成raft node实体
id, n, s, w = startNode(cfg, cl, nil)
1.5.3.2: 若日志目录不存在且flag "initial-cluster-state"为'new':
case !haveWAL && cfg.NewCluster:
使用方法
func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL)
生成raft node实体
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
1.5.3.3 若日志目录存在:
1.5.3.3.1 若flag "force-new-cluster" 为"false":
调用方法
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL)
生成raft node实体
id, cl, n, s, w = restartNode(cfg, snapshot)
1.5.3.3.2 若flag "force-new-cluster" 为"true":
调用方法
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL)
生成raft node实体
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
1.5.4 初始化EtcdServer:
srv = &EtcdServer{ readych: make(chan struct{}),
Cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
snapshotter: ss,
r: *newRaftNode(
raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
},
),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
....
}
在初始化EtcdServer过程中,会启动用于peer间发送及接收raft 消息的rafthttp transport,具体方法如下:
func (t *Transport) Start() error { var err error
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
return nil
2.1. 启动etcdServer
3.1.
为每个client url及peer url 启动一个client server的goroutine,以提供监听服务
,这个动作在raft http transport启动之后:
peer server goroutine:
go func(l *peerListener) { e.errHandler(l.serve())
}(pl)
client server goroutine:
go func(s *serveCtx) { e.errHandler(s.serve(e.Server, ctlscfg, v2h, e.errHandler))
}(sctx)
若启动失败,则停止grpcServer:
defer func() { ...
if !serving {
// errored before starting gRPC server for serveCtx.grpcServerC
for _, sctx := range e.sctxs {
close(sctx.grpcServerC)
}
}
...
}()
暂时就启动流程进行粗略分享,后续将进一步分析 etcdServer 启动具体机制,及针对NewServer内针对生成raft node详细机制进行分析及基于k8s平台部署etcd 集群备份恢复方案进行探讨。