if !config.SkipBcVersionCheck {
bcVersion := core.GetBlockChainVersion(chainDb)
if bcVersion != core.BlockChainVersion && bcVersion != 0 {
return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
}
core.WriteBlockChainVersion(chainDb, core.BlockChainVersion)
}
var (
vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
)
//eth内部变量的初始化:1、初始化区块链
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
//2、bloom启动
eth.bloomIndexer.Start(eth.blockchain)
return eth, nil
}
在前面的StartNode中会调用:
……
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
……
}
……
}
好,看一下它:
func (s *Ethereum) Start(srvr *p2p.Server) error {
// Start the bloom bits servicing goroutines
//启动布隆过滤器
s.startBloomHandlers()
// Start the RPC service
//启动RPC服务
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
//配置最大启动节点
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)//启动最大节点同步数
if s.lesServer != nil {
s.lesServer.Start(srvr)//如果包含轻量级服务也启动之
}
return nil
}
都写到注释里了,没啥太需要关心注意的。
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
func (srv *Server) listenLoop() {
……
fd, err = srv.listener.Accept() //fd的类型是net.Conn
……
}
而它又被:
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
……
//注意此处的srv.newTransport(fd)
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest) 如果正常添加新peer
……
return err
}
上面的注意中在:
func (srv *Server) Start() (err error) {
if srv.newTransport == nil {
srv.newTransport = newRLPX //重定义成了rlpx进行加密
}
}
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
……
c.caps, c.name = phs.Caps, phs.Name
err = srv.checkpoint(c, srv.addpeer) //这里看下面一个函数run
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
// If the checks completed successfully, runPeer has now been
// launched by run.
……
return nil
}
func (srv *Server) run(dialstate dialer) {
……
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
//这里接着上面的通道接收新的conn,并依此创建peer
p := newPeer(c, srv.Protocols)
// If message events are enabled, pass the peerFeed
// to the peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p) //运行
peers[c.id] = p //添加到Set中
if p.Inbound() {
inboundCount++
}
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
select {
case c.cont <- err:
case <-srv.quit:
break running
}
……
}
看runPeer(server.go):
func (srv *Server) runPeer(p *Peer) {
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}
}
看peer.go中的run:
func (p *Peer) run() (remoteRequested bool, err error) {
var (
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
//读和ping
go p.readLoop(readErr)
go p.pingLoop()
// Wait for an error or disconnect.
loop:
for {
select {
case err = <-writeErr:
// A write finished. Allow the next write to start if
// there was no error.
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
break loop
}
}