javade多任务处理之Executors框架(线程池)实现的内置几种方式与两种基本自定义方式

时间:2023-03-09 17:11:53
javade多任务处理之Executors框架(线程池)实现的内置几种方式与两种基本自定义方式

一 Executors框架(线程池)

主要是解决开发人员进行线程的有效控制,原理可以看jdk源码,主要是由java.uitl.concurrent.ThreadPoolExecutor类实现的,这里只列出简单用法

根据Executors可以创建不同功能的线程池,主要有四种:

1 newFixedThreadPool : 返回一个固定数量的线程池,并且池中数量一致保持不变,有任务时如果有空闲线程则立即执行,没有就暂时存放到队列等待空闲线程

//创建一个有10个线程的线程池,任务多于10个会一直等待,直到10个线程中有空闲线程为止
//ExecutorService pool = Executors.newFixedThreadPool(10);
//开启线程
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行中.....");}));
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行中.....");}));
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行中.....");}));

 在ThreadPoolExecutor类中有几个非常重要的方法:

  execute()   执行线程

  submit()  执行线程,并返回执行的结果

  shutdown()  关闭线程,如果有线程正在执行,则会等线程执行完,即所有线程空闲之后再关闭

  shutdownNow()  关闭线程,立刻关闭
javade多任务处理之Executors框架(线程池)实现的内置几种方式与两种基本自定义方式

看源码可知,底层任务使用了阻塞队列,任务大于线程数量后会进入阻塞队列等待执行

2 newSingleThreadExecutor

		ExecutorService poolSingle = Executors.newSingleThreadExecutor();
poolSingle.execute(new Thread(()->{
System.out.println("run1");
while(true){
}
}));
//下面永不会执行
poolSingle.execute(new Thread(()->{System.out.println("run2");}));

  创建只有一个线程的线程池,属于上面固定数量的一个特例,个人感觉这个单例线程用处不是很大

3 newCachedThreadPool

		不指定数量.如果任务多,且没有空闲线程则会一直创建线程,并且空闲线程会在60秒后回收
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(new Thread(()->{
System.out.println("run1");
while(true){
}
}));
cachePool.execute(new Thread(()->{
System.out.println("run2");
while(true){
}
}));

  底层实现:

javade多任务处理之Executors框架(线程池)实现的内置几种方式与两种基本自定义方式

由于使用了无缓冲队列,任务队列使用无缓冲队列,任务不会被存储,直接去交给线程执行.且空闲线程60秒之后会被回收,除非60秒之内去执行新的任务

4 newScheduledThreadPool

class RunTest implements Runnable{
@Override
public void run() {
System.out.println("run");
}
}
public class ScheculedTest {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
//可实现任务定时,但实际业务中一般都用spring的定时器,这个技术比较老了
//pool.schedule(new RunTest(), 2, TimeUnit.SECONDS);//2秒后执行此任务
//可实现任务的轮询,指定时间间隔循环执行此任务 任务 延迟时间(即2秒后开始执行) 间隔多少秒之后再执行(一直循环)
pool.scheduleAtFixedRate(new RunTest(), 2, 2, TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(new RunTest(), 2, 2, TimeUnit.SECONDS);
}
}

  不用想也知道,底层采用延迟队列装任务

二 自定义线程池实现的

再来看自定义的,实际业务中可能用到更多的就是自定义了,

自定义需要实现ThreadPoolExecutor这个类,主要有两种,分别是使用有界队列装任务和*队列:

1 使用有界队列:

public class MyCustom01 {

	public static void main(String[] args) {
//有界队列自定义线程池
/**在使用有界队列时,如果有任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
* 若大于corePoolSize,则会将任务加入队列,
* 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
* 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
*/
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, //核心线程数
2, //最大线程数
60, //空闲时回收时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3),//指定队列//指定队列
//DiscardOldestPolicy :丢弃最老的请求,即当6进来时,将队列中最早的2丢弃,执行6
//new ThreadPoolExecutor.DiscardOldestPolicy()//指定拒绝策略.不指定默认抛异常
//自定义拒绝策略
new RejectedExecutionHandler(){
@Override //r:被拒绝任务 executor 当前线程池
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//实例工作中默认策略都不可取,一般是把拒绝的任务记录日志,然后空闲时间解析再重新执行
//或者直接返回客户端,让他稍后再提交此任务
System.out.println("自定义处理..");
System.out.println("当前被拒绝任务为:" + r.toString());
System.out.println(executor.getPoolSize());
}}
);
MyTask mt1 = new MyTask(1, "任务1");
MyTask mt2 = new MyTask(2, "任务2");
MyTask mt3 = new MyTask(3, "任务3");
MyTask mt4 = new MyTask(4, "任务4");
MyTask mt5 = new MyTask(5, "任务5");
MyTask mt6 = new MyTask(6, "任务6");
pool.execute(mt1);
pool.execute(mt2);//如果有两个任务,则会将第二个任务入队,等待核心线程线程空闲
pool.execute(mt3);//同上
pool.execute(mt4);//同上,知道队列满 为止
pool.execute(mt5);//当有5个任务时,队列已满,则会创建第二个线程
pool.execute(mt6); //有6个时,此时队列已满,且当前线程达到最大线程数,所以无法执行,执行拒绝策略
//关闭,等到所有线程空闲时才会关闭
pool.shutdown();
}
}

  

2 使用*队列:

public class MyCustom02 implements Runnable{
//并发数字运算
private static AtomicInteger count = new AtomicInteger(0);
@Override
public void run() {
try {
int temp = count.incrementAndGet();//count累加,区分任务
System.out.println("任务" + temp);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
//*队列
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ExecutorService executor = new ThreadPoolExecutor(
5,
10,//*时,任务多会一直添加到队列中,不会创建新的线程,线程数会一直是核心数量
120L,
TimeUnit.SECONDS,
queue //指定*队列
);
for(int i = 0 ; i < 20; i++){
executor.execute(new MyCustom02()); //会5个一起执行
}
Thread.sleep(1000);
System.out.println("queue size:" + queue.size());
Thread.sleep(2000);
}
}

  然后拒绝策略个人认为也是比较常用的,实际业务中,因为默认的策略都不太友好