1. 算法背景
由于卡夫卡集群的特性,在系统运行一段时间后(默认配置是7天),会自动清除掉过期的记录,因此每个周期之后加入的节点都会丢失一部分数据。于是,我们需要一个机制能不依赖卡夫卡集群来实现数据的一致性,这就是我接下来要讲的Gossip算法。
2. 算法简介
Gossip算法如其名,灵感来自办公室八卦,只要一个人八卦一下,在有限的时间内所有的人都会知道该八卦的信息,这种方式也与病毒传播类似,因此Gossip有众多的别名“闲话算法”、“疫情传播算法”、“病毒感染算法”、“谣言传播算法”。
在一个有界网络中,每个节点都随机地与其他节点通信,经过一番杂乱无章的通信,最终所有节点的状态都会达成一致。每个节点可能知道所有其他节点,也可能仅知道几个邻居节点,只要这些节可以通过网络连通,最终他们的状态都是一致的,当然这也是疫情传播的特点。
Gossip是一种去中心化、容错而又最终一致性的绝妙算法,其收敛性不但得到证明还具有指数级的收敛速度。使用Gossip的系统可以很容易的把Server扩展到更多的节点,满足弹性扩展轻而易举。
3. 算法目的
当卡夫卡集群无法保证数据一致性时,通过此算法,保证系统最终数据一致。同时,还可支持节点间各种类型的消息传播。
4. 算法实现
算法主要实现了三个功能1.在线节点不断广播”Alive”消息来指示它们的可用性;2.在数据和其他节点不一致时,同步其他节点数据;3.在有新数据进入网络时,节点间通过不断的对随机相邻节点广播,最终达到数据一致性。
a) 初始化
i. 节点启动时,向卡夫卡集群发送连接消息,类型为KafkaMessage_Connect
ii. 其他节点收到此连接消息后,加入alivePeerNode[]切片,保存此节点地址和收到消息时的时间戳并记录区块高度
iii. 向此节点发送响应连接信息,类型为ConnRspMsg
iv. 此节点收到ConnRspMsg指令消息后,加入alivePeerNode[],保存发送节点的地址和收到消息时的时间戳并记录区块高度
b) 心跳机制
i. 节点启动时启动timer计时器,每隔5秒发送心跳消息,类型为AliveMsg
ii. 其他节点收到AliveMsg指令消息后,加入alivePeerNode[]切片,保存A节点地址和收到消息时的时间戳并记录区块高度后转发此消息
iii. 新起goroutine,遍历alivePeerNode []切片,对比时间戳,超过25秒的节点剔除出切片
c) 信息交换机制
i. A节点每隔4秒广播自己的区块高度,类型GossipMsg
ii. B节点收到A节点GossipMsg指令消息后,对比区块高度,若h(B)>h (A),则返回ProcessMsg:Push和自己的区块数据区间AckBlock[h(A)+1, h (B)] ;若h(B)<h(A),则返回ProcessMsg:Pull和自己需要区块长度区间[h(B)+1, h (A)],类型GossipAckMsg;若h(B)=h (A),则返回ProcessMsg:Equals
iii. A节点收到B节点GossipAckMsg指令消息后,检查ProcessMsg类型,如果是push,则读取区块数据区间,逐一加入账本;如果是Pull,则将指定区块长度区间的数据放入Ack2Block[],如果是Equals,不做处理。处理完成后,检查本地账本高度h’(A)是否等于h(A),若不等于,说明在此通信过程中,A又有新加入的区块,把新加入的区块加入AckBlock[],返回给B,类型GossipAck2Msg
iv. B节点收到A节点类型为GossipAck2Msg的消息,检查Ack2Block[]是否有数据,若有,则同步至本地账本
d) 数据分发机制
i. Leader节点(节点启动时,根据配置指定)在收到kafka客户端发送来的并切分成区块并记录账本后,广播此区块消息,类型为KafkaMsg
ii. 其他节点收到消息后,对比区块高度,若正好比本地高1,则放入账本,否则不做处理
iii. 处理完成后转发此消息
iv. PS:若每个节点都是Leader节点,其实这个机制就没必要,考虑后续可能会增加其他类型(不止是区块)的信息,因而实现此机制
特别说明:
节点间的通讯方式:gRPC
节点广播的过滤条件:1.存在于alivePeerNode[]中,即是活跃节点
2.是本链的节点(加入多链后实现)
3.节点签名满足要求(加入权限控制后实现)
消息转发的条件:1.转发总次数不超过progatateTotalIter
2.存活时间不超过progatateMaxSurvival
5. 算法配置
orgLeader:true # 是否指定本节点为组织代表节点
endpoint: # gossipID,默认为地址
progatateIterations:1 #消息转发次数
propagatePeerNum: 3 #推送消息给指定节点的个数
progatateTotalIters:15 #消息转发最大次数
progatateMaxSurvival:30s #消息存活最长时间
pullInterval:4s #拉取消息的时间间隔
aliveTimeInterval:5s #定期发送Alive心跳时间的时间间隔
aliveExpirationTimeOut:25s #Alive心跳时间的超时时间
6. 算法可能存在的问题及解决方案
a) 若广播时,与其他节点建立不了连接怎么办,需要重连吗?
暂时不做重连
b) 与其他节点连接(重连)失败,需要重新选取节点发送吗?
暂时不重新选取,等下一轮发送周期
c)
简化版实现:
取消心跳机制,只在新节点初次连接时维护aliveNode,随机选择3个节点做数据同步,只做pull请求,接受节点返回的区块处理存入账本。