Tendermint同步区块

最近在学习Tendermint的代码,记录下同步区块的流程,直接跳过P2P阶段,后续再写一篇文章记录P2P流程吧

blockchain/pool.go的OnStart()新建了gorountine来发起获取区块请求

func (pool *BlockPool) OnStart() error {
	go pool.makeRequestersRoutine()
	pool.startTime = time.Now()
	return nil
}

看下makeRequestersRoutine的代码

// Run spawns requesters as needed.
func (pool *BlockPool) makeRequestersRoutine() {

	for {
		if !pool.IsRunning() {
			break
		}

		_, numPending, lenRequesters := pool.GetStatus()
		if numPending >= maxPendingRequests {
			// sleep for a bit.
			time.Sleep(requestIntervalMS * time.Millisecond)
			// check for timed out peers
			pool.removeTimedoutPeers()
		} else if lenRequesters >= maxTotalRequesters {
			// sleep for a bit.
			time.Sleep(requestIntervalMS * time.Millisecond)
			// check for timed out peers
			pool.removeTimedoutPeers()
		} else {
			// request for more blocks.
			pool.makeNextRequester()
		}
	}
}

这里是一个for循环,获取当前pool的状态

1 如果当前pending的个数大于等于10000个,sleep 100ms,并且remove掉timeout的peer(remove timeout peer的逻辑是当前接收peer的bitrate小于10KB/S

2 如果当前request的个数大于等于10000个,sleep 100ms并且remove timeout的peer

3 新建一个请求

func (pool *BlockPool) makeNextRequester() {
	pool.mtx.Lock()
	defer pool.mtx.Unlock()

	nextHeight := pool.height + pool.requestersLen()
	request := newBPRequester(pool, nextHeight)
	// request.SetLogger(pool.Logger.With("height", nextHeight))

	pool.requesters[nextHeight] = request
	pool.numPending++

	err := request.Start()
	if err != nil {
		request.Logger.Error("Error starting request", "err", err)
	}
}
新建请求的时候rquester和pending都会+1,然后调用request.Start(),这个调用的是blockchain/pool.go的另外一个OnStart()
func (bpr *bpRequester) OnStart() error {
	go bpr.requestRoutine()
	return nil
}

看下requestRoutine的代码

func (bpr *bpRequester) requestRoutine() {
OUTER_LOOP:
	for {
		// Pick a peer to send request to.
		var peer *bpPeer = nil
	PICK_PEER_LOOP:
		for {
			if !bpr.IsRunning() || !bpr.pool.IsRunning() {
				return
			}
			peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
			if peer == nil {
				//log.Info("No peers available", "height", height)
				time.Sleep(requestIntervalMS * time.Millisecond)
				continue PICK_PEER_LOOP
			}
			break PICK_PEER_LOOP
		}
		bpr.mtx.Lock()
		bpr.peerID = peer.id
		bpr.mtx.Unlock()

		// Send request and wait.
		bpr.pool.sendRequest(bpr.height, peer.id)
		...
	}
}

pickIncrAvailablePeer的作用是找到有高度(bpr.height)的节点

sendRequest其实是写了一个channel

func (pool *BlockPool) sendRequest(height int64, peerID string) {
	if !pool.IsRunning() {
		return
	}
	pool.requestsCh <- BlockRequest{height, peerID}
}

接着找requestsCh读取的地方,就在blockchain/reactor.go中:

func (bcR *BlockchainReactor) poolRoutine() {

	trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
	statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
	switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)

	blocksSynced := 0

	chainID := bcR.initialState.ChainID
	state := bcR.initialState

	lastHundred := time.Now()
	lastRate := 0.0

FOR_LOOP:
	for {
		select {
		case request := <-bcR.requestsCh: // chan BlockRequest
			peer := bcR.Switch.Peers().Get(request.PeerID)
			if peer == nil {
				continue FOR_LOOP // Peer has since been disconnected.
			}
			msg := &bcBlockRequestMessage{request.Height}
			queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
			if !queued {
				// We couldn't make the request, send-queue full.
				// The pool handles timeouts, just let it go.
				continue FOR_LOOP
			}
		case peerID := <-bcR.timeoutsCh: // chan string
			// Peer timed out.
			peer := bcR.Switch.Peers().Get(peerID)
			if peer != nil {
				bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
			}
		case <-statusUpdateTicker.C:
			// ask for status updates
			go bcR.BroadcastStatusRequest() // nolint: errcheck
		case <-switchToConsensusTicker.C:
			height, numPending, lenRequesters := bcR.pool.GetStatus()
			outbound, inbound, _ := bcR.Switch.NumPeers()
			bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
				"outbound", outbound, "inbound", inbound)
			if bcR.pool.IsCaughtUp() {
				bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
				bcR.pool.Stop()

				conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
				conR.SwitchToConsensus(state, blocksSynced)

				break FOR_LOOP
			}
		case <-trySyncTicker.C: // chan time
			// This loop can be slow as long as it's doing syncing work.
		SYNC_LOOP:
			for i := 0; i < 10; i++ {
				// See if there are any blocks to sync.
				first, second := bcR.pool.PeekTwoBlocks()
				//bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
				if first == nil || second == nil {
					// We need both to sync the first block.
					break SYNC_LOOP
				}
				firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes)
				firstPartsHeader := firstParts.Header()
				firstID := types.BlockID{first.Hash(), firstPartsHeader}
				// Finally, verify the first block using the second's commit
				// NOTE: we can probably make this more efficient, but note that calling
				// first.Hash() doesn't verify the tx contents, so MakePartSet() is
				// currently necessary.
				err := state.Validators.VerifyCommit(
					chainID, firstID, first.Height, second.LastCommit)
				if err != nil {
					bcR.Logger.Error("Error in validation", "err", err)
					bcR.pool.RedoRequest(first.Height)
					break SYNC_LOOP
				} else {
					bcR.pool.PopRequest()

					bcR.store.SaveBlock(first, firstParts, second.LastCommit)

					// NOTE: we could improve performance if we
					// didn't make the app commit to disk every block
					// ... but we would need a way to get the hash without it persisting
					var err error
					state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
					if err != nil {
						// TODO This is bad, are we zombie?
						cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
					}
					blocksSynced += 1

					// update the consensus params
					bcR.updateConsensusParams(state.ConsensusParams)

					if blocksSynced%100 == 0 {
						lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
						bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
							"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
						lastHundred = time.Now()
					}
				}
			}
			continue FOR_LOOP
		case <-bcR.Quit:
			break FOR_LOOP
		}
	}
}

1 收到reqeustCh的数据,发送bcBlockRequestMessage到对端节点(对端节点收到消息后返回高度的block)。是在本文件中的Receive()函数处理的

2 本函数有个trySyncTicker 50ms的timer一直在运行,作用是拿到两个已经从对端peer获取的block,经过VerifyCommit之后,SaveBlock存在tendermint内部,再调用ApplyBlock存到application

3 也有个statusUpdateTicker 10s的timer,作用是broadcast bcStatusRequestMessage去获取对端节点的block高度

4 还有个switchToConsensusTicker 1s的timer也在运行,作用是看本节点的block高度有没有大于等于其他节点的最大高度,如果到达了(也就是同步区块完成了)就SwitchToConsensus切换fast mode到consensus mode

阅读更多

更多精彩内容