bigchaindb源码分析(七)——投票

现在我们来根据源码分析(六)的思路来分析投票进程(votes),vote进程也是一个pipeline

# bigchaindb/pipelines/vote.py
def start():
    """Create, start, and return the block pipeline."""

    pipeline = create_pipeline()
    pipeline.setup(indata=get_changefeed())
    pipeline.start()
    return pipeline

def get_changefeed():
    """Create and return ordered changefeed of blocks starting from last voted block"""
    b = Bigchain()
    last_block_id = b.get_last_voted_block().id
    feed = backend.query.get_new_blocks_feed(b.connection, last_block_id)
    return Node(feed.__next__, name='changefeed')

对于一个pipeline,我们还是首先来找到它的indata是什么,根据indata我们会知道什么时候会触发pipeline

indata

按照get_changefeed函数体字面上的意思与block进程的逻辑,我们先猜测该函数将首先找到最后的投票区块,然后再对于这个最后的区块之后所有添加的区块都使用yield进行抛出到indata的输入队列中

获取最后一个投票过的区块id

先来看如何获取最后一个投票的区块,对应的函数为get_last_voted_block

# bigchaindb/core.py
def get_last_voted_block(self):
    """Returns the last block that this node voted on."""

    last_block_id = backend.query.get_last_voted_block_id(self.connection,
                                                          self.me)
    return Block.from_dict(self.get_block(last_block_id))

backend.query很明显又使用了singledispatch,所以最终实现函数在bigchaindb/backend/mongodb/query.py中,注意第二个参数为本节点(投票节点)的公钥

# bigchaindb/backend/mongodb/query.py
@register_query(MongoDBConnection)
def get_last_voted_block_id(conn, node_pubkey):
    last_voted = conn.run(
            conn.collection('votes')
            .find({'node_pubkey': node_pubkey},
                  sort=[('vote.timestamp', -1)]))

    if last_voted.count() == 0:
        return get_genesis_block(conn)['id']

    mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
               for v in last_voted}

    last_block_id = list(mapping.values())[0]

    explored = set()

    while True:
        try:
            if last_block_id in explored:
                raise CyclicBlockchainError()
            explored.add(last_block_id)
            last_block_id = mapping[last_block_id]
        except KeyError:
            break

    return last_block_id

get_last_voted_block_id将从votes表中找到所有自己投票过的记录,如果一条都没有,则直接返回创世区块的id。若是找到的话,创建一个hashmap,map的key为前一个区块,value为后一个区块,所以实际上记录了区块与区块之间的关系。之后取出hashmap中的第一个value,依次从hashmap中找该区块的子区块,直到无法找到为止。此时返回的将是最后一个本节点投票过的节点

获取反馈

当找到最后一个投票过的区块之后,反馈的基准线即已经建立,之后所以时间戳比该区块新的区块都应该进行投票

get_new_blocks_feed的最终实现也在bigchaindb/backend/mongodb/query.py中。首先从oplog中查找到最后一个投票过的区块的时间戳,然后调用run_changefeed函数

@register_query(MongoDBConnection)
def get_new_blocks_feed(conn, start_block_id):
    namespace = conn.dbname + '.bigchain'
    match = {'o.id': start_block_id, 'op': 'i', 'ns': namespace}
    # Neccesary to find in descending order since tests may write same block id several times
    query = conn.query().local.oplog.rs.find(match).sort('$natural', -1).next()['ts']
    last_ts = conn.run(query)
    feed = run_changefeed(conn, 'bigchain', last_ts)
    return (evt['o'] for evt in feed if evt['op'] == 'i')

run_changefeed的代码在源码分析(六)已经进行了介绍,该函数是一个无限循环,不断从oplog中查找比输入的last_ts更新时间戳的记录,对这些记录通过yield关键进行抛出,同时将新记录的时间戳赋值给last_ts。如此循环,使得其能够监听所有bigchain表中新增的区块记录,当有新增区块时,get_new_blocks_feed即会把新区块返回

再看get_changefeed函数,该函数的返回值即为indata。get_new_blocks_feed的返回值是一个生成器(用圆括号而不是中括号),

def get_changefeed():
    """Create and return ordered changefeed of blocks starting from last voted block"""
    b = Bigchain()
    last_block_id = b.get_last_voted_block().id
    feed = backend.query.get_new_blocks_feed(b.connection, last_block_id)
    return Node(feed.__next__, name='changefeed')

为了理解这段代码,我们来写个测试例子。可以看到Node类实际上只实例化一次,而每次只要yield抛出数据后,生成器的__next__即将指向新抛出的数据。因此,所有由get_new_blocks_feed抛出的新的区块将会被依次赋值给feed.__next__,而pipeline的Node类是一个无限循环,所有feed.__next__将会被调用,并将返回值放入到该Node的输出队列中,而feed.__next__()本身就为区块记录本身,故而,所有新区块全部放入到了输出队列中。indata的反馈成功建立

class Node():
    def __init__(self, target):
        self.target = target
        print("init... ", target)

def test1():
    for i in range(0, 3):
        yield i

def invoke():
    feed = (i for i in test1())
    return Node(feed.__next__)

indata = invoke()

while True:
    print(indata.target())

输出结果将为

init...  <method-wrapper '__next__' of generator object at 0x7f24338c4938>
0
1
2
Traceback (most recent call last):
  File "/root/workspace/test/yield.py", line 20, in <module>
    print(indata.target())
StopIteration

pipeline

我们依旧先来看vote进程的pipeline的节点定义,很明显其pipeline的执行过程为indata->validate_block->ungroup->validate_tx->vote->write_vote,所有函数都在bigchaindb/pipelines/vote.py

def create_pipeline():

    voter = Vote()

    return Pipeline([
        Node(voter.validate_block),
        Node(voter.ungroup),
        Node(voter.validate_tx, fraction_of_cores=1),
        Node(voter.vote),
        Node(voter.write_vote)
    ])

validate_block

在进入该函数时,首先判断vote表中是否已经有了本投票节点对该区块的投票,如果有则直接退出。否则调用from_db来重构区块,这一步主要是为了更新本区块中所有的asset项,如若重构区块失败,则直接返回一个无效的事务invalid_dummy_tx,使得pipeline依旧生效,同时确保pipeline的最后结果为验证失败。若重构区块成功,则调用_validate_block来验证区块,若验证失败同样返回无效的事务invalid_dummy_tx给pipeline。若重构成功且验证区块成功,则将区块里的所有的事务返回给pipeline的下一个Node,等待更进一步处理

def validate_block(self, block_dict):
    if not self.bigchain.has_previous_vote(block_dict['id']):
        try:
            block = Block.from_db(self.bigchain, block_dict, from_dict_kwargs={
                'tx_construct': FastTransaction
            })
        except (exceptions.InvalidHash):
            return block_dict['id'], [self.invalid_dummy_tx]
        try:
            block._validate_block(self.bigchain)
        except exceptions.ValidationError:
            return block.id, [self.invalid_dummy_tx]
        return block.id, block_dict['block']['transactions']

在判断vote中是否已经投票过的过程分为两步,首先是从vote表中找到本节点对该区块的所有投票记录,然后调用partition_eligible_votes对所查询到的投票记录验证本节点的签名。当存在通过签名验证的记录时,has_previous_vote将会返回真

# bighciandb/core.py
def has_previous_vote(self, block_id):
    votes = list(backend.query.get_votes_by_block_id_and_voter(self.connection, block_id, self.me))
    el, _ = self.consensus.voting.partition_eligible_votes(votes, [self.me])
    return bool(el)

# bigchaindb/backend/mongodb/query.py 
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
    return conn.run(
        conn.collection('votes')
        .find({'vote.voting_for_block': block_id,
               'node_pubkey': node_pubkey},
              projection={'_id': False}))

# bigchaindb/vote.py
def partition_eligible_votes(cls, votes, eligible_voters):
    eligible, ineligible = ([], [])

    for vote in votes:
        voter_eligible = vote.get('node_pubkey') in eligible_voters
        if voter_eligible:
            try:
                if cls.verify_vote_signature(vote):
                    eligible.append(vote)
                    continue
            except ValueError:
                pass
        ineligible.append(vote)
    return eligible, ineligible

对区块的重构实际上在于对区块中所有事务的asset的更新。首先从区块中提取到所有asset的id,然后从assets表中找到这些id所对应的asset记录,然后调用couple_assets对区块中的CREATE事务(或者创世区块)的asset项更新为assets表中的记录。而TRANSFER事务只要维护着转移的asset的id即可(根据id可以找到asset记录)

# bigchaindb/models.py
def from_db(cls, bigchain, block_dict, from_dict_kwargs=None):
    asset_ids = cls.get_asset_ids(block_dict)
    assets = bigchain.get_assets(asset_ids)
    block_dict = cls.couple_assets(block_dict, assets)
    kwargs = from_dict_kwargs or {}
    return cls.from_dict(block_dict, **kwargs)

最后一步则是验证区块(不包括验证区块里的事务),验证分为三步:1)验证创建区块的节点是否位于联盟之内;2)验证区块的签名是否正确;3)验证区块中的事务是否有重复

def _validate_block(self, bigchain):
    # Check if the block was created by a federation node
    if self.node_pubkey not in bigchain.federation:
        raise SybilError('Only federation nodes can create blocks')

    # Check that the signature is valid
    if not self.is_signature_valid():
        raise InvalidSignature('Invalid block signature')

    # Check that the block contains no duplicated transactions
    txids = [tx.id for tx in self.transactions]
    if len(txids) != len(set(txids)):
        raise DuplicateTransaction('Block has duplicate transaction')

ungroup

pipeline的上一Nodevalidate_block将会返回区块的id与区块中所有的事务给ungroupNode。ungroup操作及其简单,为直接使用生成器的方式将区块中的事务一个个地返回给pipeline的下一Node

def ungroup(self, block_id, transactions):
    num_tx = len(transactions)
    for tx in transactions:
        yield tx, block_id, num_tx

validate_tx

对于区块中每个事务,validate_tx将会首先判断该事务是否是一个“新的事务”,若是,则验证这个事务的签名。验证事务签名的函数tx.validate(self.bigchain)在之前已经介绍过,这里不再说明

def validate_tx(self, tx_dict, block_id, num_tx):
    try:
        tx = Transaction.from_dict(tx_dict)
        new = self.bigchain.is_new_transaction(tx.id, exclude_block_id=block_id)
        if not new:
            raise exceptions.ValidationError('Tx already exists, %s', tx.id)
        tx.validate(self.bigchain)
        valid = True
    except exceptions.ValidationError as e:
        valid = False
        logger.warning('Invalid tx: %s', e)

    return valid, block_id, num_tx

注意这个“新的事务”并不是指的首次创建的事务,而是,只有先前存在过,且投票不为invalid的事务才不是新的事务。is_new_transaction首先拿到所有包含有该事务的区块的投票状态,只有当存在投票不为invalid的(valid或者undecided)才返回False。简单来说,从未出现过的事务当然为新的事务,此外,还需要让事先投票为invalid的事务有机会来参与到新的投票环节中

def is_new_transaction(self, txid, exclude_block_id=None):
    block_statuses = self.get_blocks_status_containing_tx(txid)
    block_statuses.pop(exclude_block_id, None)
    for status in block_statuses.values():
        if status != self.BLOCK_INVALID:
            return False
    return True

vote

Nodevote维护了一个名为counter的计数器,当计数器的数目等于本区块中事务的个数,且每个事务的验证节点都为True时,才构造一个由该投票节点签名的投票记录,将记录返回给pipeline的下一个Node。同时在返回给下一个Node之前,本投票节点实际上已经进行了投票(除了还没有将记录写入vote表中),所以此时要更新本节点最后一个投票的区块的id(每次投票时,投票记录中记录了本节点上一次投票的区块的id)

def vote(self, tx_validity, block_id, num_tx):
    self.counters[block_id] += 1
    self.validity[block_id] = tx_validity and self.validity.get(block_id,
                                                                True)

    if self.counters[block_id] == num_tx:
        vote = self.bigchain.vote(block_id,
                                  self.last_voted_id,
                                  self.validity[block_id])
        self.last_voted_id = block_id
        del self.counters[block_id]
        del self.validity[block_id]
        return vote, num_tx

write_vote

Nodewrite_vote则是将投票写入到vote表中

def write_vote(self, vote):
    return backend.query.write_vote(self.connection, vote)

至此投票的pipeline结束

总结

pipeline进程的流程为:

  • indata: 找到本节点最后投票的区块,然后对这个最后的区块之后所有bigchain表中添加的区块都使用yield进行抛出到indata的输出队列中
  • validate_block:判断vote表中是否已经有了本投票节点对该区块的投票,如果有则直接退出。若还未投票,则从assets表中查询该区块中所有事务的资产,并将CREATE事务(以及创世区块)的asset项更新为assets表中的记录。确保所有CREATE事务(以及创世区块)的asset项为完整的资产记录,而TRANSFER区块的asset项为asset的id
  • validate_block:验证写区块节点身份,验证区块签名
  • validate_tx:对区块中的每个事务进行处理。若包含该事务的区块的投票结果为valid或者undecided,抛出“事务已经存在”的异常,否则验证事务的签名。这样给予了之前投票为invalid的事务再次被投票的机会
  • vote:当验证完本区块中的所有事务,且验证结果均为True时,构建投票记录,并对投票记录进行签名
  • write_vote:将投票记录写入votes表中
阅读更多

更多精彩内容