共享内存循环形队列池设计

时间:2021-03-13 17:39:25

1、 简述
队列是一种先进先出(FIFO)的线性表数据结构,常见的操作如在表的尾部插入,在头部删除数据。队列的类型有链表结构、固定缓冲区结构等。常用的队列空间都是动态地从堆中申请,在数据量操作频繁的任务中,带来系统实时性和内存碎片等问题。
本文采用的是共享循环队列池,共享内存队列来解决进程间通信数据量大的场景。
共享内存循环形队列池设计

队列长度计算公式:nCount = (rear - front + nSize) % nSize;
Rear:表示队列尾偏移量,该量放置在共享内存Head中,对所有共享进程可视化;
Front:表示当前进程的头偏移量,该量为进程私有变量,对当前进程可视化;
nSize:表示队列最大单元结构数目,该量放置在共享内存Head中,对所有共享进程可 视化。
2、 设计原则
该队列设计主要实现多进程之间共享数据,队列空间申请的大小可以自定义,队列每个单元结构可以通过模板进行构造。该队列创建完成之后,后面的进程直接映象即可。该队列为了防止非法操作,在映射时验证名称和结构单元的长度。
3、 设计代码

#ifndef _AESHAREQUEUE_H_
#define _AESHAREQUEUE_H_

#include "aefc/AEShareKey.inl"
#include "aefc/AEShareMemory.h"

#ifdef AE_QUEUE_EXPORTS
#define AE_QUEUE_ENTRY A_DECL_EXPORT
#else
#define AE_QUEUE_ENTRY A_DECL_IMPORT
#endif

//
// 循环队列类模版
// 
const size_t _MAX_QUEUE_SIZE = 1024;
const int   _NAME_QUENE_SIZE = 48;

#define QUEUE_T_NAME(x) (#x)

template <typename T>
class AE_QUEUE_ENTRY CAEShareQueue
{
         struct queue_header
         {
                   char name[_NAME_QUENE_SIZE];
                   unsigned int len;
                   size_t size;
                   size_t rear;
         };
public:
         CAEShareQueue();

    virtual ~CAEShareQueue();

public:

         bool bindQueue(char *name, size_t &size);

public:     
    size_t getSize() const;

    int enqueue(const T value);

    int dequeue(T *value);

private:
         bool createQueue(char *name, size_t &size);

private:
    T               *pQueue_;
    size_t                     front_;

private:
         queue_header *header_;
         void             *shmaddr_;
         void             *shmHanlde_;
};

template <typename T>
CAEShareQueue<T>::CAEShareQueue()
         : front_(0), pQueue_(NULL)
{
         header_ = NULL;
         shmaddr_ = shmHanlde_ = NULL;
}

template <typename T>
bool CAEShareQueue<T>::createQueue(char *name, size_t &size)
{      
         CAEShareMemory share;
         assert(name != NULL);
         int nCreateFlag = 0;
         size = (size == 0 ? _MAX_QUEUE_SIZE : size);

         int nTotalSize = ((sizeof(T) * size +sizeof(queue_header) + 7) / 8) * 8;
         if (pQueue_ == NULL)
         {                                  
                   void *pData = (void *)share.CreateAndMapping(
                            AEQUEUE_SHM_DATA, nTotalSize, &nCreateFlag);

                   assert(pData != NULL);

                   // Init header information. 
                   header_ = (queue_header *)pData;
                   if (nCreateFlag)
                   {                
                            strncpy(header_->name, name, _NAME_QUENE_SIZE - 1);
                            header_->len = sizeof(T);
                            header_->size = size;
                            header_->rear = 0;
                   }
                   else
                   {
                            if (strcmp(header_->name, name) != 0 &&
                                     header_->len != sizeof(T))
                            {
                                     returnfalse;
                            }                                                     
                            size = header_->size;
                   }

                   shmaddr_ = pData;
                   shmHanlde_ = share.GetHandle();

                   // Init queue data pointer position.
                   pQueue_ = (T *)((char *)pData +sizeof(queue_header));         
         }
         return true;
}

template <typename T>
bool CAEShareQueue<T>::bindQueue(char *name, size_t &size)
{
         return createQueue(name, size);
}

template <typename T>
CAEShareQueue<T>::~CAEShareQueue()
{
         CAEShareMemory shm;
         shm.UnMapShareMemory(shmaddr_, shmHanlde_);
}

template <typename T>    
size_t CAEShareQueue<T>::getSize() const
{
         if (pQueue_)
         {
                   size_t nSize = header_->size;
                   return (header_->rear - front_ + nSize) % nSize;
         }
         return 0;
}

template <typename T>
int CAEShareQueue<T>::enqueue(const T value)
{
         if (pQueue_)
         {
                   pQueue_[header_->rear] = value;    
                   size_t nSize = header_->size;
                   header_->rear = (header_->rear + 1) % nSize;
                   return 1;
         }
         return 0;
}

template <typename T>
int CAEShareQueue<T>::dequeue(T *value)
{
         if (getSize() <= 0 || !pQueue_)
                   return 0;
         if (pQueue_)
         {
                   *value = pQueue_[front_];
                   size_t nSize = header_->size;
                   front_ = (front_ + 1) % nSize;
                   return 1;
         }
         return 0;
}
#endif // _AESHAREQUEUE_H_

4、 封装与测试
1)、封装一个类为“CAEQueue”,实现对信号消息流读写操作头文件定义(Aequeue.h):

#ifndef _AEQUEUE_H_
#define _AEQUEUE_H_

#ifdef AE_QUEUE_EXPORTS
#define AE_QUEUE_ENTRY A_DECL_EXPORT
#else
#define AE_QUEUE_ENTRY A_DECL_IMPORT
#endif

const unsignedshort _MAX_BODY_LEN = 256;

struct signal_message
{
         long StationId;
         long DimsType;
         long DimsId;
         long status;
         unsigned char label;
         unsigned short len;
         unsigned char stream[_MAX_BODY_LEN];

         signal_message()
         {
                   StationId  = 0;
                   DimsType= 0;
                   DimsId    = 0;
                   status     = 0;
                   label       = 0;
                   len         = 0;
                   stream[0] = '\0';
         };
};

class CAEQueuePrivate;

class AE_QUEUE_ENTRY CAEQueue
{
public:
         CAEQueue();

         virtual ~CAEQueue();

public:
         int getSize();

         int push(signal_message obj);

         int pop(signal_message *pObj);

private:
         CAEQueuePrivate *d;
};

#endif // _AEQUEUE_H_

2)、类的实现部分:

#include "stdafx.h"
#include "signal/aesharequeue.h"
#include "signal/aequeue.h"

class CAEQueuePrivate
{
public:
         CAEShareQueue<signal_message> queue;
};


CAEQueue::CAEQueue() : d(new CAEQueuePrivate)
{
         unsigned int nSize = 4048;
         d->queue.bindQueue(QUEUE_T_NAME(signal_message), nSize);  
}

CAEQueue::~CAEQueue()
{
         delete d;
}

int CAEQueue::getSize()
{
         return d->queue.getSize();
}

int CAEQueue::push(signal_message obj)
{
         return d->queue.enqueue(obj);
}

int CAEQueue::pop(signal_message *pObj)
{
         return d->queue.dequeue(pObj);
}

3)、写和读
Process1:

#include <iostream>
#include "signal/aequeue.h"

using namespace std;

int main (int argc,char *argv [])
{
     CAEQueue queue;
    //
    // Main loop
    //
    cout << "Enter for 'p' to push message or 'x' for exit:\n";
    char c;
    do
    {
        cout << "> ";
        cin >> c;
        if (c == 'p')
            {
                      signal_message obj;
                      obj.StationId = 1
                      obj.DimsType =2;
                      obj.DimsId = 3;
                      obj.status = 4;
                      queue.push(obj);
                      printf("StationId : %d, DimsType :%d, DimsId :%d, status :%d",
                              obj.StationId, obj.DimsType, obj.DimsId, obj.status);
             }
    }
    while (cin.good() && c != 'x');
    system("pause");
    return 0;
}

Process2:

#include <iostream>
#include "signal/aequeue.h"

using namespace std;

int main (int argc,char *argv [])
{
   CAEQueue queue;
    //
    // Main loop
    //
    cout << "Enter for 'p' to post message or 'x' for exit:\n";
    char c;
    do
    {
        cout << "> ";
        cin >> c;
        if (c == 'p')
            {
                      signal_message obj;
                      if (queue.pop(&obj))
                      {
                          printf("StationId : %d, DimsType :%d, DimsId :%d, status :%d",
                                  obj.StationId, obj.DimsType, obj.DimsId, obj.status);
                       }
             }
    }
    while (cin.good() && c !='x');
    system("pause");
    return 0;
} 

共享内存循环形队列池设计

图4-1 读写流程
5、 总结
共享内存循环队列池适合不同进程模块任务的分担,写进程负责信号消息的入队写操作,而读进程负责消息的出列实现消息调度。共享队列需要考虑的问题,主要包括队列空间大小申请和读写速度的配匹,考虑任务的实时性,在并发控制中读和写操作没有考虑锁粒度相关问题。