- 线程同步辅助类
在使用Java多线程时,经常需要进行多线程同步,我在写一个用于Hadoop的调度框架中就遇到了类似的问题,调度主线程在发起并行任务后,需要原地等待各子线程执行完毕,才能继续执行,我尝试了两种方法。
(1)CyclicBarrier
下面给出示例代码
调度线程
try{
List<MrJob> paralJobList = jobExeUnit.getMrjobList();
CyclicBarrier barrier = new CyclicBarrier(paralJobList.size()+1);
for(final MrJob pMrjob: paralJobList){
pMrjob.setBarrier(barrier);
Thread t = new Thread(pMrjob);
t.start();
}
barrier.await();
}catch(Exception e){
e.printStackTrace();
}
Hadoop任务执行(子线程)
@Override
public void run(){
try {
execute();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}finally{
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
开始使用CyclicBarrier的时候,在run函数中,await函数我是写在execute之后的,这样会造成任务抛出异常时,await函数没有执行,进而整个程序不会退出,因此后来移到了finally里。但是finally本来是做最后的清理,特别是有异常抛出时,在其中再次try-catch实在恶心,因此后来没有再继续使用CyclicBarrier。
(2)CountDownLatch
没有看源码,但是从使用方法来看,感觉CountDownLatch与CyclicBarrier原理上应该是差不多的,下面给出示例代码
调度线程
try{
List<MrJob> paralJobList = jobExeUnit.getMrjobList();
final CountDownLatch countDownLatch = new CountDownLatch(paralJobList.size());
for(final MrJob pMrjob: paralJobList){
Runnable processRunable = new Runnable() {
@Override
public void run() {
pMrjob.setCountDownLatch(countDownLatch);
pMrjob.process(listener);
}
};
Thread t = new Thread(processRunable);
t.start();
}
countDownLatch.await();
}catch(Exception e){
e.printStackTrace();
}
Hadoop任务执行(子线程)
public void process(JobEventListener listener){
try{
execute();
}catch(Throwable cause){
listener.exceptionCaught(cause);
}finally{
countDownLatch.countDown();
}
}
值得注意的是,在子线程中使用countDown(注意要放在finally中),在主线程中等待的位置使用await。
2. 异常处理
在Java多线程程序中,所有线程都不允许抛出未捕获的checked exception,也就是说各个线程需要自己把自己的checked exception处理掉。这一点是通过java.lang.Runnable.run()方法声明(因为此方法声明上没有throw exception部分)进行了约束。但是线程依然有可能抛出unchecked exception,当此类异常跑抛出时,线程就会终结,而对于主线程和其他线程完全不受影响,且完全感知不到某个线程抛出的异常(也是说完全无法catch到这个异常。
我们在多线程程序的处理中,往往子线程异常后,还想在主线程中捕获子线程的异常,因此引入辅助类EventListener。下面给出示例代码
辅助监听类
public class JobEventListener implements EventListener{
CountDownLatch countDownLatch = null;
List<Throwable> causes = null;
public void exceptionCaught(Throwable cause) {
if (causes == null) {
causes = new CopyOnWriteArrayList<Throwable>();
}
causes.add(cause);
}
public List<Throwable> getCauses() {
return causes;
}
public void setCauses(List<Throwable> causes) {
this.causes = causes;
}
}
主线程
try{
JobType jobtype = jobExeUnit.getJobtype();
List<MrJob> paralJobList = jobExeUnit.getMrjobList();
final CountDownLatch countDownLatch = new CountDownLatch(paralJobList.size());
final JobEventListener listener = new JobEventListener();
for(final MrJob pMrjob: paralJobList){
Runnable processRunable = new Runnable() {
@Override
public void run() {
pMrjob.setCountDownLatch(countDownLatch);
pMrjob.process(listener);
}
};
Thread t = new Thread(processRunable);
t.start();
}
countDownLatch.await();
if(listener.getCauses() != null){
StringBuilder sb = new StringBuilder();
for(int exceptionno=0; exceptionno<listener.getCauses().size(); ++exceptionno){
sb.append("\t\n").append(listener.getCauses()
}
throw new Exception("Throwable size: " + listener.getCauses().size()+sb.toString());
}
}catch(Exception e){
e.printStackTrace();
}
原理比较简单,就是将一个监听类设置入子线程中,将异常获取到,在主线程中处理。