最近在学习Tendermint的代码,记录下同步区块的流程,直接跳过P2P阶段,后续再写一篇文章记录P2P流程吧
blockchain/pool.go的OnStart()新建了gorountine来发起获取区块请求
func (pool *BlockPool) OnStart() error {
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}
看下makeRequestersRoutine的代码
// Run spawns requesters as needed.
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
break
}
_, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else if lenRequesters >= maxTotalRequesters {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else {
// request for more blocks.
pool.makeNextRequester()
}
}
}
这里是一个for循环,获取当前pool的状态
1 如果当前pending的个数大于等于10000个,sleep 100ms,并且remove掉timeout的peer(remove timeout peer的逻辑是当前接收peer的bitrate小于10KB/S
2 如果当前request的个数大于等于10000个,sleep 100ms并且remove timeout的peer
3 新建一个请求
func (pool *BlockPool) makeNextRequester() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
nextHeight := pool.height + pool.requestersLen()
request := newBPRequester(pool, nextHeight)
// request.SetLogger(pool.Logger.With("height", nextHeight))
pool.requesters[nextHeight] = request
pool.numPending++
err := request.Start()
if err != nil {
request.Logger.Error("Error starting request", "err", err)
}
}
新建请求的时候rquester和pending都会+1,然后调用request.Start(),这个调用的是blockchain/pool.go的另外一个OnStart()
func (bpr *bpRequester) OnStart() error {
go bpr.requestRoutine()
return nil
}
看下requestRoutine的代码
func (bpr *bpRequester) requestRoutine() {
OUTER_LOOP:
for {
// Pick a peer to send request to.
var peer *bpPeer = nil
PICK_PEER_LOOP:
for {
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
return
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
//log.Info("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
break PICK_PEER_LOOP
}
bpr.mtx.Lock()
bpr.peerID = peer.id
bpr.mtx.Unlock()
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
...
}
}
pickIncrAvailablePeer的作用是找到有高度(bpr.height)的节点
sendRequest其实是写了一个channel
func (pool *BlockPool) sendRequest(height int64, peerID string) {
if !pool.IsRunning() {
return
}
pool.requestsCh <- BlockRequest{height, peerID}
}
接着找requestsCh读取的地方,就在blockchain/reactor.go中:
func (bcR *BlockchainReactor) poolRoutine() {
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced := 0
chainID := bcR.initialState.ChainID
state := bcR.initialState
lastHundred := time.Now()
lastRate := 0.0
FOR_LOOP:
for {
select {
case request := <-bcR.requestsCh: // chan BlockRequest
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
continue FOR_LOOP // Peer has since been disconnected.
}
msg := &bcBlockRequestMessage{request.Height}
queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
if !queued {
// We couldn't make the request, send-queue full.
// The pool handles timeouts, just let it go.
continue FOR_LOOP
}
case peerID := <-bcR.timeoutsCh: // chan string
// Peer timed out.
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
}
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() // nolint: errcheck
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop()
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(state, blocksSynced)
break FOR_LOOP
}
case <-trySyncTicker.C: // chan time
// This loop can be slow as long as it's doing syncing work.
SYNC_LOOP:
for i := 0; i < 10; i++ {
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
//bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
if first == nil || second == nil {
// We need both to sync the first block.
break SYNC_LOOP
}
firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes)
firstPartsHeader := firstParts.Header()
firstID := types.BlockID{first.Hash(), firstPartsHeader}
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := state.Validators.VerifyCommit(
chainID, firstID, first.Height, second.LastCommit)
if err != nil {
bcR.Logger.Error("Error in validation", "err", err)
bcR.pool.RedoRequest(first.Height)
break SYNC_LOOP
} else {
bcR.pool.PopRequest()
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// NOTE: we could improve performance if we
// didn't make the app commit to disk every block
// ... but we would need a way to get the hash without it persisting
var err error
state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced += 1
// update the consensus params
bcR.updateConsensusParams(state.ConsensusParams)
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
}
}
continue FOR_LOOP
case <-bcR.Quit:
break FOR_LOOP
}
}
}
1 收到reqeustCh的数据,发送bcBlockRequestMessage到对端节点(对端节点收到消息后返回高度的block)。是在本文件中的Receive()函数处理的
2 本函数有个trySyncTicker 50ms的timer一直在运行,作用是拿到两个已经从对端peer获取的block,经过VerifyCommit之后,SaveBlock存在tendermint内部,再调用ApplyBlock存到application
3 也有个statusUpdateTicker 10s的timer,作用是broadcast bcStatusRequestMessage去获取对端节点的block高度
4 还有个switchToConsensusTicker 1s的timer也在运行,作用是看本节点的block高度有没有大于等于其他节点的最大高度,如果到达了(也就是同步区块完成了)就SwitchToConsensus切换fast mode到consensus mode