Java并发编程之线程池、Callable和Future使用

时间:2021-04-11 18:00:36

转载请注明来源-作者@loongshawn:http://blog.csdn.net/loongshawn/article/details/75338247,建议读者阅读原文,确保获得完整的信息

知识储备

收藏几篇好文章:

1、深入理解Java之线程池

2、Java并发编程:volatile关键字解析

目录结构

  • Callable和Future使用
  • 线程池使用

Callable和Future使用

多线程实现方式很多,分为两类:1、没有返回值的;2、有返回值的。

针对“没有返回值的”这类可以参考《Java创建线程的两种方法比较》,本文不作赘述。本文仅说明Callable方式的多线程实现。

首先,创建任务类实现Callable接口,重写call函数,定义其返回类型。

/**
* Created by loongshawn on 2017/7/17.
*
* 并发任务
*/

public class TaskCallable implements Callable<DatabaseSearchResponse> {

private DataBaseAuthentication dataBaseAuthentication;
private String filepath;
private String extension;
private String checklinks;

public TaskCallable(DataBaseAuthentication dataBaseAuthentication,String filepath,String extension,String checklinks){
this.dataBaseAuthentication = dataBaseAuthentication;
this.filepath = filepath;
this.extension = extension;
this.checklinks = checklinks;
}

public DatabaseSearchResponse call() throws Exception {
// 任务查询
DatabaseSearchRequest databaseSearchRequest = new DatabaseSearchRequest();
DatabaseSearchResponse databaseSearchResponse = databaseSearchRequest.execute(dataBaseAuthentication,filepath,extension,checklinks);
return databaseSearchResponse;
}
}

然后,构建多线程创建主体函数。仅提供代码片段。


public Response execute(SearchParameter para){

Response response = new Response();
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<DatabaseSearchResponse>> results = new ArrayList<Future<DatabaseSearchResponse>>();
// 获取数据库参数
...
for (String dataBaseName : dataBaseNames){

// 数据库查询任务
TaskCallable task = new TaskCallable(dataBaseAuthentication,filepath,extension,checklinks);
results.add(executorService.submit(task));
}

executorService.shutdown();

for (Future<DatabaseSearchResponse> searchResponseFuture: results){
JSONObject jsonObject = new JSONObject();
while (true) {
if(searchResponseFuture.isDone() && !searchResponseFuture.isCancelled()) {
DatabaseSearchResponse databaseSearchResponse = searchResponseFuture.get();
break;
} else {
Thread.sleep(100);
}
}
}
...
return response;
}

其中Future<?>用来接收线程执行结果,从Future字面意思理解,这是一个将来的结果,也就是说要想获得线程执行结果,需要判断其是否执行完毕。详细可参考官方API文档

Java并发编程之线程池、Callable和Future使用

Java并发编程之线程池、Callable和Future使用

线程池使用

Java并发编程之线程池、Callable和Future使用

在上例中,用到了线程池,采用了Executors提供的静态方法初始化线程池。

ExecutorService executorService = Executors.newCachedThreadPool();

newCachedThreadPool()方法实现如下,即初始线程池没有创建线程,只有在有新任务时才会创建线程去执行任务,空闲线程等待时间60秒。

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

ExecutorService与ThreadPoolExecutor是什么关系,有什么差异,因为通过ThreadPoolExecutor也能够实现线程池。

public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));

for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
}
executor.shutdown();
}
}

首先来看看ThreadPoolExecutor的execute函数,这个函数返回void:

void execute(Runnable command)
//Executes the given task sometime in the future.

然后再来看看ExecutorService的submit函数,这个函数返回Future<?>,即有返回值,这是两者的一个差异之处。


Future<?> submit(Runnable task)
//Submits a Runnable task for execution and returns a Future representing that task.

接下来,通过树状图来看看线程池相关类间的关系,可以查阅源码看之间的关系:

Java并发编程之线程池、Callable和Future使用

  • Executor是顶层接口,仅提供execute方法。
  • ExecutorService接口继承了Executor接口,丰富了接口函数。
  • AbstractExecutorService抽象类实现了ExecutorService接口。
  • ThreadPoolExecutor类继承了AbstractExecutorService类。

部分源码

Executor接口

public interface Executor {
void execute(Runnable command);
}

ExecutorService接口

public interface ExecutorService extends Executor {

void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService抽象类

public abstract class AbstractExecutorService implements ExecutorService {

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}

ThreadPoolExecutor类

public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}

源码解析

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

片段1: command为空,返回NullPointerException。

if (command == null)
throw new NullPointerException();

片段2: 如果当前线程数量小于corePoolSize,则在线程池中新启一个任务线程。

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

片段3: 如果当前线程池正常运行,且任务成功添加到缓存队列中。则会进行一次double check:1、线程池shut down,移除队列任务,拒绝新任务;2、线程死掉了。

if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

片段4: 任务添加到缓存队列失败,拒绝新任务,可能是线程池shut down或者线程池已达最大容量。

else if (!addWorker(command, false))
reject(command);