在java中并行化任务的最简单方法是什么?

时间:2021-06-04 04:30:19

Say I have a task like:

说我有一个类似的任务:

for(Object object: objects) {
    Result result = compute(objects);
    list.add(result);
}

What is the easiest way to parallelize each compute() (assuming they are already parallelizable)?

并行化每个compute()的最简单方法是什么(假设它们已经可并行化)?

I do not need an answer that matches strictly the code above, just a general answer. But if you need more info: my tasks are IO bound and this is for a Spring Web application and the tasks are going to be executed in a HTTP request.

我不需要一个严格符合上述代码的答案,只是一般答案。但是如果您需要更多信息:我的任务是IO绑定的,这是针对Spring Web应用程序的,任务将在HTTP请求中执行。

8 个解决方案

#1


48  

I would recommend taking a look at ExecutorService.

我建议看看ExecutorService。

In particular, something like this:

特别是这样的事情:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Note that using newCachedThreadPool could be bad if objects is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n) where n is something reasonable (like the number of cores you have, assuming compute() is CPU bound).

请注意,如果对象是一个大列表,使用newCachedThreadPool可能会很糟糕。缓存的线程池可以为每个任务创建一个线程!您可能希望使用newFixedThreadPool(n),其中n是合理的(例如,您拥有的核心数,假设compute()受CPU限制)。

Here's full code that actually runs:

这是实际运行的完整代码:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}

#2


2  

For a more detailed answer, read Java Concurrency in Practice and use java.util.concurrent.

有关更详细的解答,请阅读Java Concurrency in Practice并使用java.util.concurrent。

#3


1  

Here's something I use in my own projects:

这是我在自己的项目中使用的东西:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Which prints a bit over 2000 on my dual-core box.

在我的双核盒子上打印超过2000。

#4


0  

You can use the ThreadPoolExecutor. Here is sample code: http://programmingexamples.wikidot.com/threadpoolexecutor (too long to bring it here)

您可以使用ThreadPoolExecutor。以下是示例代码:http://programmingexamples.wikidot.com/threadpoolexecutor(太长了,无法将其带到这里)

#5


0  

Fork/Join's parallel array is one option

Fork / Join的并行数组是一种选择

#6


0  

One can simple create a few thread and get the result.

人们可以简单地创建一些线程并获得结果。

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

EDIT : I think other solutions are cooler.

编辑:我认为其他解决方案更酷。

#7


0  

I to was going to mention an executor class. Here is some example code that you would place in the executor class.

我要提到一个执行者课程。以下是您将在执行程序类中放置的一些示例代码。

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Then to use it you would make calls to the executor class to populate and execute it.

然后使用它,您将调用执行程序类来填充和执行它。

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();

#8


0  

With Java8 and later you can create a stream and then do the processing in parallel with parallelStream:

使用Java8及更高版本,您可以创建流,然后与parallelStream并行执行处理:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Note: the order of the results may not match the order of the objects in the list.

注意:结果的顺序可能与列表中对象的顺序不匹配。

Details how to setup the right number of threads are available in this * question how-many-threads-are-spawned-in-parallelstream-in-java-8

有关如何设置正确数量的线程的详细信息,请参阅此*问题how-many-threads-are-spawned-in-parallelstream-in-java-8

#1


48  

I would recommend taking a look at ExecutorService.

我建议看看ExecutorService。

In particular, something like this:

特别是这样的事情:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Note that using newCachedThreadPool could be bad if objects is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n) where n is something reasonable (like the number of cores you have, assuming compute() is CPU bound).

请注意,如果对象是一个大列表,使用newCachedThreadPool可能会很糟糕。缓存的线程池可以为每个任务创建一个线程!您可能希望使用newFixedThreadPool(n),其中n是合理的(例如,您拥有的核心数,假设compute()受CPU限制)。

Here's full code that actually runs:

这是实际运行的完整代码:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}

#2


2  

For a more detailed answer, read Java Concurrency in Practice and use java.util.concurrent.

有关更详细的解答,请阅读Java Concurrency in Practice并使用java.util.concurrent。

#3


1  

Here's something I use in my own projects:

这是我在自己的项目中使用的东西:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Which prints a bit over 2000 on my dual-core box.

在我的双核盒子上打印超过2000。

#4


0  

You can use the ThreadPoolExecutor. Here is sample code: http://programmingexamples.wikidot.com/threadpoolexecutor (too long to bring it here)

您可以使用ThreadPoolExecutor。以下是示例代码:http://programmingexamples.wikidot.com/threadpoolexecutor(太长了,无法将其带到这里)

#5


0  

Fork/Join's parallel array is one option

Fork / Join的并行数组是一种选择

#6


0  

One can simple create a few thread and get the result.

人们可以简单地创建一些线程并获得结果。

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

EDIT : I think other solutions are cooler.

编辑:我认为其他解决方案更酷。

#7


0  

I to was going to mention an executor class. Here is some example code that you would place in the executor class.

我要提到一个执行者课程。以下是您将在执行程序类中放置的一些示例代码。

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Then to use it you would make calls to the executor class to populate and execute it.

然后使用它,您将调用执行程序类来填充和执行它。

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();

#8


0  

With Java8 and later you can create a stream and then do the processing in parallel with parallelStream:

使用Java8及更高版本,您可以创建流,然后与parallelStream并行执行处理:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Note: the order of the results may not match the order of the objects in the list.

注意:结果的顺序可能与列表中对象的顺序不匹配。

Details how to setup the right number of threads are available in this * question how-many-threads-are-spawned-in-parallelstream-in-java-8

有关如何设置正确数量的线程的详细信息,请参阅此*问题how-many-threads-are-spawned-in-parallelstream-in-java-8