java线程池,Executor框架,Callable,Future

时间:2022-09-01 18:37:19

前言

本文章适合新手对线程池的认识=0的同学,不涉及源码,仅从应用层次来讲,是笔者的资料整理,每一个都有相应的实例

主要内容

  • 线程池的类型,区别,特点
  • 线程池如何提交runnable任务和callable任务。
  • 线程池所使用的队列(有界,*)
  • 线程池满的策略
  • 自定义线程

why?

其实每次学习一个新知识的时候,我们都需要知道为什么要使用它?

通俗概括:

创建线程T1时间,线程运行T2时间,线程销毁T3时间。在很多时候T2<T1+T3。如果大量客户同时访问服务器的时候,我给每个客户都创建一个线程,然后再销毁,但是吧,线程执行的时间却很小。。所以,池的技术孕育而生。形象来说,我圈了一个池子,里面一共有这么多线程,来一个请求我就给你一个线程,然后再把线程放回去,达到一个复用效果,这样就不用重新创建再销毁了。

全局把握Executor框架

Executor框架其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,Executor来启动线程比使用Thread的start方法更好除了更易管理,效率更好。

来看看javaExecutor的大家族
java线程池,Executor框架,Callable,Future

由浅入深

Callable、Future、FutureTask

一、why callable?
线程的创建有两种基本方式,一个是继承Thread,一个是实现runnable接口,这两种是新手初识的方式。它们都有一个问题,就是都无法取得线程执行的返回值。由此callable孕育而生

对比一下:

public interface Callable<V> {
    V call() throws Exception;
}
public interface Runnable {
    public abstract void run();
}

二、Future接口
java线程池,Executor框架,Callable,Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

5个方法的作用

  • cancel :取消任务,取消成功返回true,失败返回false,参数的意思是,是否取消正在运行的任务,PS:正在运行的任务不加参数取消不掉。
  • isCancelled:任务是否被取消成功。
  • isDone:任务是否已完成。
  • get():获取任务执行完成后的结果。线程完成之前get方法会被阻塞。
  • get(long timeout,TimeUnit unit)在指定时间内获取不到结果就返回false。

Future的三个功能
1、取消任务
2、判断任务完成/是否被取消
3、获取

三、FutureTask
FutureTask是Runnable和Callable两个接口的同时实现,可以简单理解成一个任务。
构造器:

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

我们可以看到,底层最终都是转换成callable的,Executors.callable()方法是将runnable转变成callable的一个方法。之后你就可以使用它来执行Future所有支持的方法。第三种创建线程的方法孕育而生

实例:

package thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class Test implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("This is call");
        return 0;
    }

    public static void main(String[] args) {
        Test test = new Test();
        FutureTask<Integer> task = new FutureTask<>(test);
        new Thread(task).start();
        try {
            System.out.println(task.get());
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

需要记住的是,这里没有直接使用实现callable接口的类的实例作为Thread()的参数,因为Thread()构造器中只有runnable,没有callable。所以借助FutureTask去实现。

到此,Future的知识简单介绍完毕了

Executor

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。

ExecutorService

作用:
它是一个接口,一个接口代表了一定的功能,顾名思义它是提供服务的,什么样的服务呢?->当它运行之后,会不断地接受新的任务,提交新任务,执行新任务,关闭shutdown,自己写代码测试的时候要调用shutdown,不然这个会一直运行下去。几种线程池都是实现了它的功能

四种线程池

四种线程池的创建集合在Executors中,提供了一系列的工厂方法,所有的线程池均实现了ExecutorService的接口

  • newCachedThreadPool
    介绍:
    缓存型池子,它主要功能是为了提供线程的复用,来了一个任务,先查看一下池子里是否有空闲的线程,如果有的话,就启用已有的线程来完成任务,如果没有的话,就建立一个新的线程去完成任务
    特点:
    1、有空间机制 默认60s等待时间,60s后自动销毁线程
    2、复用线程
    3、无线程上限
    4、适用于生存期短的异步任务,长连接的那种就不太适合了。

  • newFixedThreadPool
    介绍:
    fixed线程池,依然是解决复用的,但是不能随心所欲地建立新线程,线程池中只有固定数量线程的池子,新线程想加入池子,除非池子中有那么一个线程终止了
    特点:
    1、固定数量的活动线程
    2、线程没有空闲机制
    3、适用于比较正规的并发线程,服务器上大多是用这个。

  • newScheduledThreadPool
    介绍:
    调度型线程池,线程池中可以按照计划在给定延时之后,或者周期性执行任务。

  • newSingleThreadExecutor
    介绍:任何时候线程池中只能有一个线程。没有空闲线程

简单实例

讲了这些概念,先来几个小例子练练手把。

  • Executor执行Runnable任务
// 固化线程池
package thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test2 implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "被调用了");
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Test2());
        }
        executor.shutdown();
    }

}
----output----
pool-1-thread-2被调用了
pool-1-thread-4被调用了
pool-1-thread-3被调用了
pool-1-thread-1被调用了
pool-1-thread-3被调用了
pool-1-thread-4被调用了
pool-1-thread-2被调用了
pool-1-thread-5被调用了
pool-1-thread-3被调用了
pool-1-thread-1被调用了

可以看到,new Test2()是我们的任务,一共有10个任务要被执行,一共提供了5个线程来执行这么多的任务。可以看到,实验结果中的线程被复用了。

// 缓存型线程池
package thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test2 implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "被调用了");
    }

    public static void main(String[] args) {
        ExecutorService executorService  = Executors.newCachedThreadPool();
        for (int i= 0; i<10; i++) {
            executorService.execute(new Test2());
        }
        executorService.shutdown();
    }
}

----output----
pool-1-thread-1被调用了
pool-1-thread-3被调用了
pool-1-thread-2被调用了
pool-1-thread-4被调用了
pool-1-thread-6被调用了
pool-1-thread-5被调用了
pool-1-thread-7被调用了
pool-1-thread-2被调用了
pool-1-thread-5被调用了
pool-1-thread-7被调用了

可以看到,10个任务一共用了7个线程。也是一种复用

// 计划型线程池
package thread;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Test3 {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
        /*for (int i = 0; i < 10; i++) { service.schedule(new Test3(), 3, TimeUnit.SECONDS); } */
        service.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                System.out.println("任务: "+Thread.currentThread().getName());
            }
        }, 1, 3, TimeUnit.SECONDS);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("关闭周期性任务");
        service.shutdown();
    }
}

----output----
任务: pool-1-thread-1
任务: pool-1-thread-1
任务: pool-1-thread-2
任务: pool-1-thread-1
关闭周期性任务
  • Executor 执行callable任务
package thread;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test2 implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("线程被调用" + Thread.currentThread().getName());
        return Thread.currentThread().getName();
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 储存任务的列表
        ArrayList<Future<String>> list = new ArrayList<>();
        // 开启15个任务
        for (int i = 0; i < 15; i++) {
            Future<String> future = executorService.submit(new Test2());
            // 每提交一个任务就加入队列中
            list.add(future);
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        for (Future<String> future : list) {
            // 如果任务没有被执行完那就用一个while让它循环的同时去等待
            while (!future.isDone())
                ;
            try {
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

        }
        executorService.shutdown();
    }
}

线程池所使用的任务队列

  • SynchronousQueue:
    不存储元素的队列,每一次地入队的操作都必须有一个出队的操作。它将新任务直接提交给线程去处理,不保持它们,由于没有缓冲的过程,当线程池中的线程数量不足以支持任务运行的时候,就会开启一个新的线程来执行它。缓存型缓冲池就是用的它。这么一想是不是正确了呢?

  • LinkedBlockingQueue:
    属于*队列,啥意思呢?就是这个队列的最大值为Integer.MAX_VALUE。所有没来得及执行的新任务都添加在这个队列中。fixed型就是用的它,Single也是用的它

  • ArrayBlockingQueue:
    有界队列,限定了队列的最大值。

  • PriorityBlockingQueue:
    优先级的*队列

  • DelayQueue
    是一个支持延时获取元素的使用优先级队列实现的*阻塞队列具有优先界别的阻塞队列。

线程池构造方法

讲完了它们所使用的队列,各自特点,下面来看看构造器,偷看一眼源码

老大哥实现的方法,所有线程池都是它来完成的

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

几种属性:
1、核心线程数目
2、最大线程数目
3、保活时间
4、时间单位
5、阻塞队列

添加任务进队列的处理方式:

1、当线程数未达到核心线程数,即使线程池中有空闲线程,也会创建新的线程来执行新的任务

2、当线程数达到了核心线程数,但是缓冲队列未满,会把新任务添加到队列中,等待核心线程去处理

3、当线程数达到了核心线程数,并且缓冲队列已满,但是线程池中的线程数小于最大线程数,会创建新的线程来处理任务。

4、线程池里面的线程总数达到了最大值,这个时候有4种策略
java线程池,Executor框架,Callable,Future

  • AbortPolicy,默认的策略,新任务提交的时候直接抛出异常RejectedExecutionException。

  • CallerRunsPolicy,直接使用调用者线程来执行新的任务,不在线程池中运行,比如main中调用了线程池,那么就用main来执行新的任务

  • DiscardOldest,将队列中的最老的丢弃,就是队列头

  • Discard,直接丢弃

看一看缓存型队列的实现。其它的差不多一样的结构,就不看了。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

自定义线程池

正如上面的线程池的构造方法,我们也可以使用它来自定义我们需要的线程池,制定响应的参数。

package thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test3 {
    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(5);
        ExecutorService service = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, queue);
        for (int i = 0; i < 10; i++) {
            service.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "正在工作");

                }
            });
        }
        service.shutdown();
    }
}

----output----
pool-1-thread-1正在工作
pool-1-thread-3正在工作
pool-1-thread-2正在工作
pool-1-thread-5正在工作
pool-1-thread-1正在工作
pool-1-thread-4正在工作
pool-1-thread-1正在工作
pool-1-thread-4正在工作
pool-1-thread-3正在工作
pool-1-thread-2正在工作

总结

这篇文章主要是从如何使用的角度和简单的原理概念来讲解的线程池,它的源码我们并未探究。适合新手去学习,花了一整天的时间从各个地方资料整理。写上了自己的认识。以上代码全都是笔者亲身写的。然后,不要看笔者菜,我也是验证过这些东西,看了点源码才敢写的,是有一定依据的,比如策略部分,源码上就是用4个类写的。还是有可信度的。

参考资料:
并发线程池部分:
https://blog.csdn.net/ns_code/article/details/17465497
https://blog.csdn.net/fuyuwei2015/article/details/72775426

callable部分:
https://www.cnblogs.com/dolphin0520/p/3949310.html