Callable异步执行,应该不会陌生,那么在java中是怎么用的呢?又是如何实现的?下面我们循序渐进,慢慢分析。
先看一个例子,实现Callable接口,进行异步计算:
package com.demo;
import java.util.concurrent.*;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("call");
TimeUnit.SECONDS.sleep(1);
return "str";
}
});
System.out.println(future.get());
}
}
这段代码是很简单的一种方式利用Callable进行异步操作,结果自己可以执行下。
如何实现异步
在不阻塞当前线程的情况下计算,那么必然需要另外的线程去执行具体的业务逻辑,上面代码中可以看到,是把Callable放入了线程池中,等待执行,并且立刻返回futrue。可以猜想下,需要从Future中得到Callable的结果,那么Future的引用必然会被两个线程共享,一个线程执行完成后改变Future的状态位并唤醒挂起在get上的线程,到底是不是这样呢?
源码分析
首先我们从任务提交开始,在AbstractExecutorService中的源码如下:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
可以看到Callable任务被包装成了RunnableFuture对象,通过了线程池的execute方法提交任务并且立刻返回对象本身,而线程池是接受Runnable,必然RunnableFuture继承了Runnable,我们看下其继承结构。
从继承中可以清楚的看到,FutureTask是Runnable和Future的综合。
到这里我们应该有些头绪了,关键点应该在FutureTask对象上,线程池不过是提供一个线程运行FutureTask中的run方法罢了。
FutureTask
从上面的分析,FutureTask被生产者和消费者共享,生产者运行run方法计算结果,消费者通过get方法获取结果,那么必然就需要通知,如何通知呢,肯定是状态位变化,并唤醒线程。
FutureTask状态
//FutureTask类中用来保存状态的变量,下面常量就是具体的状态表示
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
run方法
//我修剪后的代码,可以看出其逻辑,执行Callable的call方法获取结果
public void run() {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result = c.call();
set(result);
}
}
//把结果保存到属性字段中,finishCompletion是最后的操作,唤醒等待结果的线程
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);//正常结束设置状态为NORMAL
finishCompletion();
}
}
//waiters是FutureTask类的等待线程包装类,以链表的形式连接多个,WaitNode对象是在调用get方法时生成,并挂起get的调用者线程
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); //唤醒get上等待的线程
if (next == null)
break;
}
break;
}
}
}
等待的线程
为了清除的看到如何挂起get的线程,我们可以分析下get的源码
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); //会一直挂起知道处理业务的线程完成,唤醒等待线程
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//通过判断返回的状态是否为已完成做不同的处理
throw new TimeoutException();
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { //如果是超时的get那么会挂起一段时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {//等待时间过后则会移除等待线程返回当前futureTask状态
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
如果想搞明白可以自行研究下,这种经过优化的并发代码确实可读性差,基本原理就是生产者与消费者模型。