基于Gossip的P2P数据分发

时间:2024-03-02 09:46:54

1、概述

排序结点广播的信息包括更新的状态信息和账本信息,这些信息需要广播给所有结点,在fabric中实现这个广播功能的协议就是Gossip协议,当结点收到消息以后会以固定概率发送给周围的结点,在fabric中,当结点收到消息后会随机选择周围k个结点(默认是3个)进行转发消息,如果邻居结点小于k个,那么就全部转发

2、超级账本中的Gossip协议

Gossip模块负责连接排序服务结点和所有的peer结点之间的数据转发,实现从单个源结点(排序服务结点)到所有结点的高效的数据分发,在后台实现所有结点的状态同步。

账本信息、状态信息、成员信息等都通过Gossip协议进行转发,Gossip协议的功能如下:

  • 在不需要所有结点都连接到排序服务结点的情况下,所有结点的账本数据,状态信息同步

  • 系统正常运行后,新加入的结点可以不通过排序服务就能从其他结点获取账本信息和状态信息

  • 错过了批量更新的结点能够获取到缺失的区块

  • 维护和管理成员,跟踪存活和故障的结点

  • 数据能够快速从单源结点同步到所有结点上

fabric中每个组织会选一个主节点与排序服务结点连接,主节点负责将从排序服务结点收到的区块分发组织内的其他结点。在组织内部就是基于Gossip协议进行数据转发,一个结点收到消息后,随机选择周围k个结点进行数据转发

3、成员认证及身份管理

在Gossip网络中,所有结点都有MSP(Membership Service Provider)颁发的身份证书,从证书计算哈希值导出的一个标识符称为结点的PKI-ID,身份管理模块管理结点标识符和证书之间的映射,在内部维护了以PKI-ID为键,证书为值的映射表pkiID2Cert,可以通过PKI-ID获取结点的证书,也可以更新结点的证书,同时还内置了MSC(MessageCryptoService)模块,它可以对消息进行签名和验证,更新结点证书的时候会通过MSC模块验证证书是否有效,检查从证书导出的标识符是否和PKI-ID匹配

当所有结点都是双向TLS部署的,也就是说TLS连接的双方都有一个有效的TLS证书,当两个结点连接的时候,会有一个握手协议,这个协议验证它是否有TLS证书的私钥,过程如下:

  • 握手结点主动给对端结点发送一条ConnEstablish消息,内容如下:

    • 结点的TLS证书的哈希值

    • 结点的PKI-ID

    • MSP身份证书

  • 对端结点的处理过程如下:

    • 接收发送过来的ConnEstablish消息

    • 提取结点的TLS证书并计算哈希值(通过这个哈希值导出PKI-ID

    • 通过哈希值验证ConnEstablish的签名是否正确

握手协议通过以后会验证PKI-ID是否在黑名单列表中,如果不在就用这个PKI-ID作为键,证书作为值保存在pkiID2Cert中,同时使用这个PKI-ID作为键创建一个连接进行关联,将这个连接保存在内存中,需要和这个结点通信的时候可以通过这个PKI-ID快速找到这个连接并发送消息,发送消息的过程是将消息放入发送缓冲区中,由一个线程负责将缓冲区中的数据发送出去。

如果前面的握手协议签名验证失败,结点将拒绝再次连接。

4、结点启动及成员管理

fabric网络维护着以PKI-ID为标识符的存活结点、故障结点、最后一次检测到存活或者故障的时间。

一个结点要加入网络,必须至少知道网络中的一个存活结点的地址信息,结点启动的时候会读取配置文件core.yaml,读取peer.gossip.bootstrap字段的值,这个字段可以是一个列表,包含它可以连接的一些结点,这个列表名为启动集合,结点会向启动集合中的所有结点发送MembershipRequest消息,包含的内容如下:

  • AliveMessage消息

    • PKI-ID

    • Endpoint(host+":"+port)

    • Metadata(字节数组,以后备用)

    • PeerTime

      • 结点的启动时间(转世时间)

      • 单调递增的计数器,每次在AliveMessage传播时加1

    • 上面所有字段的签名

    • 结点的证书(可选)

  • 本地结点已知的存活结点列表

当一个结点收到AliveMessage消息后,会调用MSC(MessageCryptoService)模块验证消息的证书和签名,若验证通过,会添加到存活结点列表中,并更新检测到存活的时间。

结点证书只在结点启动后的一段时间内随消息发送,过了这段时间后发送的消息就不包含结点的证书了,当一个结点接收到其他结点的消息时,会使用之前接收到的证书验证消息,没有接收到证书的结点可以通过周期性的数据交换机制获取缺失的证书

每个结点启动后会周期性的在网络中广播AliveMessage消息,接收到消息的结点会更新本地的存活结点列表。还会周期性的检查本地存活结点列表,查看是否有结点在相当长一段时间内没有更新存活状态,若果有结点掉线了,那么就从存活结点列表中删除它,添加到掉线结点列表中。并且结点会周期性的尝试重新连接掉线列表中的结点。

5、主结点的选举过程

主节点的作用是代表组织与排序服务结点连接,将从排序服务结点接收到的消息分发给组织内的其他结点,主节点的选举是在组织内部选举的,所以选举主结点的消息LeadershipMessage也是只在组织内部传播。

主结点的选举是在Gossip层实现的

主结点的选举的消息有两种类型:

  • 一种是参与主节点选举的消息proposal,它在竞争主节点的过程中被广播给所有的结点

  • 一种是声明成为主节点的消息declareation,它通过比较,可以申请为主节点的消息

两种消息的结构是一样的,都是LeadershipMessage,包含的内容如下:

  • 结点的PKI-ID

  • 是否声明为主结点

  • PeerTime

    • 结点的启动时间(转世时间)

    • 单调递增的计数器,每次发送一个LeadershipMessage消息时就加1

当一个结点加入网络中的某个组织时,它等待网络稳定以后再参与主节点选举,等待网络稳定的时间为15秒,如果在这期间,网络的结点数量不发生变化或者出现了主节点,那么网络就达到稳定了。如果等待过程中没有出现主节点,那么这个加入的结点就参与主节点的选举。

  • 参与主节点选举时,首先将isLeader字段的值设置为false

  • 广播一个proposal消息给组织内其他结点,并设置一个超时时间,等待proposal在组织内部扩散

  • 其他结点接收到proposal以后会将它缓存起来

  • 如果超时时间到了,还没有出现主结点,那么按照PKI-ID字母排序,最靠前的结点将isLeader字段设置为true,声明自己为主节点,在组织内部广播一个declaration消息

  • 如果在超时时间内,有某个结点声明自己为主结点,那么就接受它为主节点,如果同时又多个结点声明自己为主节点,那么还是选择PKI-ID最靠前的结点为主节点,其他结点主动放弃

  • 一次主节点选举的有效时间leaderAliveThreshold是10秒,没有成为主节点的结点清除接收到的proposal消息,等待有效时间结束,进入下一轮主节点选举

6、基于反熵的状态同步

反熵是指每个结点周期性的和邻居结点交换保存的数据,然后对比本地数据和邻居结点的保存的数据,检查是否有缺失或者过期的数据,然后更新本地结点的数据为最新的数据。

fabric的反熵实现的是每过10秒检查本地账本的区块的序列号和其他结点的账本的序列号:

  • 如果本地账本的区块序列号小,就在网络中广播一个GossipMessage_StateRequest消息,请求缺失的序列号。

  • 如果本地账本的区块序列号不小,并且收到其他序列号较小的结点的请求消息,如果本地有对应的区块,那么就广播一个GossipMessage_StateResponse消息。这个消息中包含了请求结点需要的区块

上面是通过直接消息(directMessage)进行的,结点接收到消息后会缓存起来,放到一个PayloadsBuffer的数据结构中保存起来。由于TCP/IP网络传输中,数据可能不会按序到达,PayloadsBuffer会在内部保存一个索引,记录等待提交到账本的下一个区块的序列号next,如果接收到的区块的序列号小于next,说明这是过期的区块,会直接丢弃。如果接收到的区块的序列号大于next,那么会将这个区块保存在缓冲区中,只有收到的区块的序列号连续了,才会将这些区块提交到账本中,并且更新PayloadsBuffernext值,然后清空缓冲区。

此外,当组织的主节点从排序服务结点接收到新的区块,主节点会创建一个GossipMessage_DataMsg的数据消息然后广播个其他结点,其他结点接收到这个消息以后,也会将这个消息中的区块的序列号与PayloadsBuffernext值作比较,然后进行同样的处理,和直接消息(directMessage)不同的是,此时的GossipMessage_DataMsg消息只包含一个区块,而直接消息中可能包含了多个缺失的区块。

7、数据传播过程

fabric数据传播机制的底层是基于Gossip的结点通信支撑的,所以不会构建每个结点的分离路径,而是每个结点在每次传播数据的时候都会随机选择周围的k个结点扩散消息,可以让当结点中出现拜占庭结点时系统有更好的健壮性。

8、多通道支持

每个交易都和唯一的一个通道关联,这明确定义了哪些实体(组织及成员)会关注这个交易

客户端SDK通过发送一个CONFIGURATION的交易背书请求来创建一个通道,然后通过排序服务广播给其他结点,创建通道的请求包含组成通道的组织列表,即哪些组织可以加入这个通道

一旦创建好通道,客户端SDK可以通知组织内的结点加入这个新创建的通道,加入这个通道的结点的Gossip模块就会在这个通道内广播一个消息,它已经属于这个通道了。

为了保证Gossip在多通道环境下正常工作,通道内的所有结点都需要知道其他结点的存在:一个组织的主结点会接收到哪些结点属于这个通道的消息。当成员管理建立起来以后,Gossip模块会在通道内部分发数据,不属于这个通道的数据会在通道外传输,而通道信息不会在通道外面传播

当一个结点加入到通道中,它需要获取最新的通道配置信息,用来识别参与的组织,结点加入通道的时候,客户端SDK会提供最新的通道的配置交易信息,里面就包含了参与的组织,对于新创建的通道,这个信息会是一个创世区块(Genesis Block),对于老的通道,这是一个最新的重新配置的区块。结点获取这些信息以后,它的Gossip模块就会在通道允许的组织范围里面分享这些信息。这样,通道的成员管理信息是在Gossip服务允许的范围里面维护的。这些信息建立起来以后,Gossip就会基于这些信息工作,就像只有一个通道一样。

一个结点可以选择通道内的一些结点来进行区块转发,然后再选通道内其他结点来进行状态同步,状态可能会在不同的组织之间同步,只要这些组织是同一个通道的成员。

当从通道内删除组织时,客户端SDK会发送一个CONFIGURATION的交易背书请求,背书完成后的交易被发送给排序服务结点,然后由排序服务结点广播给通道内的结点。其他结点收到这个更新过组织列表的通道消息的时候,他们的Gossip模块就会移除被删除的结点。被删除的结点最终会发现自己被删除了,因为Peer结点的Gossip成员管理模块会继续发送心跳信息,但是这些信息不会发送给已经被删除的结点,所以过一段时间,该结点就知道自己被删除了。

9、消息验证策略

每个通过Gossip协议转发给其他结点的消息都会声明本结点的一些信息。包含以下的一些内容:

  • 必须包含本结点的PKI-ID

  • 必须由结点签名

  • 能够通过结点的证书验证

结点的点对点通信没有签名,不会通过Gossip转发,唯一不用结点签名而且不通过点对点方式传播的消息账本数据区块(最新的区块),它是由排序服务结点签名的。

Gossip模块初始化以后,它就会设置通道的验证策略,以判断结点属于的组织,并且它只给通道内的结点发送区块,这个过程可以通过每个结点的PKI-ID和其组织的根证书来实现,本地账本都有通道里的组织的最新配置。

由排序服务广播的区块中包含了最新的配置区块的序列号,当一个结点收到来自其他结点发送过来的区块时,会检查区块的序列号和自己本地账本的序列号,检查的方法是查看最新的配置区块的序列号,如果收到的数据区块的序列号比提交到账本的最新序列号高,则数据区块被存储在缓冲中,不会转发给其他结点,否则,账本的最新配置就是最新的,区块可以安全的转发给通过策略验证的结点了。

账本的内部消息的存储接口MessageStore

type MessageStore interface{
    // 添加消息msg到内存中,返回是否添加成功
    Add(msg interface{})bool
    // 返回存储的消息数量
    Size()int
    // 返回存储的所有消息
    Get()[]interface{}
}

具体实现如下:

type messageStoreImpl struct{
    pol common.MessageReplacingPolicy
    lock *sync.RWMutex
    message []*msg
    invTrigger invalidationTrigger
}

包含一个通用的消息验证策略函数:

type MessageReplacingPolicy func(this interface{},that interface{})InvalidationResult

其中thisthat分别指当前消息和原有消息,这个函数就是将两个消息进行比较,比较结果InvalidationResult有3种可能性:

  • MESSAGE_INVALIDATES:当前消息是有效的,原有消息是无效的,用当前消息替换原有消息

  • MESSAGE_INVALIDATED:当前消息是无效的,原有消息是有效的,丢弃当前消息

  • MESSAGE_NO_ACTION:两个消息可能是不同类型的,不能进行比较,两个消息都是有效的

MessageReplacingPolicy可以根据实际存储的消息定义不同的比较策略

invalidationTrigger是一个回调函数,当替换原有消息时,可以对替换掉的消息进行一些处理

目前的内部消息有以下类型:

  • 存活消息(Alive)

  • 区块数据(Data)

  • 状态消息(State)

  • 身份消息(Identity)

  • 主结点选举消息(Leader)

每一个消息都有一个时间戳信息PeerTime,它包含两个字段incNumberseqNumber。验证两个消息有效性的方法分以下3类

  • 基于时间戳比较:incNumber大的消息有效,incNumber相同时,seqNumber大的有效,如果都相同,原消息为有效消息

  • 基于消息序列号比较:这种策略只对数据区块比较,相同的seqNumber还会比较数据哈希值,当seqNum不同,检查缓冲区是否能够存储两个消息seqNum之间的所有消息,如果缓冲区足够大,那么两个消息都是有效的,否则seqNum大的消息有效

  • 基于PKI-ID比较,检查发送两个消息的结点的PKI-ID是否相同,如果相同就以当前消息为准,这种验证方法只用在身份消息中

消息类型消息验证策略
存活消息 基于时间戳比较
区块数据 基于消息序列号比较
状态消息 基于时间戳比较
身份消息 基于结点PKI-ID比较
主节点选举消息 基于时间戳比较

10、消息的多路分用及分区

通过Gossip协议广播的消息种类很多,不同种类的消息有不同的处理逻辑,Gossip模块利用Go语言的通道,实现了一个消息的多路分用接口ChannelDeMultiplexer

type ChannelDeMultiplexer struct{
    channels []*channel
    lock *sync.RWMutex
    closed int32
}

其中lock是一个读写锁,用来同步对channels处理,closed是通道是否关闭的标识,通道关闭就不能再从通道中读取数据

type channel struct{
    pred common.MessageAcceptor
    ch chan interface{}
}

ch是缓存消息的通道,默认只能存储10个消息,结点接收到消息的时候会调用DeMultiplex接口,pred用来判断订阅者是否对某个消息感兴趣,感兴趣的消息会过滤后存储在ch中,等待下一步处理,pred的策略可以根据业务逻辑来实现,函数定义如下:

type MessageAccptor func (interface{})bool

Gossip模块实现的几种MessageAccptor

  • Gossip消息过滤器

    • GossipMessage

    • SignedGossipMessage

  • 存活消息过滤器

    • GossipMessage_AliveMsg

    • GossipMessage_MemReq

    • GossipMessage_MemRes

  • 状态消息过滤器,过滤出状态同步的消息

  • 远程状态消息过滤器,为了同步状态主动发送和接收的消息

    • GossipMessage_StateRequest

    • GossipMessage_StateResponse

  • 主结点选举消息过滤器

    • GossipMessage_LeadershipMsg

对于需要广播的消息,Gossip模块还实现了消息分区,同一个消息经过过滤器过滤以后,只会出现在一个列表中,实现代码如下:

func partitionMessages(pred common.MessageAcceptor,a *proto.SignedGossipMessage)([]*proto.SignedGossipMessage,[]*proto.SignedGossipMessage){
    s1:=[]*proto.SignedGossipMessage{}
    s2:=[]*proto.SignedGossipMessage{}
    for _,m:=range a{
        if pred(m){
            s1=append(s1,m)
        }else{
            s2=append(s2,m)
        }
    }
    return s1,s2
}

经过partitionMessages处理过的消息a,会分成两个切片,满足pred的放入s1中,不满足的放入s2

广播消息过滤器有如下几种:

  • 区块数据过滤器:过滤出指定通道chainID的区块数据

  • 状态消息过滤器:过滤出状态同步消息

  • 通道内部的消息过滤器:过滤出只在通道内部广播的消息

  • 组织内部的消息过滤器:过滤出只在组织内部广播的消息

  • 主节点选举消息过滤器:过滤出某个通道上的GossipMessage_LeadershipMsg消息

和多路分用消息不同,广播消息过滤器过滤后的消息只能出现在一个结果集中

11、和Gossip相关的配置参数

# 启动连接结点,可以是多个
bootstrap:127.0.0.1:7051
# 自动选举主结点还是指定主节点
useLeaderElection:false
# 只在useleaderElection设置为false时生效
orgLeader:true
# 本地结点的ID:ip:port
endpoint:
# 缓冲区大小
maxBlockCountToStore:100
# 消息推送间隔(毫秒)
maxPropagationBurstSize:10ms
# 消息推送缓冲区大小
maxPropagationBurstSize:10
# 消息推送次数
propagateIterations:1
# 消息推送结点数
propagatePeerNum:3
# 消息获取间隔(秒)
pullInterval:4s
# 消息获取结点数
pullPeerNum:3
# 状态消息获取间隔(秒)
requestStateInfoInerval:4s
# 状态消息推送间隔(秒)
publishStateInfoInterval:4s
# 状态消息存活周期(秒)
stateInfoRetentionInterval:
# 存活消息中推送证书的周期(秒)
publishCertPeriod:10s
# 是否验证区块消息
skipBlockVerification:false
# 是否忽略安全检查
ignoreSecurity:false
# 建立连接超时时间(秒)
dialTimeout:3s
# 连接超时时间(秒)
connTimeout:2s
# 接收消息缓冲区大小
recvBuffSize:20
# 发送消息缓冲区大小
sendBuffSize:20
# 摘要消息处理超时时间(秒)
digestWaitTime:1s
# 请求消息处理超时时间(秒)
requestWaitTime:1s
# 消息响应超时时间(秒)
responseWatTime:2s
# 存活消息间隔时间(秒)
aliveTimeInterval:5s
# 存活消息超时时间(秒)
aliveExpirationTimeout:2s
# 重新连接间隔时间(秒)
reconnectInterval:25s
# 外部标识
externalEndpoint: