Java Executors:如何设置任务优先级?

时间:2021-05-10 23:08:27

Is there a possibility to set priority to tasks which are executed by Executors? I've found some statements in JCIP about it's possible but I cannot find any example and I cannot find anything related in docs.

是否有可能为Executors执行的任务设置优先级?我在JCIP中发现了一些关于它可能的陈述,但我找不到任何例子,我找不到任何与文档相关的内容。

From JCIP:

An execution policy specifies the "what, where, when, and how" of task execution, including:

执行策略指定任务执行的“内容,地点,时间和方式”,包括:

  • ...
  • In what order should tasks be executed (FIFO, LIFO, priority order)?
  • 应该以什么顺序执行任务(FIFO,LIFO,优先顺序)?

  • ...

UPD: I realized that I asked not exactly what I wanted to ask. What I really wanted is:

UPD:我意识到我不确切地问我想问什么。我真正想要的是:

How to use/emulate setting threads priority (i.e. what was thread.setPriority()) with executors framework?

如何使用/模拟设置线程优先级(即什么是thread.setPriority())与执行器框架?

7 个解决方案

#1


53  

Currently the only concrete implementations of the Executor interface are the ThreadPoolExecutor and the ScheduledThreadpoolExecutor

目前,Executor接口的唯一具体实现是ThreadPoolExecutor和ScheduledThreadpoolExecutor

Instead of using the utility / factory class Executors, you should create an instance using a constructor.

您应该使用构造函数创建实例,而不是使用实用程序/工厂类Executors。

You can pass a BlockingQueue to the constructors of the ThreadPoolExecutor.

您可以将BlockingQueue传递给ThreadPoolExecutor的构造函数。

One of the implementations of the BlockingQueue, the PriorityBlockingQueue lets you pass a Comparator to a constructor, that way enabling you to decide the order of execution.

作为BlockingQueue的一个实现,PriorityBlockingQueue允许您将Comparator传递给构造函数,这样您就可以决定执行的顺序。

#2


35  

The idea here is to use a PriorityBlockingQueue in the executor. For this:

这里的想法是在执行程序中使用PriorityBlockingQueue。为了这:

  • Create a comparator that would compare our futures.
  • 创建一个比较我们未来的比较器。

  • Create a proxy for the Future to hold a priority.
  • 为Future创建代理以保持优先级。

  • Override 'newTaskFor' in order to wrap every future in our proxy.
  • 覆盖'newTaskFor'以包装代理中的每个未来。

First you need to hold priority on your future:

首先,您需要优先考虑您的未来:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

Next you need to define comparator that would correctly sort the priority futures:

接下来,您需要定义比较器,以正确排序优先级期货:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

Next let's assume we have a lengthy job like this:

接下来让我们假设我们有一个冗长的工作:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

Then in order to execute these jobs in priority the code will look like:

然后,为了优先执行这些作业,代码将如下所示:

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

This is a lot of code but that's nearly the only way this can be accomplished.

这是很多代码,但这几乎是实现这一目标的唯一方法。

On my machine the output is like the following:

在我的机器上输出如下:

Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97

#3


3  

you can use ThreadPoolExecutor with Priority blocking queue How to implement PriorityBlockingQueue with ThreadPoolExecutor and custom tasks

您可以将ThreadPoolExecutor与优先级阻塞队列一起使用如何使用ThreadPoolExecutor和自定义任务实现PriorityBlockingQueue

#4


2  

You can specify a ThreadFactory in the ThreadPoolExecutor constructor (or Executors factory method). This allows you to provide threads of a given thread priority for the executor.

您可以在ThreadPoolExecutor构造函数(或Executors工厂方法)中指定ThreadFactory。这允许您为执行程序提供给定线程优先级的线程。

To get different thread priorities for different jobs, you'd need to send them to executors with different thread factories.

要为不同的作业获取不同的线程优先级,您需要将它们发送给具有不同线程工厂的执行程序。

#5


2  

You can implement your own ThreadFactory and set it within ThreadPoolExecutor like this:

你可以实现自己的ThreadFactory并在ThreadPoolExecutor中设置它,如下所示:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

where my OpJobThreadFactory looks like the following:

我的OpJobThreadFactory如下所示:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}

#6


0  

Please be aware that setPriority(..) normally does not work under Linux. See the following links for the full details:

请注意,setPriority(..)通常在Linux下不起作用。有关完整详细信息,请参阅以下链接:

#7


0  

Just want to add my bit of contribution to this discussion. I've implemented this ReorderingThreadPoolExecutor for a very specific purpose, which is being able to explicitly bring to the front of the executor's BlockingQueue (in this case LinkedBlockingDeque) whenever I want and without having to deal with priorities (which can lead to deadlocks and are, anyway, fixed).

只是想为这次讨论增添一点贡献。我已经实现了这个ReorderingThreadPoolExecutor用于一个非常特定的目的,它能够在我想要的时候显式地将执行器的BlockingQueue(在本例中为LinkedBlockingDeque)带到前面,而不必处理优先级(这可能导致死锁并且是,无论如何,固定)。

I'm using this to manage (inside an Android app) the case where I have to download many images that are displayed in a long list view. Whenever the user scrolls down quickly, the executor queue gets flooded of image download requests: by moving the latest ones on the top of the queue, I have achieved much better performances in loading the images which are actually on the screen, delaying the download of the ones that will probably needed later. Note that I use an internal concurrent map key (which can be as simple as the image URL string) to add the tasks to the executor so that I can retrieve them later for the reordering.

我正在使用它来管理(在Android应用程序内部)我必须下载许多以长列表视图显示的图像的情况。每当用户快速向下滚动时,执行程序队列就会被图像下载请求淹没:通过移动队列顶部的最新文件,我在加载实际在屏幕上的图像方面取得了更好的性能,延迟了下载以后可能需要的那些。请注意,我使用内部并发映射键(可以像图像URL字符串一样简单)将任务添加到执行程序,以便稍后可以检索它们以进行重新排序。

There'd have been many other ways of doing the same and maybe it's overcomplicated, but it works fine and also Facebook in his Android SDK is doing something similar in its own working threads queue.

还有很多其他方法可以做同样的事情,也许它过于复杂,但它运行良好,Facebook的Android SDK也在其自己的工作线程队列中做类似的事情。

Feel free to have a look at the code and give me suggestions, it's inside an Android project but stripping a few logs and annotations would make the class pure Java 6.

随意查看代码并给我建议,它在Android项目中,但剥离一些日志和注释会使类纯Java 6。

#1


53  

Currently the only concrete implementations of the Executor interface are the ThreadPoolExecutor and the ScheduledThreadpoolExecutor

目前,Executor接口的唯一具体实现是ThreadPoolExecutor和ScheduledThreadpoolExecutor

Instead of using the utility / factory class Executors, you should create an instance using a constructor.

您应该使用构造函数创建实例,而不是使用实用程序/工厂类Executors。

You can pass a BlockingQueue to the constructors of the ThreadPoolExecutor.

您可以将BlockingQueue传递给ThreadPoolExecutor的构造函数。

One of the implementations of the BlockingQueue, the PriorityBlockingQueue lets you pass a Comparator to a constructor, that way enabling you to decide the order of execution.

作为BlockingQueue的一个实现,PriorityBlockingQueue允许您将Comparator传递给构造函数,这样您就可以决定执行的顺序。

#2


35  

The idea here is to use a PriorityBlockingQueue in the executor. For this:

这里的想法是在执行程序中使用PriorityBlockingQueue。为了这:

  • Create a comparator that would compare our futures.
  • 创建一个比较我们未来的比较器。

  • Create a proxy for the Future to hold a priority.
  • 为Future创建代理以保持优先级。

  • Override 'newTaskFor' in order to wrap every future in our proxy.
  • 覆盖'newTaskFor'以包装代理中的每个未来。

First you need to hold priority on your future:

首先,您需要优先考虑您的未来:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

Next you need to define comparator that would correctly sort the priority futures:

接下来,您需要定义比较器,以正确排序优先级期货:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

Next let's assume we have a lengthy job like this:

接下来让我们假设我们有一个冗长的工作:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

Then in order to execute these jobs in priority the code will look like:

然后,为了优先执行这些作业,代码将如下所示:

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

This is a lot of code but that's nearly the only way this can be accomplished.

这是很多代码,但这几乎是实现这一目标的唯一方法。

On my machine the output is like the following:

在我的机器上输出如下:

Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97

#3


3  

you can use ThreadPoolExecutor with Priority blocking queue How to implement PriorityBlockingQueue with ThreadPoolExecutor and custom tasks

您可以将ThreadPoolExecutor与优先级阻塞队列一起使用如何使用ThreadPoolExecutor和自定义任务实现PriorityBlockingQueue

#4


2  

You can specify a ThreadFactory in the ThreadPoolExecutor constructor (or Executors factory method). This allows you to provide threads of a given thread priority for the executor.

您可以在ThreadPoolExecutor构造函数(或Executors工厂方法)中指定ThreadFactory。这允许您为执行程序提供给定线程优先级的线程。

To get different thread priorities for different jobs, you'd need to send them to executors with different thread factories.

要为不同的作业获取不同的线程优先级,您需要将它们发送给具有不同线程工厂的执行程序。

#5


2  

You can implement your own ThreadFactory and set it within ThreadPoolExecutor like this:

你可以实现自己的ThreadFactory并在ThreadPoolExecutor中设置它,如下所示:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

where my OpJobThreadFactory looks like the following:

我的OpJobThreadFactory如下所示:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}

#6


0  

Please be aware that setPriority(..) normally does not work under Linux. See the following links for the full details:

请注意,setPriority(..)通常在Linux下不起作用。有关完整详细信息,请参阅以下链接:

#7


0  

Just want to add my bit of contribution to this discussion. I've implemented this ReorderingThreadPoolExecutor for a very specific purpose, which is being able to explicitly bring to the front of the executor's BlockingQueue (in this case LinkedBlockingDeque) whenever I want and without having to deal with priorities (which can lead to deadlocks and are, anyway, fixed).

只是想为这次讨论增添一点贡献。我已经实现了这个ReorderingThreadPoolExecutor用于一个非常特定的目的,它能够在我想要的时候显式地将执行器的BlockingQueue(在本例中为LinkedBlockingDeque)带到前面,而不必处理优先级(这可能导致死锁并且是,无论如何,固定)。

I'm using this to manage (inside an Android app) the case where I have to download many images that are displayed in a long list view. Whenever the user scrolls down quickly, the executor queue gets flooded of image download requests: by moving the latest ones on the top of the queue, I have achieved much better performances in loading the images which are actually on the screen, delaying the download of the ones that will probably needed later. Note that I use an internal concurrent map key (which can be as simple as the image URL string) to add the tasks to the executor so that I can retrieve them later for the reordering.

我正在使用它来管理(在Android应用程序内部)我必须下载许多以长列表视图显示的图像的情况。每当用户快速向下滚动时,执行程序队列就会被图像下载请求淹没:通过移动队列顶部的最新文件,我在加载实际在屏幕上的图像方面取得了更好的性能,延迟了下载以后可能需要的那些。请注意,我使用内部并发映射键(可以像图像URL字符串一样简单)将任务添加到执行程序,以便稍后可以检索它们以进行重新排序。

There'd have been many other ways of doing the same and maybe it's overcomplicated, but it works fine and also Facebook in his Android SDK is doing something similar in its own working threads queue.

还有很多其他方法可以做同样的事情,也许它过于复杂,但它运行良好,Facebook的Android SDK也在其自己的工作线程队列中做类似的事情。

Feel free to have a look at the code and give me suggestions, it's inside an Android project but stripping a few logs and annotations would make the class pure Java 6.

随意查看代码并给我建议,它在Android项目中,但剥离一些日志和注释会使类纯Java 6。