如何使用Java 5中的ExecutorService实现任务优先级划分?

时间:2022-06-12 07:09:47

I am implementing a thread pooling mechanism in which I'd like to execute tasks of varying priorities. I'd like to have a nice mechanism whereby I can submit a high priority task to the service and have it be scheduled before other tasks. The priority of the task is an intrinsic property of the task itself (whether I express that task as a Callable or a Runnable is not important to me).

我正在实现一个线程池机制,我想在其中执行不同优先级的任务。我想有一个很好的机制,我可以向服务提交高优先级的任务,并在其他任务之前安排它。任务的优先级是任务本身的固有属性(无论我将该任务表示为Callable还是Runnable对我来说都不重要)。

Now, superficially it looks like I could use a PriorityBlockingQueue as the task queue in my ThreadPoolExecutor, but that queue contains Runnable objects, which may or may not be the Runnable tasks I've submitted to it. Moreover, if I've submitted Callable tasks, it's not clear how this would ever map.

现在,从表面上看,我可以使用PriorityBlockingQueue作为我的ThreadPoolExecutor中的任务队列,但该队列包含Runnable对象,这些对象可能是也可能不是我提交给它的Runnable任务。此外,如果我提交了可调用任务,则不清楚这是如何映射的。

Is there a way to do this? I'd really rather not roll my own for this, since I'm far more likely to get it wrong that way.

有没有办法做到这一点?我真的宁愿不为此而努力,因为我更有可能以这种方式弄错。

(An aside; yes, I'm aware of the possibility of starvation for lower-priority jobs in something like this. Extra points (?!) for solutions that have a reasonable guarantee of fairness)

(旁白;是的,我知道在这样的事情中,低优先级工作可能会出现饥饿。对于有合理保证公平性的解决方案,可以获得额外积分(?!))

6 个解决方案

#1


8  

At first blush it would seem you could define an interface for your tasks that extends Runnable or Callable<T> and Comparable. Then wrap a ThreadPoolExecutor with a PriorityBlockingQueue as the queue, and only accept tasks that implement your interface.

乍一看,你似乎可以为你的任务定义一个扩展Runnable或Callable 和Comparable的接口。然后使用PriorityBlockingQueue作为队列包装ThreadPoolExecutor,并仅接受实现接口的任务。

Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor, and override the submit() methods. Refer to AbstractExecutorService to see what the default ones look like; all they do is wrap the Runnable or Callable in a FutureTask and execute() it. I'd probably do this by writing a wrapper class that implements ExecutorService and delegates to an anonymous inner ThreadPoolExecutor. Wrap them in something that has your priority, so that your Comparator can get at it.

考虑到您的注释,看起来一个选项是扩展ThreadPoolExecutor,并覆盖submit()方法。请参阅AbstractExecutorService以查看默认的内容;他们所做的就是将Runnable或Callable包装在FutureTask中并执行()它。我可能通过编写一个实现ExecutorService的包装类并委托给一个匿名的内部ThreadPoolExecutor来实现这一点。将它们包裹在具有您优先级的东西中,以便您的比较器可以获得它。

#2


16  

I have solved this problem in a reasonable fashion, and I'll describe it below for future reference to myself and anyone else who runs into this problem with the Java Concurrent libraries.

我已经以合理的方式解决了这个问题,我将在下面对其进行描述,以便将来参考我自己以及使用Java Concurrent库遇到此问题的任何其他人。

Using a PriorityBlockingQueue as the means for holding onto tasks for later execution is indeed a movement in the correct direction. The problem is that the PriorityBlockingQueue must be generically instantiated to contain Runnable instances, and it is impossible to call compareTo (or similiar) on a Runnable interface.

使用PriorityBlockingQueue作为保留任务以供以后执行的手段确实是一个正确方向的运动。问题是PriorityBlockingQueue必须通常实例化以包含Runnable实例,并且无法在Runnable接口上调用compareTo(或类似)。

Onto solving the problem. When creating the Executor, it must be given a PriorityBlockingQueue. The queue should further be given a custom Comparator to do proper in place sorting:

解决问题。创建Executor时,必须给它一个PriorityBlockingQueue。队列应该进一步给定一个自定义比较器来进行适当的位置排序:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Now, a peek at CustomTaskComparator:

现在,看看CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Everything looking pretty straight forward up to this point. It gets a bit sticky here. Our next problem is to deal with the creation of FutureTasks from the Executor. In the Executor, we must override newTaskFor as so:

到目前为止,一切看起来都非常直截了当。这里有点粘。我们的下一个问题是处理Executor创建FutureTasks。在Executor中,我们必须覆盖newTaskFor:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Where c is the Callable task that we're trying to execute. Now, let's have a peek at CustomFutureTask:

其中c是我们尝试执行的Callable任务。现在,让我们来看看CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Notice the getTask method. We're gonna use that later to grab the original task out of this CustomFutureTask that we've created.

注意getTask方法。稍后我们将使用它来从我们创建的CustomFutureTask中获取原始任务。

And finally, let's modify the original task that we were trying to execute:

最后,让我们修改我们尝试执行的原始任务:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

You can see that we implement Comparable in the task to delegate to the actual Comparator for MyType.

您可以看到我们在任务中实现了Comparable,以委托给MyType的实际Comparator。

And there you have it, customized prioritization for an Executor using the Java libraries! It takes some bit of bending, but it's the cleanest that I've been able to come up with. I hope this is helpful to someone!

你有它,使用Java库为Executor定制优先级!它需要一些弯曲,但它是我能够想出的最干净的。我希望这对某人有帮助!

#3


4  

You can use these helper classes:

您可以使用这些帮助程序类:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

AND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

AND this helper method:

和这个帮手方法:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

AND then use it like this:

然后像这样使用它:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

#4


3  

I will try to explain this problem with a fully functional code. But before diving into the code I would like to explain about PriorityBlockingQueue

我将尝试用功能齐全的代码解释这个问题。但在深入研究代码之前,我想解释一下PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue is an implementation of BlockingQueue. It accepts the tasks along with their priority and submits the task with the highest priority for execution first. If any two tasks have same priority, then we need to provide some custom logic to decide which task goes first.

PriorityBlockingQueue:PriorityBlockingQueue是BlockingQueue的实现。它接受任务及其优先级,并首先提交具有最高优先级的任务。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定首先执行哪个任务。

Now lets get into the code straightaway.

现在让我们直接进入代码。

Driver class : This class creates an executor which accepts tasks and later submits them for execution. Here we create two tasks one with LOW priority and the other with HIGH priority. Here we tell the executor to run a MAX of 1 threads and use the PriorityBlockingQueue.

驱动程序类:此类创建一个执行程序,它接受任务并稍后提交它们以供执行。这里我们创建两个任务,一个具有LOW优先级,另一个具有HIGH优先级。在这里,我们告诉执行程序运行MAX的1个线程并使用PriorityBlockingQueue。

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

MyTask class : MyTask implements Runnable and accepts priority as an argument in the constructor. When this task runs, it prints a message and then puts the thread to sleep for 1 second.

MyTask类:MyTask实现Runnable并接受优先级作为构造函数中的参数。当此任务运行时,它会打印一条消息,然后让线程进入休眠状态1秒钟。

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask class : Since we are using PriorityBlocingQueue for holding our tasks, our tasks must be wrapped inside FutureTask and our implementation of FutureTask must implement Comparable interface. The Comparable interface compares the priority of 2 different tasks and submits the task with the highest priority for execution.

MyFutureTask类:由于我们使用PriorityBlocingQueue来保存我们的任务,我们的任务必须包含在FutureTask中,我们的FutureTask实现必须实现Comparable接口。 Comparable接口比较2个不同任务的优先级,并提交具有最高执行优先级的任务。

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Priority class : Self explanatory Priority class.

优先级:自解释优先级。

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Now when we run this example, we get the following output

现在,当我们运行此示例时,我们得到以下输出

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Even though we submitted the LOW priority first, but HIGH priority task later, but since we are using a PriorityBlockingQueue, any task with a higher priority will execute first.

即使我们先提交LOW优先级,但稍后提交HIGH优先级任务,但由于我们使用的是PriorityBlockingQueue,因此优先级较高的任务将首先执行。

#5


1  

My solution preserves submition order of tasks for same priorities. It's an improvement of this answer

我的解决方案保留了相同优先级的任务的子订单顺序。这是对这个答案的改进

Task execution order is based on:

任务执行顺序基于:

  1. Priority
  2. 优先
  3. Submit order (within same priority)
  4. 提交订单(优先级相同)

Tester class:

测试人员类:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Result:

结果:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

First task is A1 because there were no higher priority in the queue when it was inserted. B tasks are 1 priority so executed earlier, A tasks are 0 priority so executed later, but execution order is follows submition order: B1, B2, B3, ... A2, A3, A4 ...

第一个任务是A1,因为插入时队列中没有更高的优先级。 B任务是1个优先级,所以先执行,A任务是0优先级,所以稍后执行,但执行顺序遵循子命令:B1,B2,B3,... A2,A3,A4 ......

The solution:

解决方案:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

#6


0  

Would it be possible to have one ThreadPoolExecutor for each level of priority? A ThreadPoolExecutor can be instanciated with a ThreadFactory and you could have your own implementation of a ThreadFactory to set the different priority levels.

是否可以为每个优先级别设置一个ThreadPoolExecutor? ThreadPoolExecutor可以与ThreadFactory实例化,您可以拥有自己的ThreadFactory实现来设置不同的优先级。

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }

#1


8  

At first blush it would seem you could define an interface for your tasks that extends Runnable or Callable<T> and Comparable. Then wrap a ThreadPoolExecutor with a PriorityBlockingQueue as the queue, and only accept tasks that implement your interface.

乍一看,你似乎可以为你的任务定义一个扩展Runnable或Callable 和Comparable的接口。然后使用PriorityBlockingQueue作为队列包装ThreadPoolExecutor,并仅接受实现接口的任务。

Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor, and override the submit() methods. Refer to AbstractExecutorService to see what the default ones look like; all they do is wrap the Runnable or Callable in a FutureTask and execute() it. I'd probably do this by writing a wrapper class that implements ExecutorService and delegates to an anonymous inner ThreadPoolExecutor. Wrap them in something that has your priority, so that your Comparator can get at it.

考虑到您的注释,看起来一个选项是扩展ThreadPoolExecutor,并覆盖submit()方法。请参阅AbstractExecutorService以查看默认的内容;他们所做的就是将Runnable或Callable包装在FutureTask中并执行()它。我可能通过编写一个实现ExecutorService的包装类并委托给一个匿名的内部ThreadPoolExecutor来实现这一点。将它们包裹在具有您优先级的东西中,以便您的比较器可以获得它。

#2


16  

I have solved this problem in a reasonable fashion, and I'll describe it below for future reference to myself and anyone else who runs into this problem with the Java Concurrent libraries.

我已经以合理的方式解决了这个问题,我将在下面对其进行描述,以便将来参考我自己以及使用Java Concurrent库遇到此问题的任何其他人。

Using a PriorityBlockingQueue as the means for holding onto tasks for later execution is indeed a movement in the correct direction. The problem is that the PriorityBlockingQueue must be generically instantiated to contain Runnable instances, and it is impossible to call compareTo (or similiar) on a Runnable interface.

使用PriorityBlockingQueue作为保留任务以供以后执行的手段确实是一个正确方向的运动。问题是PriorityBlockingQueue必须通常实例化以包含Runnable实例,并且无法在Runnable接口上调用compareTo(或类似)。

Onto solving the problem. When creating the Executor, it must be given a PriorityBlockingQueue. The queue should further be given a custom Comparator to do proper in place sorting:

解决问题。创建Executor时,必须给它一个PriorityBlockingQueue。队列应该进一步给定一个自定义比较器来进行适当的位置排序:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Now, a peek at CustomTaskComparator:

现在,看看CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Everything looking pretty straight forward up to this point. It gets a bit sticky here. Our next problem is to deal with the creation of FutureTasks from the Executor. In the Executor, we must override newTaskFor as so:

到目前为止,一切看起来都非常直截了当。这里有点粘。我们的下一个问题是处理Executor创建FutureTasks。在Executor中,我们必须覆盖newTaskFor:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Where c is the Callable task that we're trying to execute. Now, let's have a peek at CustomFutureTask:

其中c是我们尝试执行的Callable任务。现在,让我们来看看CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Notice the getTask method. We're gonna use that later to grab the original task out of this CustomFutureTask that we've created.

注意getTask方法。稍后我们将使用它来从我们创建的CustomFutureTask中获取原始任务。

And finally, let's modify the original task that we were trying to execute:

最后,让我们修改我们尝试执行的原始任务:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

You can see that we implement Comparable in the task to delegate to the actual Comparator for MyType.

您可以看到我们在任务中实现了Comparable,以委托给MyType的实际Comparator。

And there you have it, customized prioritization for an Executor using the Java libraries! It takes some bit of bending, but it's the cleanest that I've been able to come up with. I hope this is helpful to someone!

你有它,使用Java库为Executor定制优先级!它需要一些弯曲,但它是我能够想出的最干净的。我希望这对某人有帮助!

#3


4  

You can use these helper classes:

您可以使用这些帮助程序类:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

AND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

AND this helper method:

和这个帮手方法:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

AND then use it like this:

然后像这样使用它:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

#4


3  

I will try to explain this problem with a fully functional code. But before diving into the code I would like to explain about PriorityBlockingQueue

我将尝试用功能齐全的代码解释这个问题。但在深入研究代码之前,我想解释一下PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue is an implementation of BlockingQueue. It accepts the tasks along with their priority and submits the task with the highest priority for execution first. If any two tasks have same priority, then we need to provide some custom logic to decide which task goes first.

PriorityBlockingQueue:PriorityBlockingQueue是BlockingQueue的实现。它接受任务及其优先级,并首先提交具有最高优先级的任务。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定首先执行哪个任务。

Now lets get into the code straightaway.

现在让我们直接进入代码。

Driver class : This class creates an executor which accepts tasks and later submits them for execution. Here we create two tasks one with LOW priority and the other with HIGH priority. Here we tell the executor to run a MAX of 1 threads and use the PriorityBlockingQueue.

驱动程序类:此类创建一个执行程序,它接受任务并稍后提交它们以供执行。这里我们创建两个任务,一个具有LOW优先级,另一个具有HIGH优先级。在这里,我们告诉执行程序运行MAX的1个线程并使用PriorityBlockingQueue。

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

MyTask class : MyTask implements Runnable and accepts priority as an argument in the constructor. When this task runs, it prints a message and then puts the thread to sleep for 1 second.

MyTask类:MyTask实现Runnable并接受优先级作为构造函数中的参数。当此任务运行时,它会打印一条消息,然后让线程进入休眠状态1秒钟。

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask class : Since we are using PriorityBlocingQueue for holding our tasks, our tasks must be wrapped inside FutureTask and our implementation of FutureTask must implement Comparable interface. The Comparable interface compares the priority of 2 different tasks and submits the task with the highest priority for execution.

MyFutureTask类:由于我们使用PriorityBlocingQueue来保存我们的任务,我们的任务必须包含在FutureTask中,我们的FutureTask实现必须实现Comparable接口。 Comparable接口比较2个不同任务的优先级,并提交具有最高执行优先级的任务。

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Priority class : Self explanatory Priority class.

优先级:自解释优先级。

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Now when we run this example, we get the following output

现在,当我们运行此示例时,我们得到以下输出

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Even though we submitted the LOW priority first, but HIGH priority task later, but since we are using a PriorityBlockingQueue, any task with a higher priority will execute first.

即使我们先提交LOW优先级,但稍后提交HIGH优先级任务,但由于我们使用的是PriorityBlockingQueue,因此优先级较高的任务将首先执行。

#5


1  

My solution preserves submition order of tasks for same priorities. It's an improvement of this answer

我的解决方案保留了相同优先级的任务的子订单顺序。这是对这个答案的改进

Task execution order is based on:

任务执行顺序基于:

  1. Priority
  2. 优先
  3. Submit order (within same priority)
  4. 提交订单(优先级相同)

Tester class:

测试人员类:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Result:

结果:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

First task is A1 because there were no higher priority in the queue when it was inserted. B tasks are 1 priority so executed earlier, A tasks are 0 priority so executed later, but execution order is follows submition order: B1, B2, B3, ... A2, A3, A4 ...

第一个任务是A1,因为插入时队列中没有更高的优先级。 B任务是1个优先级,所以先执行,A任务是0优先级,所以稍后执行,但执行顺序遵循子命令:B1,B2,B3,... A2,A3,A4 ......

The solution:

解决方案:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

#6


0  

Would it be possible to have one ThreadPoolExecutor for each level of priority? A ThreadPoolExecutor can be instanciated with a ThreadFactory and you could have your own implementation of a ThreadFactory to set the different priority levels.

是否可以为每个优先级别设置一个ThreadPoolExecutor? ThreadPoolExecutor可以与ThreadFactory实例化,您可以拥有自己的ThreadFactory实现来设置不同的优先级。

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }