任务的提交与异步执行

时间:2024-01-22 20:19:37

异步编程是一种对 CPU 资源更高效的编程方式,也是提高系统吞吐量的一个非常不错的选择。很多人会认为所谓的异步不就是多线程吗?

但实际上这句话只能说对一半,没错,异步是通过多线程来实现的,但我们 Java 中的异步编程却绝不仅仅只是多线程,它还包括对任务执行状态的监控、随时可以选择性的中断任务的执行以及获取任务执行的返回结果。

Java 的并发包下为我们提供了一整套完善的异步任务框架,包括任务的定义、任务的提交、线程的创建与任务分配、监控任务状态、取消任务等等,绝不仅仅局限于多线程的简单创建与启动。

简单介绍与使用

下面我们先简单介绍异步框架中的相关接口所代表的作用与含义,接着我简单的编写一个 demo 应用下我们异步框架。

1、任务的抽象

我们使用接口 Runnable 与 Callable 抽象的描述一个任务,前者相信大家已经非常的熟悉了,后者我们见的不多,但其实也是一个很简单的接口,与 Runable 接口一样也是一个函数式接口,内部定义一个 call 方法。

相比于 Runnable,除了内部定义的方法名称不同外,call 方法还要求调用结束后返回一个结果,至于返回的结果是什么,取决于你的实现类,总的来说,两者差别不大。

2、任务的执行

Executor 接口抽象了任务的执行者,所有的任务都可以向这里进行提交,Executor 会负责创建线程并启动线程,执行任务。

Executor 接口的定义也是非常简单的,只有一个 execute 执行方法。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService 接口继承了接口 Executor 并新增了更多的任务执行必须的方法,例如:

  • void shutdown();
  • List
  • boolean isShutdown();
  • Future<?> submit(Runnable task);
  • invokeAll,invokeAny等

这些方法我们等会会深入去分析它们,这里大家只要有个印象就好,ExecutorService 允许我们将任务进行提交,它会统一地并在合适地时候创建线程、执行任务。

3、任务的监控

Future 接口用于监控我们的任务执行状态,是已提交但未执行,或是已取消,亦或是已完成。Future 接口中定义的方法我们也不妨列举部分感受一下:

  • boolean cancel(boolean mayInterruptIfRunning);
  • boolean isCancelled();
  • boolean isDone();
  • V get()

细心的同学可能已经发现了,任务的监控 Future 将在任务的提交成功后返回,也就是当你成功的调用 submit 方法之后,ExecutorService 将为你返回一个 Future 接口实例供你监控刚刚提交的任务执行状态。

下面我们看一个简单的 demo,用于演示基本的任务提交与执行。

demo

我们通过 Executors 的工厂方法获取一个单线程的任务执行者,接着我们可以向这个任务执行者提交任务,当然这里简化了代码,使用了 Lambda 表达式,我们分别提交了两个任务,并从 submit 方法的返回得到了任务的监控者 Future 实例。

接着,我们也就可以通过 Future 来得知任务执行的状态。

总的来说,异步任务给我们带来的好处是什么呢?我觉得最重要的一点就是「便捷」。

我只需要将我的任务提交就好了,不再关心如何如何创建线程,启动线程等等细节,我也不再像以前一样,线程启动后根本不知道有没有执行,我手里有 Future,我可以随时的监控任务的执行情况。

另外,异步任务框架还有一点非常的不错,那就是性能,它可以依赖线程池,减少线程创建和销毁的开销,这一切都将随着 jdk 的迭代而不断的优化,而我们在使用上根本不用关心,我只关心我的任务该怎么写,至于任务怎么执行,如何高效低能耗,交给你异步框架了。

基本的实现原理

ExecutorService 继承了 Executor 并新增了一些接口方法,这些方法数量还不少,而有些方法是很通用的,亦或是有些方法子类用不到,这你不能要求每一个子类实现者都实现了这些方法。

所以,向下又有了一层抽象,AbstractExecutorService 实现了 ExecutorService 并完成了很多方法的默认实现。后者只需要继承 AbstractExecutorService 并重写自己需要重写的方法即可成为一个「异步任务的执行者」。

但是如下的方法 AbstractExecutorService 是没有做默认实现的,需要你子类自行实现。原因也很简单,因为这些方法不具备通用的逻辑,涉及到具体实现者内部使用的资源释放,锁资源竞争以及队列资源的使用等,所以不太适合做抽象。

public void shutdown()
public List<Runnable> shutdownNow()
public boolean isShutdown()
public boolean isTerminated()
public boolean awaitTermination(long timeout, TimeUnit unit)
public void execute(Runnable command)

那我们就简单点吧,直接从任务的提交开始看。

submit 主要有三种重载:

Future<?> submit(Runnable task)

Future

Future

因为任务的抽象表示主要有两种,一种是 Runnable,一种是 Callable,所以需要提供对两种不同任务类型的抽象提交。我们以其中一个重载进行分析即可,这里我们采用第一个重载方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

RunnableFuture 继承了 Runnable 和 Future,标识这是一个可执行的、可监控的任务。而 newTaskFor 返回的是 FutureTask (RunnableFuture的一种实现类)。

而我们也不妨看看这个 FutureTask 内部都有些哪些成员:

任务执行状态

state 和它可取的这些值共同描述了当前任务的执行状态,是刚开始执行,还是正在执行中,还是正常结束,还是异常结束,还是被取消了,都由这个 state 来体现。

image

callable 代表当前正在执行的工作内容,这里说一下为什么只有 Callable 类型的任务,因为所有的 Runnable 类型任务都会被事先转换成 Callable 类型,我觉得主要是统一和抽象实现吧。

outcome 是任务执行结束的返回值,runner 是正在执行当前任务的线程,waiters 是一个简单的单链表,维护的是所有在任务执行结束之前尝试调用 get 方法获取执行结果的线程集合。当任务执行结束自当唤醒队列中所有的线程。

除此之外,还有一个稍显重要的方法,就是 run 方法,这个方法会在任务开始时由 ExecutorService 调用,这是一个很核心的方法,虽然方法体有点长,但是逻辑简单,我们大体上概括下。

image

  1. 如果任务已经开始将退出方法逻辑的执行
  2. 调度任务执行,调用 call 方法
  3. 调用成功将保存结果,异常则将保存异常信息
  4. 处理中断

其他的方法就不去看了,也比较多,还算是简单的,如果有所想法,也欢迎你和我探讨交流。

当我们回到 submit 方法时,其实就只剩下一个 execute 方法了,execute 方法是有点复杂的,也稍繁琐,其中也涉及了一些线程池的概念,我们在下一篇分析线程池的时候再作详尽分析了。

这里你只要知道,execute 会根据线程池中可用线程的数量,分配并选择一个线程执行我们的任务即可。其他的一些细节我们后续再作讨论。

关于异步任务我们这里作了简单的介绍了,总体上你应该对 Java 的异步编程体系有一个认知了,细节之处并没有很多,因为大多会涉及一些线程池的概念,我们还未介绍。

所以,后续也会结合线程池以及 Java8 新增的组合异步再作分析。

关注公众不迷路,一个爱分享的程序员。

公众号回复「1024」加作者微信一起探讨学习!

每篇文章用到的所有案例代码素材都会上传我个人 github

https://github.com/SingleYam/overview_java

欢迎来踩!

YangAM 公众号