Hyperledger Fabric继peer启动之后的源码解析二

时间:2022-03-16 17:18:31

Hyperledger Fabric继peer启动之后的源码解析二

如图所示,我们要分析的是registering BLOCK,registering CHAINCODE,registering REJECTION和registering REGISTER的整个过程

Hyperledger Fabric继peer启动之后的源码解析二


下图是代码流程图

Hyperledger Fabric继peer启动之后的源码解析二


funccreateEventHubServer()(net.Listener,*grpc.Server,error){

    var lis net.Listener
    var grpcServer *grpc.Server
    var err error
    // ValidatorEnabled返回peer.validator.enabled是否可用
    if peer.ValidatorEnabled() {
        lis, err = net.Listen("tcp", viper.GetString("peer.validator.events.address"))
        if err != nil {
            return nil, nil, fmt.Errorf("failed to listen: %v", err)
        }
        //TODO - do we need different SSL material for events ?
        var opts []grpc.ServerOption
        // TLSEnabled返回peer.tls.enabled配置好的值的缓存值
        if comm.TLSEnabled() {
            //NewServerTLSFromFile是gRPC库的函数,主要目的是为获取tls的证书和密钥
            creds, err := credentials.NewServerTLSFromFile(
                viper.GetString("peer.tls.cert.file"),
                viper.GetString("peer.tls.key.file"))
            if err != nil {
                return nil, nil, fmt.Errorf("Failed to generate credentials %v", err)
            }
            opts = []grpc.ServerOption{grpc.Creds(creds)}
        }
        grpcServer = grpc.NewServer(opts...)
        ehServer := producer.NewEventsServer(
            uint(viper.GetInt("peer.validator.events.buffersize")),
            viper.GetInt("peer.validator.events.timeout"))
        //注册事件服务
        pb.RegisterEventsServer(grpcServer, ehServer)
    }
    return lis, grpcServer, err
}

// ValidatorEnabled返回peer.validator.enabled是否可用

func ValidatorEnabled() bool {
    if !configurationCached {
        cacheConfiguration()
    }
    return validatorEnabled
}
// cacheConfiguration如果检查失败打一个错误日志
func cacheConfiguration() {
    if err := CacheConfiguration(); err != nil {
        peerLogger.Errorf("Execution continues after CacheConfiguration() failure : %s", err)
    }
}
// TLSEnabled返回peer.tls.enabled配置好的值的缓存值
func TLSEnabled() bool {
    if !configurationCached {
        cacheConfiguration()
    }
    return tlsEnabled
}
// cacheConfiguration如果检查失败打错误日志.
func cacheConfiguration() {
    if err := CacheConfiguration(); err != nil {
        commLogger.Errorf("Execution continues after CacheConfiguration() failure : %s", err)
    }
}
// NewEventsServer 返回一个事件服务器
func NewEventsServer(bufferSize uint, timeout int) *EventsServer {
    if globalEventsServer != nil {
        panic("Cannot create multiple event hub servers")
    }
    globalEventsServer = new(EventsServer)
    //初始化并且开启事件
    initializeEvents(bufferSize, timeout)
    //initializeCCEventProcessor(bufferSize, timeout)
    return globalEventsServer
}
//初始化并且开启事件
func initializeEvents(bufferSize uint, tout int) {
    if gEventProcessor != nil {
        panic("should not be called twice")
    }
    gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, bufferSize), timeout: tout}
    addInternalEventTypes()
    //启动事件进程器
    go gEventProcessor.start()
}

func addInternalEventTypes() {
    AddEventType(pb.EventType_BLOCK)
    AddEventType(pb.EventType_CHAINCODE)
    AddEventType(pb.EventType_REJECTION)
    AddEventType(pb.EventType_REGISTER)
}
//AddEventType 添加支持的事件类型
func AddEventType(eventType pb.EventType) error {
    gEventProcessor.Lock()
    producerLogger.Debugf("registering %s", pb.EventType_name[int32(eventType)])
    if _, ok := gEventProcessor.eventConsumers[eventType]; ok {
        gEventProcessor.Unlock()
        return fmt.Errorf("event type exists %s", pb.EventType_name[int32(eventType)])
    }
    switch eventType {
    case pb.EventType_BLOCK:
        gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
    case pb.EventType_CHAINCODE:
        gEventProcessor.eventConsumers[eventType] = &chaincodeHandlerList{handlers: make(map[string]map[string]map[*handler]bool)}
    case pb.EventType_REJECTION:
        gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
    }
    gEventProcessor.Unlock()
    return nil
}
func (ep * eventProcessor ) start() {
    producerLogger.Info("event processor started")
    for {
        //等待事件
        e := <-ep.eventChannel
        var hl handlerList
        eType := getMessageType(e)
        ep.Lock()
        if hl, _ = ep.eventConsumers[eType]; hl == nil {
            producerLogger.Errorf("Event of type %s does not exist", eType)
            ep.Unlock()
            continue
        }
        //lock the handler map lock
        ep.Unlock()
        hl.foreach(e, func(h *handler) {
            if e.Event != nil {
                h.SendMessage(e)
            }
        })
    }
}
// 获取消息类型
func getMessageType(e *pb.Event) pb.EventType {
    switch e.Event.(type) {
    case *pb.Event_Register:
        return pb.EventType_REGISTER
    case *pb.Event_Block:
        return pb.EventType_BLOCK
    case *pb.Event_ChaincodeEvent:
        return pb.EventType_CHAINCODE
    case *pb.Event_Rejection:
        return pb.EventType_REJECTION
    default:
        return -1
    }
}
// SendMessage 通过流发送一条消息给远程的peer
func (d *handler) SendMessage(msg *pb.Event) error {
    err := d.ChatStream.Send(msg)
    if err != nil {
        return fmt.Errorf("Error Sending message through ChatStream: %s", err)
    }
    return nil
}

funcRegisterEventsServer(s*grpc.Server,srv EventsServer){

    s.RegisterService(&_Events_serviceDesc, srv)
}

一些重要的变量

typeeventProcessorstruct{

    sync.RWMutex
    eventConsumers map[pb.EventType]handlerList
    //we could generalize this with mutiple channels each with its own size
    // 产生多个大小限定的channels
    eventChannel chan *pb.Event
    //milliseconds timeout for producer to send an event.
    //毫秒级别的超时触发器发送一个事件
    //if < 0, if buffer full, unblocks immediately and not send
    //如果小于0,如果缓冲区满的,立即解锁并且不发送事件
    //if 0, if buffer full, will block and guarantee the event will be sent out
    // 如是0,如果缓冲区满的,将上锁并保证事件不会被发出去
    //if > 0, if buffer full, blocks till timeout
    //如果是0,如果缓冲区是满的,上锁直到超时
    timeout int
}
// 通过initializeEvents函数来创建全局eventProcessor单列模式 Openchain producers
// Openchain仅仅通过一个重入的静态方法生产并发送事件
var gEventProcessor *eventProcessor

typeEventType int32


const (
    EventType_REGISTER  EventType = 0
    EventType_BLOCK     EventType = 1
    EventType_CHAINCODE EventType = 2
    EventType_REJECTION EventType = 3
)
var EventType_name = map[int32]string{
    0: "REGISTER",
    1: "BLOCK",
    2: "CHAINCODE",
    3: "REJECTION",
}
var EventType_value = map[string]int32{
    "REGISTER":  0,
    "BLOCK":     1,
    "CHAINCODE": 2,
    "REJECTION": 3,
}

// Eventis usedby

//  - consumers (adapters) to send Register
//  - producer to advertise supported types and events
type Event struct {
    // Types that are valid to be assigned to Event:
    //    *Event_Register
    //    *Event_Block
    //    *Event_ChaincodeEvent
    //    *Event_Rejection
    //    *Event_Unregister
    Event isEvent_Event `protobuf_oneof:"Event"`
}