多线程拓展--谷歌ListeningExecutorService的使用

时间:2025-03-19 11:20:12

由于普通的线程池,返回的Future,功能比较单一;Guava 定义了 ListenableFuture接口并继承了JDK concurrent包下的Future 接口,ListenableFuture 允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用。

1.使用方法如下:

1.创建线程池

2.装饰线程池

3.任务处理

4.回调函数处理

5.所有任务完成后处理

场景模拟:
导入一张1211条数据的Excel表格:

1.每条数据处理较慢
2.处理完后需要汇总数据
3.处理汇总成功的数据


2.代码示例如下:


2.1接口和调用

package com.java4all.test11;

import ;
import .*;
import ;
import ;
import ;

import ;
import ;
import ;
import ;
import ;
import ;


/**
 * Author: yunqing
 * Date: 2018/9/19
 * Description:
 */
@RestController
@RequestMapping(value = "testThread")
public class TestThread {


    /**线程池*/
    static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60,
            ,
            new LinkedBlockingQueue<>(200),
            new ()
    );

    /**
     * 数据处理
     * @return
     * @throws Exception
     */
    @RequestMapping(value = "parse",method = )
    public String parse() throws Exception{
        List<String> result = new ArrayList<>();
        List<String> list = new ArrayList<>();
        
        //模拟原始数据
        for(int i = 0; i < 1211;i ++){
            (i+"-");
            ("添加原始数据:"+i);
        }

        int size = 50;//切分粒度,每size条数据,切分一块,交由一条线程处理
        int countNum = 0;//当前处理到的位置
        int count = ()/size;//切分块数
        int threadNum = 0;//使用线程数
        if(count*size != ()){
            count ++;
        }

        final CountDownLatch countDownLatch = new CountDownLatch(count);

        //使用Guava的ListeningExecutorService装饰线程池
        ListeningExecutorService executorService = (threadPoolExecutor);

        while (countNum < count*size){
            //切割不同的数据块,分段处理
            threadNum ++;
            countNum += size;
            MyCallable myCallable = new MyCallable();
            ((
                    (countNum-size,() > countNum ? countNum : ())));

            ListenableFuture listenableFuture = (myCallable);

            //回调函数
            (listenableFuture, new FutureCallback<List<String>>() {
                //任务处理成功时执行
                @Override
                public void onSuccess(List<String> list) {
                    ();
                    ("第h次处理完成");
                    (list);
                }

                //任务处理失败时执行
                @Override
                public void onFailure(Throwable throwable) {
                    ();
                    ("处理失败:"+throwable);
                }
            });
            
        }

        //设置时间,超时了直接向下执行,不再阻塞
        (3,);

        ().forEach(s -> (s));
        ("------------结果处理完毕,返回完毕,使用线程数量:"+threadNum);

        return "处理完了";
    }
}

任务处理

package com.java4all.test11;

import ;
import ;
import ;

/**
 * Author: yunqing
 * Date: 2018/9/19
 * Description:任务处理逻辑
 */
public class MyCallable implements Callable{

    private List<String> list ;
    @Override
    public Object call() throws Exception {
        List<String> listReturn = new ArrayList<>();
        //模拟对数据处理,然后返回
        for(int i = 0;i < ();i++){
            ((i)+":处理时间:"+()+"---:处理线程:"+());
        }

        return listReturn;
    }

    public void setList(List<String> list) {
         = list;
    }
}