gossip算法个人实现思路

1.       算法背景

由于卡夫卡集群的特性,在系统运行一段时间后(默认配置是7天),会自动清除掉过期的记录,因此每个周期之后加入的节点都会丢失一部分数据。于是,我们需要一个机制能不依赖卡夫卡集群来实现数据的一致性,这就是我接下来要讲的Gossip算法。

2.       算法简介

Gossip算法如其名,灵感来自办公室八卦,只要一个人八卦一下,在有限的时间内所有的人都会知道该八卦的信息,这种方式也与病毒传播类似,因此Gossip有众多的别名“闲话算法”、“疫情传播算法”、“病毒感染算法”、“谣言传播算法”。

在一个有界网络中,每个节点都随机地与其他节点通信,经过一番杂乱无章的通信,最终所有节点的状态都会达成一致。每个节点可能知道所有其他节点,也可能仅知道几个邻居节点,只要这些节可以通过网络连通,最终他们的状态都是一致的,当然这也是疫情传播的特点。

Gossip是一种去中心化、容错而又最终一致性的绝妙算法,其收敛性不但得到证明还具有指数级的收敛速度。使用Gossip的系统可以很容易的把Server扩展到更多的节点,满足弹性扩展轻而易举。

3.       算法目的

当卡夫卡集群无法保证数据一致性时,通过此算法,保证系统最终数据一致。同时,还可支持节点间各种类型的消息传播。

4.       算法实现

算法主要实现了三个功能1.在线节点不断广播”Alive”消息来指示它们的可用性;2.在数据和其他节点不一致时,同步其他节点数据;3.在有新数据进入网络时,节点间通过不断的对随机相邻节点广播,最终达到数据一致性。

a)        初始化

                        i.             节点启动时,向卡夫卡集群发送连接消息,类型为KafkaMessage_Connect

                      ii.             其他节点收到此连接消息后,加入alivePeerNode[]切片,保存此节点地址和收到消息时的时间戳并记录区块高度

                     iii.             向此节点发送响应连接信息,类型为ConnRspMsg

                     iv.             此节点收到ConnRspMsg指令消息后,加入alivePeerNode[],保存发送节点的地址和收到消息时的时间戳并记录区块高度

b)        心跳机制

                        i.             节点启动时启动timer计时器,每隔5秒发送心跳消息,类型为AliveMsg

                      ii.             其他节点收到AliveMsg指令消息后,加入alivePeerNode[]切片,保存A节点地址和收到消息时的时间戳并记录区块高度后转发此消息

                     iii.             新起goroutine,遍历alivePeerNode []切片,对比时间戳,超过25秒的节点剔除出切片

c)        信息交换机制

                        i.             A节点每隔4秒广播自己的区块高度,类型GossipMsg

                      ii.             B节点收到A节点GossipMsg指令消息后,对比区块高度,若h(B)>h (A),则返回ProcessMsgPush和自己的区块数据区间AckBlock[h(A)+1, h (B)] ;若h(B)<h(A),则返回ProcessMsgPull和自己需要区块长度区间[h(B)+1, h (A)],类型GossipAckMsg;若h(B)=h (A),则返回ProcessMsgEquals

                     iii.             A节点收到B节点GossipAckMsg指令消息后,检查ProcessMsg类型,如果是push,则读取区块数据区间,逐一加入账本;如果是Pull,则将指定区块长度区间的数据放入Ack2Block[],如果是Equals,不做处理。处理完成后,检查本地账本高度h’(A)是否等于h(A),若不等于,说明在此通信过程中,A又有新加入的区块,把新加入的区块加入AckBlock[],返回给B,类型GossipAck2Msg

                     iv.             B节点收到A节点类型为GossipAck2Msg的消息,检查Ack2Block[]是否有数据,若有,则同步至本地账本

d)        数据分发机制

                        i.             Leader节点(节点启动时,根据配置指定)在收到kafka客户端发送来的并切分成区块并记录账本后,广播此区块消息,类型为KafkaMsg

                      ii.             其他节点收到消息后,对比区块高度,若正好比本地高1,则放入账本,否则不做处理

                     iii.             处理完成后转发此消息

                     iv.             PS:若每个节点都是Leader节点,其实这个机制就没必要,考虑后续可能会增加其他类型(不止是区块)的信息,因而实现此机制

特别说明:

节点间的通讯方式:gRPC

节点广播的过滤条件:1.存在于alivePeerNode[]中,即是活跃节点

                                            2.是本链的节点(加入多链后实现)

                                            3.节点签名满足要求(加入权限控制后实现)

         消息转发的条件:1.转发总次数不超过progatateTotalIter

                                            2.存活时间不超过progatateMaxSurvival

5.       算法配置

orgLeadertrue  # 是否指定本节点为组织代表节点

endpoint # gossipID,默认为地址

progatateIterations:1 #消息转发次数

propagatePeerNum: 3 #推送消息给指定节点的个数

progatateTotalIters:15 #消息转发最大次数

progatateMaxSurvival:30s #消息存活最长时间

pullInterval:4s #拉取消息的时间间隔

aliveTimeInterval:5s #定期发送Alive心跳时间的时间间隔

aliveExpirationTimeOut:25s #Alive心跳时间的超时时间

6.       算法可能存在的问题及解决方案

a)        若广播时,与其他节点建立不了连接怎么办,需要重连吗?

暂时不做重连

b)        与其他节点连接(重连)失败,需要重新选取节点发送吗?

暂时不重新选取,等下一轮发送周期

c)         

 

简化版实现:

取消心跳机制,只在新节点初次连接时维护aliveNode,随机选择3个节点做数据同步,只做pull请求,接受节点返回的区块处理存入账本。




阅读更多

更多精彩内容