如何等待ThreadPoolExecutor完成

时间:2022-03-28 20:55:41

My Question: How to execute a bunch of threaded objects on a ThreadPoolExecutor and wait for them all to finish before moving on?

我的问题:如何在ThreadPoolExecutor上执行一堆线程对象并等待它们全部完成然后再继续?

I'm new to ThreadPoolExecutor. So this code is a test to learn how it works. Right now I don't even fill the BlockingQueue with the objects because I don't understand how to start the queue without calling execute() with another RunnableObject. Anyway, right now I just call awaitTermination() but I think I'm still missing something. Any tips would be great! Thanks.

我是ThreadPoolExecutor的新手。所以这段代码是一个测试它是如何工作的测试。现在我甚至没有用对象填充BlockingQueue,因为我不理解如何在不使用另一个RunnableObject调用execute()的情况下启动队列。无论如何,现在我只是打电话给awaitTermination(),但我想我仍然缺少一些东西。任何提示都会很棒!谢谢。

public void testThreadPoolExecutor() throws InterruptedException {
  int limit = 20;
  BlockingQueue q = new ArrayBlockingQueue(limit);
  ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
  for (int i = 0; i < limit; i++) {
    ex.execute(new RunnableObject(i + 1));
  }
  ex.awaitTermination(2, TimeUnit.SECONDS);
  System.out.println("finished");
}

The RunnableObject class:

RunnableObject类:

package playground;

public class RunnableObject implements Runnable {

  private final int id;

  public RunnableObject(int id) {
    this.id = id;
  }

  @Override
  public void run() {
    System.out.println("ID: " + id + " started");
    try {
      Thread.sleep(2354);
    } catch (InterruptedException ignore) {
    }
    System.out.println("ID: " + id + " ended");
  }
}

6 个解决方案

#1


47  

You should loop on awaitTermination

你应该循环awaitTermination

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
  log.info("Awaiting completion of threads.");
}

#2


4  

Your issue seems to be that you are not calling shutdown after you have submitted all of the jobs to your pool. Without shutdown() your awaitTermination will always return false.

您的问题似乎是在将所有作业提交到池后,您没有调用shutdown。如果没有shutdown(),你的awaitTermination将始终返回false。

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

You can also do something like the following to wait for all your jobs to finish:

您还可以执行以下操作以等待所有作业完成:

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
   // this joins with the submitted job
   future.get();
}
...
// still need to shutdown at the end
ex.shutdown();

Also, because you are sleeping for 2354 milliseconds but only waiting for the termination of all of the jobs for 2 SECONDS, awaitTermination will always return false.

此外,因为您正在休眠2354毫秒,但只等待2 SECONDS的所有作业终止,awaitTermination将始终返回false。

Lastly, it sounds like you are worrying about created a new ThreadPoolExecutor and you instead want to reuse the first one. Don't be. The GC overhead is going to be extremely minimal compared to any code that you write to detect if the jobs are finished.

最后,听起来你担心创建一个新的ThreadPoolExecutor而你想要重用第一个。不要。与您编写的用于检测作业是否完成的任何代码相比,GC开销将极小。


To quote from the javadocs, ThreadPoolExecutor.shutdown():

引用javadocs,ThreadPoolExecutor.shutdown():

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。如果已经关闭,调用没有其他影响。

In the ThreadPoolExecutor.awaitTermination(...) method, it is waiting for the state of the executor to go to TERMINATED. But first the state must go to SHUTDOWN if shutdown() is called or STOP if shutdownNow() is called.

在ThreadPoolExecutor.awaitTermination(...)方法中,它正在等待执行程序的状态转到TERMINATED。但首先,如果调用shutdown(),则状态必须转到SHUTDOWN;如果调用shutdownNow(),则必须转到STOP。

#3


3  

It's nothing to do with the executor itself. Just use the interface's java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>). It will block until all the Callables are finished.

这与执行者本身无关。只需使用接口的java.util.concurrent.ExecutorService.invokeAll(Collection >)。它会阻塞,直到所有Callables都完成。

Executors are meant to be long-lived; beyond the lifetime of a group of tasks. shutdown is for when the application is finished and cleaning up.

执行者意味着长寿;超出一组任务的生命周期。关闭是在应用程序完成和清理时。

#4


2  

Here's a variant on the accepted answer that handles retries if/when InterruptedException is thrown:

以下是在接受InterruptedException时触发重试的已接受答案的变体:

executor.shutdown();

boolean isWait = true;

while (isWait)
{
    try
    {             
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        {
            log.info("Awaiting completion of bulk callback threads.");
        }
    } catch (InterruptedException e) {
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    }
}

#5


1  

Another approach is to use CompletionService, very useful if you have to attempt any task result:

另一种方法是使用CompletionService,如果您必须尝试任何任务结果,这非常有用:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++){
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);
}

executor.shutdown(); //it cannot be queued more task

try {
    for (int t = 0; t < numTaskToStart ; t++) {
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    }
} catch (InterruptedException e) {
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
} catch (ExecutionException e) {
    // ... something to catch ...
}

#6


-1  

Try this,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}

Lines to be added

要添加的行

ex.shutdown();
ex.awaitTermination(timeout, unit)

#1


47  

You should loop on awaitTermination

你应该循环awaitTermination

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
  log.info("Awaiting completion of threads.");
}

#2


4  

Your issue seems to be that you are not calling shutdown after you have submitted all of the jobs to your pool. Without shutdown() your awaitTermination will always return false.

您的问题似乎是在将所有作业提交到池后,您没有调用shutdown。如果没有shutdown(),你的awaitTermination将始终返回false。

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

You can also do something like the following to wait for all your jobs to finish:

您还可以执行以下操作以等待所有作业完成:

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
   // this joins with the submitted job
   future.get();
}
...
// still need to shutdown at the end
ex.shutdown();

Also, because you are sleeping for 2354 milliseconds but only waiting for the termination of all of the jobs for 2 SECONDS, awaitTermination will always return false.

此外,因为您正在休眠2354毫秒,但只等待2 SECONDS的所有作业终止,awaitTermination将始终返回false。

Lastly, it sounds like you are worrying about created a new ThreadPoolExecutor and you instead want to reuse the first one. Don't be. The GC overhead is going to be extremely minimal compared to any code that you write to detect if the jobs are finished.

最后,听起来你担心创建一个新的ThreadPoolExecutor而你想要重用第一个。不要。与您编写的用于检测作业是否完成的任何代码相比,GC开销将极小。


To quote from the javadocs, ThreadPoolExecutor.shutdown():

引用javadocs,ThreadPoolExecutor.shutdown():

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。如果已经关闭,调用没有其他影响。

In the ThreadPoolExecutor.awaitTermination(...) method, it is waiting for the state of the executor to go to TERMINATED. But first the state must go to SHUTDOWN if shutdown() is called or STOP if shutdownNow() is called.

在ThreadPoolExecutor.awaitTermination(...)方法中,它正在等待执行程序的状态转到TERMINATED。但首先,如果调用shutdown(),则状态必须转到SHUTDOWN;如果调用shutdownNow(),则必须转到STOP。

#3


3  

It's nothing to do with the executor itself. Just use the interface's java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>). It will block until all the Callables are finished.

这与执行者本身无关。只需使用接口的java.util.concurrent.ExecutorService.invokeAll(Collection >)。它会阻塞,直到所有Callables都完成。

Executors are meant to be long-lived; beyond the lifetime of a group of tasks. shutdown is for when the application is finished and cleaning up.

执行者意味着长寿;超出一组任务的生命周期。关闭是在应用程序完成和清理时。

#4


2  

Here's a variant on the accepted answer that handles retries if/when InterruptedException is thrown:

以下是在接受InterruptedException时触发重试的已接受答案的变体:

executor.shutdown();

boolean isWait = true;

while (isWait)
{
    try
    {             
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        {
            log.info("Awaiting completion of bulk callback threads.");
        }
    } catch (InterruptedException e) {
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    }
}

#5


1  

Another approach is to use CompletionService, very useful if you have to attempt any task result:

另一种方法是使用CompletionService,如果您必须尝试任何任务结果,这非常有用:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++){
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);
}

executor.shutdown(); //it cannot be queued more task

try {
    for (int t = 0; t < numTaskToStart ; t++) {
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    }
} catch (InterruptedException e) {
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
} catch (ExecutionException e) {
    // ... something to catch ...
}

#6


-1  

Try this,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}

Lines to be added

要添加的行

ex.shutdown();
ex.awaitTermination(timeout, unit)