现在我们来根据源码分析(六)的思路来分析投票进程(votes),vote进程也是一个pipeline
# bigchaindb/pipelines/vote.py
def start():
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline
def get_changefeed():
"""Create and return ordered changefeed of blocks starting from last voted block"""
b = Bigchain()
last_block_id = b.get_last_voted_block().id
feed = backend.query.get_new_blocks_feed(b.connection, last_block_id)
return Node(feed.__next__, name='changefeed')
对于一个pipeline,我们还是首先来找到它的indata是什么,根据indata我们会知道什么时候会触发pipeline
按照get_changefeed
函数体字面上的意思与block进程的逻辑,我们先猜测该函数将首先找到最后的投票区块,然后再对于这个最后的区块之后所有添加的区块都使用yield进行抛出到indata的输入队列中
先来看如何获取最后一个投票的区块,对应的函数为get_last_voted_block
# bigchaindb/core.py
def get_last_voted_block(self):
"""Returns the last block that this node voted on."""
last_block_id = backend.query.get_last_voted_block_id(self.connection,
self.me)
return Block.from_dict(self.get_block(last_block_id))
backend.query
很明显又使用了singledispatch,所以最终实现函数在bigchaindb/backend/mongodb/query.py
中,注意第二个参数为本节点(投票节点)的公钥
# bigchaindb/backend/mongodb/query.py
@register_query(MongoDBConnection)
def get_last_voted_block_id(conn, node_pubkey):
last_voted = conn.run(
conn.collection('votes')
.find({'node_pubkey': node_pubkey},
sort=[('vote.timestamp', -1)]))
if last_voted.count() == 0:
return get_genesis_block(conn)['id']
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
last_block_id = list(mapping.values())[0]
explored = set()
while True:
try:
if last_block_id in explored:
raise CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
return last_block_id
get_last_voted_block_id
将从votes
表中找到所有自己投票过的记录,如果一条都没有,则直接返回创世区块的id。若是找到的话,创建一个hashmap,map的key为前一个区块,value为后一个区块,所以实际上记录了区块与区块之间的关系。之后取出hashmap中的第一个value,依次从hashmap中找该区块的子区块,直到无法找到为止。此时返回的将是最后一个本节点投票过的节点
当找到最后一个投票过的区块之后,反馈的基准线即已经建立,之后所以时间戳比该区块新的区块都应该进行投票
get_new_blocks_feed
的最终实现也在bigchaindb/backend/mongodb/query.py
中。首先从oplog中查找到最后一个投票过的区块的时间戳,然后调用run_changefeed
函数
@register_query(MongoDBConnection)
def get_new_blocks_feed(conn, start_block_id):
namespace = conn.dbname + '.bigchain'
match = {'o.id': start_block_id, 'op': 'i', 'ns': namespace}
# Neccesary to find in descending order since tests may write same block id several times
query = conn.query().local.oplog.rs.find(match).sort('$natural', -1).next()['ts']
last_ts = conn.run(query)
feed = run_changefeed(conn, 'bigchain', last_ts)
return (evt['o'] for evt in feed if evt['op'] == 'i')
run_changefeed
的代码在源码分析(六)已经进行了介绍,该函数是一个无限循环,不断从oplog中查找比输入的last_ts
更新时间戳的记录,对这些记录通过yield关键进行抛出,同时将新记录的时间戳赋值给last_ts
。如此循环,使得其能够监听所有bigchain
表中新增的区块记录,当有新增区块时,get_new_blocks_feed
即会把新区块返回
再看get_changefeed
函数,该函数的返回值即为indata。get_new_blocks_feed
的返回值是一个生成器(用圆括号而不是中括号),
def get_changefeed():
"""Create and return ordered changefeed of blocks starting from last voted block"""
b = Bigchain()
last_block_id = b.get_last_voted_block().id
feed = backend.query.get_new_blocks_feed(b.connection, last_block_id)
return Node(feed.__next__, name='changefeed')
为了理解这段代码,我们来写个测试例子。可以看到Node类实际上只实例化一次,而每次只要yield抛出数据后,生成器的__next__
即将指向新抛出的数据。因此,所有由get_new_blocks_feed
抛出的新的区块将会被依次赋值给feed.__next__
,而pipeline的Node
类是一个无限循环,所有feed.__next__
将会被调用,并将返回值放入到该Node的输出队列中,而feed.__next__()
本身就为区块记录本身,故而,所有新区块全部放入到了输出队列中。indata的反馈成功建立
class Node():
def __init__(self, target):
self.target = target
print("init... ", target)
def test1():
for i in range(0, 3):
yield i
def invoke():
feed = (i for i in test1())
return Node(feed.__next__)
indata = invoke()
while True:
print(indata.target())
输出结果将为
init... <method-wrapper '__next__' of generator object at 0x7f24338c4938>
0
1
2
Traceback (most recent call last):
File "/root/workspace/test/yield.py", line 20, in <module>
print(indata.target())
StopIteration
我们依旧先来看vote进程的pipeline的节点定义,很明显其pipeline的执行过程为indata->validate_block->ungroup->validate_tx->vote->write_vote
,所有函数都在bigchaindb/pipelines/vote.py
def create_pipeline():
voter = Vote()
return Pipeline([
Node(voter.validate_block),
Node(voter.ungroup),
Node(voter.validate_tx, fraction_of_cores=1),
Node(voter.vote),
Node(voter.write_vote)
])
validate_block
在进入该函数时,首先判断vote
表中是否已经有了本投票节点对该区块的投票,如果有则直接退出。否则调用from_db
来重构区块,这一步主要是为了更新本区块中所有的asset项,如若重构区块失败,则直接返回一个无效的事务invalid_dummy_tx
,使得pipeline依旧生效,同时确保pipeline的最后结果为验证失败。若重构区块成功,则调用_validate_block
来验证区块,若验证失败同样返回无效的事务invalid_dummy_tx
给pipeline。若重构成功且验证区块成功,则将区块里的所有的事务返回给pipeline的下一个Node,等待更进一步处理
def validate_block(self, block_dict):
if not self.bigchain.has_previous_vote(block_dict['id']):
try:
block = Block.from_db(self.bigchain, block_dict, from_dict_kwargs={
'tx_construct': FastTransaction
})
except (exceptions.InvalidHash):
return block_dict['id'], [self.invalid_dummy_tx]
try:
block._validate_block(self.bigchain)
except exceptions.ValidationError:
return block.id, [self.invalid_dummy_tx]
return block.id, block_dict['block']['transactions']
在判断vote
中是否已经投票过的过程分为两步,首先是从vote表中找到本节点对该区块的所有投票记录,然后调用partition_eligible_votes
对所查询到的投票记录验证本节点的签名。当存在通过签名验证的记录时,has_previous_vote
将会返回真
# bighciandb/core.py
def has_previous_vote(self, block_id):
votes = list(backend.query.get_votes_by_block_id_and_voter(self.connection, block_id, self.me))
el, _ = self.consensus.voting.partition_eligible_votes(votes, [self.me])
return bool(el)
# bigchaindb/backend/mongodb/query.py
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
return conn.run(
conn.collection('votes')
.find({'vote.voting_for_block': block_id,
'node_pubkey': node_pubkey},
projection={'_id': False}))
# bigchaindb/vote.py
def partition_eligible_votes(cls, votes, eligible_voters):
eligible, ineligible = ([], [])
for vote in votes:
voter_eligible = vote.get('node_pubkey') in eligible_voters
if voter_eligible:
try:
if cls.verify_vote_signature(vote):
eligible.append(vote)
continue
except ValueError:
pass
ineligible.append(vote)
return eligible, ineligible
对区块的重构实际上在于对区块中所有事务的asset
的更新。首先从区块中提取到所有asset的id,然后从assets
表中找到这些id所对应的asset记录,然后调用couple_assets
对区块中的CREATE事务(或者创世区块)的asset
项更新为assets
表中的记录。而TRANSFER事务只要维护着转移的asset的id即可(根据id可以找到asset记录)
# bigchaindb/models.py
def from_db(cls, bigchain, block_dict, from_dict_kwargs=None):
asset_ids = cls.get_asset_ids(block_dict)
assets = bigchain.get_assets(asset_ids)
block_dict = cls.couple_assets(block_dict, assets)
kwargs = from_dict_kwargs or {}
return cls.from_dict(block_dict, **kwargs)
最后一步则是验证区块(不包括验证区块里的事务),验证分为三步:1)验证创建区块的节点是否位于联盟之内;2)验证区块的签名是否正确;3)验证区块中的事务是否有重复
def _validate_block(self, bigchain):
# Check if the block was created by a federation node
if self.node_pubkey not in bigchain.federation:
raise SybilError('Only federation nodes can create blocks')
# Check that the signature is valid
if not self.is_signature_valid():
raise InvalidSignature('Invalid block signature')
# Check that the block contains no duplicated transactions
txids = [tx.id for tx in self.transactions]
if len(txids) != len(set(txids)):
raise DuplicateTransaction('Block has duplicate transaction')
ungroup
pipeline的上一Nodevalidate_block
将会返回区块的id与区块中所有的事务给ungroup
Node。ungroup
操作及其简单,为直接使用生成器的方式将区块中的事务一个个地返回给pipeline的下一Node
def ungroup(self, block_id, transactions):
num_tx = len(transactions)
for tx in transactions:
yield tx, block_id, num_tx
validate_tx
对于区块中每个事务,validate_tx
将会首先判断该事务是否是一个“新的事务”,若是,则验证这个事务的签名。验证事务签名的函数tx.validate(self.bigchain)
在之前已经介绍过,这里不再说明
def validate_tx(self, tx_dict, block_id, num_tx):
try:
tx = Transaction.from_dict(tx_dict)
new = self.bigchain.is_new_transaction(tx.id, exclude_block_id=block_id)
if not new:
raise exceptions.ValidationError('Tx already exists, %s', tx.id)
tx.validate(self.bigchain)
valid = True
except exceptions.ValidationError as e:
valid = False
logger.warning('Invalid tx: %s', e)
return valid, block_id, num_tx
注意这个“新的事务”并不是指的首次创建的事务,而是,只有先前存在过,且投票不为invalid的事务才不是新的事务。is_new_transaction
首先拿到所有包含有该事务的区块的投票状态,只有当存在投票不为invalid
的(valid
或者undecided
)才返回False。简单来说,从未出现过的事务当然为新的事务,此外,还需要让事先投票为invalid的事务有机会来参与到新的投票环节中
def is_new_transaction(self, txid, exclude_block_id=None):
block_statuses = self.get_blocks_status_containing_tx(txid)
block_statuses.pop(exclude_block_id, None)
for status in block_statuses.values():
if status != self.BLOCK_INVALID:
return False
return True
vote
Nodevote
维护了一个名为counter
的计数器,当计数器的数目等于本区块中事务的个数,且每个事务的验证节点都为True时,才构造一个由该投票节点签名的投票记录,将记录返回给pipeline的下一个Node。同时在返回给下一个Node之前,本投票节点实际上已经进行了投票(除了还没有将记录写入vote表中),所以此时要更新本节点最后一个投票的区块的id(每次投票时,投票记录中记录了本节点上一次投票的区块的id)
def vote(self, tx_validity, block_id, num_tx):
self.counters[block_id] += 1
self.validity[block_id] = tx_validity and self.validity.get(block_id,
True)
if self.counters[block_id] == num_tx:
vote = self.bigchain.vote(block_id,
self.last_voted_id,
self.validity[block_id])
self.last_voted_id = block_id
del self.counters[block_id]
del self.validity[block_id]
return vote, num_tx
write_vote
Nodewrite_vote
则是将投票写入到vote表中
def write_vote(self, vote):
return backend.query.write_vote(self.connection, vote)
至此投票的pipeline结束
pipeline进程的流程为:
indata
: 找到本节点最后投票的区块,然后对这个最后的区块之后所有bigchain
表中添加的区块都使用yield进行抛出到indata的输出队列中validate_block
:判断vote
表中是否已经有了本投票节点对该区块的投票,如果有则直接退出。若还未投票,则从assets
表中查询该区块中所有事务的资产,并将CREATE事务(以及创世区块)的asset项更新为assets
表中的记录。确保所有CREATE事务(以及创世区块)的asset项为完整的资产记录,而TRANSFER区块的asset项为asset的idvalidate_block
:验证写区块节点身份,验证区块签名validate_tx
:对区块中的每个事务进行处理。若包含该事务的区块的投票结果为valid或者undecided,抛出“事务已经存在”的异常,否则验证事务的签名。这样给予了之前投票为invalid的事务再次被投票的机会vote
:当验证完本区块中的所有事务,且验证结果均为True时,构建投票记录,并对投票记录进行签名write_vote
:将投票记录写入votes
表中