Presto启动源码分析

时间:2022-01-07 16:56:57

1.启动主流程

1. 加载module

        ImmutableList.Builder<Module> modules = ImmutableList.builder();//不是很明白加载这些module的作用
        modules.add(
                new NodeModule(),
                new DiscoveryModule(),
                new HttpServerModule(),
                new JsonModule(),
                new JaxrsModule(true),
                new MBeanModule(),
                new JmxModule(),
                new JmxHttpModule(),
                new LogJmxModule(),
                new TraceTokenModule(),
                new JsonEventModule(),
                new HttpEventModule(),
                new EmbeddedDiscoveryModule(),
                new ServerSecurityModule(),
                new AccessControlModule(),
                new EventListenerModule(),
                new ServerMainModule(sqlParserOptions),
                new GracefulShutdownModule());

        modules.addAll(getAdditionalModules());//additional module为空

        Bootstrap app = new Bootstrap(modules.build());

2. 服务启动时初始化的对象

1. SqlTaskManager

  构造该类对象需要使用如下比较重要的类对象

1. TaskManager

 该类对象属于SqlTaskManger的一个成员变量
 初始化时会调用该对象的start方法,构造处理线程
    public synchronized void start()
    {
        checkState(!closed, "TaskExecutor is closed");
        for (int i = 0; i < runnerThreads; i++) {
            addRunnerThread();//构造空线程,因为pendingSplits为空
        }
    }

2.QueryMonitor

3.LocalExecutionPlanner

        requireNonNull(compilerConfig, "compilerConfig is null");
        this.queryPerformanceFetcher = requireNonNull(queryPerformanceFetcher, "queryPerformanceFetcher is null");
        this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");//用于源数据扫描操作,用于构造两个物理操作符:ScanFilterAndProjectOperator和TableScanOperator
        this.indexManager = requireNonNull(indexManager, "indexManager is null"); //用于构造IndexSourceOperator,这个操作符具体什么用于还不清楚
        this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");//********
        this.exchangeClientSupplier = exchangeClientSupplier; //用于构造ExchangeOperator,这个客户端会添加一些需要的location,然后从上一个stage通过http请求的方式获取上一个stage计算出的page
        this.metadata = requireNonNull(metadata, "metadata is null"); 
        this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");//用于构造一些IdentityHashMap<Expression, Type> expressionTypes或者function,但是具体干什么还不是很清楚
        this.pageSinkManager = requireNonNull(pageSinkManager, "pageSinkManager is null");//用于构造TableWriterOperator,应该是insert时会用到
        this.compiler = requireNonNull(compiler, "compiler is null");//用于构造pageprocessor和cursorprocessor,如果把相关代码注释掉,就使用的解释型的
        this.indexJoinLookupStats = requireNonNull(indexJoinLookupStats, "indexJoinLookupStats is null");
        this.maxIndexMemorySize = requireNonNull(taskManagerConfig, "taskManagerConfig is null").getMaxIndexMemoryUsage();
        this.maxPartialAggregationMemorySize = taskManagerConfig.getMaxPartialAggregationMemoryUsage();
        this.maxPagePartitioningBufferSize = taskManagerConfig.getMaxPagePartitioningBufferSize();

2. SqlQueryManager

这个类主要是用来创建query的,有一个createQuery方法,该方法会在stagementResource响应查询request时被调用

   public SqlQueryManager(
            SqlParser sqlParser,
            QueryManagerConfig config,
            QueryMonitor queryMonitor,
            QueryQueueManager queueManager,
            ClusterMemoryManager memoryManager,
            LocationFactory locationFactory,
            TransactionManager transactionManager,
            Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories)
    {
        this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");//用于在查询中为query创建statement

        this.executionFactories = requireNonNull(executionFactories, "executionFactories is null");//用于为query创建QueryExecution

        this.queryExecutor = newCachedThreadPool(threadsNamed("query-scheduler-%s"));//用于在后台提交查询任务
        this.queryExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) queryExecutor);

        requireNonNull(config, "config is null");
        this.queueManager = requireNonNull(queueManager, "queueManager is null");
        this.memoryManager = requireNonNull(memoryManager, "memoryManager is null");

        this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
        this.locationFactory = requireNonNull(locationFactory, "locationFactory is null");//用于在创建query时,创建location

        this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");

        this.minQueryExpireAge = config.getMinQueryExpireAge();
        this.maxQueryHistory = config.getMaxQueryHistory();
        this.clientTimeout = config.getClientTimeout();

        queryManagementExecutor = Executors.newScheduledThreadPool(config.getQueryManagerExecutorPoolSize(), threadsNamed("query-management-%s"));
        queryManagementExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) queryManagementExecutor);
        queryManagementExecutor.scheduleWithFixedDelay(new Runnable()

3. 加载插件

            Injector injector = app.strictConfig().initialize();//初始化很多系统变量,但是不清楚怎么进入的

            injector.getInstance(PluginManager.class).loadPlugins(); //加载plugin目录下存储的插件

            injector.getInstance(CatalogManager.class).loadCatalogs(); //加载etc/catalog目录下的插件配置文件