后端程序员之路 41、BlockingQueue

时间:2021-08-22 12:16:46

BlockingQueue,阻塞队列,常用于实现生产者和消费者模型
特点:
1、队列为空时,取操作会等到队列有数据
2、队列满时,存操作会等到队列可用

基于C++11的阻塞队列简单实现 - Cynric 的博客 - 博客频道 - CSDN.NET
http://blog.csdn.net/cywosp/article/details/9157379


参考java的阻塞队列实现,还可以有以下细节:
1、ArrayBlockingQueue
用数组来存队列里的数据,可以避免用链表时额外的node对象创建销毁开销
2、LinkedBlockingQueue
最常用,队列可以有无限容量,但是生产速度过快会爆掉内存
3、DelayQueue
node只有延迟时间到了才能取到,存操作不会阻塞
4、PriorityBlockingQueue
类似DelayQueue,传入Compator来决定优先级
5、SynchronousQueue
没有缓冲的等待队列

BlockingQueue - - ITeye技术网站
http://wsmajunfeng.iteye.com/blog/1629354

最终实现的简单BlockingQueue:

#include <semaphore.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <list>

template<class T> class BlockingQueue {
public:
    std::list<T> _queue;
    size_t _curr_size;
    pthread_mutex_t _lock;
    sem_t _consumer_sem;
    sem_t _producer_sem;
    int _max_size;

public:
    BlockingQueue(int max_size) {
        pthread_mutex_init(&_lock, NULL);
        sem_init(&_consumer_sem, 0, 0);
        _max_size = max_size;
        _curr_size = 0;
        if (max_size > 0) {
            sem_init(&_producer_sem, 0, max_size);
        }
    }

    ~BlockingQueue() {
        pthread_mutex_destroy(&_lock);
        sem_destroy(&_consumer_sem);
        if (_max_size > 0) {
            sem_destroy(&_producer_sem);
        }
    }

    void push(const T& value) {
        if (_max_size > 0) {
            sem_wait(&_producer_sem);
        }

        pthread_mutex_lock(&_lock);
        _queue.push_back(value);
        ++_curr_size;
        sem_post(&_consumer_sem);
        pthread_mutex_unlock(&_lock);
    }

    void batch_push(const std::vector<T>& values) {
        for (uint32_t i = 0; i < values.size(); ++i) {
            push( values[i] );
        }
    }
    
    T take() {
        sem_wait(&_consumer_sem);
        pthread_mutex_lock(&_lock);
        T value = _queue.front();
        _queue.pop_front();
        --_curr_size;
        if (_max_size > 0) {
            sem_post(&_producer_sem);
        }
        pthread_mutex_unlock(&_lock);
        return value;
    }

    bool try_take(T& out) {
        int ret = sem_trywait(&_consumer_sem);
        if (ret == -1 && errno == EAGAIN) {
            return false;
        }

        pthread_mutex_lock(&_lock);
        out = _queue.front();
        _queue.pop_front();
        --_curr_size;
        if (_max_size > 0) {
            sem_post(&_producer_sem);
        }
        pthread_mutex_unlock(&_lock);
        return true;
    }

    size_t size() {
        return _curr_size;
    }
};