查询总流程图:
1. Query创建
statementResource在接收到查询请求后,会通过调用createQuery方法来响应,每调用一次该方法,构造一个driver,如果有多个stage,会发送多次http请求,进行driver创建。其中一个driver使用若干个operator处理一个split:
@POST
@Produces(MediaType.APPLICATION_JSON)
public Response createQuery(
String statement,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo)
throws InterruptedException
{
assertRequest(!isNullOrEmpty(statement), "SQL statement is empty");
Session session = createSessionForRequest(servletRequest, accessControl, sessionPropertyManager, queryIdGenerator.createNextQueryId());
ExchangeClient exchangeClient = exchangeClientSupplier.get(deltaMemoryInBytes -> { });
Query query = new Query(session, statement, queryManager, exchangeClient);//主要是调用sqlQueryManager的createQuery方法
queries.put(query.getQueryId(), query);
return getQueryResults(query, Optional.empty(), uriInfo, new Duration(1, MILLISECONDS));//获取查询结果
}
1.Query创建和QueryExecution提交
1. 通过调用SqlQueryManager的createQuery方法创建:
Statement wrappedStatement = sqlParser.createStatement(query);
statement = unwrapExecuteStatement(wrappedStatement, sqlParser, session);//创建statement
queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement, parameters);//创建QueryExecution
queueManager.submit(statement, queryExecution, queryExecutor);//提交QueryExecution,也就是提交查询任务
2. 提交查询任务(SqlQueueManager的submit方法分析)
2.1 队列选择
有一些列的队列匹配原则,如果匹配上就创建(getOrCreateQueues方法中创建新队列)或者返回已有队列,如果没有匹配上,则抛出异常
List<QueryQueue> queues;
try {
queues = selectQueues(queryExecution.getSession(), executor);
}
catch (PrestoException e) {
queryExecution.fail(e);
return;
}
这里还需要详细看看QueryQueue的构造方法:
QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries)
{
requireNonNull(queryExecutor, "queryExecutor is null");
checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");
this.queuePermits = new AtomicInteger(maxQueuedQueries + maxConcurrentQueries);
this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,//并发执行的query个数
queryExecutor,//查询线程池,在构造SqlQueryManager时构造,是一个CachedThreadPool。问题是这个SqlQueryManager是在什么时候构造的,没有找到
queueEntry -> { //异步信号量的第三个参数是Function<T, ListenableFuture<?>> submitter类型,主要完成的工作是QueuedExecution出队列,执行其start方法,进而调用QueryExecution的start方法,开始执行查询
QueuedExecution queuedExecution = queueEntry.dequeue();
if (queuedExecution != null) {
queuedExecution.start();
return queuedExecution.getCompletionFuture();
}
return Futures.immediateFuture(null);
});
}
2.2 判断是否有队列许可
for (QueryQueue queue : queues) {
if (!queue.reserve(queryExecution)) {
// Reject query if we couldn't acquire a permit to enter the queue.
// The permits will be released when this query fails.
queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries!"));
return;
}
}
2.3 创建QuenedExecution
主要调用createQueuedExecution方法把一个SqlQueryExecution封装成一个QueuedExecution
queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor));
使用信号量通知队列中有任务,可以提交执行。
2.4 开始执行
即QueryExecution后,在QuenedExecution中实际开始执行,代码如下:
public void start()
{
// Only execute if the query is not already completed (e.g. cancelled)
if (listenableFuture.isDone()) {
return;
}
if (nextQueues.isEmpty()) {
executor.execute(() -> {//提交到线程池,开始并发执行
try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
queryExecution.start(); //在这里调用QueryExecution的start方法开始执行
}
});
}
else {
nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture));
}
}