Java高并发编程(四)

时间:2022-04-25 15:43:44

  一、Executor执行器

  1.Executor接口,java线程池框架中的顶层接口,提供一个execute方法来执行任务

import java.util.concurrent.Executor;

public class T01_MyExecutor implements Executor {

public static void main(String[] args) {
new T01_MyExecutor().execute(()->System.out.println("hello executor"));
}

@Override
public void execute(Runnable command) {
//new Thread(command).run();//另起线程方法调用
command.run();//方法直接调用
}

}

  2.任务接口

    1)callable接口:提供一个call方法,具有返回值可以抛出异常

    2)runnable接口:提供一个run方法,无返回值不可抛出异常

  3.ExecutorService接口(Executor的一个子接口)

    1)ExecutorService可以理解为服务,执行execute方法可以向服务器中添加runnable任务无返回值,而执行submit方法可以向服务中添加callable与runnable任务,且有返回值

    2)Executor与ExecutorService的简单理解:可以想象许多的执行器Executor在等待着执行任务,而service服务可通过调用execute或submit方法来添加不同的任务供执行器执行

    3)submit方法具有一个future类型的返回值,与callable,Executors一起使用

public class T06_Future {
public static void main(String[] args) throws InterruptedException, ExecutionException {

FutureTask
<Integer> task = new FutureTask<>(()->{
TimeUnit.MILLISECONDS.sleep(
500);
return 1000;
});
//new Callable () { Integer call();}

new Thread(task).start();

System.out.println(task.get());
//阻塞

//*******************************
ExecutorService service = Executors.newFixedThreadPool(5);
Future
<Integer> f = service.submit(()->{
TimeUnit.MILLISECONDS.sleep(
500);
return 1;
});
System.out.println(f.get());
System.out.println(f.isDone());

}
}

运行结果:

Java高并发编程(四)

  4.Executors(操作Executor的一个工具类、工厂类)

  二、ThreadPool线程池

  1.线程池将任务分配给池中线程,当线程池中线程执行不过来时,任务会进入等待队列,队列由BlockingQueue实现,当有线程空闲时再来任务时会被分配给空闲线程,一个线程同时维护着两个队列(结束队列与等待队列)

  下面一个程序帮助理解线程池的概念

public class T05_ThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service
= Executors.newFixedThreadPool(5); //execute submit
for (int i = 0; i < 6; i++) {
service.execute(()
-> {
try {
TimeUnit.MILLISECONDS.sleep(
500);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);

service.shutdown();
//正常关闭,等待所有任务执行完毕,关闭线程池
System.out.println(service.isTerminated());//判断所有任务是否都执行完了
System.out.println(service.isShutdown());//判断线程池是否被关闭了
System.out.println(service);

TimeUnit.SECONDS.sleep(
5);
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);
}
}

运行结果:

Java高并发编程(四)

  2.六种线程池

    1)FixedThreadPool  固定线程数量的线程池

    2)cachedThreadPool  假定刚开始一个线程都没有,来一个任务开启一个线程,如果再来一个任务,线程池中有空闲线程就将任务分配给空闲线程,如此往复直到起到电脑能支撑限度为止,默认一个线程空闲超过一分钟就自动销毁

    3)singleThreadPool  线程池中只有一个线程

    4)scheduledThreadPool  定时器线程池

    5)workStealingPool  工作窃取线程池,当前线程池有线程空闲,会自动去寻找任务执行,默认根据CPU核数启动默认线程数目线程,是精灵(demon)线程(守护线程、后台线程),主函数不阻塞的话看不到输出,本质上是ForkJoinPool实现的

    6)ForkJoinPool  分叉合并线程,大任务可以切分成许多的小任务,如果小任务还是太大的话还可以继续分,分的可以了就将小任务合并,最后产生一个总的结果(有点像归并排序)

       ForkJoinTask从RecursiveAction和RecursiveTask继承

      a.RecursiveAction  无返回值

/**
* 对数组中所有数求和
*
@author zhangqi
*
*/
public class T12_ForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();

static {
for(int i=0; i<nums.length; i++) {
nums[i]
= r.nextInt(100);
}

System.out.println(Arrays.stream(nums).sum());
//stream api
}


static class AddTask extends RecursiveAction {

int start, end;

AddTask(
int s, int e) {
start
= s;
end
= e;
}

@Override
protected void compute() {

if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println(
"from:" + start + " to:" + end + " = " + sum);
}
else {

int middle = start + (end-start)/2;

AddTask subTask1
= new AddTask(start, middle);
AddTask subTask2
= new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}


}

}


/*static class AddTask extends RecursiveTask<Long> {

int start, end;

AddTask(int s, int e) {
start = s;
end = e;
}

@Override
protected Long compute() {

if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}

int middle = start + (end-start)/2;

AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();

return subTask1.join() + subTask2.join();
}

}
*/

public static void main(String[] args) throws IOException {
ForkJoinPool fjp
= new ForkJoinPool();
AddTask task
= new AddTask(0, nums.length);
fjp.execute(task);
//long result = task.join();
//System.out.println(result);

System.in.read();

}
}

运行结果:

Java高并发编程(四)

      b.RescursiveTask  有返回值的递归任务

 

/**
* 对数组中所有数求和
*
@author zhangqi
*
*/
public class T12_ForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();

static {
for(int i=0; i<nums.length; i++) {
nums[i]
= r.nextInt(100);
}

System.out.println(Arrays.stream(nums).sum());
//stream api
}


/* static class AddTask extends RecursiveAction {

int start, end;

AddTask(int s, int e) {
start = s;
end = e;
}

@Override
protected void compute() {

if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {

int middle = start + (end-start)/2;

AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}


}

}
*/


static class AddTask extends RecursiveTask<Long> {

int start, end;

AddTask(
int s, int e) {
start
= s;
end
= e;
}

@Override
protected Long compute() {

if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}

int middle = start + (end-start)/2;

AddTask subTask1
= new AddTask(start, middle);
AddTask subTask2
= new AddTask(middle, end);
subTask1.fork();
subTask2.fork();

return subTask1.join() + subTask2.join();
}

}

public static void main(String[] args) throws IOException {
ForkJoinPool fjp
= new ForkJoinPool();
AddTask task
= new AddTask(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);

//System.in.read();

}
}

运行结果:

Java高并发编程(四)

  补充:

  ThreadPoolExecutor线程池执行器(自定义线程池,许多线程池的底层实现,ForkJoinPool不是由它实现)