如图所示,我们要分析的是registering BLOCK,registering CHAINCODE,registering REJECTION和registering REGISTER的整个过程
下图是代码流程图
funccreateEventHubServer()(net.Listener,*grpc.Server,error){
var lis net.Listener
var grpcServer *grpc.Server
var err error
// ValidatorEnabled返回peer.validator.enabled是否可用
if peer.ValidatorEnabled() {
lis, err = net.Listen("tcp", viper.GetString("peer.validator.events.address"))
if err != nil {
return nil, nil, fmt.Errorf("failed to listen: %v", err)
}
//TODO - do we need different SSL material for events ?
var opts []grpc.ServerOption
// TLSEnabled返回peer.tls.enabled配置好的值的缓存值
if comm.TLSEnabled() {
//NewServerTLSFromFile是gRPC库的函数,主要目的是为获取tls的证书和密钥
creds, err := credentials.NewServerTLSFromFile(
viper.GetString("peer.tls.cert.file"),
viper.GetString("peer.tls.key.file"))
if err != nil {
return nil, nil, fmt.Errorf("Failed to generate credentials %v", err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
grpcServer = grpc.NewServer(opts...)
ehServer := producer.NewEventsServer(
uint(viper.GetInt("peer.validator.events.buffersize")),
viper.GetInt("peer.validator.events.timeout"))
//注册事件服务
pb.RegisterEventsServer(grpcServer, ehServer)
}
return lis, grpcServer, err
}
// ValidatorEnabled返回peer.validator.enabled是否可用
func ValidatorEnabled() bool {
if !configurationCached {
cacheConfiguration()
}
return validatorEnabled
}
//
cacheConfiguration如果检查失败打一个错误日志
func cacheConfiguration() {
if err := CacheConfiguration(); err != nil {
peerLogger.Errorf("Execution continues after CacheConfiguration() failure : %s", err)
}
}
//
TLSEnabled返回peer.tls.enabled配置好的值的缓存值
func TLSEnabled() bool {
if !configurationCached {
cacheConfiguration()
}
return tlsEnabled
}
//
cacheConfiguration如果检查失败打错误日志.
func cacheConfiguration() {
if err := CacheConfiguration(); err != nil {
commLogger.Errorf("Execution continues after CacheConfiguration() failure : %s", err)
}
}
//
NewEventsServer
返回一个事件服务器
func NewEventsServer(bufferSize uint, timeout int) *EventsServer {
if globalEventsServer != nil {
panic("Cannot create multiple event hub servers")
}
globalEventsServer = new(EventsServer)
//初始化并且开启事件
initializeEvents(bufferSize, timeout)
//initializeCCEventProcessor(bufferSize, timeout)
return globalEventsServer
}
//初始化并且开启事件
func initializeEvents(bufferSize uint, tout int) {
if gEventProcessor != nil {
panic("should not be called twice")
}
gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, bufferSize), timeout: tout}
addInternalEventTypes()
//启动事件进程器
go gEventProcessor.start()
}
func
addInternalEventTypes()
{
AddEventType(pb.EventType_BLOCK)
AddEventType(pb.EventType_CHAINCODE)
AddEventType(pb.EventType_REJECTION)
AddEventType(pb.EventType_REGISTER)
}
//AddEventType
添加支持的事件类型
func AddEventType(eventType pb.EventType) error {
gEventProcessor.Lock()
producerLogger.Debugf("registering %s", pb.EventType_name[int32(eventType)])
if _, ok := gEventProcessor.eventConsumers[eventType]; ok {
gEventProcessor.Unlock()
return fmt.Errorf("event type exists %s", pb.EventType_name[int32(eventType)])
}
switch eventType {
case pb.EventType_BLOCK:
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
case pb.EventType_CHAINCODE:
gEventProcessor.eventConsumers[eventType] = &chaincodeHandlerList{handlers: make(map[string]map[string]map[*handler]bool)}
case pb.EventType_REJECTION:
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
}
gEventProcessor.Unlock()
return nil
}
func
(ep
*
eventProcessor
)
start()
{
producerLogger.Info("event processor started")
for {
//等待事件
e := <-ep.eventChannel
var hl handlerList
eType := getMessageType(e)
ep.Lock()
if hl, _ = ep.eventConsumers[eType]; hl == nil {
producerLogger.Errorf("Event of type %s does not exist", eType)
ep.Unlock()
continue
}
//lock the handler map lock
ep.Unlock()
hl.foreach(e, func(h *handler) {
if e.Event != nil {
h.SendMessage(e)
}
})
}
}
//
获取消息类型
func getMessageType(e *pb.Event) pb.EventType {
switch e.Event.(type) {
case *pb.Event_Register:
return pb.EventType_REGISTER
case *pb.Event_Block:
return pb.EventType_BLOCK
case *pb.Event_ChaincodeEvent:
return pb.EventType_CHAINCODE
case *pb.Event_Rejection:
return pb.EventType_REJECTION
default:
return -1
}
}
//
SendMessage
通过流发送一条消息给远程的peer
func (d *handler) SendMessage(msg *pb.Event) error {
err := d.ChatStream.Send(msg)
if err != nil {
return fmt.Errorf("Error Sending message through ChatStream: %s", err)
}
return nil
}
funcRegisterEventsServer(s*grpc.Server,srv EventsServer){
s.RegisterService(&_Events_serviceDesc, srv)
}
一些重要的变量
typeeventProcessorstruct{
sync.RWMutex
eventConsumers map[pb.EventType]handlerList
//we could generalize this with mutiple channels each with its own size
// 产生多个大小限定的channels
eventChannel chan *pb.Event
//milliseconds timeout for producer to send an event.
//毫秒级别的超时触发器发送一个事件
//if < 0, if buffer full, unblocks immediately and not send
//如果小于0,如果缓冲区满的,立即解锁并且不发送事件
//if 0, if buffer full, will block and guarantee the event will be sent out
// 如是0,如果缓冲区满的,将上锁并保证事件不会被发出去
//if > 0, if buffer full, blocks till timeout
//如果是0,如果缓冲区是满的,上锁直到超时
timeout int
}
// 通过initializeEvents函数来创建全局eventProcessor单列模式 Openchain producers
// Openchain仅仅通过一个重入的静态方法生产并发送事件
var gEventProcessor *eventProcessor
typeEventType int32
const (
EventType_REGISTER EventType = 0
EventType_BLOCK EventType = 1
EventType_CHAINCODE EventType = 2
EventType_REJECTION EventType = 3
)
var EventType_name = map[int32]string{
0: "REGISTER",
1: "BLOCK",
2: "CHAINCODE",
3: "REJECTION",
}
var EventType_value = map[string]int32{
"REGISTER": 0,
"BLOCK": 1,
"CHAINCODE": 2,
"REJECTION": 3,
}
// Eventis usedby
// - consumers (adapters) to send Register
// - producer to advertise supported types and events
type Event struct {
// Types that are valid to be assigned to Event:
// *Event_Register
// *Event_Block
// *Event_ChaincodeEvent
// *Event_Rejection
// *Event_Unregister
Event isEvent_Event `protobuf_oneof:"Event"`
}