Java 线程池并发编程详解

时间:2021-02-05 18:03:25

本博文分为6个部分:
1.BizProcessor:具体的事务处理逻辑,被多线程调用。
2.BizAsyncTaskCall:实现Callable接口,用于组装FutureTask。
3.BizFutureTask:代表一个异步计算任务,用于提交任务异步执行,并返回计算结果。
4.BizExecutor:包含一个ExecutorService,用于执行异步计算任务。
5.完整的测试程序。
6.测试程序运行结果。

一、BizProcessor

public class BizProcessor {
public String bizProcess(String name){
return "Success, Congratulation to " + name;
}
}

二、BizAsyncTaskCall

public class BizAsyncTaskCall<V> implements Callable<String> {

private BizProcessor bp;
private String name;

public BizAsyncTaskCall(BizProcessor bp, String name) {
this.bp = bp;
this.name = name;
}

@Override
public String call() throws Exception {
return bp.bizProcess(name);
}

}

三、BizFutureTask

public class BizFutureTask extends FutureTask<String> {
private final CountDownLatch signal;

public BizFutureTask(CountDownLatch signal, BizAsyncTaskCall<String> asyncTask){
super(asyncTask);
this.signal = signal;
}

@Override
protected void done(){
signal.countDown();
}
}

四、BizExecutor

public class BizExecutor {

private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("[bizProcessor] " + t.getName());
return t;
}
};

private final ExecutorService executor;

public BizExecutor(int minThreadNum, int maxThreadNum, int queueLength) {
executor = new ThreadPoolExecutor(
minThreadNum,
maxThreadNum,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueLength),
THREAD_FACTORY);
}

public void executeSafely(final FutureTask<String> task) {
executor.execute(task);
}

public void shutdown() {
List<Runnable> tasks = executor.shutdownNow();
if (!tasks.isEmpty()) {
System.out.println("BizProcessor shuts down, leaving " + tasks.size() + " not executed");
}
}

}

五、BizMain

public class BizMain {

public static void main(String[] args) {
try{
//创建Executor
BizExecutor bizExecutor = new BizExecutor(3, 5, 10);

//创建signal,用于在任务完成后再获取结果
CountDownLatch signal = new CountDownLatch(4);

//创建task并执行
List<BizFutureTask> bizFutureTaskList = new ArrayList<BizFutureTask>();
for(int i = 0; i < 4; i++){
BizFutureTask bizFutureTask = new BizFutureTask(signal, new BizAsyncTaskCall(new BizProcessor(), "ZQ-" + i));
bizFutureTaskList.add(bizFutureTask);
bizExecutor.executeSafely(bizFutureTask);
}

//获取task执行结果
signal.await(1000, TimeUnit.MICROSECONDS);
for(int i = 0; i < 4; i++){
System.out.println(bizFutureTaskList.get(i).get());
}
}catch(Exception e){
e.printStackTrace();
}


}

}

六、测试结果

Success, Congratulation to ZQ-0
Success, Congratulation to ZQ-1
Success, Congratulation to ZQ-2
Success, Congratulation to ZQ-3