python ethereum 代码分析《2》

python ethereum 代码分析 《2》

python 版本以太坊

pyethapp 模块

本章主要介绍pyethapp 模块中的ChainService 和 PoWService

一些关键概念

totalDifficulty 总难度:
total difficulty总难度是当前某条链所有区块难度的总和,total difficulty被用来指示最长的那一条链,某个节点如果想从其他节点同步数据的话,他会选择total difficulty最大的那一条链来同步数据。值得注意的是uncle block的difficulty也被计算到totalDifficulty中去,以太坊白皮书中的幽灵协议阐述了为什么这么设计

uncle block 叔叔区块:也是矿工挖出来的区块,它也是合法的,但是发现的稍晚,或者是网络传输稍慢,而没有能成为最长的链上的区块
以太坊十几秒的出块间隔,大大增加了孤块的产生,并且降低了安全性。通过鼓励引用叔块,使引用主链获得更多的安全保证(因为孤块本身也是合法的)
区块可以不引用,或者最多引用两个叔块
叔块必须是区块的前2层~前7层的祖先的直接的子块
被引用过的叔块不能重复引用
引用叔块的区块,可以获得挖矿报酬的1/32,也就是5*1/32=0.15625 Ether。最多获得2*0.15625=0.3125 Ether
参考资料http://blog.csdn.net/superswords/article/details/76445278
https://zhuanlan.zhihu.com/p/28928827

head_candidate 候选头区块: 矿工本地的候选头区块,相当于一个临时区块,head_candidate一直存在并且矿工会一直更新。矿工将交易打包进这个区块,计算出随机数后将次区块作为新区块广播出去。

contract :合约,本身也是一个账户。每当一个交易指向一个合约账户时,该交易transaction的data属性作为该合约的输入执行该合约。

以太坊基本的一些概念:http://ethdocs.org/en/latest/contracts-and-transactions/index.html

ChainService 和 PowService的关系

chain service负责区块链的同步和更新,处理连接的各个节点的eth_protocol数据包,以及交易和区块的广播; pow service是矿工挖矿的服务,计算出’幸运值’后告知chain service,chain service 将区块写入区块链并广播出去。

矿工将交易打包,更新head_candidate

    @property
    def head_candidate(self):
        if self._head_candidate_needs_updating:
            self._head_candidate_needs_updating = False
            # Make a copy of self.transaction_queue because
            # make_head_candidate modifies it.
            txqueue = copy.deepcopy(self.transaction_queue)
            #将交易打包,引用uncle区块,执行交易中的合约,更新区块状态
            self._head_candidate, self._head_candidate_state = make_head_candidate(
                self.chain, txqueue, timestamp=int(time.time()))
        return self._head_candidate

chain 实例的_on_new_head回调

    def _on_new_head(self, block):
        log.debug('new head cbs', num=len(self.on_new_head_cbs))
        self.transaction_queue = self.transaction_queue.diff(
            block.transactions)
        self._head_candidate_needs_updating = True
        #cb是pow service的回调函数mine_head_candidate,更新head_candidate并开始挖矿
        for cb in self.on_new_head_cbs:
            cb(block)

powservice的回调

    def mine_head_candidate(self, _=None):
        #打包当前交易队里中的交易并更新head_candidate
        hc = self.chain.head_candidate
        if not self.active or self.chain.is_syncing:
            return
        elif (hc.transaction_count == 0 and
              not self.app.config['pow']['mine_empty_blocks']):
            return

        log.debug('mining', difficulty=hc.difficulty)
        #开始挖矿
        self.ppipe.put(('mine', dict(mining_hash=hc.mining_hash,
                                     block_number=hc.number,
                                     difficulty=hc.difficulty)))

计算出幸运值后,调用chain.add_mined_block写入区块链并广播

    #成功找到幸运值
    def recv_found_nonce(self, bin_nonce, mixhash, mining_hash):
        log.info('nonce found', mining_hash=mining_hash.encode('hex'))
        #再次打包交易,更新head_candidate
        block = self.chain.head_candidate
        if block.mining_hash != mining_hash:
            log.warn('mining_hash does not match')
            return False
        block.header.mixhash = mixhash
        block.header.nonce = bin_nonce
        #添加新块并广播
        if self.chain.add_mined_block(block):
            log.debug('mined block %d (%s) added to chain' % (
                block.number, encode_hex(block.hash[:8])))
            return True
        else:
            log.debug('failed to add mined block %d (%s) to chain' % (
                block.number, encode_hex(block.hash[:8])))
            return False
    def add_mined_block(self, block):
        log.debug('adding mined block', block=block)
        assert isinstance(block, Block)
        #添加新块
        if self.chain.add_block(block):
            log.debug('added', block=block, ts=time.time())
            assert block == self.chain.head
            self.transaction_queue = self.transaction_queue.diff(block.transactions)
            self._head_candidate_needs_updating = True
            #广播新块
            self.broadcast_newblock(block, chain_difficulty=self.chain.get_score(block))
            return True
        log.debug('failed to add', block=block, ts=time.time())
        return False

在写入新块完成后,调用new_head_cb回调,再次开始挖矿
至此,整个循环的过程就是矿工不断打包交易挖矿到广播新块的过程

ChainService

service start 服务启动
服务启动时将eth_protocol(以太坊协议)中command对应的回调函数添加到eth_protocol实例中

先看看eth_protocol协议
以太坊协议定义eth_protocol.py

每个peer对象都有一个eth_protocol实例。在p2p协议中,当节点收到hello数据包的时候,便初始化所有协议实例,其中包括eth_protocol
peer对象receive hello
初始化eth_protocol

    def connect_service(self, service):
        assert isinstance(service, WiredService)
        protocol_class = service.wire_protocol
        assert issubclass(protocol_class, BaseProtocol)
        # create protcol instance which connects peer with serivce
        protocol = protocol_class(self, service)
        # register protocol
        assert protocol_class not in self.protocols
        log.debug('registering protocol', protocol=protocol.name, peer=self)
        self.protocols[protocol_class] = protocol
        self.mux.add_protocol(protocol.protocol_id)
        protocol.start()#调用chain_service.on_wire_protocol_start(self)

chain service为该实例添加回调函数,并向对方节点发送status数据包

    def on_wire_protocol_start(self, proto):
        log.debug('----------------------------------')
        log.debug('on_wire_protocol_start', proto=proto)
        assert isinstance(proto, self.wire_protocol)
        # register callbacks
        proto.receive_status_callbacks.append(self.on_receive_status)#处理status数据包
        proto.receive_newblockhashes_callbacks.append(self.on_newblockhashes)
        proto.receive_transactions_callbacks.append(self.on_receive_transactions)
        proto.receive_getblockheaders_callbacks.append(self.on_receive_getblockheaders)
        proto.receive_blockheaders_callbacks.append(self.on_receive_blockheaders)
        proto.receive_getblockbodies_callbacks.append(self.on_receive_getblockbodies)
        proto.receive_blockbodies_callbacks.append(self.on_receive_blockbodies)
        proto.receive_newblock_callbacks.append(self.on_receive_newblock)

        # send status 一旦连接就向对方发送自己的区块链状态status
        head = self.chain.head
        proto.send_status(chain_difficulty=self.chain.get_score(head), chain_head_hash=head.hash,
                          genesis_hash=self.chain.genesis.hash)

eth_protocol协议中包括
Status 与新节点建立连接后,互相发送自己的区块链状态
NewBlockHashes 向网络中广播一批新区块的hash
Transactions 包含一批交易的数据包
GetBlockHashes 从指定hash开始,请求一批BlockHashes
BlockHashes 返回GetBlockHashes请求
GetBlocks 从指定hash开始,请求一批Block
Blocks 返回GetBlocks请求
NewBlock 矿工挖矿后广播新区块,节点接到该区块后验证后添加到本地

1.收到status:
某个已经完成握手的peer节点发送他当前的区块链网络状态ethereum state,status数据包是节点之间建立连接后收到的第一个数据包。
通过status数据包来获取网络中最新的区块并更新本地的区块链

    class status(BaseProtocol.command):

        """ protocolVersion: The version of the Ethereum protocol this peer implements. 30 at present. networkID: The network version of Ethereum for this peer. 0 for the official testnet. totalDifficulty: Total Difficulty of the best chain. Integer, as found in block header. latestHash: The hash of the block with the highest validated total difficulty. GenesisHash: The hash of the Genesis block. """
        cmd_id = 0
        sent = False

        structure = [
            ('eth_version', rlp.sedes.big_endian_int),
            ('network_id', rlp.sedes.big_endian_int),
            ('chain_difficulty', rlp.sedes.big_endian_int),#totalDifficulty,该链的总难度
            ('chain_head_hash', rlp.sedes.binary),#latestHash,头区块hash
            ('genesis_hash', rlp.sedes.binary)]#初始区块hash

        def create(self, proto, chain_difficulty, chain_head_hash, genesis_hash):
            self.sent = True
            network_id = proto.service.app.config['eth'].get('network_id', proto.network_id)
            return [proto.version, network_id, chain_difficulty, chain_head_hash, genesis_hash]

receive packet收到数据包
receive status解析数据包定位到status处理函数
之前已注册回调eth_protocol实例初始化时已注册
on receive status receive status处理函数

这里注意一点,这里看的是DAO事件之前的版本,不是分叉过后的版本(还是先按正常逻辑来。。)

    def on_receive_status(self, proto, eth_version, network_id, chain_difficulty, chain_head_hash, genesis_hash):
        log.debug('----------------------------------')
        log.debug('status received', proto=proto, eth_version=eth_version)
        assert eth_version == proto.version, (eth_version, proto.version)
        #必须是同一个networkid
        if network_id != self.config['eth'].get('network_id', proto.network_id):
            log.warn("invalid network id", remote_network_id=network_id,
                     expected_network_id=self.config['eth'].get('network_id', proto.network_id))
            raise eth_protocol.ETHProtocolError('wrong network_id')

        # check genesis
        #初始区块hash必须一致
        if genesis_hash != self.chain.genesis.hash:
            log.warn("invalid genesis hash", remote_id=proto, genesis=genesis_hash.encode('hex'))
            raise eth_protocol.ETHProtocolError('wrong genesis block')

        # request chain
        #调用同步器同步数据
        self.synchronizer.receive_status(proto, chain_head_hash, chain_difficulty)

        # send transactions
        #获取网络中已知的但尚未被计入区块的交易transactions
        transactions = self.chain.get_transactions()
        if transactions:
            log.debug("sending transactions", remote_id=proto)
            #将这些交易告知对方
            proto.send_transactions(*transactions)

synchronizer同步器同步数据receive_status

    def receive_status(self, proto, blockhash, chain_difficulty):
        "called if a new peer is connected"
        log.debug('status received', proto=proto, chain_difficulty=chain_difficulty)

        # memorize proto with difficulty
        #将改节点区块链总难度记下
        self._protocols[proto] = chain_difficulty

        #对方头区块hash已经在本地存在,则忽略
        if self.chainservice.knows_block(blockhash) or self.synctask:
            log.debug('existing task or known hash, discarding')
            return

        if self.force_sync:
            blockhash, chain_difficulty = self.force_sync
            log.debug('starting forced syctask', blockhash=blockhash.encode('hex'))
            self.synctask = SyncTask(self, proto, blockhash, chain_difficulty)

        #如果这条链总难度比自己的大,则认为这条链状态更新一点,并从他给出的头区块hash开始同步区块,同步的时候从已连接节点中总难度chain_difficulty最大的那个开始同步(但是这并不能保证同步的链就一定是主链,后面会分析如果同步的链不是主链的情况)
        elif chain_difficulty > self.chain.head.chain_difficulty():
            log.debug('sufficient difficulty')
            self.synctask = SyncTask(self, proto, blockhash, chain_difficulty)

fetch_hashchain 向网络请求以指定hash作为头区块的一批区块hash

    def fetch_hashchain(self):
        log_st.debug('fetching hashchain')
        blockhashes_chain = [self.blockhash]  # youngest to oldest
        # For testing purposes: skip the hash downoading stage
        # import ast
        # blockhashes_chain = ast.literal_eval(open('/home/vub/blockhashes.pyast').read())[:299000]

        blockhash = self.blockhash = blockhashes_chain[-1]
        assert blockhash not in self.chain

        # get block hashes until we found a known one
        max_blockhashes_per_request = self.initial_blockhashes_per_request
        while blockhash not in self.chain:
            # proto with highest_difficulty should be the proto we got the newblock from
            blockhashes_batch = []

            # try with protos
            protocols = self.protocols
            if not protocols:
                log_st.warn('no protocols available')
                return self.exit(success=False)
            #这里的protocols是各个已连接节点peer对象的eth_protocol实例,按链总难度大小排序,难度最大在前面
            for proto in protocols:
                log.debug('syncing with', proto=proto)
                if proto.is_stopped:
                    continue

                # request
                assert proto not in self.requests
                deferred = AsyncResult()
                self.requests[proto] = deferred
                #从指定hash开始,请求一批BlockHashes
                proto.send_getblockhashes(blockhash, max_blockhashes_per_request)
                try:
                    #获取到这批BlockHashes
                    blockhashes_batch = deferred.get(block=True,
                                                     timeout=self.blockhashes_request_timeout)
                except gevent.Timeout:
                    log_st.warn('syncing hashchain timed out')
                    continue
                finally:
                    # is also executed 'on the way out' when any other clause of the try statement
                    # is left via a break, continue or return statement.
                    del self.requests[proto]

                if not blockhashes_batch:
                    log_st.warn('empty getblockhashes result')
                    continue
                if not all(isinstance(bh, bytes) for bh in blockhashes_batch):
                    log_st.warn('got wrong data type', expected='bytes',
                                received=type(blockhashes_batch[0]))
                    continue
                break

            if not blockhashes_batch:
                log_st.warn('syncing failed with all peers', num_protos=len(protocols))
                return self.exit(success=False)

            #在获取到的这批blockhashes中直到找到一个blockhash是自己数据库中有的,注意这里只要找到一个自己有的blockhash就可以了,而且这个blockhash不一定是自己本地chain的头区块,因为本地的头区块有可能不是在主链上的区块,上面已经提到过这点。节点总是会去同步最长的那条链,而且可能是从自己本地chain的头区块的父辈区块的某个区块开始同步,这样就提供了一定的纠错机制,让节点可以纠正到主链上去。
            for blockhash in blockhashes_batch:  # youngest to oldest
                assert utils.is_string(blockhash)
                if blockhash not in self.chain:
                    blockhashes_chain.append(blockhash)
                else:
                    log_st.debug('found known blockhash', blockhash=utils.encode_hex(blockhash),
                                 is_genesis=bool(blockhash == self.chain.genesis.hash))
                    break
            log_st.debug('downloaded ' + str(len(blockhashes_chain)) + ' block hashes, ending with %s' % utils.encode_hex(blockhashes_chain[-1]))
            max_blockhashes_per_request = self.max_blockhashes_per_request
        #取到最长链的这批blockhash后,开始同步区块
        self.fetch_blocks(blockhashes_chain)

fetch blocks 向网络请求同步这批区块

    def fetch_blocks(self, blockhashes_chain):
        # fetch blocks (no parallelism here)
        log_st.debug('fetching blocks', num=len(blockhashes_chain))
        assert blockhashes_chain
        blockhashes_chain.reverse()  # oldest to youngest
        num_blocks = len(blockhashes_chain)
        num_fetched = 0

        while blockhashes_chain:
            blockhashes_batch = blockhashes_chain[:self.max_blocks_per_request]
            t_blocks = []

            # try with protos
            protocols = self.protocols
            if not protocols:
                log_st.warn('no protocols available')
                return self.exit(success=False)
            #向每个节点请求
            for proto in protocols:
                if proto.is_stopped:
                    continue
                assert proto not in self.requests
                # request
                log_st.debug('requesting blocks', num=len(blockhashes_batch))
                deferred = AsyncResult()
                self.requests[proto] = deferred
                #向网络请求这批hash值对应的区块
                proto.send_getblocks(*blockhashes_batch)
                try:
                    t_blocks = deferred.get(block=True, timeout=self.blocks_request_timeout)
                except gevent.Timeout:
                    log_st.warn('getblocks timed out, trying next proto')
                    continue
                finally:
                    del self.requests[proto]
                if not t_blocks:
                    log_st.warn('empty getblocks reply, trying next proto')
                    continue
                elif not isinstance(t_blocks[0], TransientBlock):
                    log_st.warn('received unexpected data', data=repr(t_blocks))
                    t_blocks = []
                    continue
                # we have results
                if not [b.header.hash for b in t_blocks] == blockhashes_batch[:len(t_blocks)]:
                    log_st.warn('received wrong blocks, should ban peer')
                    t_blocks = []
                    continue
                break

            # add received t_blocks
            num_fetched += len(t_blocks)
            log_st.debug('received blocks', num=len(t_blocks), num_fetched=num_fetched,
                         total=num_blocks, missing=num_blocks - num_fetched)

            if not t_blocks:
                log_st.warn('failed to fetch blocks', missing=len(blockhashes_chain))
                return self.exit(success=False)

            ts = time.time()
            log_st.debug('adding blocks', qsize=self.chainservice.block_queue.qsize())
            for t_block in t_blocks:
                b = blockhashes_chain.pop(0)
                assert t_block.header.hash == b
                assert t_block.header.hash not in blockhashes_chain
                #将获取的block添加到队列,最后添加到本地数据库
                self.chainservice.add_block(t_block, proto)  # this blocks if the queue is full
            log_st.debug('adding blocks done', took=time.time() - ts)

        # done
        last_block = t_block
        assert not len(blockhashes_chain)
        assert last_block.header.hash == self.blockhash
        log_st.debug('syncing finished')
        # at this point blocks are not in the chain yet, but in the add_block queue
        if self.chain_difficulty >= self.chain.head.chain_difficulty():
            #广播这批区块最新的那个出去。
            self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto)

        self.exit(success=True)

至此,on receive status 函数处理完成,已同步已连接节点的最新区块

2.transactions
on_receive_transactions 节点收到transactions数据包后
矿工收到transactions数据包后,校验每笔交易的合法性,将交易打包进head candidate临时区块并广播交易

add_transaction

    def add_transaction(self, tx, origin=None):
        if self.is_syncing:
            #如果正在同步中的话,本地链的状态是过时的
            return  # we can not evaluate the tx based on outdated state
        log.debug('add_transaction', locked=self.add_transaction_lock.locked(), tx=tx)
        assert isinstance(tx, Transaction)
        assert origin is None or isinstance(origin, BaseProtocol)

        if tx.hash in self.broadcast_filter:
            log.debug('discarding known tx')  # discard early
            return

        # validate transaction
        #交易合法性校验
        try:
            #tx签名要正确;交易数nonce要与本地账户nonce一致;gas要足够;本地账户余额足够
            validate_transaction(self.chain.head_candidate, tx)
            log.debug('valid tx, broadcasting')
            self.broadcast_transaction(tx, origin=origin)  # asap
        except InvalidTransaction as e:
            log.debug('invalid tx', error=e)
            return

        if origin is not None:  # not locally added via jsonrpc
            if not self.is_mining or self.is_syncing:
                log.debug('discarding tx', syncing=self.is_syncing, mining=self.is_mining)
                return

        self.add_transaction_lock.acquire()
        #向head_candidate临时区块写入交易
        success = self.chain.add_transaction(tx)
        self.add_transaction_lock.release()
        if success:
            self._on_new_head_candidate()

3.GetBlockHashes BlockHashes
在向网络同步区块链的时候被调用fetch_hashchain中调用send_getblockhashes来发送GetBlockHashes数据包
其他节点接受到GetBlockHashes数据包后,返回BlockHashes数据包,及指定blockash开始的一批blockhash。

4.GetBlocks Blocks
在向网络同步区块链的时候被调用fetch_blocks中调用send_getblocks来发送GetBlocks数据包
其他节点接受到GetBlocks数据包后,返回Blocks数据包。

5.NewBlock
矿工挖到新块后广播该区块
节点接收到广播的新区块

    def receive_newblock(self, proto, t_block, chain_difficulty):
        "called if there's a newblock announced on the network"
        log.debug('newblock', proto=proto, block=t_block, chain_difficulty=chain_difficulty,
                  client=proto.peer.remote_client_version)

        if t_block.header.hash in self.chain:
            assert chain_difficulty == self.chain.get(t_block.header.hash).chain_difficulty()

        # memorize proto with difficulty
        #记住该节点区块链的总难度
        self._protocols[proto] = chain_difficulty
        #如果该区块存在则忽略
        if self.chainservice.knows_block(block_hash=t_block.header.hash):
            log.debug('known block')
            return

        # check pow
        if not t_block.header.check_pow():
            log.warn('check pow failed, should ban!')
            return
        #预计的总难度
        expected_difficulty = self.chain.head.chain_difficulty() + t_block.header.difficulty
        #总难度至少比本地链的总难度大
        if chain_difficulty >= self.chain.head.chain_difficulty():
            # broadcast duplicates filtering is done in eth_service
            log.debug('sufficient difficulty, broadcasting',
                      client=proto.peer.remote_client_version)
            self.chainservice.broadcast_newblock(t_block, chain_difficulty, origin=proto)
        else:
            # any criteria for which blocks/chains not to add?
            age = self.chain.head.number - t_block.header.number
            log.debug('low difficulty', client=proto.peer.remote_client_version,
                      chain_difficulty=chain_difficulty, expected_difficulty=expected_difficulty,
                      block_age=age)
            if age > self.MAX_NEWBLOCK_AGE:
                log.debug('newblock is too old, not adding', block_age=age,
                          max_age=self.MAX_NEWBLOCK_AGE)
                return

        # unknown and pow check and highest difficulty

        # check if we have parent
        #如果有祖先,直接添加
        if self.chainservice.knows_block(block_hash=t_block.header.prevhash):
            log.debug('adding block')
            self.chainservice.add_block(t_block, proto)
        #没有祖先说明还差一个以上区块,向网络同步区块
        else:
            log.debug('missing parent')
            if not self.synctask:
                self.synctask = SyncTask(self, proto, t_block.header.hash, chain_difficulty)
            else:
                log.debug('existing task, discarding')

以上是以太坊节点与节点交互的协议部分

阅读更多

更多精彩内容