由于普通的线程池,返回的Future,功能比较单一;Guava 定义了 ListenableFuture接口并继承了JDK concurrent包下的Future 接口,ListenableFuture 允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用。
1.使用方法如下:
1.创建线程池
2.装饰线程池
3.任务处理
4.回调函数处理
5.所有任务完成后处理
场景模拟:
导入一张1211条数据的Excel表格:
1.每条数据处理较慢
2.处理完后需要汇总数据
3.处理汇总成功的数据
2.代码示例如下:
2.1接口和调用
package com.java4all.test11;
import ;
import .*;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* Author: yunqing
* Date: 2018/9/19
* Description:
*/
@RestController
@RequestMapping(value = "testThread")
public class TestThread {
/**线程池*/
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60,
,
new LinkedBlockingQueue<>(200),
new ()
);
/**
* 数据处理
* @return
* @throws Exception
*/
@RequestMapping(value = "parse",method = )
public String parse() throws Exception{
List<String> result = new ArrayList<>();
List<String> list = new ArrayList<>();
//模拟原始数据
for(int i = 0; i < 1211;i ++){
(i+"-");
("添加原始数据:"+i);
}
int size = 50;//切分粒度,每size条数据,切分一块,交由一条线程处理
int countNum = 0;//当前处理到的位置
int count = ()/size;//切分块数
int threadNum = 0;//使用线程数
if(count*size != ()){
count ++;
}
final CountDownLatch countDownLatch = new CountDownLatch(count);
//使用Guava的ListeningExecutorService装饰线程池
ListeningExecutorService executorService = (threadPoolExecutor);
while (countNum < count*size){
//切割不同的数据块,分段处理
threadNum ++;
countNum += size;
MyCallable myCallable = new MyCallable();
((
(countNum-size,() > countNum ? countNum : ())));
ListenableFuture listenableFuture = (myCallable);
//回调函数
(listenableFuture, new FutureCallback<List<String>>() {
//任务处理成功时执行
@Override
public void onSuccess(List<String> list) {
();
("第h次处理完成");
(list);
}
//任务处理失败时执行
@Override
public void onFailure(Throwable throwable) {
();
("处理失败:"+throwable);
}
});
}
//设置时间,超时了直接向下执行,不再阻塞
(3,);
().forEach(s -> (s));
("------------结果处理完毕,返回完毕,使用线程数量:"+threadNum);
return "处理完了";
}
}
任务处理
package com.java4all.test11;
import ;
import ;
import ;
/**
* Author: yunqing
* Date: 2018/9/19
* Description:任务处理逻辑
*/
public class MyCallable implements Callable{
private List<String> list ;
@Override
public Object call() throws Exception {
List<String> listReturn = new ArrayList<>();
//模拟对数据处理,然后返回
for(int i = 0;i < ();i++){
((i)+":处理时间:"+()+"---:处理线程:"+());
}
return listReturn;
}
public void setList(List<String> list) {
= list;
}
}