启动命令:
geth --identity "TestNode1" --datadir "data0" --rpc --rpcapi "db,eth,net,web3" --port "30303" --networkid "29382" --ws --wsorigins="*" --rpccorsdomain="*" console
启动后,我们可以从日志来分析程序启动的流程。
INFO [10-29|12:27:11.737] Maximum peer count
INFO [10-29|12:27:11.747] Starting peer-to-peer node // node/node.go
INFO [10-29|12:27:11.747] Allocated cache and file handles //
INFO [10-29|12:27:11.763] Initialised chain configuration // eth/backend.go
INFO [10-29|12:27:11.763] Disk storage enabled for ethash caches // consensus/ethash/ethash.go
INFO [10-29|12:27:11.763] Disk storage enabled for ethash DAGs
INFO [10-29|12:27:11.763] Initialising Ethereum protocol
WARN [10-29|12:27:11.765] Head state missing, repairing chain
INFO [10-29|12:27:11.775] Rewound blockchain to past state
INFO [10-29|12:27:11.776] Loaded most recent local header
INFO [10-29|12:27:11.776] Loaded most recent local full block
INFO [10-29|12:27:11.776] Loaded most recent local fast block
INFO [10-29|12:27:11.776] Loaded local transaction journal
INFO [10-29|12:27:11.777] Regenerated local transaction journal
WARN [10-29|12:27:11.777] Blockchain not empty, fast sync disabled
INFO [10-29|12:27:11.777] Starting P2P networking
INFO [10-29|12:27:13.928] Mapped network port
INFO [10-29|12:27:13.990] UDP listener up
INFO [10-29|12:27:13.991] RLPx listener up
INFO [10-29|12:27:13.995] IPC endpoint opened
INFO [10-29|12:27:13.995] HTTP endpoint opened
INFO [10-29|12:27:13.996] WebSocket endpoint opened
INFO [10-29|12:27:14.001] Mapped network port
Welcome to the Geth JavaScript console!
Go里面有两个保留的函数:init函数(能够应用于所有的package)和main函数(只能应用于package main)。这两个函数在定义时不能有任何的参数和返回值。
在cmd/geth/main.go中,首先定义app:
cmd/geth/main.go
var (
// Git SHA1 commit hash of the release (set via linker flags)
gitCommit = ""
// The app that holds all commands and flags.
app = utils.NewApp(gitCommit, "the go-ethereum command line interface")
// flags
nodeFlags = []cli.Flag{
utils.IdentityFlag, // 所有这些flag都来自cmd/utils模块中
...
}
rpcFlags = []cli.Flag{...}
consoleFlags = []cli.Flag{...} // 来自cmd/geth/consolecmd.go
whisperFlags = []cli.Flag{...}
metricsFlafs = []cli.Flag{...}
)
然后通过init()函数来初始化app,其中app.Action表示如果用户没有输入其他的子命令的情况下,会调用这个字段指向的函数,即geth()。
geth的命令使用了urfave/cli这个库,这个库是go语言命令行程序常用的库,它把命令行解析的过程做了一下封装,抽象出flag/command/subcommand这些模块,用户只需要提供一些模块的配置,参数的解析和关联在库内部完成,帮助信息也可以自动生成。
app.Flags和app.Commands分别设置了支持的[option]和[command],是当从用户输入命令解析出相应的参数后指向特定的函数并执行。这里先不做介绍,后面以console的启动为例介绍这一部分的原理。
func init() {
// Initialize the CLI app and start Geth
app.Action = geth
app.HideVersion = true // we have a command to print the version
app.Copyright = "Copyright 2013-2018 The go-ethereum Authors"
// 所有能够支持的子命令
app.Commands = []cli.Command{
// See chaincmd.go:
initCommand,
...
// See monitorcmd.go:
// See accountcmd.go:
// See consolecmd.go:
// See misccmd.go:
// See config.go
}
sort.Sort(cli.CommandsByName(app.Commands))
// 所有能够解析的Options
app.Flags = append(app.Flags, nodeFlags...)
app.Flags = append(app.Flags, rpcFlags...)
app.Flags = append(app.Flags, consoleFlags...)
app.Flags = append(app.Flags, debug.Flags...)
app.Flags = append(app.Flags, whisperFlags...)
app.Flags = append(app.Flags, metricsFlags...)
app.Before = func(ctx *cli.Context) error {
...
}
app.After = func(ctx *cli.Context) error {
debug.Exit()
console.Stdin.Close() // Resets terminal mode.
return nil
}
}
通过上面的代码就把我们解析用户命令的对象设置完成了,下一步就是执行app.Run()。
func main() {
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
在以太坊客户端geth中,如果什么命令都不输入直接运行geth, 就会默认启动一个全节点模式的节点,连接到主网络。这时候就是按照上面所说的,启动了geth()函数:
cmd/geth/main.go
func geth(ctx *cli.Context) error {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
node := makeFullNode(ctx) // 定义全节点对象
startNode(ctx, node) // 启动全节点
node.Wait()
return nil
}
在cmd/geth/main.go中有一个startNode()函数用来启动全节点,首先调用cmd/geth/config.go中的makeFullNode()函数:
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx) // 进行节点配置
utils.RegisterEthService(stack, &cfg.Eth) // 注册eth服务
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
if shhEnabled || shhAutoEnabled {
if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
utils.RegisterShhService(stack, &cfg.Shh)
}
// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}
return stack
}
可以看出,makeFullNode首先通过makeConfigNode(ctx) 对节点进行配置,包括Eth、Shh、Node、Dashboard,返回Node和geth配置,然后开启两条路线:1、通过Node.Start()——>Server.Start()启动p2p服务;2、通过RegisterEthService将Ethereum服务注册到Node的services map[reflect.Type]Service中,通过Node.Start()来启动Ethereum服务。
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
// Load defaults.
cfg := gethConfig{
Eth: eth.DefaultConfig,
Shh: whisper.DefaultConfig,
Node: defaultNodeConfig(),
Dashboard: dashboard.DefaultConfig,
}
// Load config file.
if file := ctx.GlobalString(configFileFlag.Name); file != "" {
if err := loadConfig(file, &cfg); err != nil {
utils.Fatalf("%v", err)
}
}
// Apply flags.
utils.SetNodeConfig(ctx, &cfg.Node)
stack, err := node.New(&cfg.Node)
if err != nil {
utils.Fatalf("Failed to create the protocol stack: %v", err)
}
utils.SetEthConfig(ctx, stack, &cfg.Eth)
if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {
cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
}
utils.SetShhConfig(ctx, stack, &cfg.Shh)
utils.SetDashboardConfig(ctx, &cfg.Dashboard)
return stack, cfg
}
makeFullNode()函数里在做好节点配置之后,再调用cmd/utils/flag.go里RegisterEthService()函数注册eth服务,而这个函数是调用了node/node.go模块中的Register()方法 ,这个方法的参数是一个构造函数(constructor),正如下面的代码所示:
cmd/utils/flag.go
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}
这个函数里会判断同步的方式 ,如果是LightSync则会使用les.New()创建轻节点,否则就使用eth.New()创建全节点,这里我们还是建立全节点,即调用eth.New()方法。
eth/backend.go
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
...
chainDb, err := CreateDB(ctx, config, "chaindata")
...
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb),
shutdownChan: make(chan bool),
networkID: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
...
var (
vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bloomIndexer.Start(eth.blockchain)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
eth.APIBackend = &EthAPIBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
return eth, nil
}
总体上看来,这个函数主要是执行了以下几步:
1、加载创始区块
2、初始化以太坊类
3、初始化BlookChain
4、初始化交易池
5、初始化协议管理器
6、初始化挖矿模式
回到cmd/geth/main.go,在节点配置完成后执行startNode()函数。startNode()的主要功能有:
1、启动node;
2、解锁账户;
3、开启钱包事件监听;
func startNode(ctx *cli.Context, stack *node.Node) {
debug.Memsize.Add("node", stack)
// Start up the node itself
utils.StartNode(stack)
// Unlock any account specifically requested
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
passwords := utils.MakePasswordList(ctx)
unlocks := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",")
for i, account := range unlocks {
if trimmed := strings.TrimSpace(account); trimmed != "" {
unlockAccount(ctx, ks, trimmed, i, passwords)
}
}
// Register wallet event handlers to open and auto-derive wallets
events := make(chan accounts.WalletEvent, 16)
stack.AccountManager().Subscribe(events)
go func() {
// Create a chain state reader for self-derivation
rpcClient, err := stack.Attach()
if err != nil {
utils.Fatalf("Failed to attach to self: %v", err)
}
stateReader := ethclient.NewClient(rpcClient)
// Open any wallets already attached
for _, wallet := range stack.AccountManager().Wallets() {
if err := wallet.Open(""); err != nil {
log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
}
}
// Listen for wallet event till termination
for event := range events {
switch event.Kind {
case accounts.WalletArrived:
if err := event.Wallet.Open(""); err != nil {
log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
}
case accounts.WalletOpened:
status, _ := event.Wallet.Status()
log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)
derivationPath := accounts.DefaultBaseDerivationPath
if event.Wallet.URL().Scheme == "ledger" {
derivationPath = accounts.DefaultLedgerBaseDerivationPath
}
event.Wallet.SelfDerive(derivationPath, stateReader)
case accounts.WalletDropped:
log.Info("Old wallet dropped", "url", event.Wallet.URL())
event.Wallet.Close()
}
}
}()
// Start auxiliary services if enabled
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
// Mining only makes sense if a full Ethereum node is running
if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
utils.Fatalf("Light clients do not support mining")
}
var ethereum *eth.Ethereum
if err := stack.Service(ðereum); err != nil {
utils.Fatalf("Ethereum service not running: %v", err)
}
// Use a reduced number of threads if requested
threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name)
if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) {
threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name)
}
if threads > 0 {
type threaded interface {
SetThreads(threads int)
}
if th, ok := ethereum.Engine().(threaded); ok {
th.SetThreads(threads)
}
}
// Set the gas price to the limits from the CLI and start mining
gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name)
if ctx.IsSet(utils.MinerGasPriceFlag.Name) {
gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
}
ethereum.TxPool().SetGasPrice(gasprice)
if err := ethereum.StartMining(true); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
}
}
在上面的代码中,通过cmd/utils/cmd.go中的StartNode()函数,调用node/node.go中的Start()方法,启动节点。在这个方法中,首先判断节点是否已经在运行,然后要对p2p服务进行初始化,最后构建p2p.Server对象,执行该对象的Start()方法,使p2p服务启动起来。
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
// Short circuit if the node's already running
if n.server != nil {
return ErrNodeRunning
}
if err := n.openDataDir(); err != nil {
return err
}
// 初始化p2p服务,配置serverConfig,并以此穿件p2p.Server实例
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
if n.serverConfig.TrustedNodes == nil {
n.serverConfig.TrustedNodes = n.config.TrustedNodes()
}
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
for _, constructor := range n.serviceFuncs {
// Create a new context for the particular service
ctx := &ServiceContext{
config: n.config,
services: make(map[reflect.Type]Service),
EventMux: n.eventmux,
AccountManager: n.accman,
}
for kind, s := range services { // copy needed for threaded access
ctx.services[kind] = s
}
// Construct and save the service
service, err := constructor(ctx)
if err != nil {
return err
}
kind := reflect.TypeOf(service)
if _, exists := services[kind]; exists {
return &DuplicateServiceError{Kind: kind}
}
services[kind] = service
}
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil {
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
// Lastly start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}
// Finish initializing the startup
n.services = services
n.server = running
n.stop = make(chan struct{})
return nil
}
至此,以太坊的启动流程就完成了。