I need to execute some amount of tasks 4 at a time, something like this:
我需要一次执行一些任务4,像这样:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
How can I get notified once all of them are complete? For now I can't think about anything better than setting some global task counter and decrease it at the end of every task, then monitor in infinite loop this counter to become 0; or get a list of Futures and in infinite loop monitor isDone for all of them. What are better solutions not involving infinite loops?
他们全部完成后,我怎样才能得到通知?现在我想不出比设置一些全局任务计数器并在每个任务结束时减少它,然后在无限循环中监视这个计数器变成0的情况了。或者得到一个未来的列表,并在无限循环监视器中为所有的人做。什么是更好的解决方案,而不是无限循环?
Thanks.
谢谢。
19 个解决方案
#1
344
Basically on an ExecutorService
you call shutdown()
and then awaitTermination()
:
基本上,在ExecutorService上,您调用shutdown(),然后是waittermination ():
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
#2
141
Use a CountDownLatch:
使用CountDownLatch:
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
and within your task (enclose in try / finally)
在你的任务中(试着/最后)
latch.countDown();
#3
74
ExecutorService.invokeAll()
does it for you.
ExecutorService.invokeAll()为您完成。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
#4
41
You can use Lists of Futures, as well:
你也可以使用期货清单:
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
then when you want to join on all of them, its essentially the equivalent of joining on each, (with the added benefit that it re-raises exceptions from child threads to the main):
然后,当您想要加入所有这些元素时,它本质上相当于加入每个元素,(附加的好处是,它将从子线程重新引发异常):
for(Future f: this.futures) { f.get(); }
Basically the trick is to call .get() on each Future one at a time, instead of infinite looping calling isDone() on (all or each). So you're guaranteed to "move on" through and past this block as soon as the last thread finishes. The caveat is that since the .get() call re-raises exceptions, if one of the threads dies, you would raise from this possibly before the other threads have finished to completion [to avoid this, you could add a catch ExecutionException
around the get call]. The other caveat is it keeps a reference to all threads so if they have thread local variables they won't get collected till after you get past this block (though you might be able to get around this, if it became a problem, by removing Future's off the ArrayList). If you wanted to know which Future "finishes first" you could use some something like https://*.com/a/31885029/32453
基本上,诀窍就是在每个将来一次调用.get(),而不是在(all或each)上进行无限循环调用isDone()。因此,在最后一个线程结束时,您将确保“继续”通过并通过这个块。需要注意的是,自从.get()调用重新引发异常时,如果其中一个线程死了,您可能会在其他线程完成完成之前从这个过程中提高(为了避免这一点,您可以在get调用周围添加一个catch ExecutionException)。另一个需要注意的是,它保留了对所有线程的引用,所以如果它们有线程局部变量,那么在您通过这个块之后,它们就不会被收集(尽管您可能会绕过这个问题,如果它成为一个问题,通过删除将来的ArrayList)。如果您想知道哪个未来“先完成”,您可以使用一些类似于https://*.com/a/31885029/32453的东西。
#5
23
Just my two cents. To overcome the requirement of CountDownLatch
to know the number of tasks beforehand, you could do it the old fashion way by using a simple Semaphore
.
只是我两美分。为了克服CountDownLatch的要求,预先知道任务的数量,可以使用简单的信号量来完成旧的流行方式。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
In your task just call s.release()
as you would latch.countDown();
在您的任务中,只需调用。release(),就像您将会挂起的一样。
#6
16
In Java8 you can do it with CompletableFuture:
在Java8中,您可以使用CompletableFuture实现:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
#7
12
The CyclicBarrier class in Java 5 and later is designed for this sort of thing.
Java 5和后来的循环屏障类是为这种东西而设计的。
#8
9
A bit late to the game but for the sake of completion...
有点晚了,但为了完成…
Instead of 'waiting' for all tasks to finish, you can think in terms of the Hollywood principle, "don't call me, I'll call you" - when I'm finished. I think the resulting code is more elegant...
你可以考虑好莱坞的原则:“不要给我打电话,我会打电话给你”,而不是“等待”完成所有任务。我认为生成的代码更加优雅……
Guava offers some interesting tools to accomplish this.
Guava提供了一些有趣的工具来实现这一点。
An example ::
一个例子:
Wrap an ExecutorService into a ListeningExecutorService ::
将一个执行服务包装成一个侦听器服务::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Submit a collection of callables for execution ::
提交一组用于执行的callables::
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
Now the essential part:
现在基本部分:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
Attach a callback to the ListenableFuture, that you can use to be notified when all futures complete ::
将回调附加到ListenableFuture,当所有的期货完成时,您可以使用该回调::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.info("@@ finished processing {} elements", Iterables.size(result));
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
log.info("@@ failed because of :: {}", t);
}
});
This also offers the advantage that you can collect all the results in one place once the processing is finished...
这也提供了一个优点,当处理完成后,您可以在一个地方收集所有的结果……
More information here
更多的信息在这里
#9
5
You could wrap your tasks in another runnable, that will send notifications:
您可以将任务打包到另一个runnable中,它将发送通知:
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
#10
3
I've just written a sample program that solves your problem. There was no concise implementation given, so I'll add one. While you can use executor.shutdown()
and executor.awaitTermination()
, it is not the best practice as the time taken by different threads would be unpredictable.
我刚刚写了一个示例程序来解决您的问题。没有给出简明的实现,所以我将添加一个。当您可以使用executor.shutdown()和executor.awaitend()时,它并不是最佳实践,因为不同线程所花费的时间是不可预测的。
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());
for (int i = 0; i < 1000000; i++) {
sum += i;
}
System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}
});
}
try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;
for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}
if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
#11
3
Just to provide more alternatives here different to use latch/barriers. You can also get the partial results until all of them finish using CompletionService.
只是为了在这里提供更多的替代选择来使用门闩/屏障。您还可以获得部分结果,直到所有的结果都使用CompletionService。
From Java Concurrency in practice: "If you have a batch of computations to submit to an Executor and you want to retrieve their results as they become available, you could retain the Future associated with each task and repeatedly poll for completion by calling get with a timeout of zero. This is possible, but tedious. Fortunately there is a better way: a completion service."
在实践中,从Java并发性:“如果您有一批计算要提交给执行器,并且您希望在它们可用时检索它们的结果,那么您可以保留与每个任务相关联的未来,并通过调用get(超时为0)进行重复轮询。这是可能的,但是很乏味。幸运的是,有一个更好的方法:完成服务。
Here the implementation
在这里实现
public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}
#12
3
Follow one of below approaches.
遵循以下方法之一。
- Iterate through all Future tasks, returned from
submit
onExecutorService
and check the status with blocking callget()
onFuture
object as suggested byKiran
- 遍历所有将来的任务,从提交执行服务返回,并根据Kiran建议的对未来对象的阻塞调用get()来检查状态。
- Use
invokeAll()
on ExecutorService - ExecutorService使用invokeAll()
- CountDownLatch
- CountDownLatch
- ForkJoinPool or Executors.html#newWorkStealingPool
- ForkJoinPool或Executors.html # newWorkStealingPool
- Use
shutdown, awaitTermination, shutdownNow
APIs of ThreadPoolExecutor in proper sequence - 在适当的顺序下,使用关机,等待终止,关闭线程的api。
Related SE questions:
SE相关问题:
How is CountDownLatch used in Java Multithreading?
如何在Java多线程中使用countdown闩?
How to properly shutdown java ExecutorService
如何正确关闭java ExecutorService ?
#13
1
You could use your own subclass of ExecutorCompletionService to wrap taskExecutor
, and your own implementation of BlockingQueue to get informed when each task completes and perform whatever callback or other action you desire when the number of completed tasks reaches your desired goal.
您可以使用自己的ExecutorCompletionService子类来包装taskExecutor,以及您自己的BlockingQueue的实现,以便在每个任务完成并执行您希望的完成任务的数量达到您期望的目标时执行的任何回调或其他操作。
#14
0
you should use executorService.shutdown()
and executorService.awaitTermination
method.
您应该使用executorService.shutdown()和executorService。awaitTermination方法。
An example as follows :
下面是一个例子:
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
}
#15
0
Java 8 - We can use stream API to process stream. Please see snippet below
Java 8 -我们可以使用流API来处理流。请看下面的代码片段
final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();
#16
0
You could use this code:
您可以使用以下代码:
public class MyTask implements Runnable {
private CountDownLatch countDownLatch;
public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
#17
-1
This might help
这可能帮助
Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
try {
Log.i(LOG_TAG, "Waiting for executor to terminate...");
if (executor.isTerminated())
break;
if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException ignored) {}
}
#18
-1
You could call waitTillDone() on this Runner class:
你可以在这个跑步班上叫waitTillDone():
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool
while(...) {
runner.run(new MyTask()); // here you submit your task
}
runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)
runner.shutdown(); // once you done you can shutdown the runner
You can reuse this class and call waitTillDone() as many times as you want to before calling shutdown(), plus your code is extremly simple. Also you don't have to know the number of tasks upfront.
您可以重复使用这个类,并在调用shutdown()之前多次调用waitTillDone(),加上您的代码非常简单。你也不必预先知道任务的数量。
To use it just add this gradle/maven compile 'com.github.matejtymes:javafixes:1.1.1'
dependency to your project.
要使用它,只需添加这个gradle/maven编译'com.github.matejtymes:javafixes:1.1.1'依赖于您的项目。
More details can be found here:
更多细节可以在这里找到:
https://github.com/MatejTymes/JavaFixes
https://github.com/MatejTymes/JavaFixes
http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
#19
-2
There is a method in executor getActiveCount()
- that gives the count of active threads.
在executor getActiveCount()中有一个方法,它给出了活动线程的计数。
After spanning the thread, we can check if the activeCount()
value is 0
. Once the value is zero, it is meant that there are no active threads currently running which means task is finished:
跨越线程后,我们可以检查activeCount()值是否为0。一旦值为零,就意味着当前没有正在运行的活动线程,这意味着任务已经完成:
while (true) {
if (executor.getActiveCount() == 0) {
//ur own piece of code
break;
}
}
#1
344
Basically on an ExecutorService
you call shutdown()
and then awaitTermination()
:
基本上,在ExecutorService上,您调用shutdown(),然后是waittermination ():
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
#2
141
Use a CountDownLatch:
使用CountDownLatch:
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
and within your task (enclose in try / finally)
在你的任务中(试着/最后)
latch.countDown();
#3
74
ExecutorService.invokeAll()
does it for you.
ExecutorService.invokeAll()为您完成。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
#4
41
You can use Lists of Futures, as well:
你也可以使用期货清单:
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
then when you want to join on all of them, its essentially the equivalent of joining on each, (with the added benefit that it re-raises exceptions from child threads to the main):
然后,当您想要加入所有这些元素时,它本质上相当于加入每个元素,(附加的好处是,它将从子线程重新引发异常):
for(Future f: this.futures) { f.get(); }
Basically the trick is to call .get() on each Future one at a time, instead of infinite looping calling isDone() on (all or each). So you're guaranteed to "move on" through and past this block as soon as the last thread finishes. The caveat is that since the .get() call re-raises exceptions, if one of the threads dies, you would raise from this possibly before the other threads have finished to completion [to avoid this, you could add a catch ExecutionException
around the get call]. The other caveat is it keeps a reference to all threads so if they have thread local variables they won't get collected till after you get past this block (though you might be able to get around this, if it became a problem, by removing Future's off the ArrayList). If you wanted to know which Future "finishes first" you could use some something like https://*.com/a/31885029/32453
基本上,诀窍就是在每个将来一次调用.get(),而不是在(all或each)上进行无限循环调用isDone()。因此,在最后一个线程结束时,您将确保“继续”通过并通过这个块。需要注意的是,自从.get()调用重新引发异常时,如果其中一个线程死了,您可能会在其他线程完成完成之前从这个过程中提高(为了避免这一点,您可以在get调用周围添加一个catch ExecutionException)。另一个需要注意的是,它保留了对所有线程的引用,所以如果它们有线程局部变量,那么在您通过这个块之后,它们就不会被收集(尽管您可能会绕过这个问题,如果它成为一个问题,通过删除将来的ArrayList)。如果您想知道哪个未来“先完成”,您可以使用一些类似于https://*.com/a/31885029/32453的东西。
#5
23
Just my two cents. To overcome the requirement of CountDownLatch
to know the number of tasks beforehand, you could do it the old fashion way by using a simple Semaphore
.
只是我两美分。为了克服CountDownLatch的要求,预先知道任务的数量,可以使用简单的信号量来完成旧的流行方式。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
In your task just call s.release()
as you would latch.countDown();
在您的任务中,只需调用。release(),就像您将会挂起的一样。
#6
16
In Java8 you can do it with CompletableFuture:
在Java8中,您可以使用CompletableFuture实现:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
#7
12
The CyclicBarrier class in Java 5 and later is designed for this sort of thing.
Java 5和后来的循环屏障类是为这种东西而设计的。
#8
9
A bit late to the game but for the sake of completion...
有点晚了,但为了完成…
Instead of 'waiting' for all tasks to finish, you can think in terms of the Hollywood principle, "don't call me, I'll call you" - when I'm finished. I think the resulting code is more elegant...
你可以考虑好莱坞的原则:“不要给我打电话,我会打电话给你”,而不是“等待”完成所有任务。我认为生成的代码更加优雅……
Guava offers some interesting tools to accomplish this.
Guava提供了一些有趣的工具来实现这一点。
An example ::
一个例子:
Wrap an ExecutorService into a ListeningExecutorService ::
将一个执行服务包装成一个侦听器服务::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
Submit a collection of callables for execution ::
提交一组用于执行的callables::
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
Now the essential part:
现在基本部分:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
Attach a callback to the ListenableFuture, that you can use to be notified when all futures complete ::
将回调附加到ListenableFuture,当所有的期货完成时,您可以使用该回调::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.info("@@ finished processing {} elements", Iterables.size(result));
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
log.info("@@ failed because of :: {}", t);
}
});
This also offers the advantage that you can collect all the results in one place once the processing is finished...
这也提供了一个优点,当处理完成后,您可以在一个地方收集所有的结果……
More information here
更多的信息在这里
#9
5
You could wrap your tasks in another runnable, that will send notifications:
您可以将任务打包到另一个runnable中,它将发送通知:
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
#10
3
I've just written a sample program that solves your problem. There was no concise implementation given, so I'll add one. While you can use executor.shutdown()
and executor.awaitTermination()
, it is not the best practice as the time taken by different threads would be unpredictable.
我刚刚写了一个示例程序来解决您的问题。没有给出简明的实现,所以我将添加一个。当您可以使用executor.shutdown()和executor.awaitend()时,它并不是最佳实践,因为不同线程所花费的时间是不可预测的。
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());
for (int i = 0; i < 1000000; i++) {
sum += i;
}
System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}
});
}
try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;
for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}
if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
#11
3
Just to provide more alternatives here different to use latch/barriers. You can also get the partial results until all of them finish using CompletionService.
只是为了在这里提供更多的替代选择来使用门闩/屏障。您还可以获得部分结果,直到所有的结果都使用CompletionService。
From Java Concurrency in practice: "If you have a batch of computations to submit to an Executor and you want to retrieve their results as they become available, you could retain the Future associated with each task and repeatedly poll for completion by calling get with a timeout of zero. This is possible, but tedious. Fortunately there is a better way: a completion service."
在实践中,从Java并发性:“如果您有一批计算要提交给执行器,并且您希望在它们可用时检索它们的结果,那么您可以保留与每个任务相关联的未来,并通过调用get(超时为0)进行重复轮询。这是可能的,但是很乏味。幸运的是,有一个更好的方法:完成服务。
Here the implementation
在这里实现
public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}
#12
3
Follow one of below approaches.
遵循以下方法之一。
- Iterate through all Future tasks, returned from
submit
onExecutorService
and check the status with blocking callget()
onFuture
object as suggested byKiran
- 遍历所有将来的任务,从提交执行服务返回,并根据Kiran建议的对未来对象的阻塞调用get()来检查状态。
- Use
invokeAll()
on ExecutorService - ExecutorService使用invokeAll()
- CountDownLatch
- CountDownLatch
- ForkJoinPool or Executors.html#newWorkStealingPool
- ForkJoinPool或Executors.html # newWorkStealingPool
- Use
shutdown, awaitTermination, shutdownNow
APIs of ThreadPoolExecutor in proper sequence - 在适当的顺序下,使用关机,等待终止,关闭线程的api。
Related SE questions:
SE相关问题:
How is CountDownLatch used in Java Multithreading?
如何在Java多线程中使用countdown闩?
How to properly shutdown java ExecutorService
如何正确关闭java ExecutorService ?
#13
1
You could use your own subclass of ExecutorCompletionService to wrap taskExecutor
, and your own implementation of BlockingQueue to get informed when each task completes and perform whatever callback or other action you desire when the number of completed tasks reaches your desired goal.
您可以使用自己的ExecutorCompletionService子类来包装taskExecutor,以及您自己的BlockingQueue的实现,以便在每个任务完成并执行您希望的完成任务的数量达到您期望的目标时执行的任何回调或其他操作。
#14
0
you should use executorService.shutdown()
and executorService.awaitTermination
method.
您应该使用executorService.shutdown()和executorService。awaitTermination方法。
An example as follows :
下面是一个例子:
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
}
#15
0
Java 8 - We can use stream API to process stream. Please see snippet below
Java 8 -我们可以使用流API来处理流。请看下面的代码片段
final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();
#16
0
You could use this code:
您可以使用以下代码:
public class MyTask implements Runnable {
private CountDownLatch countDownLatch;
public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
#17
-1
This might help
这可能帮助
Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
try {
Log.i(LOG_TAG, "Waiting for executor to terminate...");
if (executor.isTerminated())
break;
if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException ignored) {}
}
#18
-1
You could call waitTillDone() on this Runner class:
你可以在这个跑步班上叫waitTillDone():
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool
while(...) {
runner.run(new MyTask()); // here you submit your task
}
runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)
runner.shutdown(); // once you done you can shutdown the runner
You can reuse this class and call waitTillDone() as many times as you want to before calling shutdown(), plus your code is extremly simple. Also you don't have to know the number of tasks upfront.
您可以重复使用这个类,并在调用shutdown()之前多次调用waitTillDone(),加上您的代码非常简单。你也不必预先知道任务的数量。
To use it just add this gradle/maven compile 'com.github.matejtymes:javafixes:1.1.1'
dependency to your project.
要使用它,只需添加这个gradle/maven编译'com.github.matejtymes:javafixes:1.1.1'依赖于您的项目。
More details can be found here:
更多细节可以在这里找到:
https://github.com/MatejTymes/JavaFixes
https://github.com/MatejTymes/JavaFixes
http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
#19
-2
There is a method in executor getActiveCount()
- that gives the count of active threads.
在executor getActiveCount()中有一个方法,它给出了活动线程的计数。
After spanning the thread, we can check if the activeCount()
value is 0
. Once the value is zero, it is meant that there are no active threads currently running which means task is finished:
跨越线程后,我们可以检查activeCount()值是否为0。一旦值为零,就意味着当前没有正在运行的活动线程,这意味着任务已经完成:
while (true) {
if (executor.getActiveCount() == 0) {
//ur own piece of code
break;
}
}