Presto源码解读1-客户端提交查询-吕信(原创)

时间:2021-03-01 16:56:37

客户端请求处理就是接收交互式/非交互式命令,通过httpclient发送到服务器(coordinator),客户端通过httpclient更新执行状态打印给用户看,最后收集结果打印给用户。启动Presto cli 客户端的代码文件为:

presto-cli\src\main\java\com\facebook\presto\cli\Presto.java

public final class Presto

{

    private Presto() {}


    public static void main(String[] args)

            throws Exception

    {

        Console console = singleCommand(Console.class).parse(args);

        if (console.helpOption.showHelpIfRequested()) {

            return;

        }

        console.run();

    }

}


Console类对应的源文件为:presto-cli\src\main\java\com\facebook\presto\cli\Console.java,在该类中对查询的处理方法为:

private static void process(QueryRunner queryRunner, String sql, OutputFormat outputFormat, boolean interactive)

    {

        try (Query query = queryRunner.startQuery(sql)) {

            query.renderOutput(System.out, outputFormat, interactive);

            // update session properties if present

            if (!query.getSetSessionProperties().isEmpty() || !query.getResetSessionProperties().isEmpty()) {

                Map<String, String> sessionProperties = new HashMap<>(queryRunner.getSession().getProperties());

                sessionProperties.putAll(query.getSetSessionProperties());

                sessionProperties.keySet().removeAll(query.getResetSessionProperties());

                queryRunner.setSession(withProperties(queryRunner.getSession(), sessionProperties));

            }

        }

        catch (RuntimeException e) {

            System.out.println("Error running command: " + e.getMessage());

            if (queryRunner.getSession().isDebug()) {

                e.printStackTrace();

            }

        }

    }


Query.java StatementCliet.java Console.java QueryRunner.java


  1. 客户端初始化:console.run()

 

@Override

    public void run()

    {

       //将登录的配置参数(--catalog �Cserver �Cschema 日志级别等)装载到ClientSession,附录1:客户端默认session

        ClientSession session = clientOptions.toClientSession();

        //命令行查询 --execute

        boolean hasQuery = !Strings.isNullOrEmpty(clientOptions.execute);

        //--file 执行的文件

        boolean isFromFile = !Strings.isNullOrEmpty(clientOptions.file);

        if (!hasQuery || !isFromFile) {

            AnsiConsole.systemInstall();

        }

        initializeLogging(session.isDebug());

        String query = clientOptions.execute;

        if (hasQuery) {

            query += ";";

        }

        //冲突检查,--execute―file不能同时使用

        if (isFromFile) {

            if (hasQuery) {

                throw new RuntimeException("both --execute and --file specified");

            }

            try {

                query = Files.toString(new File(clientOptions.file), Charsets.UTF_8);

                hasQuery = true;

            }

            catch (IOException e) {

                throw new RuntimeException(format("Error reading from file %s: %s", clientOptions.file, e.getMessage()));

            }

        }

      //有查询就执行查询,没查询就运行客户端

        try (QueryRunner queryRunner = QueryRunner.create(session)) {

            if (hasQuery) {

                executeCommand(queryRunner, query, clientOptions.outputFormat);

            }

            else {

                runConsole(queryRunner, session);

            }

        }

    }

 

  1. 执行命令行:com.facebook.presto.cli.Console.runConsole(QueryRunner queryRunner, ClientSession session)

private static void runConsole(QueryRunner queryRunner, ClientSession session)

    {

        try (TableNameCompleter tableNameCompleter = new TableNameCompleter(queryRunner);

                LineReader reader = new LineReader(getHistory(), tableNameCompleter)) {

            tableNameCompleter.populateCache();

            StringBuilder buffer = new StringBuilder();

            while (true) {

                // read a line of input from user

                String prompt = PROMPT_NAME + ":" + session.getSchema();

                if (buffer.length() > 0) {

                    prompt = Strings.repeat(" ", prompt.length() - 1) + "-";

                }

                String line = reader.readLine(prompt + "> ");

 

                // add buffer to history and clear on user interrupt

                if (reader.interrupted()) {

                    String partial = squeezeStatement(buffer.toString());

                    if (!partial.isEmpty()) {

                        reader.getHistory().add(partial);

                    }

                    buffer = new StringBuilder();

                    continue;

                }

 

                // exit on EOF

                if (line == null) {

                    return;

                }

 

                // check for special commands if this is the first line

                if (buffer.length() == 0) {

                    String command = line.trim();

                    if (command.endsWith(";")) {

                        command = command.substring(0, command.length() - 1).trim();

                    }

                    switch (command.toLowerCase()) {

                        case "exit":

                        case "quit":

                            return;

                        case "help":

                            System.out.println();

                            System.out.println(getHelpText());

                            continue;

                    }

                }

 

                // not a command, add line to buffer

                buffer.append(line).append("\n");

 

                // execute any complete statements

                String sql = buffer.toString();

                StatementSplitter splitter = new StatementSplitter(sql, ImmutableSet.of(";", "\\G"));

                for (Statement split : splitter.getCompleteStatements()) {

                    System.out.printf("Execute query:" + split.statement());

                    Optional<Object> statement = getParsedStatement(split.statement());

                    if (statement.isPresent() && isSessionParameterChange(statement.get())) {

                        session = processSessionParameterChange(statement.get(), session);

                        queryRunner.setSession(session);

                        tableNameCompleter.populateCache();

                    }

                    else {

                        OutputFormat outputFormat = OutputFormat.ALIGNED;

                        if (split.terminator().equals("\\G")) {

                            outputFormat = OutputFormat.VERTICAL;

                        }

 

                        process(queryRunner, split.statement(), outputFormat, true);

                    }

                    reader.getHistory().add(squeezeStatement(split.statement()) + split.terminator());

                }

 

                // replace buffer with trailing partial statement

                buffer = new StringBuilder();

                String partial = splitter.getPartialStatement();

                if (!partial.isEmpty()) {

                    buffer.append(partial).append('\n');

                }

            }

        }

        catch (IOException e) {

            System.err.println("Readline error: " + e.getMessage());

        }

    }

 

  1. 发送请求到Coordinator:

在方法:com.facebook.presto.cli.QueryRunner.startInternalQuery(String query)中创建一个StatementClient对象,在调用StatementClient类的构造方法中发送请求到Coordinator中。


    public StatementClient(HttpClient httpClient, JsonCodec<QueryResults> queryResultsCodec, ClientSession session, String query)

    {

        checkNotNull(httpClient, "httpClient is null");

        checkNotNull(queryResultsCodec, "queryResultsCodec is null");

        checkNotNull(session, "session is null");

        checkNotNull(query, "query is null");

 

        this.httpClient = httpClient;

        this.responseHandler = createFullJsonResponseHandler(queryResultsCodec);

        this.debug = session.isDebug();

        this.timeZoneId = session.getTimeZoneId();

        this.query = query;

 

        Request request = buildQueryRequest(session, query);

       //发送请求给Coordinator

        currentResults.set(httpClient.execute(request, responseHandler).getValue());

    }

 

  1. 打印查询执行过程和结果

public void renderOutput(PrintStream out, OutputFormat outputFormat, boolean interactive)

    {

        SignalHandler oldHandler = Signal.handle(SIGINT, new SignalHandler()

        {

            @Override

            public void handle(Signal signal)

            {

                if (ignoreUserInterrupt.get() || client.isClosed()) {

                    return;

                }

                try {

                    if (!client.cancelLeafStage()) {

                        client.close();

                    }

                }

                catch (RuntimeException e) {

                    log.debug(e, "error canceling leaf stage");

                    client.close();

                }

            }

        });

        try {

            renderQueryOutput(out, outputFormat, interactive);

        }

        finally {

            Signal.handle(SIGINT, oldHandler);

        }

    }

 

    private void renderQueryOutput(PrintStream out, OutputFormat outputFormat, boolean interactive)

    {

        StatusPrinter statusPrinter = null;

        @SuppressWarnings("resource")

        PrintStream errorChannel = interactive ? out : System.err;

 

        if (interactive) {

            //交互式打印,非交互式会等待输出结果

            statusPrinter = new StatusPrinter(client, out);

            statusPrinter.printInitialStatusUpdates();

        }

        else {

            waitForData();

        }

 

打印执行情况的方法

public void printInitialStatusUpdates()

    {

        long lastPrint = System.nanoTime();

        try {

            while (client.isValid()) {

                try {

                    // exit status loop if there is there is pending output

                    if (client.current().getData() != null) {

                        return;

                    }

 

                    // update screen if enough time has passed

                    if (Duration.nanosSince(lastPrint).getValue(SECONDS) >= 0.5) {

                        console.repositionCursor();

                        printQueryInfo(client.current());

                        lastPrint = System.nanoTime();

                    }

 

                    // fetch next results (server will wait for a while if no data)

                    client.advance();

                }

                catch (RuntimeException e) {

                    log.debug(e, "error printing status");

                }

            }

        }

        finally {

            console.resetScreen();

        }

    }

 

附录1:客户端默认session

public class ClientOptions

{

    @Option(name = "--server", title = "server", description = "Presto server location (default: localhost:8080)")

    public String server = "localhost:8080";

 

    @Option(name = "--user", title = "user", description = "Username")

    public String user = System.getProperty("user.name");

 

    @Option(name = "--source", title = "source", description = "Name of source making query")

    public String source = "presto-cli";

 

    @Option(name = "--catalog", title = "catalog", description = "Default catalog")

    public String catalog = "default";

 

    @Option(name = "--schema", title = "schema", description = "Default schema")

    public String schema = "default";

 

    @Option(name = {"-f", "--file"}, title = "file", description = "Execute statements from file and exit")

    public String file;

 

    @Option(name = "--debug", title = "debug", description = "Enable debug information")

    public boolean debug;

 

    @Option(name = "--execute", title = "execute", description = "Execute specified statements and exit")

    public String execute;

 

    @Option(name = "--output-format", title = "output-format", description = "Output format for batch mode (default: CSV)")

    public OutputFormat outputFormat = OutputFormat.CSV;

本文出自 “smile” 博客,请务必保留此出处http://lvxin1986.blog.51cto.com/4953500/1662883