背景
以太坊技术搭建的区块链网络,节点间需要保证时间一致,才能正常有序的发送交易和生成区块,使得众多节点共同维护分布式账本(区块数据+状态数据)。但是,网络中节点的系统时间不一致回出现什么现象呢,我们通过2个例子来分析一下。
问题现象:
1. 以太坊节点启动时,报出时间未同步的警告日志:
WARN [03-20|17:20:54] System clock seems off by -1m7.049442998s, which can prevent network connectivity
WARN [03-20|17:20:54] Please enable network time synchronisation in system settings.
2. 以太坊节点间进行区块同步时,出现回滚和同步失败的警告日志:
WARN [03-19|18:07:10] Rolled back headers count=181 header=72832->72832 fast=74752->74752 block=0->0
WARN [03-19|18:07:10] Synchronisation failed, dropping peer peer=d3f5b2f51ba7acc6 err="retrieved hash chain is invalid"
问题分析:
我们先来看看日志信息:
DEBUG[D 03-20|18:19:06] Synchronising with the network peer=d3f5b2f51ba7acc6 eth=63 head=fea6e3…5abc2d td=533542548562 mode=full DEBUG[03-20|18:19:06] Retrieving remote chain height peer=d3f5b2f51ba7acc6 DEBUG[03-20|18:19:06] Fetching batch of headers id=d3f5b2f51ba7acc6 conn=staticdial count=1 fromhash=fea6e3…5abc2d skip=0 reverse=false DEBUG[03-20|18:19:06] Remote head header identified peer=d3f5b2f51ba7acc6 number=80698 hash=fea6e3…5abc2d DEBUG[03-20|18:19:06] Looking for common ancestor peer=d3f5b2f51ba7acc6 local=80690 remote=80698 DEBUG[03-20|18:19:06] Fetching batch of headers id=d3f5b2f51ba7acc6 conn=staticdial count=13 fromnum=80498 skip=15 reverse=false DEBUG[03-20|18:19:06] Found common ancestor peer=d3f5b2f51ba7acc6 number=80690 hash=4cba72…c3fce5 DEBUG[03-20|18:19:06] Directing header downloads peer=d3f5b2f51ba7acc6 origin=80691 DEBUG[03-20|18:19:06] Downloading block bodies origin=80691 DEBUG[03-20|18:19:06] Downloading transaction receipts origin=80691 DEBUG[03-20|18:19:06] Fetching batch of headers id=d3f5b2f51ba7acc6 conn=staticdial count=128 fromnum=80882 skip=191 reverse=false DEBUG[03-20|18:19:06] Fetching batch of headers id=d3f5b2f51ba7acc6 conn=staticdial count=192 fromnum=80691 skip=0 reverse=false DEBUG[03-20|18:19:06] Fetching batch of headers id=d3f5b2f51ba7acc6 conn=staticdial count=192 fromnum=80699 skip=0 reverse=false DEBUG[03-20|18:19:06] Inserting downloaded chain items=8 firstnum=80691 firsthash=4a10d7…4b050e lastnum=80698 lasthash=fea6e3…5abc2d DEBUG[03-20|18:19:06] Trie cache stats after commit misses=285 unloads=0 DEBUG[03-20|18:19:06] Dereferenced trie from memory database nodes=4 size=1.20kB time=11.957µs gcnodes=18 gcsize=5.92kB gctime=68.067µs livenodes=528 livesize=149.81kB DEBUG[03-20|18:19:06] Inserted new block number=80691 hash=4a10d7…4b050e uncles=0 txs=0 gas=0 elapsed=10.024ms DEBUG[03-20|18:19:06] Trie cache stats after commit misses=285 unloads=0 DEBUG[03-20|18:19:06] Dereferenced trie from memory database nodes=5 size=1.23kB time=11.865µs gcnodes=23 gcsize=7.16kB gctime=79.686µs livenodes=526 livesize=149.56kB DEBUG[03-20|18:19:06] Inserted new block number=80692 hash=c0f08c…f3196f uncles=0 txs=0 gas=0 elapsed=1.194ms DEBUG[03-20|18:19:06] Downloaded item processing failed number=80694 hash=5802eb…4fc564 err="future block: 1521541188 > 1521541176" DEBUG[03-20|18:19:06] Header download terminated peer=d3f5b2f51ba7acc6 DEBUG[03-20|18:19:06] Transaction receipt download terminated err="receipt download canceled (requested)" DEBUG[03-20|18:19:06] Block body download terminated err="block body download canceled (requested)" DEBUG[03-20|18:19:06] Synchronisation terminated elapsed=102.384528ms WARN [03-20|18:19:06] Synchronisation failed, dropping peer peer=d3f5b2f51ba7acc6 err="retrieved hash chain is invalid" DEBUG[03-20|18:19:06] Removing Ethereum peer peer=d3f5b2f51ba7acc6 DEBUG[03-20|18:19:06] Ethereum message handling failed id=d3f5b2f51ba7acc6 conn=staticdial err=EOF DEBUG[03-20|18:19:06] Removing p2p peer id=d3f5b2f51ba7acc6 conn=staticdial duration=7.895s peers=0 req=false err="useless peer" DEBUG[03-20|18:19:09] Adding p2p peer name=Geth/v1.8.0-unstable... addr=128.32.37.209:40404 peers=1 DEBUG[03-20|18:19:09] Ethereum peer connected id=6bf05c91058746aa conn=dyndial name=Geth/v1.8.0-unstable/linux-amd64/go1.9.1 DEBUG[03-20|18:19:09] Ethereum handshake failed id=6bf05c91058746aa conn=dyndial err="Genesis block mismatch - 5e31ddf9f93f71ee (!= 29ff4c23644fa9ee)" DEBUG[03-20|18:19:09] Removing p2p peer id=6bf05c91058746aa conn=dyndial duration=179.772ms peers=0 req=false err="Genesis block mismatch - 5e31ddf9f93f71ee (!= 29ff4c23644fa9ee)" DEBUG[03-20|18:19:11] Removed dead node b=12 id=35f339215dc5d4ae ip=83.83.179.65 DEBUG[03-20|18:19:14] Trie cache stats after commit misses=285 unloads=0 DEBUG[03-20|18:19:14] Dereferenced trie from memory database nodes=1 size=564.00B time=13.063µs gcnodes=24 gcsize=7.72kB gctime=92.277µs livenodes=529 livesize=150.20kB DEBUG[03-20|18:19:14] Inserted new block number=80693 hash=01b472…8b7890 uncles=0 txs=0 gas=0 elapsed=28.188ms INFO [03-20|18:19:14] Imported new chain segment blocks=1 txs=0 mgas=0.000 elapsed=28.317ms mgasps=0.000 number=80693 hash=01b472…8b7890 cache=150.62kB INFO [03-20|18:19:14] Mining too far in the future wait=14s DEBUG[03-20|18:19:14] Reinjecting stale transactions count=0 DEBUG[03-20|18:19:16] Removed dead node b=15 id=5dabdd5adb72cb1a ip=36.110.209.136 DEBUG[03-20|18:19:23] Found seed node in database id=71e3445d0b9af99a addr=218.85.66.28:30399 age=8h25m6.047057248s DEBUG[03-20|18:19:23] Found seed node in database id=ba1ae648ebbd893d addr=106.167.183.143:30303 age=16h13m35.047096949s
> > eth.getBlock(80694) { difficulty: 9770747, extraData: "0xd983010802846765746887676f312e392e328777696e646f7773", gasLimit: 4712388, gasUsed: 0, hash: "0x5802ebeb71ce647dd874b5c7d20130ea73e7f81ca71a1d86b78adce4a14fc564", logsBloom: "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", miner: "0x609d020ed7873ba9d5732d67109546365ef3c49c", mixHash: "0x1f10d92fb51d6d7c522f3b1ac6f79bca752c04ae4b01381a9a2193c41c896beb", nonce: "0x0255559b5d0e03c3", number: 80694, parentHash: "0x01b472d3ed73290ba95428179b958be6d4ffec4f790f0ebe417075f11e8b7890", receiptsRoot: "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", sha3Uncles: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", size: 540, stateRoot: "0xf32f401b3c4ed1f14f22108c2f47e9cf77c4565738482459a84cb572b9ca79de", timestamp: 1521541188, totalDifficulty: 533503417848, transactions: [], transactionsRoot: "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", uncles: [] }
我们可以看到此区块的时间戳为:1521541188,即时间是Tue Mar 20 2018 18:19:48 GMT+0800 (CST)。
但是,我们可以通过日志发现处理此块时,本节点的系统时间为18:19:06,区块的生成时间比本节点系统时间晚了42秒,其时间差超过了所允许的时间跨度(常量maxTimeFutureBlocks的默认值为30秒)
代码分析
在分析此问题时,需要查看的以太坊相关源代码才能了解其原理,有兴趣的读者可以一起来阅读一下。我将本文涉及的2处异常的直接相关的代码标记为红色。
节点启动时检查系统时间的逻辑:
go-ethereum/p2p/discover/ntp.go
// checkClockDrift queries an NTP server for clock drifts and warns the user if // one large enough is detected. func checkClockDrift() { drift, err := sntpDrift(ntpChecks) if err != nil { return } if drift < -driftThreshold || drift > driftThreshold { log.Warn(fmt.Sprintf("System clock seems off by %v, which can prevent network connectivity", drift)) log.Warn("Please enable network time synchronisation in system settings.") } else { log.Debug("NTP sanity check done", "drift", drift) } }
ntp.go的checkClockDrift函数所依赖的一些变量如下:
https://github.com/ethereum/go-ethereum/blob/9fd76e33af367752160ab0e33d1097e1e9aff6e4/p2p/discover/udp.go
// Timeouts const ( respTimeout = 500 * time.Millisecond sendTimeout = 500 * time.Millisecond expiration = 20 * time.Second ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning driftThreshold = 10 * time.Second // Allowed clock drift before warning user )
区块同步逻辑:
接收对等节点的区块报文后,开始导入本地账本。
func (d *Downloader) importBlockResults(results []*fetchResult) error { // Check for any early termination requests if len(results) == 0 { return nil } select { case <-d.quitCh: return errCancelContentProcessing default: } // Retrieve the a batch of results to import first, last := results[0].Header, results[len(results)-1].Header log.Debug("Inserting downloaded chain", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), "lastnum", last.Number, "lasthash", last.Hash(), ) blocks := make([]*types.Block, len(results)) for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } if index, err := d.blockchain.InsertChain(blocks); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } return nil }
区块加入本地账本的主逻辑:
...... const ( bodyCacheLimit = 256 blockCacheLimit = 256 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 badBlockLimit = 10 triesInMemory = 128 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. BlockChainVersion = 3 ) ...... // insertChain will execute the actual chain insertion and event aggregation. The // only reason this method exists as a separate one is to make locking cleaner // with deferred statements. func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) { // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { // Chain broke ancestry, log a messge (programming error) and skip insertion log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) } } // Pre-checks passed, start the full block imports bc.wg.Add(1) defer bc.wg.Done() bc.chainmu.Lock() defer bc.chainmu.Unlock() // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. var ( stats = insertStats{startTime: mclock.Now()} events = make([]interface{}, 0, len(chain)) lastCanon *types.Block coalescedLogs []*types.Log ) // Start the parallel header verifier headers := make([]*types.Header, len(chain)) seals := make([]bool, len(chain)) for i, block := range chain { headers[i] = block.Header() seals[i] = true } abort, results := bc.engine.VerifyHeaders(bc, headers, seals) defer close(abort) // Iterate over the blocks and insert when the verifier permits for i, block := range chain { // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") break } // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) return i, events, coalescedLogs, ErrBlacklistedHash } // Wait for the block's verification to complete bstart := time.Now() err := <-results if err == nil { err = bc.Validator().ValidateBody(block) } switch { case err == ErrKnownBlock: // Block and state both already known. However if the current block is below // this number we did a rollback and we should reimport it nonetheless. if bc.CurrentBlock().NumberU64() >= block.NumberU64() { stats.ignored++ continue } case err == consensus.ErrFutureBlock: // Allow up to MaxFuture second in the future blocks. If this limit is exceeded // the chain is discarded and processed at a later time if given. max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) if block.Time().Cmp(max) > 0 { return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) } bc.futureBlocks.Add(block.Hash(), block) stats.queued++ continue case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): bc.futureBlocks.Add(block.Hash(), block) stats.queued++ continue case err == consensus.ErrPrunedAncestor: // Block competing with the canonical chain, store in the db, but don't process // until the competitor TD goes above the canonical TD currentBlock := bc.CurrentBlock() localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) if localTd.Cmp(externTd) > 0 { if err = bc.WriteBlockWithoutState(block, externTd); err != nil { return i, events, coalescedLogs, err } continue } // Competitor chain beat canonical, gather all blocks from the common ancestor var winner []*types.Block parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) for !bc.HasState(parent.Root()) { winner = append(winner, parent) parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) } for j := 0; j < len(winner)/2; j++ { winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] } // Import all the pruned blocks to make the state available bc.chainmu.Unlock() _, evs, logs, err := bc.insertChain(winner) bc.chainmu.Lock() events, coalescedLogs = evs, logs if err != nil { return i, events, coalescedLogs, err } case err != nil: bc.reportBlock(block, nil, err) return i, events, coalescedLogs, err } // Create a new statedb using the parent block and report an // error if it fails. var parent *types.Block if i == 0 { parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) } else { parent = chain[i-1] } state, err := state.New(parent.Root(), bc.stateCache) if err != nil { return i, events, coalescedLogs, err } // Process block using the parent state as reference point. receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) return i, events, coalescedLogs, err } // Validate the state using the default validator err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) if err != nil { bc.reportBlock(block, receipts, err) return i, events, coalescedLogs, err } proctime := time.Since(bstart) // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { return i, events, coalescedLogs, err } switch status { case CanonStatTy: log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) coalescedLogs = append(coalescedLogs, logs...) blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block // Only count canonical blocks for GC processing time bc.gcproc += proctime case SideStatTy: log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) blockInsertTimer.UpdateSince(bstart) events = append(events, ChainSideEvent{block}) } stats.processed++ stats.usedGas += usedGas stats.report(chain, i, bc.stateCache.TrieDB().Size()) } // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } return 0, events, coalescedLogs, nil }
区块头部校验逻辑:
// verifyHeader checks whether a header conforms to the consensus rules.The // caller may optionally pass in a batch of parents (ascending order) to avoid // looking those up from the database. This is useful for concurrently verifying // a batch of new headers. func (c *Clique) verifyHeader(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { if header.Number == nil { return errUnknownBlock } number := header.Number.Uint64() // Don't waste time checking blocks from the future if header.Time.Cmp(big.NewInt(time.Now().Unix())) > 0 { return consensus.ErrFutureBlock } // Checkpoint blocks need to enforce zero beneficiary checkpoint := (number % c.config.Epoch) == 0 if checkpoint && header.Coinbase != (common.Address{}) { return errInvalidCheckpointBeneficiary } // Nonces must be 0x00..0 or 0xff..f, zeroes enforced on checkpoints if !bytes.Equal(header.Nonce[:], nonceAuthVote) && !bytes.Equal(header.Nonce[:], nonceDropVote) { return errInvalidVote } if checkpoint && !bytes.Equal(header.Nonce[:], nonceDropVote) { return errInvalidCheckpointVote } // Check that the extra-data contains both the vanity and signature if len(header.Extra) < extraVanity { return errMissingVanity } if len(header.Extra) < extraVanity+extraSeal { return errMissingSignature } // Ensure that the extra-data contains a signer list on checkpoint, but none otherwise signersBytes := len(header.Extra) - extraVanity - extraSeal if !checkpoint && signersBytes != 0 { return errExtraSigners } if checkpoint && signersBytes%common.AddressLength != 0 { return errInvalidCheckpointSigners } // Ensure that the mix digest is zero as we don't have fork protection currently if header.MixDigest != (common.Hash{}) { return errInvalidMixDigest } // Ensure that the block doesn't contain any uncles which are meaningless in PoA if header.UncleHash != uncleHash { return errInvalidUncleHash } // Ensure that the block's difficulty is meaningful (may not be correct at this point) if number > 0 { if header.Difficulty == nil || (header.Difficulty.Cmp(diffInTurn) != 0 && header.Difficulty.Cmp(diffNoTurn) != 0) { return errInvalidDifficulty } } // If all checks passed, validate any special fields for hard forks if err := misc.VerifyForkHashes(chain.Config(), header, false); err != nil { return err } // All basic checks passed, verify cascading fields return c.verifyCascadingFields(chain, header, parents) }
解决方案:
同步系统时间,以Ubuntu为例:
sudo apt install ntpdate sudo ntpdate 0.asia.pool.ntp.org