函数的完整代码如下:
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if raft.IsEmptySnap(apply.snapshot) {
return
}
applySnapshotInProgress.Inc()
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
applySnapshotInProgress.Dec()
}()
if apply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
}
// wait for raftNode to persist snapshot onto the disk
<-apply.notifyc
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err))
}
// We need to set the backend to consistIndex before recovering the lessor,
// because lessor.Recover will commit the boltDB transaction, accordingly it
// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
// Eventually the new consistent_index value coming from snapshot is overwritten
// by the old value.
s.consistIndex.SetBackend(newbe)
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
lg.Info("restoring lease store")
s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
lg.Info("restored lease store")
}
lg.Info("restoring mvcc store")
if err := s.kv.Restore(newbe); err != nil {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
// Closing old backend might block until all the txns
// on the backend are finished.
// We do not want to wait on closing the old backend.
s.bemu.Lock()
oldbe := s.be
go func() {
lg.Info("closing old backend file")
defer func() {
lg.Info("closed old backend file")
}()
if err := oldbe.Close(); err != nil {
lg.Panic("failed to close old backend", zap.Error(err))
}
}()
s.be = newbe
s.bemu.Unlock()
lg.Info("restoring alarm store")
if err := s.restoreAlarms(); err != nil {
lg.Panic("failed to restore alarm store", zap.Error(err))
}
lg.Info("restored alarm store")
if s.authStore != nil {
lg.Info("restoring auth store")
s.authStore.Recover(newbe)
lg.Info("restored auth store")
}
lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {
lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
lg.Panic("illegal v2store content", zap.Error(err))
}
lg.Info("restored v2 store")
s.cluster.SetBackend(newbe)
lg.Info("restoring cluster configuration")
s.cluster.Recover(api.UpdateCapability)
lg.Info("restored cluster configuration")
lg.Info("removing old peers from network")
// recover raft transport
s.r.transport.RemoveAllPeers()
lg.Info("removed old peers from network")
lg.Info("adding peers from new cluster configuration")
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
lg.Info("added peers from new cluster configuration")
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
}