如何在take()上中断阻塞的阻塞队列?

时间:2022-06-02 20:57:07

I have a class that takes objects from a BlockingQueue and processes them by calling take() in a continuous loop. At some point I know that no more objects will be added to the queue. How do I interrupt the take() method so that it stops blocking?

我有一个类,它从一个BlockingQueue获取对象,并通过在一个连续循环中调用take()来处理它们。在某些时候,我知道不再向队列添加任何对象。如何中断take()方法,使其停止阻塞?

Here's the class that processes the objects:

这是处理对象的类:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

And here's the method that uses this class to process objects:

这里是使用这个类来处理对象的方法:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}

5 个解决方案

#1


62  

If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.

如果中断线程不是一种选项,另一种方法是在队列中放置一个“标记”或“命令”对象,MyObjHandler就会识别这个对象并跳出循环。

#2


12  

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

However, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using poll instead of take, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.

但是,如果这样做,在队列中仍然有项目等待处理时,线程可能会被中断。您可能想要考虑使用轮询而不是take,这将允许处理线程在没有新输入的情况下等待一段时间后超时并终止。

#3


12  

Very late but Hope this helps other too as I faced the similar problem and used the poll approach suggested by erickson above with some minor changes,

很晚了,但是希望这对其他人也有帮助,因为我遇到了类似的问题,使用了上面erickson建议的投票方法,并做了一些小小的改变,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

This solved both the problems

这就解决了这两个问题

  1. Flagged the BlockingQueue so that it knows it has not to wait more for elements
  2. 标记块队列,以便它知道不必等待更多的元素
  3. Did not interrupted in between so that processing blocks terminates only when all the items in queue are processed and there are no items remaining to be added
  4. 是否在处理队列中的所有项且没有项需要添加时才终止处理块

#4


1  

Interrupt the thread:

中断线程:

thread.interrupt()

#5


0  

Or don't interrupt, its nasty.

或者别插嘴,这很讨厌。

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. when producer puts object to the queue, call queue.notify(), if it ends, call queue.done()
  2. 当生成器将对象放入队列时,调用queue.notify(),如果结束,则调用queue.done()
  3. loop while (!queue.isDone() || !queue.isEmpty())
  4. 循环while (!queue.isDone() || !queue.isEmpty()
  5. test take() return value for null
  6. test take()返回空值

#1


62  

If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.

如果中断线程不是一种选项,另一种方法是在队列中放置一个“标记”或“命令”对象,MyObjHandler就会识别这个对象并跳出循环。

#2


12  

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

However, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using poll instead of take, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.

但是,如果这样做,在队列中仍然有项目等待处理时,线程可能会被中断。您可能想要考虑使用轮询而不是take,这将允许处理线程在没有新输入的情况下等待一段时间后超时并终止。

#3


12  

Very late but Hope this helps other too as I faced the similar problem and used the poll approach suggested by erickson above with some minor changes,

很晚了,但是希望这对其他人也有帮助,因为我遇到了类似的问题,使用了上面erickson建议的投票方法,并做了一些小小的改变,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

This solved both the problems

这就解决了这两个问题

  1. Flagged the BlockingQueue so that it knows it has not to wait more for elements
  2. 标记块队列,以便它知道不必等待更多的元素
  3. Did not interrupted in between so that processing blocks terminates only when all the items in queue are processed and there are no items remaining to be added
  4. 是否在处理队列中的所有项且没有项需要添加时才终止处理块

#4


1  

Interrupt the thread:

中断线程:

thread.interrupt()

#5


0  

Or don't interrupt, its nasty.

或者别插嘴,这很讨厌。

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. when producer puts object to the queue, call queue.notify(), if it ends, call queue.done()
  2. 当生成器将对象放入队列时,调用queue.notify(),如果结束,则调用queue.done()
  3. loop while (!queue.isDone() || !queue.isEmpty())
  4. 循环while (!queue.isDone() || !queue.isEmpty()
  5. test take() return value for null
  6. test take()返回空值