Hyperledger Fabric 排序节点处理 Deliver 请求的过程

Hyperledger 源码分析之 Fabric

Deliver,意味着客户端通过 gRPC 接口从 Ordering 服务获取数据(例如指定区块的数据)。

Orderer 节点收到请求消息,会首先交给 orderer.common.server 包中 server 结构体的 Deliver(srv ab.AtomicBroadcast_DeliverServer) error 方法处理。该方法进一步调用 orderer.common.deliver 包中 deliverServer 结构的 Handle(srv ab.AtomicBroadcast_DeliverServer) error 方法进行处理。

deliverServer 结构体十分重要,完成对 Deliver 请求的处理过程。

type deliverServer struct {
    sm SupportManager}func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error

整体过程

整体处理过程如下图所示。


Handle(srv ab.AtomicBroadcast_DeliverServer) error 方法会开启一个循环来从 srv 中不断读取请求消息并进行处理,直到结束。

核心代码如下所示,包括提取消息和对消息进行处理两个步骤。

for {
    envelope, err := srv.Recv() // 从请求中提取一个 Envelope 消息
    ds.deliverBlocks(srv, envelope) // 对消息进行处理并答复,核心过程}

可见,对单个请求的处理都在 deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, envelope *cb.Envelope) 方法中。该方法的处理过程包括解析消息、检查合法性、发送区块以及返回响应四个步骤。

下面具体对其进行具体分析。

解析消息

首先,从请求的 Envelope 结构中提取载荷(Payload),进一步从载荷中提取通道头部信息。利用通道头部信息获取对应的本地链结构,并获取当前最新的配置序列号。

// 提取载荷payload, err := utils.UnmarshalPayload(envelope.Payload)// 提取通道头chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)// 获取链结构,映射到 orderer.common.multichannel 包中 Registrar 结构体中对应方法chain, ok := ds.sm.GetChain(chdr.ChannelId)// 获取当前配置序列号lastConfigSequence := chain.Sequence()

检查合法性

包括对权限和 seekInfo 数据进行检查。

首先,检查请求方是否对通道拥有读权限。

sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain.PolicyManager())if err := sf.Apply(envelope); err != nil {
    logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
    return sendStatusReply(srv, cb.Status_FORBIDDEN)}

接下来,从 Envelope 结构的 payload.data 域中解析出 seekInfo 结构,并检查其合法性。

proto.Unmarshal(payload.Data, seekInfo)chain.Reader().Iterator(seekInfo.Start)// 检查 seekInfo 的cursor, number := chain.Reader().Iterator(seekInfo.Start)switch stop := seekInfo.Stop.Type.(type) {case *ab.SeekPosition_Oldest: // 截止到最早的区块
    stopNum = numbercase *ab.SeekPosition_Newest: // 截止到最新的区块
    stopNum = chain.Reader().Height() - 1case *ab.SeekPosition_Specified: // 截止到特定的区块
    stopNum = stop.Specified.Number
    if stopNum < number {
        logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
        return sendStatusReply(srv, cb.Status_BAD_REQUEST)
    }}

发送区块

在指定的起始和截止范围内,逐个从本地账本读取区块,并发送对应的区块数据,

核心代码如下所示。

for {
    block, status := cursor.Next() // 获取区块
    sendBlockReply(srv, block) // 发送区块
    if stopNum == block.Header.Number {
        break
    }}

返回响应

如果处理成功,则返回成功响应消息。

sendStatusReply(srv, cb.Status_SUCCESS)


===========================


《区块链原理、设计与应用》一书已经正式出版,以超级账本项目为例,介绍了区块链和分布式账本技术的底层原理、设计架构、应用实践的大量细节,欢迎大家阅读指正。


===== 关于 TechFirst 公众号 =====

专注金融科技、人工智能、云计算、大数据相关领域的热门技术与前瞻方向。

发送关键词(如区块链、云计算、大数据、容器),获取热门点评与技术干货。

欢迎投稿!

如果你喜欢公众号内容,欢迎鼓励一杯 coffee~

阅读更多

更多精彩内容