初步了解fork/join框架
fork/join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果。下面的图片展示了这种框架的工作模型:
使用fork/join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用fork/join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是寻找一个大数组中的最大/最小值,我们可以将一个大数组拆成很多小数组,然后分别求解每个小数组中的最大/最小值,然后根据这些任务的结果组装出最后的最大最小值,下面的代码展示了如何通过fork/join求解数组的最大值:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
import java.util.concurrent.executionexception;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.future;
import java.util.concurrent.recursivetask;
import java.util.concurrent.timeunit;
import java.util.concurrent.timeoutexception;
/**
* created by hujian06 on 2017/9/28.
*
* fork/join demo
*/
public class forkjoindemo {
/**
* how to find the max number in array by fork/join
*/
private static class maxnumber extends recursivetask<integer> {
private int threshold = 2 ;
private int [] array; // the data array
private int index0 = 0 ;
private int index1 = 0 ;
public maxnumber( int [] array, int index0, int index1) {
this .array = array;
this .index0 = index0;
this .index1 = index1;
}
@override
protected integer compute() {
int max = integer.min_value;
if ((index1 - index0) <= threshold) {
for ( int i = index0;i <= index1; i ++) {
max = math.max(max, array[i]);
}
} else {
//fork/join
int mid = index0 + (index1 - index0) / 2 ;
maxnumber lmax = new maxnumber(array, index0, mid);
maxnumber rmax = new maxnumber(array, mid + 1 , index1);
lmax.fork();
rmax.fork();
int lm = lmax.join();
int rm = rmax.join();
max = math.max(lm, rm);
}
return max;
}
}
public static void main(string ... args) throws executionexception, interruptedexception, timeoutexception {
forkjoinpool pool = new forkjoinpool();
int [] array = { 100 , 400 , 200 , 90 , 80 , 300 , 600 , 10 , 20 ,- 10 , 30 , 2000 , 1000 };
maxnumber task = new maxnumber(array, 0 , array.length - 1 );
future<integer> future = pool.submit(task);
system.out.println( "result:" + future.get( 1 , timeunit.seconds));
}
}
|
可以通过设置不同的阈值来拆分成小任务,阈值越小代表拆出来的小任务越多。
工作窃取算法
fork/join在实现上,大任务拆分出来的小任务会被分发到不同的队列里面,每一个队列都会用一个线程来消费,这是为了获取任务时的多线程竞争,但是某些线程会提前消费完自己的队列。而有些线程没有及时消费完队列,这个时候,完成了任务的线程就会去窃取那些没有消费完成的线程的任务队列,为了减少线程竞争,fork/join使用双端队列来存取小任务,分配给这个队列的线程会一直从头取得一个任务然后执行,而窃取线程总是从队列的尾端拉取task。
frok/join框架的实现细节
在上面的示例代码中,我们发现fork/join的任务是通过forkjoinpool来执行的,所以框架的一个核心是任务的fork和join,然后就是这个forkjoinpool。关于任务的fork和join,我们可以想象,而且也是由我们的代码自己控制的,所以要分析fork/join,那么forkjoinpool最值得研究。
上面的图片展示了forkjoinpool的类关系图,可以看到本质上它就是一个executor。在forkjoinpool里面,有两个特别重要的成员如下:
1
2
|
volatile workqueue[] workqueues;
final forkjoinworkerthreadfactory factory;
|
workqueues 用于保存向forkjoinpool提交的任务,而具体的执行有forkjoinworkerthread执行,而forkjoinworkerthreadfactory可以用于生产出forkjoinworkerthread。可以看一些forkjoinworkerthread,可以发现每一个forkjoinworkerthread会有一个pool和一个workqueue,和我们上面描述的是一致的,每个线程都被分配了一个任务队列,而执行这个任务队列的线程由pool提供。
下面我们看一下当我们fork的时候发生了什么:
1
2
3
4
5
6
7
8
|
public final forkjointask<v> fork() {
thread t;
if ((t = thread.currentthread()) instanceof forkjoinworkerthread)
((forkjoinworkerthread)t).workqueue.push( this );
else
forkjoinpool.common.externalpush( this );
return this ;
}
|
看上面的fork代码,可以看到首先取到了当前线程,然后判断是否是我们的forkjoinpool专用线程,如果是,则强制类型转换(向下转换)成forkjoinworkerthread,然后将任务push到这个线程负责的队列里面去。如果当前线程不是forkjoinworkerthread类型的线程,那么就会走else之后的逻辑,大概的意思是首先尝试将任务提交给当前线程,如果不成功,则使用例外的处理方法,关于底层实现较为复杂,和我们使用fork/join关系也不太大,如果希望搞明白具体原理,可以看源码。
下面看一下join的流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public final v join() {
int s;
if ((s = dojoin() & done_mask) != normal)
reportexception(s);
return getrawresult();
}
private int dojoin() {
int s; thread t; forkjoinworkerthread wt; forkjoinpool.workqueue w;
return (s = status) < 0 ? s :
((t = thread.currentthread()) instanceof forkjoinworkerthread) ?
(w = (wt = (forkjoinworkerthread)t).workqueue).
tryunpush( this ) && (s = doexec()) < 0 ? s :
wt.pool.awaitjoin(w, this , 0l) :
externalawaitdone();
}
final int doexec() {
int s; boolean completed;
if ((s = status) >= 0 ) {
try {
completed = exec();
} catch (throwable rex) {
return setexceptionalcompletion(rex);
}
if (completed)
s = setcompletion(normal);
}
return s;
}
/**
* implements execution conventions for recursivetask.
*/
protected final boolean exec() {
result = compute();
return true ;
}
|
上面展示了主要的调用链路,我们发现最后落到了我们在代码里编写的compute方法,也就是执行它,所以,我们需要知道的一点是,fork仅仅是分割任务,只有当我们执行join的时候,我们的额任务才会被执行。
如何使用fork/join并行框架
前文首先展示了一个求数组中最大值得例子,然后介绍了“工作窃取算法”,然后分析了fork/join框架的一些细节,下面才是我们最关心的,怎么使用fork/join框架呢?
为了使用fork/join框架,我们只需要继承类recursivetask或者recursiveaction。前者适用于有返回值的场景,而后者适合于没有返回值的场景。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.jianshu.com/p/ac9e175662ca