C ++相当于Java的BlockingQueue

时间:2022-02-16 00:27:13

I'm in the process of porting some Java code over to C++, and one particular section makes use of a BlockingQueue to pass messages from many producers to a single consumer.

我正在将一些Java代码移植到C ++,一个特定的部分利用BlockingQueue将消息从许多生产者传递给单个消费者。

If you are not familiar with what a Java BlockingQueue is, it is just a queue that has a hard capacity, which exposes thread safe methods to put() and take() from the queue. put() blocks if the queue is full, and take() blocks if the queue is empty. Also, timeout-sensitive versions of these methods are supplied.

如果您不熟悉Java BlockingQueue是什么,它只是一个具有硬容量的队列,它将线程安全方法暴露给队列中的put()和take()。如果队列已满,则put()阻塞;如果队列为空,则使用take()块。此外,还提供了这些方法的超时敏感版本。

Timeouts are relevant to my use-case, so a recommendation that supplies those is ideal. If not, I can code up some myself.

超时与我的用例相关,因此提供这些超时的建议是理想的。如果没有,我可以自己编写代码。

I've googled around and quickly browsed the Boost libraries and I'm not finding anything like this. Maybe I'm blind here...but does anyone know of a good recommendation?

我已经google了一下,很快浏览了Boost库,我找不到这样的东西。也许我在这里失明了...但有人知道一个好推荐吗?

Thanks!

2 个解决方案

#1


35  

It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:

它不是固定大小,它不支持超时,但这是我最近使用C ++ 2011构造发布的队列的简单实现:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.

扩展和使用定时等待弹出应该是微不足道的。我没有做到的主要原因是我对目前为止我想到的界面选择不满意。

#2


0  

Here's an example of a blocking queue with shutdown request feature:

以下是具有关闭请求功能的阻塞队列的示例:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};

#1


35  

It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:

它不是固定大小,它不支持超时,但这是我最近使用C ++ 2011构造发布的队列的简单实现:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.

扩展和使用定时等待弹出应该是微不足道的。我没有做到的主要原因是我对目前为止我想到的界面选择不满意。

#2


0  

Here's an example of a blocking queue with shutdown request feature:

以下是具有关闭请求功能的阻塞队列的示例:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};