执行服务,如何等待所有任务完成

时间:2022-03-28 20:56:11

What is the simplest way to to wait for all tasks of ExecutorService to finish? My task is primarily computational, so I just want to run a large number of jobs - one on each core. Right now my setup looks like this:

要等待执行服务的所有任务完成,最简单的方法是什么?我的任务主要是计算,所以我只想运行大量的工作——每个核心都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask implements runnable. This appears to execute the tasks correctly, but the code crashes on wait() with IllegalMonitorStateException. This is odd, because I played around with some toy examples and it appeared to work.

ComputeDTask实现runnable。这似乎可以正确地执行任务,但是代码在wait()上崩溃,出现了IllegalMonitorStateException异常。这很奇怪,因为我尝试了一些玩具的例子,它似乎是有效的。

uniquePhrases contains several tens of thousands of elements. Should I be using another method? I am looking for something as simple as possible

单片语包含成千上万的元素。我应该用别的方法吗?我在寻找尽可能简单的东西

16 个解决方案

#1


180  

The simplest approach is to use ExecutorService.invokeAll() which does what you want in a one-liner. In your parlance, you'll need to modify or wrap ComputeDTask to implement Callable<>, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation of Callable.call(), but here's a way to wrap it if not using Executors.callable().

最简单的方法是使用ExecutorService.invokeAll(),它在一行中执行您想要的操作。用您的话说,您将需要修改或包装ComputeDTask来实现可调用的<>,这将使您具有更大的灵活性。可能在您的应用程序中有一个有意义的Callable.call()实现,但是如果不使用Executors.callable(),这里有一种方法来包装它。

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

As others have pointed out, you could use the timeout version of invokeAll() if appropriate. In this example, answers is going to contain a bunch of Futures which will return nulls (see definition of Executors.callable(). Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlying ComputeDTask, but I can't tell from your example.

正如其他人指出的,如果合适,可以使用invokeAll()的超时版本。在本例中,答案将包含一系列将返回null的期货(参见Executors.callable()的定义)。也许你想做的是稍微重构,这样你就可以得到一个有用的答案,或者一个对底层ComputeDTask的引用,但是我不能从你的例子中看出。

If it isn't clear, note that invokeAll() will not return until all the tasks are completed. (i.e., all the Futures in your answers collection will report .isDone() if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse this ExecutorService neatly for multiple cycles, if desired.

如果不清楚,请注意invokeAll()在所有任务完成之前不会返回。(即。,如有要求,你所收集的答案中的所有期货都会报告。isdone()。这避免了所有手动关机、等待终止等。如果需要,还允许您在多个循环中灵活地重用这个ExecutorService。

There are a few related questions on SO:

关于这个有几个相关的问题:

None of these are strictly on-point for your question, but they do provide a bit of color about how folks think Executor/ExecutorService ought to be used.

这些都不是你要问的问题,但它们确实提供了一些人们认为应该如何使用Executor/ExecutorService的颜色。

#2


48  

If you want to wait for all tasks to complete, use the shutdown method instead of wait. Then follow it with awaitTermination.

如果您希望等待所有任务完成,请使用shutdown方法而不是等待。然后跟在后面等待结束。

Also, you can use Runtime.availableProcessors to get the number of hardware threads so you can initialize your threadpool properly.

此外,还可以使用运行时。可以使用的处理器来获取硬件线程的数量,以便您可以正确地初始化线程池。

#3


43  

If waiting for all tasks in the ExecutorService to finish isn't precisely your goal, but rather waiting until a specific batch of tasks has completed, you can use a CompletionService — specifically, an ExecutorCompletionService.

如果等待ExecutorService中的所有任务完成并不是您的目标,而是等待特定的任务完成,那么您可以使用CompletionService—具体地说,ExecutorCompletionService。

The idea is to create an ExecutorCompletionService wrapping your Executor, submit some known number of tasks through the CompletionService, then draw that same number of results from the completion queue using either take() (which blocks) or poll() (which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.

我们的想法是创建一个ExecutorCompletionService包装您的Executor,通过CompletionService提交一些已知数量的任务,然后使用take()或poll()从完成队列中提取相同数量的结果。一旦你画出了与你提交的任务相对应的所有预期结果,你就知道它们都完成了。

Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the CompletionService in order to know how many things to try to draw out. This matters especially with the take() method: call it one time too many and it will block your calling thread until some other thread submits another job to the same CompletionService.

让我再说一次,因为从界面上看不明显:你必须知道你在完成服务中放入了多少东西,以便知道有多少事情需要尝试。这对take()方法尤其重要:多次调用它,它将阻塞您的调用线程,直到其他线程将另一个作业提交给相同的CompletionService。

There are some examples showing how to use CompletionService in the book Java Concurrency in Practice.

在实践中,有一些示例展示了如何在《Java并发》一书中使用CompletionService。

#4


9  

If you want to wait for the executor service to finish executing, call shutdown() and then, awaitTermination(units, unitType), e.g. awaitTermination(1, MINUTE). The ExecutorService does not block on it's own monitor, so you can't use wait etc.

如果您希望等待执行器服务完成执行,请调用shutdown(),然后调用awaitTermination(unit, unitType),例如awaitTermination(1分钟)。ExecutorService不阻塞它自己的监视器,所以您不能使用wait等。

#5


6  

You could wait jobs to finish on a certain interval:

你可以等待乔布斯在某段时间内完成:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Or you could use ExecutorService.submit(Runnable) and collect the Future objects that it returns and call get() on each in turn to wait for them to finish.

或者您可以使用ExecutorService.submit(Runnable)并收集它返回的未来对象,并依次对每个对象调用get(),以等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException is extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.

InterruptedException异常重要。它使您或库的用户能够安全地终止一个长进程。

#6


5  

Just use

只使用

latch = new CountDownLatch(noThreads)

In each thread

在每个线程

latch.countDown();

and as barrier

和障碍

latch.await();

#7


4  

Root cause for IllegalMonitorStateException:

IllegalMonitorStateException根本原因:

Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.

抛出以指示线程尝试等待对象的监视器,或通知等待对象监视器的其他线程,而不拥有指定的监视器。

From your code, you have just called wait() on ExecutorService without owning the lock.

从您的代码中,您刚刚在ExecutorService上调用了wait(),但没有锁。

Below code will fix IllegalMonitorStateException

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

Follow one of below approaches to wait for completion of all tasks, which have been submitted to ExecutorService.

按照以下方法之一等待所有任务的完成,这些任务已经提交给ExecutorService。

  1. Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object

    遍历从ExecutorService上提交的所有未来任务,并检查在Future对象上的阻塞调用get()的状态

  2. Using invokeAll on ExecutorService

    使用invokeAll ExecutorService

  3. Using CountDownLatch

    使用CountDownLatch

  4. Using ForkJoinPool or newWorkStealingPool of Executors(since java 8)

    使用ForkJoinPool或newWorkStealingPool的执行者(从java 8开始)

  5. Shutdown the pool as recommended in oracle documentation page

    按照oracle文档页面中的建议关闭池

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change

    如果您希望在使用选项5而不是选项1到4时优雅地等待所有任务完成,请更改

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    to

    a while(condition) which checks for every 1 minute.

    每一分钟检查一次的时间(条件)。

#8


3  

I also have the situation that I have a set of documents to be crawled. I start with an initial "seed" document which should be processed, that document contains links to other documents which should also be processed, and so on.

我还有一套文件要爬。我从一个应该被处理的初始“种子”文档开始,该文档包含到其他应该被处理的文档的链接,等等。

In my main program, I just want to write something like the following, where Crawler controls a bunch of threads.

在我的主程序中,我只想编写如下内容,其中爬虫控制了一些线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

The same situation would happen if I wanted to navigate a tree; i would pop in the root node, the processor for each node would add children to the queue as necessary, and a bunch of threads would process all the nodes in the tree, until there were no more.

如果我想在一棵树上导航,同样的情况也会发生;我将弹出根节点,每个节点的处理器将根据需要向队列添加子节点,一堆线程将处理树中的所有节点,直到没有其他节点为止。

I couldn't find anything in the JVM which I thought was a bit surprising. So I wrote a class AutoStopThreadPool which one can either use directly or subclass to add methods suitable for the domain, e.g. schedule(Document). Hope it helps!

我在JVM中找不到任何东西,我觉得有点奇怪。因此,我编写了一个类AutoStopThreadPool,它可以直接使用或子类来添加适合域的方法,例如schedule(Document)。希望它可以帮助!

AutoStopThreadPool Javadoc | Download

AutoStopThreadPool Javadoc |下载

#9


2  

Add all threads in collection and submit it using invokeAll. If you can use invokeAll method of ExecutorService, JVM won’t proceed to next line until all threads are complete.

添加集合中的所有线程并使用invokeAll提交它。如果您可以使用ExecutorService的invokeAll方法,那么在所有线程都完成之前,JVM不会继续到下一行。

Here there is a good example: invokeAll via ExecutorService

这里有一个很好的例子:通过ExecutorService开发票

#10


1  

Submit your tasks into the Runner and then wait calling the method waitTillDone() like this:

把你的任务提交给跑步者,然后等着叫方法waitTillDone():

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

To use it add this gradle/maven dependency: 'com.github.matejtymes:javafixes:1.0'

要使用它,需要添加这个gradle/maven依赖项:'com.github.matejtymes:javafix:1.0'

For more details look here: https://github.com/MatejTymes/JavaFixes or here: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

更多细节请看这里:https://github.com/MatejTymes/JavaFixes或者这里:http://matejtymes.blogspot.com/2016/04/executor- notify -you-when-task.html

#11


1  

You can use ExecutorService.invokeAll method, It will execute all task and wait till all threads finished their task.

您可以使用ExecutorService。invokeAll方法,它将执行所有任务,并等待所有线程完成任务。

Here is complete javadoc

这是完整的javadoc

You can also user overloaded version of this method to specify the timeout.

您还可以使用该方法的重载版本来指定超时。

Here is sample code with ExecutorService.invokeAll

下面是ExecutorService.invokeAll的示例代码

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

#12


0  

A simple alternative to this is to use threads along with join. Refer : Joining Threads

一个简单的替代方法是使用线程和连接。参考:加入线程

#13


0  

I will just wait for the executor to terminate with a specified timeout that you think it is suitable for the tasks to complete.

我将等待执行程序以指定的超时结束,您认为它适合完成任务。

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

#14


0  

Sounds like you need ForkJoinPool and use the global pool to execute tasks.

听起来您需要ForkJoinPool并使用全局池执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

The beauty is in pool.awaitQuiescence where the method will block utilize the caller's thread to execute its tasks and then return when it is really empty.

美在池中。当方法阻塞时,使用调用者的线程执行它的任务,然后当它真的为空时返回。

#15


-1  

You can use a ThreadPoolExecutor with a pool size set to Runtime.getRuntime().availableProcessors() and .execute() it all the tasks you want to execute, then call tpe.shutdown() and then wait in a while(!tpe.terminated()) { /* waiting for all tasks to complete */} which blocks for all the submitted tasks to complete. where tpe is a reference to your ThreadPoolExecutor instance.

您可以使用一个具有池大小的ThreadPoolExecutor,设置为Runtime.getRuntime(). availableprocessor()和.execute(),它将执行您想要执行的所有任务,然后调用tpe.shutdown(),然后在一段时间内等待(!tpe.终止()){/*等待所有任务完成*/},以完成所有提交的任务。其中tpe是对ThreadPoolExecutor实例的引用。

Or if it is more appropriate use an ExecutorCompletionService A CompletionService that uses a supplied Executor to execute tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. The class is lightweight enough to be suitable for transient use when processing groups of tasks.

或者,如果使用ExecutorCompletionService更合适,则使用提供的Executor执行任务的CompletionService。这个类会安排提交的任务在完成后,放置在可访问的队列中。该类足够轻量,适合在处理任务组时临时使用。

NOTE: This solutions does not block like ExecutorService.invokeAll() does while waiting on all the jobs to finish. So while the simplest it is also the most limited as it will block the main thread of the application. Not good if you want status about what is going on, or to do other things while these jobs are running, like post processing the results or producing and incremental aggregated result.

注意:此解决方案不像ExecutorService.invokeAll()那样在等待所有作业完成时阻塞。因此,尽管最简单的方法也是最有限的,因为它将阻塞应用程序的主线程。如果您想了解正在发生的事情的状态,或者想在这些作业运行时做其他事情,比如对结果进行后处理,或者生成和增量聚合结果,那么这样做就不太好。

#16


-1  

Sorry for late answer but you can use the following to force the threadPoolTaskExecutor to wait until all active and pending tasks been processed.

很抱歉,我的回答迟了,但是您可以使用下面的命令来强制threadPoolTaskExecutor等到所有的活动任务和挂起任务都被处理之后再执行。

ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) AppContext.getBean("threadPoolTaskExecutor");

//Code here to execute the runnable example:

//执行可运行示例的代码:

while(iterator.hasNext())
{
threadPoolTaskExecutor.execute(new Runnable() ... );
}

    while(threadPoolTaskExecutor.getActiveCount() > 0)
    {
        //Hold until finish
    }

    threadPoolTaskExecutor.shutdown();

Make sure that ThreadPoolTaskExecutor bean is defined with scope="Prototype". Important!

确保使用scope="Prototype"定义了ThreadPoolTaskExecutor bean。重要!

#1


180  

The simplest approach is to use ExecutorService.invokeAll() which does what you want in a one-liner. In your parlance, you'll need to modify or wrap ComputeDTask to implement Callable<>, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation of Callable.call(), but here's a way to wrap it if not using Executors.callable().

最简单的方法是使用ExecutorService.invokeAll(),它在一行中执行您想要的操作。用您的话说,您将需要修改或包装ComputeDTask来实现可调用的<>,这将使您具有更大的灵活性。可能在您的应用程序中有一个有意义的Callable.call()实现,但是如果不使用Executors.callable(),这里有一种方法来包装它。

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

As others have pointed out, you could use the timeout version of invokeAll() if appropriate. In this example, answers is going to contain a bunch of Futures which will return nulls (see definition of Executors.callable(). Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlying ComputeDTask, but I can't tell from your example.

正如其他人指出的,如果合适,可以使用invokeAll()的超时版本。在本例中,答案将包含一系列将返回null的期货(参见Executors.callable()的定义)。也许你想做的是稍微重构,这样你就可以得到一个有用的答案,或者一个对底层ComputeDTask的引用,但是我不能从你的例子中看出。

If it isn't clear, note that invokeAll() will not return until all the tasks are completed. (i.e., all the Futures in your answers collection will report .isDone() if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse this ExecutorService neatly for multiple cycles, if desired.

如果不清楚,请注意invokeAll()在所有任务完成之前不会返回。(即。,如有要求,你所收集的答案中的所有期货都会报告。isdone()。这避免了所有手动关机、等待终止等。如果需要,还允许您在多个循环中灵活地重用这个ExecutorService。

There are a few related questions on SO:

关于这个有几个相关的问题:

None of these are strictly on-point for your question, but they do provide a bit of color about how folks think Executor/ExecutorService ought to be used.

这些都不是你要问的问题,但它们确实提供了一些人们认为应该如何使用Executor/ExecutorService的颜色。

#2


48  

If you want to wait for all tasks to complete, use the shutdown method instead of wait. Then follow it with awaitTermination.

如果您希望等待所有任务完成,请使用shutdown方法而不是等待。然后跟在后面等待结束。

Also, you can use Runtime.availableProcessors to get the number of hardware threads so you can initialize your threadpool properly.

此外,还可以使用运行时。可以使用的处理器来获取硬件线程的数量,以便您可以正确地初始化线程池。

#3


43  

If waiting for all tasks in the ExecutorService to finish isn't precisely your goal, but rather waiting until a specific batch of tasks has completed, you can use a CompletionService — specifically, an ExecutorCompletionService.

如果等待ExecutorService中的所有任务完成并不是您的目标,而是等待特定的任务完成,那么您可以使用CompletionService—具体地说,ExecutorCompletionService。

The idea is to create an ExecutorCompletionService wrapping your Executor, submit some known number of tasks through the CompletionService, then draw that same number of results from the completion queue using either take() (which blocks) or poll() (which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.

我们的想法是创建一个ExecutorCompletionService包装您的Executor,通过CompletionService提交一些已知数量的任务,然后使用take()或poll()从完成队列中提取相同数量的结果。一旦你画出了与你提交的任务相对应的所有预期结果,你就知道它们都完成了。

Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the CompletionService in order to know how many things to try to draw out. This matters especially with the take() method: call it one time too many and it will block your calling thread until some other thread submits another job to the same CompletionService.

让我再说一次,因为从界面上看不明显:你必须知道你在完成服务中放入了多少东西,以便知道有多少事情需要尝试。这对take()方法尤其重要:多次调用它,它将阻塞您的调用线程,直到其他线程将另一个作业提交给相同的CompletionService。

There are some examples showing how to use CompletionService in the book Java Concurrency in Practice.

在实践中,有一些示例展示了如何在《Java并发》一书中使用CompletionService。

#4


9  

If you want to wait for the executor service to finish executing, call shutdown() and then, awaitTermination(units, unitType), e.g. awaitTermination(1, MINUTE). The ExecutorService does not block on it's own monitor, so you can't use wait etc.

如果您希望等待执行器服务完成执行,请调用shutdown(),然后调用awaitTermination(unit, unitType),例如awaitTermination(1分钟)。ExecutorService不阻塞它自己的监视器,所以您不能使用wait等。

#5


6  

You could wait jobs to finish on a certain interval:

你可以等待乔布斯在某段时间内完成:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Or you could use ExecutorService.submit(Runnable) and collect the Future objects that it returns and call get() on each in turn to wait for them to finish.

或者您可以使用ExecutorService.submit(Runnable)并收集它返回的未来对象,并依次对每个对象调用get(),以等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException is extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.

InterruptedException异常重要。它使您或库的用户能够安全地终止一个长进程。

#6


5  

Just use

只使用

latch = new CountDownLatch(noThreads)

In each thread

在每个线程

latch.countDown();

and as barrier

和障碍

latch.await();

#7


4  

Root cause for IllegalMonitorStateException:

IllegalMonitorStateException根本原因:

Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.

抛出以指示线程尝试等待对象的监视器,或通知等待对象监视器的其他线程,而不拥有指定的监视器。

From your code, you have just called wait() on ExecutorService without owning the lock.

从您的代码中,您刚刚在ExecutorService上调用了wait(),但没有锁。

Below code will fix IllegalMonitorStateException

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

Follow one of below approaches to wait for completion of all tasks, which have been submitted to ExecutorService.

按照以下方法之一等待所有任务的完成,这些任务已经提交给ExecutorService。

  1. Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object

    遍历从ExecutorService上提交的所有未来任务,并检查在Future对象上的阻塞调用get()的状态

  2. Using invokeAll on ExecutorService

    使用invokeAll ExecutorService

  3. Using CountDownLatch

    使用CountDownLatch

  4. Using ForkJoinPool or newWorkStealingPool of Executors(since java 8)

    使用ForkJoinPool或newWorkStealingPool的执行者(从java 8开始)

  5. Shutdown the pool as recommended in oracle documentation page

    按照oracle文档页面中的建议关闭池

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change

    如果您希望在使用选项5而不是选项1到4时优雅地等待所有任务完成,请更改

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    to

    a while(condition) which checks for every 1 minute.

    每一分钟检查一次的时间(条件)。

#8


3  

I also have the situation that I have a set of documents to be crawled. I start with an initial "seed" document which should be processed, that document contains links to other documents which should also be processed, and so on.

我还有一套文件要爬。我从一个应该被处理的初始“种子”文档开始,该文档包含到其他应该被处理的文档的链接,等等。

In my main program, I just want to write something like the following, where Crawler controls a bunch of threads.

在我的主程序中,我只想编写如下内容,其中爬虫控制了一些线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

The same situation would happen if I wanted to navigate a tree; i would pop in the root node, the processor for each node would add children to the queue as necessary, and a bunch of threads would process all the nodes in the tree, until there were no more.

如果我想在一棵树上导航,同样的情况也会发生;我将弹出根节点,每个节点的处理器将根据需要向队列添加子节点,一堆线程将处理树中的所有节点,直到没有其他节点为止。

I couldn't find anything in the JVM which I thought was a bit surprising. So I wrote a class AutoStopThreadPool which one can either use directly or subclass to add methods suitable for the domain, e.g. schedule(Document). Hope it helps!

我在JVM中找不到任何东西,我觉得有点奇怪。因此,我编写了一个类AutoStopThreadPool,它可以直接使用或子类来添加适合域的方法,例如schedule(Document)。希望它可以帮助!

AutoStopThreadPool Javadoc | Download

AutoStopThreadPool Javadoc |下载

#9


2  

Add all threads in collection and submit it using invokeAll. If you can use invokeAll method of ExecutorService, JVM won’t proceed to next line until all threads are complete.

添加集合中的所有线程并使用invokeAll提交它。如果您可以使用ExecutorService的invokeAll方法,那么在所有线程都完成之前,JVM不会继续到下一行。

Here there is a good example: invokeAll via ExecutorService

这里有一个很好的例子:通过ExecutorService开发票

#10


1  

Submit your tasks into the Runner and then wait calling the method waitTillDone() like this:

把你的任务提交给跑步者,然后等着叫方法waitTillDone():

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

To use it add this gradle/maven dependency: 'com.github.matejtymes:javafixes:1.0'

要使用它,需要添加这个gradle/maven依赖项:'com.github.matejtymes:javafix:1.0'

For more details look here: https://github.com/MatejTymes/JavaFixes or here: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

更多细节请看这里:https://github.com/MatejTymes/JavaFixes或者这里:http://matejtymes.blogspot.com/2016/04/executor- notify -you-when-task.html

#11


1  

You can use ExecutorService.invokeAll method, It will execute all task and wait till all threads finished their task.

您可以使用ExecutorService。invokeAll方法,它将执行所有任务,并等待所有线程完成任务。

Here is complete javadoc

这是完整的javadoc

You can also user overloaded version of this method to specify the timeout.

您还可以使用该方法的重载版本来指定超时。

Here is sample code with ExecutorService.invokeAll

下面是ExecutorService.invokeAll的示例代码

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

#12


0  

A simple alternative to this is to use threads along with join. Refer : Joining Threads

一个简单的替代方法是使用线程和连接。参考:加入线程

#13


0  

I will just wait for the executor to terminate with a specified timeout that you think it is suitable for the tasks to complete.

我将等待执行程序以指定的超时结束,您认为它适合完成任务。

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

#14


0  

Sounds like you need ForkJoinPool and use the global pool to execute tasks.

听起来您需要ForkJoinPool并使用全局池执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

The beauty is in pool.awaitQuiescence where the method will block utilize the caller's thread to execute its tasks and then return when it is really empty.

美在池中。当方法阻塞时,使用调用者的线程执行它的任务,然后当它真的为空时返回。

#15


-1  

You can use a ThreadPoolExecutor with a pool size set to Runtime.getRuntime().availableProcessors() and .execute() it all the tasks you want to execute, then call tpe.shutdown() and then wait in a while(!tpe.terminated()) { /* waiting for all tasks to complete */} which blocks for all the submitted tasks to complete. where tpe is a reference to your ThreadPoolExecutor instance.

您可以使用一个具有池大小的ThreadPoolExecutor,设置为Runtime.getRuntime(). availableprocessor()和.execute(),它将执行您想要执行的所有任务,然后调用tpe.shutdown(),然后在一段时间内等待(!tpe.终止()){/*等待所有任务完成*/},以完成所有提交的任务。其中tpe是对ThreadPoolExecutor实例的引用。

Or if it is more appropriate use an ExecutorCompletionService A CompletionService that uses a supplied Executor to execute tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. The class is lightweight enough to be suitable for transient use when processing groups of tasks.

或者,如果使用ExecutorCompletionService更合适,则使用提供的Executor执行任务的CompletionService。这个类会安排提交的任务在完成后,放置在可访问的队列中。该类足够轻量,适合在处理任务组时临时使用。

NOTE: This solutions does not block like ExecutorService.invokeAll() does while waiting on all the jobs to finish. So while the simplest it is also the most limited as it will block the main thread of the application. Not good if you want status about what is going on, or to do other things while these jobs are running, like post processing the results or producing and incremental aggregated result.

注意:此解决方案不像ExecutorService.invokeAll()那样在等待所有作业完成时阻塞。因此,尽管最简单的方法也是最有限的,因为它将阻塞应用程序的主线程。如果您想了解正在发生的事情的状态,或者想在这些作业运行时做其他事情,比如对结果进行后处理,或者生成和增量聚合结果,那么这样做就不太好。

#16


-1  

Sorry for late answer but you can use the following to force the threadPoolTaskExecutor to wait until all active and pending tasks been processed.

很抱歉,我的回答迟了,但是您可以使用下面的命令来强制threadPoolTaskExecutor等到所有的活动任务和挂起任务都被处理之后再执行。

ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) AppContext.getBean("threadPoolTaskExecutor");

//Code here to execute the runnable example:

//执行可运行示例的代码:

while(iterator.hasNext())
{
threadPoolTaskExecutor.execute(new Runnable() ... );
}

    while(threadPoolTaskExecutor.getActiveCount() > 0)
    {
        //Hold until finish
    }

    threadPoolTaskExecutor.shutdown();

Make sure that ThreadPoolTaskExecutor bean is defined with scope="Prototype". Important!

确保使用scope="Prototype"定义了ThreadPoolTaskExecutor bean。重要!