以太坊源码分析(42)miner挖矿部分源码分析CPU挖矿

## agent
agent 是具体执行挖矿的对象。 它执行的流程就是,接受计算好了的区块头, 计算mixhash和nonce, 把挖矿好的区块头返回。

构造CpuAgent, 一般情况下不会使用CPU来进行挖矿,一般来说挖矿都是使用的专门的GPU进行挖矿, GPU挖矿的代码不会在这里体现。

    type CpuAgent struct {
        mu sync.Mutex
    
        workCh chan *Work // 接受挖矿任务的通道
        stop chan struct{}
        quitCurrentOp chan struct{}
        returnCh chan<- *Result // 挖矿完成后的返回channel
    
        chain consensus.ChainReader // 获取区块链的信息
        engine consensus.Engine // 一致性引擎,这里指的是Pow引擎
    
        isMining int32 // isMining indicates whether the agent is currently mining
    }
    
    func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent {
        miner := &CpuAgent{
            chain: chain,
            engine: engine,
            stop: make(chan struct{}, 1),
            workCh: make(chan *Work, 1),
        }
        return miner
    }

设置返回值channel和得到Work的channel, 方便外界传值和得到返回信息。

    func (self *CpuAgent) Work() chan<- *Work { return self.workCh }
    func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }

启动和消息循环,如果已经启动挖矿,那么直接退出, 否则启动update 这个goroutine
update 从workCh接受任务,进行挖矿,或者是接受退出信息,退出。
    
    func (self *CpuAgent) Start() {
        if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
            return // agent already started
        }
        go self.update()
    }
    
    func (self *CpuAgent) update() {
    out:
        for {
            select {
            case work := <-self.workCh:
                self.mu.Lock()
                if self.quitCurrentOp != nil {
                    close(self.quitCurrentOp)
                }
                self.quitCurrentOp = make(chan struct{})
                go self.mine(work, self.quitCurrentOp)
                self.mu.Unlock()
            case <-self.stop:
                self.mu.Lock()
                if self.quitCurrentOp != nil {
                    close(self.quitCurrentOp)
                    self.quitCurrentOp = nil
                }
                self.mu.Unlock()
                break out
            }
        }
    }

mine, 挖矿,调用一致性引擎进行挖矿, 如果挖矿成功,把消息发送到returnCh上面。
    
    func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
        if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
            log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
            self.returnCh <- &Result{work, result}
        } else {
            if err != nil {
                log.Warn("Block sealing failed", "err", err)
            }
            self.returnCh <- nil
        }
    }
GetHashRate, 这个函数返回当前的HashRate。

    func (self *CpuAgent) GetHashRate() int64 {
        if pow, ok := self.engine.(consensus.PoW); ok {
            return int64(pow.Hashrate())
        }
        return 0
    }


## remote_agent
remote_agent 提供了一套RPC接口,可以实现远程矿工进行采矿的功能。 比如我有一个矿机,矿机内部没有运行以太坊节点,矿机首先从remote_agent获取当前的任务,然后进行挖矿计算,当挖矿完成后,提交计算结果,完成挖矿。

数据结构和构造

    type RemoteAgent struct {
        mu sync.Mutex
    
        quitCh chan struct{}
        workCh chan *Work        // 接受任务
        returnCh chan<- *Result     // 结果返回
    
        chain consensus.ChainReader
        engine consensus.Engine
        currentWork *Work   //当前的任务
        work map[common.Hash]*Work // 当前还没有提交的任务,正在计算
    
        hashrateMu sync.RWMutex
        hashrate map[common.Hash]hashrate // 正在计算的任务的hashrate
    
        running int32 // running indicates whether the agent is active. Call atomically
    }
    
    func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent {
        return &RemoteAgent{
            chain: chain,
            engine: engine,
            work: make(map[common.Hash]*Work),
            hashrate: make(map[common.Hash]hashrate),
        }
    }

启动和停止
    
    func (a *RemoteAgent) Start() {
        if !atomic.CompareAndSwapInt32(&a.running, 0, 1) {
            return
        }
        a.quitCh = make(chan struct{})
        a.workCh = make(chan *Work, 1)
        go a.loop(a.workCh, a.quitCh)
    }
    
    func (a *RemoteAgent) Stop() {
        if !atomic.CompareAndSwapInt32(&a.running, 1, 0) {
            return
        }
        close(a.quitCh)
        close(a.workCh)
    }
得到输入输出的channel,这个和agent.go一样。

    func (a *RemoteAgent) Work() chan<- *Work {
        return a.workCh
    }
    
    func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) {
        a.returnCh = returnCh
    }

loop方法,和agent.go里面做的工作比较类似, 当接收到任务的时候,就存放在currentWork字段里面。 如果84秒还没有完成一个工作,那么就删除这个工作, 如果10秒没有收到hashrate的报告,那么删除这个追踪/。
    
    // loop monitors mining events on the work and quit channels, updating the internal
    // state of the rmeote miner until a termination is requested.
    //
    // Note, the reason the work and quit channels are passed as parameters is because
    // RemoteAgent.Start() constantly recreates these channels, so the loop code cannot
    // assume data stability in these member fields.
    func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
    
        for {
            select {
            case <-quitCh:
                return
            case work := <-workCh:
                a.mu.Lock()
                a.currentWork = work
                a.mu.Unlock()
            case <-ticker.C:
                // cleanup
                a.mu.Lock()
                for hash, work := range a.work {
                    if time.Since(work.createdAt) > 7*(12*time.Second) {
                        delete(a.work, hash)
                    }
                }
                a.mu.Unlock()
    
                a.hashrateMu.Lock()
                for id, hashrate := range a.hashrate {
                    if time.Since(hashrate.ping) > 10*time.Second {
                        delete(a.hashrate, id)
                    }
                }
                a.hashrateMu.Unlock()
            }
        }
    }

GetWork,这个方法由远程矿工调用,获取当前的挖矿任务。
    
    func (a *RemoteAgent) GetWork() ([3]string, error) {
        a.mu.Lock()
        defer a.mu.Unlock()
    
        var res [3]string
    
        if a.currentWork != nil {
            block := a.currentWork.Block
    
            res[0] = block.HashNoNonce().Hex()
            seedHash := ethash.SeedHash(block.NumberU64())
            res[1] = common.BytesToHash(seedHash).Hex()
            // Calculate the "target" to be returned to the external miner
            n := big.NewInt(1)
            n.Lsh(n, 255)
            n.Div(n, block.Difficulty())
            n.Lsh(n, 1)
            res[2] = common.BytesToHash(n.Bytes()).Hex()
    
            a.work[block.HashNoNonce()] = a.currentWork
            return res, nil
        }
        return res, errors.New("No work available yet, don't panic.")
    }

SubmitWork, 远程矿工会调用这个方法提交挖矿的结果。 对结果进行验证之后提交到returnCh

    // SubmitWork tries to inject a pow solution into the remote agent, returning
    // whether the solution was accepted or not (not can be both a bad pow as well as
    // any other error, like no work pending).
    func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool {
        a.mu.Lock()
        defer a.mu.Unlock()
    
        // Make sure the work submitted is present
        work := a.work[hash]
        if work == nil {
            log.Info("Work submitted but none pending", "hash", hash)
            return false
        }
        // Make sure the Engine solutions is indeed valid
        result := work.Block.Header()
        result.Nonce = nonce
        result.MixDigest = mixDigest
    
        if err := a.engine.VerifySeal(a.chain, result); err != nil {
            log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err)
            return false
        }
        block := work.Block.WithSeal(result)
    
        // Solutions seems to be valid, return to the miner and notify acceptance
        a.returnCh <- &Result{work, block}
        delete(a.work, hash)
    
        return true
    }

SubmitHashrate, 提交hash算力

    func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) {
        a.hashrateMu.Lock()
        defer a.hashrateMu.Unlock()
    
        a.hashrate[id] = hashrate{time.Now(), rate}
    }


## unconfirmed

unconfirmed是一个数据结构,用来跟踪用户本地的挖矿信息的,比如挖出了一个块,那么等待足够的后续区块确认之后(5个),再查看本地挖矿的区块是否包含在规范的区块链内部。

数据结构
    
    // headerRetriever is used by the unconfirmed block set to verify whether a previously
    // mined block is part of the canonical chain or not.
    // headerRetriever由未确认的块组使用,以验证先前挖掘的块是否是规范链的一部分。
    type headerRetriever interface {
        // GetHeaderByNumber retrieves the canonical header associated with a block number.
        GetHeaderByNumber(number uint64) *types.Header
    }
    
    // unconfirmedBlock is a small collection of metadata about a locally mined block
    // that is placed into a unconfirmed set for canonical chain inclusion tracking.
    // unconfirmedBlock 是本地挖掘区块的一个小的元数据的集合,用来放入未确认的集合用来追踪本地挖掘的区块是否被包含进入规范的区块链
    type unconfirmedBlock struct {
        index uint64
        hash common.Hash
    }
    
    // unconfirmedBlocks implements a data structure to maintain locally mined blocks
    // have have not yet reached enough maturity to guarantee chain inclusion. It is
    // used by the miner to provide logs to the user when a previously mined block
    // has a high enough guarantee to not be reorged out of te canonical chain. 
    // unconfirmedBlocks 实现了一个数据结构,用来管理本地挖掘的区块,这些区块还没有达到足够的信任度来证明他们已经被规范的区块链接受。 它用来给矿工提供信息,以便他们了解他们之前挖到的区块是否被包含进入了规范的区块链。
    type unconfirmedBlocks struct {
        chain headerRetriever // Blockchain to verify canonical status through 需要验证的区块链 用这个接口来获取当前的规范的区块头信息
        depth uint // Depth after which to discard previous blocks 经过多少个区块之后丢弃之前的区块
        blocks *ring.Ring // Block infos to allow canonical chain cross checks // 区块信息,以允许规范链交叉检查
        lock sync.RWMutex // Protects the fields from concurrent access
    }

    // newUnconfirmedBlocks returns new data structure to track currently unconfirmed blocks.
    func newUnconfirmedBlocks(chain headerRetriever, depth uint) *unconfirmedBlocks {
        return &unconfirmedBlocks{
            chain: chain,
            depth: depth,
        }
    }

插入跟踪区块, 当矿工挖到一个区块的时候调用, index是区块的高度, hash是区块的hash值。
    
    
    // Insert adds a new block to the set of unconfirmed ones.
    func (set *unconfirmedBlocks) Insert(index uint64, hash common.Hash) {
        // If a new block was mined locally, shift out any old enough blocks
        // 如果一个本地的区块挖到了,那么移出已经超过depth的区块
        set.Shift(index)
    
        // Create the new item as its own ring
        // 循环队列的操作。
        item := ring.New(1)
        item.Value = &unconfirmedBlock{
            index: index,
            hash: hash,
        }
        // Set as the initial ring or append to the end
        set.lock.Lock()
        defer set.lock.Unlock()
    
        if set.blocks == nil {
            set.blocks = item
        } else {
            // 移动到循环队列的最后一个元素插入item
            set.blocks.Move(-1).Link(item)
        }
        // Display a log for the user to notify of a new mined block unconfirmed
        log.Info("
阅读更多

更多精彩内容