fabric的点对点(peer-to-peer)通信是建立在允许双向的基于流的消息gRPC上的。它使用Protocol Buffers来序列化peer之间传输的数据结构。Protocol buffers 是语言无关,平台无关并具有可扩展机制来序列化结构化的数据的技术。数据结构,消息和服务是使用 proto3 language注释来描述的。
消息在节点之间通过Messageproto 结构封装来传递的,可以分为 4 种类型:发现(Discovery), 交易(Transaction), 同步(Synchronization)和共识(Consensus)。每种类型在payload中定义了多种子类型。
message Message {
enum Type {
UNDEFINED = 0;
DISC_HELLO = 1;
DISC_DISCONNECT = 2;
DISC_GET_PEERS = 3;
DISC_PEERS = 4;
DISC_NEWMSG = 5;
CHAIN_STATUS = 6;
CHAIN_TRANSACTION = 7;
CHAIN_GET_TRANSACTIONS = 8;
CHAIN_QUERY = 9;
SYNC_GET_BLOCKS = 11;
SYNC_BLOCKS = 12;
SYNC_BLOCK_ADDED = 13;
SYNC_STATE_GET_SNAPSHOT = 14;
SYNC_STATE_SNAPSHOT = 15;
SYNC_STATE_GET_DELTAS = 16;
SYNC_STATE_DELTAS = 17;
RESPONSE = 20;
CONSENSUS = 21;
}
Type type = 1;
bytes payload = 2;
google.protobuf.Timestamp timestamp = 3;
}
payload是由不同的消息类型所包含的不同的像Transaction或Response这样的对象的不透明的字节数组。例如:type为CHAIN_TRANSACTION那么payload就是一个Transaction对象。
在启动时,如果CORE_PEER_DISCOVERY_ROOTNODE被指定,那么 peer 就会运行发现协议。CORE_PEER_DISCOVERY_ROOTNODE是网络(任意peer)中扮演用来发现所有 peer 的起点角色的另一个 peer 的 IP 地址。协议序列以payload是一个包含:
message HelloMessage {
PeerEndpoint peerEndpoint = 1;
uint64 blockNumber = 2;
}
message PeerEndpoint {
PeerID ID = 1;
string address = 2;
enum Type {
UNDEFINED = 0;
VALIDATOR = 1;
NON_VALIDATOR = 2;
}
Type type = 3;
bytes pkiID = 4;
}
message PeerID {
string name = 1;
}
这样的端点的HelloMessage对象的DISC_HELLO消息开始的。
域的定义:
PeerID 是在启动时或配置文件中定义的 peer 的任意名字PeerEndpoint 描述了端点和它是验证还是非验证 peerpkiID 是 peer 的加密IDaddress 以ip:port这样的格式表示的 peer 的主机名或IP和端口blockNumber 是 peer 的区块链的当前的高度 如果收到的DISC_HELLO 消息的块的高度比当前 peer 的块的高度高,那么它马上初始化同步协议来追上当前的网络。
DISC_HELLO之后,peer 会周期性的发送DISC_GET_PEERS来发现任意想要加入网络的 peer。收到DISC_GET_PEERS后,peer 会发送payload 包含PeerEndpoint的数组的DISC_PEERS作为响应。这是不会使用其它的发现消息类型。
有三种不同的交易类型:部署(Deploy),调用(Invoke)和查询(Query)。部署交易向链上安装指定的链码,调用和查询交易会调用部署号的链码。另一种需要考虑的类型是创建(Create)交易,其中部署好的链码是可以在链上实例化并寻址的。这种类型在写这份文档时还没有被实现。
CHAIN_TRANSACTION和CHAIN_QUERY类型的消息会在payload带有Transaction对象:
message Transaction {
enum Type {
UNDEFINED = 0;
CHAINCODE_DEPLOY = 1;
CHAINCODE_INVOKE = 2;
CHAINCODE_QUERY = 3;
CHAINCODE_TERMINATE = 4;
}
Type type = 1;
string uuid = 5;
bytes chaincodeID = 2;
bytes payloadHash = 3;
ConfidentialityLevel confidentialityLevel = 7;
bytes nonce = 8;
bytes cert = 9;
bytes signature = 10;
bytes metadata = 4;
google.protobuf.Timestamp timestamp = 6;
}
message TransactionPayload {
bytes payload = 1;
}
enum ConfidentialityLevel {
PUBLIC = 0;
CONFIDENTIAL = 1;
}
域的定义:
type - 交易的类型, 为1时表示:
UNDEFINED - 为未来的使用所保留.CHAINCODE_DEPLOY - 代表部署新的链码.
CHAINCODE_INVOKE - 代表一个链码函数被执行并修改了世界状态CHAINCODE_QUERY - 代表一个链码函数被执行并可能只读取了世界状态CHAINCODE_TERMINATE - 标记的链码不可用,所以链码中的函数将不能被调用chaincodeID - 链码源码,路径,构造函数和参数哈希所得到的IDpayloadHash - TransactionPayload.payload所定义的哈希字节.metadata - 应用可能使用的,由自己定义的任意交易相关的元数据uuid - 交易的唯一IDtimestamp - peer 收到交易时的时间戳confidentialityLevel - 数据保密的级别。当前有两个级别。未来可能会有多个级别。nonce - 为安全而使用cert - 交易者的证书signature - 交易者的签名TransactionPayload.payload - 交易的payload所定义的字节。由于payload可以很大,所以交易消息只包含payload的哈希交易安全的详细信息可以在第四节找到
一个交易通常会关联链码定义及其执行环境(像语言和安全上下文)的链码规范。现在,有一个使用Go语言来编写链码的实现。将来可能会添加新的语言。
message ChaincodeSpec {
enum Type {
UNDEFINED = 0;
GOLANG = 1;
NODE = 2;
}
Type type = 1;
ChaincodeID chaincodeID = 2;
ChaincodeInput ctorMsg = 3;
int32 timeout = 4;
string secureContext = 5;
ConfidentialityLevel confidentialityLevel = 6;
bytes metadata = 7;
}
message ChaincodeID {
string path = 1;
string name = 2;
}
message ChaincodeInput {
string function = 1;
repeated string args = 2;
}
域的定义:
chaincodeID - 链码源码的路径和名字ctorMsg - 调用的函数名及参数timeout - 执行交易所需的时间(以毫秒表示)confidentialityLevel - 这个交易的保密级别secureContext - 交易者的安全上下文metadata - 应用想要传递下去的任何数据 当 peer 收到chaincodeSpec后以合适的交易消息包装它并广播到网络
部署交易的类型是CHAINCODE_DEPLOY,且它的payload包含ChaincodeDeploymentSpec对象。
message ChaincodeDeploymentSpec {
ChaincodeSpec chaincodeSpec = 1;
google.protobuf.Timestamp effectiveDate = 2;
bytes codePackage = 3;
}
域的定义:
chaincodeSpec - 参看上面的3.1.2.2节.effectiveDate - 链码准备好可被调用的时间codePackage - 链码源码的gzip 当验证 peer 部署链码时,它通常会校验codePackage的哈希来保证交易被部署到网络后没有被篡改。
调用交易的类型是CHAINCODE_DEPLOY,且它的payload包含ChaincodeInvocationSpec对象。
message ChaincodeInvocationSpec {
ChaincodeSpec chaincodeSpec = 1;
}
查询交易除了消息类型是CHAINCODE_QUERY其它和调用交易一样
同步协议以3.1.1节描述的,当 peer 知道它自己的区块落后于其它 peer 或和它们不一样后所发起的。peer 广播SYNC_GET_BLOCKS,SYNC_STATE_GET_SNAPSHOT或SYNC_STATE_GET_DELTAS并分别接收SYNC_BLOCKS, SYNC_STATE_SNAPSHOT或SYNC_STATE_DELTAS。
安装的共识插件(如:pbft)决定同步协议是如何被应用的。每个小时是针对具体的状态来设计的:
SYNC_GET_BLOCKS 是一个SyncBlockRange对象,包含一个连续区块的范围的payload的请求。
message SyncBlockRange {
uint64 start = 1;
uint64 end = 2;
uint64 end = 3;
}
接收peer使用包含 SyncBlocks对象的payload的SYNC_BLOCKS信息来响应
message SyncBlocks {
SyncBlockRange range = 1;
repeated Block blocks = 2;
}
start和end标识包含的区块的开始和结束,返回区块的顺序由start和end的值定义。如:当start=3,end=5时区块的顺序将会是3,4,5。当start=5,end=3时区块的顺序将会是5,4,3。
SYNC_STATE_GET_SNAPSHOT 请求当前世界状态的快照。 payload是一个SyncStateSnapshotRequest对象
message SyncStateSnapshotRequest {
uint64 correlationId = 1;
}
correlationId是请求 peer 用来追踪响应消息的。接受 peer 回复payload为SyncStateSnapshot实例的SYNC_STATE_SNAPSHOT信息
message SyncStateSnapshot {
bytes delta = 1;
uint64 sequence = 2;
uint64 blockNumber = 3;
SyncStateSnapshotRequest request = 4;
}
这条消息包含快照或以0开始的快照流序列中的一块。终止消息是len(delta) == 0的块
SYNC_STATE_GET_DELTAS 请求连续区块的状态变化。默认情况下总账维护500笔交易变化。 delta(j)是block(i)和block(j)之间的状态转变,其中i=j-1。 payload包含SyncStateDeltasRequest实例
message SyncStateDeltasRequest {
SyncBlockRange range = 1;
}
接收 peer 使用包含 SyncStateDeltas实例的payload的SYNC_STATE_DELTAS信息来响应
message SyncStateDeltas {
SyncBlockRange range = 1;
repeated bytes deltas = 2;
}
delta可能以顺序(从i到j)或倒序(从j到i)来表示状态转变
共识处理交易,一个CONSENSUS消息是由共识框架接收到CHAIN_TRANSACTION消息时在内部初始化的。框架把CHAIN_TRANSACTION转换为 CONSENSUS然后以相同的payload广播到验证 peer。共识插件接收这条消息并根据内部算法来处理。插件可能创建自定义的子类型来管理共识有穷状态机。3.4节会介绍详细信息。
总账由两个主要的部分组成,一个是区块链,一个是世界状态。区块链是在总账中的一系列连接好的用来记录交易的区块。世界状态是一个用来存储交易执行状态的键-值(key-value)数据库
区块链是由一个区块链表定义的,每个区块包含它在链中前一个区块的哈希。区块包含的另外两个重要信息是它包含区块执行所有交易后的交易列表和世界状态的哈希
message Block {
version = 1;
google.protobuf.Timestamp timestamp = 2;
bytes transactionsHash = 3;
bytes stateHash = 4;
bytes previousBlockHash = 5;
bytes consensusMetadata = 6;
NonHashData nonHashData = 7;
}
message BlockTransactions {
repeated Transaction transactions = 1;
}
域的定义:
version - 用来追踪协议变化的版本号timestamp - 由区块提议者填充的时间戳transactionsHash - 区块中交易的merkle root hashstateHash - 世界状态的merkle root hashpreviousBlockHash - 前一个区块的hashconsensusMetadata - 共识可能会引入的一些可选的元数据nonHashData - NonHashData消息会在计算区块的哈希前设置为nil,但是在数据库中存储为区块的一部分BlockTransactions.transactions - 交易消息的数组,由于交易的大小,它们不会被直接包含在区块中previousBlockHash哈希是通过下面算法计算的
使用protocol buffer库把区块消息序列化为字节码
使用FIPS 202描述的SHA3 SHAKE256算法来对序列化后的区块消息计算大小为512位的哈希值
transactionHash是交易merkle树的根。定义merkle tree实现是一个代办
stateHash在3.2.2.1节中定义.
NonHashData消息是用来存储不需要所有 peer 都具有相同值的块元数据。他们是建议值。
message NonHashData {
google.protobuf.Timestamp localLedgerCommitTimestamp = 1;
repeated TransactionResult transactionResults = 2;
}
message TransactionResult {
string uuid = 1;
bytes result = 2;
uint32 errorCode = 3;
string error = 4;
}
localLedgerCommitTimestamp - 标识区块提交到本地总账的时间戳
TransactionResult - 交易结果的数组
TransactionResult.uuid - 交易的ID
TransactionResult.result - 交易的返回值
TransactionResult.errorCode - 可以用来记录关联交易的错误信息的代码
TransactionResult.error - 用来记录关联交易的错误信息的字符串
一个交易定义了它们部署或执行的链码。区块中的所有交易都可以在记录到总账中的区块之前运行。当链码执行时,他们可能会改变世界状态。之后世界状态的哈希会被记录在区块中。
peer 的世界状态涉及到所有被部署的链码的状态集合。进一步说,链码的状态由键值对集合来表示。所以,逻辑上说,peer 的世界状态也是键值对的集合,其中键有元组{chaincodeID, ckey}组成。这里我们使用术语key来标识世界状态的键,如:元组{chaincodeID, ckey} ,而且我们使用cKey来标识链码中的唯一键。
为了下面描述的目的,假定chaincodeID是有效的utf8字符串,且ckey和value是一个或多个任意的字节的序列
当网络活动时,很多像交易提交和同步 peer 这样的场合可能需要计算 peer 观察到的世界状态的加密-哈希。例如,共识协议可能需要保证网络中最小数量的 peer 观察到同样的世界状态。
应为计算世界状态的加密-哈希是一个非常昂贵的操作,组织世界状态来使得当它改变时能高效效的计算加密-哈希是非常可取的。将来,可以根据不同的负载条件来设计不同的组织形式。
由于fabric是被期望在不同的负载条件下都能正常工作,所以需要一个可拔插的机制来支持世界状态的组织。
Bucket-tree 是世界状态的组织方式的实现。为了下面描述的目的,世界状态的键被表示成两个组件(chaincodeID and ckey) 的通过nil字节的级联,如:key = chaincodeID+nil+cKey。
这个方法的模型是一个merkle-tree在hash table桶的顶部来计算世界状态的加密-哈希
这个方法的核心是世界状态的key-values被假定存储在由预先决定的桶的数量(numBuckets)所组成的哈希表中。一个哈希函数(hashFunction) 被用来确定包含给定键的桶数量。注意hashFunction不代表SHA3这样的加密-哈希方法,而是决定给定的键的桶的数量的正规的编程语言散列函数。
为了对 merkle-tree建模,有序桶扮演了树上的叶子节点-编号最低的桶是树中的最左边的叶子节点。为了构造树的最后第二层,叶子节点的预定义数量 (maxGroupingAtEachLevel),从左边开始把每个这样的分组组合在一起,一个节点被当作组中所有叶子节点的共同父节点来插入到最后第二层中。注意最后的父节点的数量可能会少于maxGroupingAtEachLevel这个构造方式继续使用在更高的层级上直到树的根节点被构造。
下面这个表展示的在{numBuckets=10009 and maxGroupingAtEachLevel=10}的配置下会得到的树在不同层级上的节点数。
| Level | Number of nodes |
|---|---|
| 0 | 1 |
| 1 | 2 |
| 2 | 11 |
| 3 | 101 |
| 4 | 1001 |
| 5 | 10009 |
为了计算世界状态的加密-哈希,需要计算每个桶的加密-哈希,并假设它们是merkle-tree的叶子节点的加密-哈希。为了计算桶的加密-哈希,存储在桶中的键值对首先被序列化为字节码并在其上应用加密-哈希函数。为了序列化桶的键值对,所有具有公共chaincodeID前缀的键值对分别序列化并以chaincodeID的升序的方式追加在一起。为了序列化一个chaincodeID的键值对,会涉及到下面的信息:
对于上面列表的所有数值类型项(如:chaincodeID的长度),使用protobuf的变体编码方式。上面这种编码方式的目的是为了桶中的键值对的字节表示方式不会被任意其他键值对的组合所产生,并减少了序列化字节码的总体大小。
例如:考虑具有chaincodeID1_key1:value1, chaincodeID1_key2:value2, 和 chaincodeID2_key1:value1这样名字的键值对的桶。序列化后的桶看上去会像:12 + chaincodeID1 + 2 + 4 + key1 + 6 + value1 + 4 + key2 + 6 + value2 + 12 + chaincodeID2 + 1 + 4 + key1 + 6 + value1
如果桶中没有键值对,那么加密-哈希为nil。
中间节点和根节点的加密-哈希与标准merkle-tree的计算方法一样,即:应用加密-哈希函数到所有子节点的加密-哈希从左到右级联后得到的字节码。进一步说,如果一个子节点的加密-哈希为nil,那么这个子节点的加密-哈希在级联子节点的加密-哈希是就被省略。如果它只有一个子节点,那么它的加密-哈希就是子节点的加密-哈希。最后,根节点的加密-哈希就是世界状态的加密-哈希。
上面这种方法在状态中少数键值对改变时计算加密-哈希是有性能优势的。主要的优势包括:
numBuckets和maxGroupingAtEachLevel参数来控制。树的不同深度和宽度对性能和不同的资源都会产生不同的影响。 在一个具体的部署中,所有的 peer 都期望使用相同的numBuckets, maxGroupingAtEachLevel, 和 hashFunction的配置。进一步说,如果任何一个配置在之后的阶段被改变,那么这些改变需要应用到所有的 peer 中,来保证 peer 节点之间的加密-哈希的比较是有意义的。即使,这可能会导致基于实现的已有数据的迁移。例如:一种实现希望存储树中所有节点最后计算的加密-哈希,那么它就需要被重新计算。
链码是在交易(参看3.1.2节)被部署是分发到网络上,并被所有验证 peer 通过隔离的沙箱来管理的应用级代码。尽管任意的虚拟技术都可以支持沙箱,现在是通过Docker容器来运行链码的。这节中描述的协议可以启用不同虚拟实现的插入与运行。
一个实现VM接口的虚拟机
type VM interface {
build(ctxt context.Context, id string, args []string, env []string, attachstdin bool, attachstdout bool, reader io.Reader) error
start(ctxt context.Context, id string, args []string, env []string, attachstdin bool, attachstdout bool) error
stop(ctxt context.Context, id string, timeout uint, dontkill bool, dontremove bool) error
}
fabric在处理链码上的部署交易或其他交易时,如果这个链码的VM未启动(崩溃或之前的不活动导致的关闭)时实例化VM。每个链码镜像通过build函数构建,通过start函数启动,并使用stop函数停止。
一旦链码容器被启动,它使用gRPC来连接到启动这个链码的验证 peer,并为链码上的调用和查询交易建立通道。
验证 peer 和它的链码之间是通过gRPC流来通信的。链码容器上有shim层来处理链码与验证 peer 之间的protobuf消息协议。
message ChaincodeMessage {
enum Type {
UNDEFINED = 0;
REGISTER = 1;
REGISTERED = 2;
INIT = 3;
READY = 4;
TRANSACTION = 5;
COMPLETED = 6;
ERROR = 7;
GET_STATE = 8;
PUT_STATE = 9;
DEL_STATE = 10;
INVOKE_CHAINCODE = 11;
INVOKE_QUERY = 12;
RESPONSE = 13;
QUERY = 14;
QUERY_COMPLETED = 15;
QUERY_ERROR = 16;
RANGE_QUERY_STATE = 17;
}
Type type = 1;
google.protobuf.Timestamp timestamp = 2;
bytes payload = 3;
string uuid = 4;
}
域的定义:
Type 是消息的类型payload 是消息的payload. 每个payload取决于Type.uuid 消息唯一的ID消息的类型在下面的小节中描述
链码实现被验证 peer 在处理部署,调用或查询交易时调用的Chaincode接口
type Chaincode interface {
Invoke(stub *ChaincodeStub, function string, args []string) (error)
Query(stub *ChaincodeStub, function string, args []string) ([]byte, error)
}
Init, Invoke 和 Query函数使用function and args参数来支持多种交易。Init是构造函数,它只在部署交易是被执行。Query函数是不允许修改链码的状态的;它只能读取和计算并以byte数组的形式返回。
当部署时(链码容器已经启动),shim层发送一次性的具有包含ChaincodeID的payload的REGISTER消息给验证 peer。然后 peer 以REGISTERED或ERROR来响应成功或失败。当收到ERROR后shim关闭连接并退出。
注册之后,验证 peer 发送具有包含ChaincodeInput对象的INIT消息。shim使用从ChaincodeInput获得的参数来调用Init函数,通过像设置持久化状态这样操作来初始化链码。
shim根据Init函数的返回值,响应RESPONSE或ERROR消息。如果没有错误,那么链码初始化完成,并准备好接收调用和查询交易。
当处理调用交易时,验证 peer 发送TRANSACTION消息给链码容器的shim,由它来调用链码的Invoke函数,并传递从ChaincodeInput得到的参数。shim响应RESPONSE或ERROR消息来表示函数完成。如果接收到ERROR函数,payload包含链码所产生的错误信息。
与调用交易一样,验证 peer 发送QUERY消息给链码容器的shim,由它来调用链码的Query函数,并传递从ChaincodeInput得到的参数。Query函数可能会返回状态值或错误,它会把它通过RESPONSE或ERROR消息来传递给验证 peer。
每个链码可能都定义了它自己的持久化状态变量。例如,一个链码可能创建电视,汽车或股票这样的资产来保存资产属性。当Invoke函数处理时,链码可能会更新状态变量,例如改变资产所有者。链码会根据下面这些消息类型类操作状态变量:
链码发送一个payload包含PutStateInfo对象的PU_STATE消息来保存键值对。
message PutStateInfo {
string key = 1;
bytes value = 2;
}
链码发送一个由payload指定要获取值的键的GET_STATE消息。
链码发送一个由payload指定要删除值的键的DEL_STATE消息。
链码发送一个payload包含RANGE_QUERY_STATE对象的RANGE_QUERY_STATE来获取一个范围内的值。
message RangeQueryState {
string startKey = 1;
string endKey = 2;
}
startKey和endKey假设是通过字典排序的. 验证 peer 响应一个payload是RangeQueryStateResponse对象的RESPONSE消息
message RangeQueryStateResponse {
repeated RangeQueryStateKeyValue keysAndValues = 1;
bool hasMore = 2;
string ID = 3;
}
message RangeQueryStateKeyValue {
string key = 1;
bytes value = 2;
}
如果相应中hasMore=true,这表示有在请求的返回中还有另外的键。链码可以通过发送包含与响应中ID相同的ID的RangeQueryStateNext消息来获取下一集合。
message RangeQueryStateNext {
string ID = 1;
}
当链码结束读取范围,它会发送带有ID的RangeQueryStateClose消息来期望它关闭。
message RangeQueryStateClose {
string ID = 1;
}
链码可以通过发送payload包含 ChaincodeSpec对象的INVOKE_CHAINCODE消息给验证 peer 来在相同的交易上下文中调用另一个链码
链码可以通过发送payload包含 ChaincodeSpec对象的QUERY_CHAINCODE消息给验证 peer 来在相同的交易上下文中查询另一个链码
共识框架定义了每个共识插件都需要实现的接口:
consensus.Consenter: 允许共识插件从网络上接收消息的接口consensus.CPI: 共识编程接口Consensus Programming Interface (CPI) 是共识插件用来与栈交互的,这个接口可以分为两部分:
consensus.Communicator: 用来发送(广播或单播)消息到其他的验证 peerconsensus.LedgerStack: 这个接口使得执行框架像总账一样方便 就像下面描述的细节一样,consensus.LedgerStack封装了其他接口,consensus.Executor接口是共识框架的核心部分。换句话说,consensus.Executor接口允许一个(批量)交易启动,执行,根据需要回滚,预览和提交。每一个共识插件都需要满足以所有验证 peer 上全序的方式把批量(块)交易(通过consensus.Executor.CommitTxBatch)被提交到总账中(参看下面的consensus.Executor接口获得详细细节)。
当前,共识框架由consensus, controller和helper这三个包组成。使用controller和helper包的主要原因是防止Go语言的“循环引入”和当插件更新时的最小化代码变化。
controller 包规范了验证 peer 所使用的共识插件helper 是围绕公式插件的垫片,它是用来与剩下的栈交互的,如为其他 peer 维护消息。 这里有2个共识插件提供:pbft和noops:
obcpbft包包含实现 PBFT [1] 和 Sieve 共识协议的共识插件。参看第5节的详细介绍noops 是一个为开发和测试提供的''假的''共识插件. 它处理所有共识消息但不提供共识功能,它也是一个好的学习如何开发一个共识插件的简单例子。Consenter 接口定义:
type Consenter interface {
RecvMsg(msg *pb.Message) error
}
Consenter接口是插件对(外部的)客户端请求的入口,当处理共识时,共识消息在内部(如从共识模块)产生。NewConsenter创建Consenter插件。RecvMsg`以到达共识的顺序来处理进来的交易。
阅读下面的helper.HandleMessage来理解 peer 是如何和这个接口来交互的。
CPI接口定义:
type CPI interface {
Inquirer
Communicator
SecurityUtils
LedgerStack
}
CPI 允许插件和栈交互。它是由helper.Helper对象实现的。回想一下这个对象是:
helper.NewConsensusHandler被调用时初始化的consensus.Consenter对象,那么它对插件的作者是可访问的Inquirer接口定义:
type Inquirer interface {
GetNetworkInfo() (self *pb.PeerEndpoint, network []*pb.PeerEndpoint, err error)
GetNetworkHandles() (self *pb.PeerID, network []*pb.PeerID, err error)
}
这个接口是consensus.CPI接口的一部分。它是用来获取网络中验证 peer 的(GetNetworkHandles)处理,以及那些验证 peer 的明细(GetNetworkInfo):
注意pees由pb.PeerID对象确定。这是一个protobuf消息,当前定义为(注意这个定义很可能会被修改):
message PeerID {
string name = 1;
}
Communicator接口定义:
type Communicator interface {
Broadcast(msg *pb.Message) error
Unicast(msg *pb.Message, receiverHandle *pb.PeerID) error
}
这个接口是consensus.CPI接口的一部分。它是用来与网络上其它 peer 通信的(helper.Broadcast, helper.Unicast):
SecurityUtils接口定义:
type SecurityUtils interface {
Sign(msg []byte) ([]byte, error)
Verify(peerID *pb.PeerID, signature []byte, message []byte) error
}
这个接口是consensus.CPI接口的一部分。它用来处理消息签名(Sign)的加密操作和验证签名(Verify)
LedgerStack 接口定义:
type LedgerStack interface {
Executor
Ledger
RemoteLedgers
}
CPI接口的主要成员,LedgerStack 组与fabric的其它部分与共识相互作用,如执行交易,查询和更新总账。这个接口支持对本地区块链和状体的查询,更新本地区块链和状态,查询共识网络上其它节点的区块链和状态。它是有Executor,Ledger和RemoteLedgers这三个接口组成的。下面会描述它们。
Executor 接口定义:
type Executor interface {
BeginTxBatch(id interface{}) error
ExecTXs(id interface{}, txs []*pb.Transaction) ([]byte, []error)
CommitTxBatch(id interface{}, transactions []*pb.Transaction, transactionsResults []*pb.TransactionResult, metadata []byte) error
RollbackTxBatch(id interface{}) error
PreviewCommitTxBatchBlock(id interface{}, transactions []*pb.Transaction, metadata []byte) (*pb.Block, error)
}
executor接口是LedgerStack接口最常使用的部分,且是共识网络工作的必要部分。接口允许交易启动,执行,根据需要回滚,预览和提交。这个接口由下面这些方法组成。
BeginTxBatch(id interface{}) error
这个调用接受任意的,故意含糊的id,来使得共识插件可以保证与这个具体的批量相关的交易才会被执行。例如:在pbft实现中,这个id是被执行交易的编码过的哈希。
ExecTXs(id interface{}, txs []*pb.Transaction) ([]byte, []error)
这个调用根据总账当前的状态接受一组交易,并返回带有对应着交易组的错误信息组的当前状态的哈希。注意一个交易所产生的错误不影响批量交易的安全提交。当遇到失败所采用的策略取决与共识插件的实现。这个接口调用多次是安全的。
RollbackTxBatch(id interface{}) error
这个调用忽略了批量执行。这会废弃掉对当前状态的操作,并把总账状态回归到之前的状态。批量是从BeginBatchTx开始的,如果需要开始一个新的就需要在执行任意交易之前重新创建一个。
PreviewCommitTxBatchBlock(id interface{}, transactions []*pb.Transaction, metadata []byte) (*pb.Block, error)
这个调用是共识插件对非确定性交易执行的测试时最有用的方法。区块返回的哈希表部分会保证,当CommitTxBatch被立即调用时的区块是同一个。这个保证会被任意新的交易的执行所打破。
CommitTxBatch(id interface{}, transactions []*pb.Transaction, transactionsResults []*pb.TransactionResult, metadata []byte) error
这个调用提交区块到区块链中。区块必须以全序提交到区块链中,CommitTxBatch结束批量交易,在执行或提交任意的交易之前必须先调用BeginTxBatch。
Ledger 接口定义:
type Ledger interface {
ReadOnlyLedger
UtilLedger
WritableLedger
}
Ledger 接口是为了允许共识插件询问或可能改变区块链当前状态。它是由下面描述的三个接口组成的
ReadOnlyLedger 接口定义:
type ReadOnlyLedger interface {
GetBlock(id uint64) (block *pb.Block, err error)
GetCurrentStateHash() (stateHash []byte, err error)
GetBlockchainSize() (uint64, error)
}
ReadOnlyLedger 接口是为了查询总账的本地备份,而不会修改它。它是由下面这些函数组成的。
GetBlockchainSize() (uint64, error)
这个函数返回区块链总账的长度。一般来说,这个函数永远不会失败,在这种不太可能发生情况下,错误被传递给调用者,由它确定是否需要恢复。具有最大区块值的区块的值为GetBlockchainSize()-1
注意在区块链总账的本地副本是腐坏或不完整的情况下,这个调用会返回链中最大的区块值+1。这允许节点在旧的块是腐坏或丢失的情况下能继续操作当前状态/块。
GetBlock(id uint64) (block *pb.Block, err error)
这个调用返回区块链中块的数值id。一般来说这个调用是不会失败的,除非请求的区块超出当前区块链的长度,或者底层的区块链被腐坏了。GetBlock的失败可能可以通过状态转换机制来取回它。
GetCurrentStateHash() (stateHash []byte, err error)
这个盗用返回总账的当前状态的哈希。一般来说,这个函数永远不会失败,在这种不太可能发生情况下,错误被传递给调用者,由它确定是否需要恢复。
UtilLedger 接口定义:
type UtilLedger interface {
HashBlock(block *pb.Block) ([]byte, error)
VerifyBlockchain(start, finish uint64) (uint64, error)
}
UtilLedger 接口定义了一些由本地总账提供的有用的功能。使用mock接口来重载这些功能在测试时非常有用。这个接口由两个函数构成。 会会
HashBlock(block *pb.Block) ([]byte, error)
尽管*pb.Block定义了GetHash方法,为了mock测试,重载这个方法会非常有用。因此,建议GetHash方法不直接调用,而是通过UtilLedger.HashBlock接口来调用这个方法。一般来说,这个函数永远不会失败,但是错误还是会传递给调用者,让它决定是否使用适当的恢复。
VerifyBlockchain(start, finish uint64) (uint64, error)
这个方法是用来校验区块链中的大的区域。它会从高的块start到低的块finish,返回第一个块的PreviousBlockHash与块的前一个块的哈希不相符的块编号以及错误信息。注意,它一般会标识最后一个好的块的编号,而不是第一个坏的块的编号。
WritableLedger 接口定义:
type WritableLedger interface {
PutBlock(blockNumber uint64, block *pb.Block) error
ApplyStateDelta(id interface{}, delta *statemgmt.StateDelta) error
CommitStateDelta(id interface{}) error
RollbackStateDelta(id interface{}) error
EmptyState() error
}
WritableLedger 接口允许调用者更新区块链。注意这NOT 不是共识插件的通常用法。当前的状态需要通过Executor接口执行交易来修改,新的区块在交易提交时生成。相反的,这个接口主要是用来状态改变和腐化恢复。特别的,这个接口下的函数永远不能直接暴露给共识消息,这样会导致打破区块链所承诺的不可修改这一概念。这个结构包含下面这些函数。
- PutBlock(blockNumber uint64, block *pb.Block) error 这个函数根据给定的区块编号把底层区块插入到区块链中。注意这是一个不安全的接口,所以它不会有错误返回或返回。插入一个比当前区块高度更高的区块是被允许的,通用,重写一个已经提交的区块也是被允许的。记住,由于哈希技术使得创建一个链上的更早的块是不可行的,所以这并不影响链的可审计性和不可变性。任何尝试重写区块链的历史的操作都能很容易的被侦测到。这个函数一般只用于状态转移API。
- ApplyStateDelta(id interface{}, delta *statemgmt.StateDelta) error
这个函数接收状态变化,并把它应用到当前的状态。变化量的应用会使得状态向前或向后转变,这取决于状态变化量的构造,与`Executor`方法一样,`ApplyStateDelta`接受一个同样会被传递给`CommitStateDelta` or `RollbackStateDelta`不透明的接口`id`
- CommitStateDelta(id interface{}) error
这个方法提交在`ApplyStateDelta`中应用的状态变化。这通常是在调用者调用`ApplyStateDelta`后通过校验由`GetCurrentStateHash()`获得的状态哈希之后调用的。这个函数接受与传递给`ApplyStateDelta`一样的`id`。
- RollbackStateDelta(id interface{}) error
这个函数撤销在`ApplyStateDelta`中应用的状态变化量。这通常是在调用者调用`ApplyStateDelta`后与由`GetCurrentStateHash()`获得的状态哈希校验失败后调用的。这个函数接受与传递给`ApplyStateDelta`一样的`id`。
- EmptyState() error
这个函数将会删除整个当前状态,得到原始的空状态。这通常是通过变化量加载整个新的状态时调用的。这一样只对状态转移API有用。
RemoteLedgers 接口定义:
type RemoteLedgers interface {
GetRemoteBlocks(peerID uint64, start, finish uint64) (<-chan *pb.SyncBlocks, error)
GetRemoteStateSnapshot(peerID uint64) (<-chan *pb.SyncStateSnapshot, error)
GetRemoteStateDeltas(peerID uint64, start, finish uint64) (<-chan *pb.SyncStateDeltas, error)
}
RemoteLedgers 接口的存在主要是为了启用状态转移,和向其它副本询问区块链的状态。和WritableLedger接口一样,这不是给正常的操作使用,而是为追赶,错误恢复等操作而设计的。这个接口中的所有函数调用这都有责任来处理超时。这个接口包含下面这些函数:
GetRemoteBlocks(peerID uint64, start, finish uint64) (<-chan *pb.SyncBlocks, error)
这个函数尝试从由peerID指定的 peer 中取出由start和finish标识的范围中的*pb.SyncBlocks流。一般情况下,由于区块链必须是从结束到开始这样的顺序来验证的,所以start是比finish更高的块编号。由于慢速的结构,其它请求的返回可能出现在这个通道中,所以调用者必须验证返回的是期望的块。第二次以同样的peerID来调用这个方法会导致第一次的通道关闭。
GetRemoteStateSnapshot(peerID uint64) (<-chan *pb.SyncStateSnapshot, error)
这个函数尝试从由peerID指定的 peer 中取出*pb.SyncStateSnapshot流。为了应用结果,首先需要通过WritableLedger的EmptyState调用来清空存在在状态,然后顺序应用包含在流中的变化量。
GetRemoteStateDeltas(peerID uint64, start, finish uint64) (<-chan *pb.SyncStateDeltas, error)
这个函数尝试从由peerID指定的 peer 中取出由start和finish标识的范围中的*pb.SyncStateDeltas流。由于慢速的结构,其它请求的返回可能出现在这个通道中,所以调用者必须验证返回的是期望的块变化量。第二次以同样的peerID来调用这个方法会导致第一次的通道关闭。
controller包签名:
func NewConsenter(cpi consensus.CPI) (consenter consensus.Consenter)
这个函数读取为peer过程指定的core.yaml配置文件中的peer.validator.consensus的值。键peer.validator.consensus的有效值指定运行noops还是obcpbft共识。(注意,它最终被改变为noops或custom。在custom情况下,验证 peer 将会运行由consensus/config.yaml中定义的共识插件)
插件的作者需要编辑函数体,来保证路由到它们包中正确的构造函数。例如,对于obcpbft 我们指向obcpft.GetPlugin构造器。
这个函数是当设置返回信息处理器的consenter域时,被helper.NewConsensusHandler调用的。输入参数cpi是由helper.NewHelper构造器输出的,并实现了consensus.CPI接口
helper包 验证 peer 通过helper.NewConsesusHandler函数(一个处理器工厂),为每个连接的 peer 建立消息处理器(helper.ConsensusHandler)。每个进来的消息都会检查它的类型(helper.HandleMessage);如果这是为了共识必须到达的消息,它会传递到 peer 的共识对象(consensus.Consenter)。其它的信息会传递到栈中的下一个信息处理器。
定义:
type ConsensusHandler struct {
chatStream peer.ChatStream
consenter consensus.Consenter
coordinator peer.MessageHandlerCoordinator
done chan struct{}
peerHandler peer.MessageHandler
}
共识中的上下文,我们只关注域coordinator和consenter。coordinator就像名字隐含的那样,它被用来在 peer 的信息处理器之间做协调。例如,当 peer 希望Broadcast时,对象被访问。共识需要到达的共识者会接收到消息并处理它们。
注意,fabric/peer/peer.go定义了peer.MessageHandler (接口),和peer.MessageHandlerCoordinator(接口)类型。
签名:
func NewConsensusHandler(coord peer.MessageHandlerCoordinator, stream peer.ChatStream, initiatedStream bool, next peer.MessageHandler) (peer.MessageHandler, error)
创建一个helper.ConsensusHandler对象。为每个coordinator设置同样的消息处理器。同时把consenter设置为controller.NewConsenter(NewHelper(coord))
定义:
type Helper struct {
coordinator peer.MessageHandlerCoordinator
}
包含验证peer的coordinator的引用。对象是否为peer实现了consensus.CPI接口。
签名:
func NewHelper(mhc peer.MessageHandlerCoordinator) consensus.CPI
返回coordinator被设置为输入参数mhc(helper.ConsensusHandler消息处理器的coordinator域)的helper.Helper对象。这个对象实现了consensus.CPI接口,从而允许插件与栈进行交互。
回忆一下,helper.NewConsensusHandler返回的helper.ConsesusHandler对象实现了 peer.MessageHandler 接口:
type MessageHandler interface {
RemoteLedger
HandleMessage(msg *pb.Message) error
SendMessage(msg *pb.Message) error
To() (pb.PeerEndpoint, error)
Stop() error
}
在共识的上下文中,我们只关心HandleMessage方法。签名:
func (handler *ConsensusHandler) HandleMessage(msg *pb.Message) error
这个函数检查进来的Message的Type。有四种情况:
pb.Message_CONSENSUS:传递给处理器的consenter.RecvMsg函数。pb.Message_CHAIN_TRANSACTION (如:一个外部部署的请求): 一个响应请求首先被发送给用户,然后把消息传递给consenter.RecvMsg函数pb.Message_CHAIN_QUERY (如:查询): 传递给helper.doChainQuery方法来在本地执行HandleMessage方法事件框架提供了生产和消费预定义或自定义的事件的能力。它有3个基础组件:
事件流是用来发送和接收事件的gRPC通道。每个消费者会与事件框架建立事件流,并快速传递它感兴趣的事件。事件生成者通过事件流只发送合适的事件给连接到生产者的消费者。
事件流初始化缓冲和超时参数。缓冲保存着几个等待投递的事件,超时参数在缓冲满时有三个选项:
事件生产者暴露函数Send(e *pb.Event)来发送事件,其中Event可以是预定义的Block或Generic事件。将来会定义更多的事件来包括其它的fabric元素。
message Generic {
string eventType = 1;
bytes payload = 2;
}
eventType和payload是由事件生产者任意定义的。例如,JSON数据可能被用在payload中。链码或插件发出Generic事件来与消费者通讯。
事件消费者允许外部应用监听事件。每个事件消费者通过时间流注册事件适配器。消费者框架可以看成是事件流与适配器之间的桥梁。一种典型的事件消费者使用方式:
adapter = <adapter supplied by the client application to register and receive events>
consumerClient = NewEventsClient(<event consumer address>, adapter)
consumerClient.Start()
...
...
consumerClient.Stop()
事件适配器封装了三种流交互的切面:
引用的实现提供了Golang指定语言绑定
EventAdapter interface {
GetInterestedEvents() ([]*ehpb.Interest, error)
Recv(msg *ehpb.Event) (bool,error)
Disconnected(err error)
}
把gRPC当成事件总线协议来使用,允许事件消费者框架对于不同的语言的绑定可移植而不影响事件生成者框架。
这节详细描述了事件系统的消息结构。为了简单起见,消息直接使用Golang描述。
事件消费者和生产者之间通信的核心消息是事件。
message Event {
oneof Event {
//consumer events
Register register = 1;
//producer events
Block block = 2;
Generic generic = 3;
}
}
每一个上面的定义必须是Register, Block或Generic中的一种。
就像之前提到过的一样,消费者通过与生产者建立连接来创建事件总线,并发送Register事件。Register事件实质上是一组声明消费者感兴趣的事件的Interest消息。
message Interest {
enum ResponseType {
//don't send events (used to cancel interest)
DONTSEND = 0;
//send protobuf objects
PROTOBUF = 1;
//marshall into JSON structure
JSON = 2;
}
string eventType = 1;
ResponseType responseType = 2;
}
事件可以通过protobuf结构直接发送,也可以通过指定适当的responseType来发送JSON结构。
当前,生产者框架可以生成Block和Generic事件。Block是用来封装区块链中区块属性的消息。