peer启动过程源码分析

时间:2021-01-01 04:23:11

peer启动过程源码分析

由图片可以看出peer启动节点后先初始化一下日志

然后进入Server函数

//==============================================================================

//peer node start 之后做日志初始化之后就进入到server函数
//==============================================================================
func serve(args []string) error {
    // Parameter overrides must be processed before any paramaters are
    // cached. Failures to cache cause the server to terminate immediately.
    //在其他参数被缓存起来之前参数覆盖必须处理,失败缓存导致服务立即结束
    if chaincodeDevMode {
        logger.Info("Running in chaincode development mode")
        logger.Info("Set consensus to NOOPS and user starts chaincode")
        logger.Info("Disable loading validity system chaincode")
 
        viper.Set("peer.validator.enabled", "true")
        viper.Set("peer.validator.consensus", "noops")
        viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
 
    }
 
    if err := peer.CacheConfiguration(); err != nil {
        return err
    }
 
    peerEndpoint, err := peer.GetPeerEndpoint()
    if err != nil {
        err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
        return err
    }
 
    //启动grpc服务,监听7051端口
    listenAddr := viper.GetString("peer.listenAddress")
 
    if "" == listenAddr {
        logger.Debug("Listen address not specified, using peer endpoint address")
        listenAddr = peerEndpoint.Address
    }
 
    lis, err := net.Listen("tcp", listenAddr)
    if err != nil {
        grpclog.Fatalf("Failed to listen: %v", err)
    }
    //创建EventHub服务器,通过调用createEventHubServer方法实现,该服务也是grpc,只有VP
    //才能开启
    ehubLis, ehubGrpcServer, err := createEventHubServer()
    if err != nil {
        grpclog.Fatalf("Failed to create ehub server: %v", err)
    }
 
    logger.Infof("Security enabled status: %t", core.SecurityEnabled())
    if viper.GetBool("security.privacy") {
        if core.SecurityEnabled() {
            logger.Infof("Privacy enabled status: true")
        } else {
            panic(errors.New("Privacy cannot be enabled as requested because security is disabled"))
        }
    } else {
        logger.Infof("Privacy enabled status: false")
    }
    //启动rockdb数据库
    db.Start()
 
    var opts []grpc.ServerOption
    if comm.TLSEnabled() {
        creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"),
            viper.GetString("peer.tls.key.file"))
 
        if err != nil {
            grpclog.Fatalf("Failed to generate credentials %v", err)
        }
        opts = []grpc.ServerOption{grpc.Creds(creds)}
    }
 
    //创建一个grpc服务
    grpcServer := grpc.NewServer(opts...)
 
    //注册Chaincode支持服务器
    secHelper, err := getSecHelper()
    if err != nil {
        return err
    }
 
    secHelperFunc := func() crypto.Peer {
        return secHelper
    }
 
    registerChaincodeSupport(chaincode.DefaultChain, grpcServer, secHelper)
 
    var peerServer *peer.Impl
 
    // 创建peer服务器,主意区分VP和NVP节点
    if peer.ValidatorEnabled() {
        logger.Debug("Running as validating peer - making genesis block if needed")
        makeGenesisError := genesis.MakeGenesis()
        if makeGenesisError != nil {
            return makeGenesisError
        }
        logger.Debugf("Running as validating peer - installing consensus %s",
            viper.GetString("peer.validator.consensus"))
 
        peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)
    } else {
        logger.Debug("Running as non-validating peer")
        peerServer, err = peer.NewPeerWithHandler(secHelperFunc, peer.NewPeerHandler)
    }
 
    if err != nil {
        logger.Fatalf("Failed creating new peer with handler %v", err)
 
        return err
    }
 
    // 注册peer服务
    pb.RegisterPeerServer(grpcServer, peerServer)
 
    // 注册管理服务器
    pb.RegisterAdminServer(grpcServer, core.NewAdminServer())
 
    // 注册Devops服务器
    serverDevops := core.NewDevopsServer(peerServer)
    pb.RegisterDevopsServer(grpcServer, serverDevops)
 
    // 注册ServerOpenchain服务器
    serverOpenchain, err := rest.NewOpenchainServerWithPeerInfo(peerServer)
    if err != nil {
        err = fmt.Errorf("Error creating OpenchainServer: %s", err)
        return err
    }
 
    pb.RegisterOpenchainServer(grpcServer, serverOpenchain)
 
    // 如果配置了的话创建和注册REST服务
    if viper.GetBool("rest.enabled") {
        go rest.StartOpenchainRESTServer(serverOpenchain, serverDevops)
    }
 
    logger.Infof("Starting peer with ID=%s, network ID=%s, address=%s, rootnodes=%v, validator=%v",
        peerEndpoint.ID, viper.GetString("peer.networkId"), peerEndpoint.Address,
        viper.GetString("peer.discovery.rootnode"), peer.ValidatorEnabled())
 
    // 启动GRPC服务器. 如果是必须的话在一个goroutine中完成这样我们能够部署genesis
    serve := make(chan error)
 
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        sig := <-sigs
        fmt.Println()
        fmt.Println(sig)
        serve <- nil
    }()
 
    go func() {
        var grpcErr error
        if grpcErr = grpcServer.Serve(lis); grpcErr != nil {
            grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
        } else {
            logger.Info("grpc server exited")
        }
        serve <- grpcErr
    }()
 
    if err := writePid(viper.GetString("peer.fileSystemPath")+"/peer.pid", os.Getpid()); err != nil {
        return err
    }
 
    // 启动eventHub服务
    if ehubGrpcServer != nil && ehubLis != nil {
        go ehubGrpcServer.Serve(ehubLis)
    }
 
    if viper.GetBool("peer.profile.enabled") {
        go func() {
            profileListenAddress := viper.GetString("peer.profile.listenAddress")
            logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
            if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
                logger.Errorf("Error starting profiler: %s", profileErr)
            }
        }()
    }
 
    // Block until grpc server exits 产生块直到grpc服务退出
    return <-serve
}
所过程在代码中的注释可以看明白