Fabric源码分析之invoke执行流程及block生成分析

invoke操作流程及block生成

主要分析invoke操作的执行流程及主要涉及的函数,以及block的生成,相关block字段的含义等。该例主要分析kafka共识模块, invoke的主要执行流程如下图所示,主要步骤有:

1、SDK或者Client发起invoke操作,调用ChaincodeInvokeOrQuery(),创建Proposal,
该过程主要生成txid,构造出channelHeader,赋值给proposal.header, 构造出ChaincodeProposalPayload赋值给proposal.payload, proposal结构如下所示。然后对proposal进行签名获得signedproposal。

type Proposal struct {
    // The header of the proposal. It is the bytes of the Header
    Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
    // The payload of the proposal as defined by the type in the proposal
    // header.
    Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
    // Optional extensions to the proposal. Its content depends on the Header's
    // type field. For the type CHAINCODE, it might be the bytes of a
    // ChaincodeAction message.
    Extension []byte `protobuf:"bytes,3,opt,name=extension,proto3" json:"extension,omitempty"`
}

2、通过endorserClient发送到peer节点进行ProcessProposal操作。首先对SignedProposal进行一些检验:对header进行检验,对签名进行校验,对txid检查,对msg type检查。获取channel header,signed header;判断如果是scc,则不能调用invoke操作。

3、创建一个ledger.TxSimulator,进行模拟执行simulateProposal:

  • 获得GetChaincodeInvocationSpec,这里特别说明如果进行chaincode join操作chaincodeID是“”,chaincode为cscc,input是genesis block; chain install,deploy,upgrade,chainID是”“,chaincode是lscc; invokeOrQuery操作chainID为自己创建的ID,chaincode为自己安装的,input是函数名和参数。
  • 如果不是syschaincode则通过getCDSFromLSCC获得chaincodedata.
  • 通过callChaincode进行模拟执行chaincode,并获得模拟执行的结果,主要是读写集readwriteSet。

4、判断chainID是否为“”,也就是说如果是scc则直接返回给SDK,如果不是则要进行背书:endorseProposal,调用escc进行背书。

5、结果返回给SDK,如果是invoke操作,则根据proposal,proposalResp创建envelope message,主要fill endorsements,构建ChaincodeEndorsedAction,签名,发送给Orderer节点。

6、orderer节点的broadcast.handle处理broadcast connection的消息:

  • 首先对envelope进行unmarshal获取一些数据。
  • 判断是不是HeaderType_CONFIG_UPDATE消息,如果是,则调用bh.sm.Process进行处理主要是构造成配置格式的msg。
  • 获得chainsupport,GetChain(chdr.ChannelId)
  • support.Filters().Apply(msg),过滤一些不合法的消息
  • Enqueue(msg),把消息发送到共识模块,solo或者kafka,本例采用kafka分析

7、kafka 开启处理消息:processMessagesToBlocks,processRegular,进行排序,CreateNextBlock,WriteBlock。CreateNextBlock主要给block进行赋值,包括header的 Number,PreviousHash,DataHash;data字段,data为是envelope数据。writeblock时给block metadata进行赋值主要做一些签名。

type Block struct {
    Header   *BlockHeader   `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
    Data     *BlockData     `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
    Metadata *BlockMetadata `protobuf:"bytes,3,opt,name=metadata" json:"metadata,omitempty"`
}

type BlockHeader struct {
    Number       uint64 `protobuf:"varint,1,opt,name=number" json:"number,omitempty"`
    PreviousHash []byte `protobuf:"bytes,2,opt,name=previous_hash,json=previousHash,proto3" json:"previous_hash,omitempty"`
    DataHash     []byte `protobuf:"bytes,3,opt,name=data_hash,json=dataHash,proto3" json:"data_hash,omitempty"`
}

type BlockData struct {
    Data [][]byte `protobuf:"bytes,1,rep,name=data,proto3" json:"data,omitempty"`
}


type BlockMetadata struct {
    Metadata [][]byte `protobuf:"bytes,1,rep,name=metadata,proto3" json:"metadata,omitempty"`
}
type Metadata struct {
    Value      []byte               `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
    Signatures []*MetadataSignature `protobuf:"bytes,2,rep,name=signatures" json:"signatures,omitempty"`
}

8、在ledger.Append(block)时会signal = make(chan struct{}),这时readychan会收到消息,deliver.handle中case <-cursor.ReadyChan()会解除阻塞,会sendBlockReply,把block发出去。

9、peer节点的DeliverBlocks会接收到block。这个deliver连接的建立过程如下:

  • peer start ->serve->Initialize->createChain->InitializeChannel->StartDeliverForChannel->newClient->NewBroadcastClient->broadcastSetup会发送seekinfo,进入orderer端deliver handle,如果orderer准备好block,则解除阻塞发送到peer。注意的是这个seekinfo起始区块号为当前的区块高度即,start:height;结束区块号为无限大,即stop:MaxUnit64。也就是说会一直请求Order的区块,也就是order生成新区块后就会发送给leader peer,然后由leader peer 通过gossip把该区块发送到其他peer节点。

10、verifyBlock,mcs.go,主要是校验签名集;

11、根据block创建gossip消息,最后gossip(gossipmsg),同步到其他peer节点。
fabric1.0 invoke流程图

阅读更多

更多精彩内容