1、 简述
队列是一种先进先出(FIFO)的线性表数据结构,常见的操作如在表的尾部插入,在头部删除数据。队列的类型有链表结构、固定缓冲区结构等。常用的队列空间都是动态地从堆中申请,在数据量操作频繁的任务中,带来系统实时性和内存碎片等问题。
本文采用的是共享循环队列池,共享内存队列来解决进程间通信数据量大的场景。
队列长度计算公式:nCount = (rear - front + nSize) % nSize;
Rear:表示队列尾偏移量,该量放置在共享内存Head中,对所有共享进程可视化;
Front:表示当前进程的头偏移量,该量为进程私有变量,对当前进程可视化;
nSize:表示队列最大单元结构数目,该量放置在共享内存Head中,对所有共享进程可 视化。
2、 设计原则
该队列设计主要实现多进程之间共享数据,队列空间申请的大小可以自定义,队列每个单元结构可以通过模板进行构造。该队列创建完成之后,后面的进程直接映象即可。该队列为了防止非法操作,在映射时验证名称和结构单元的长度。
3、 设计代码
#ifndef _AESHAREQUEUE_H_
#define _AESHAREQUEUE_H_
#include "aefc/AEShareKey.inl"
#include "aefc/AEShareMemory.h"
#ifdef AE_QUEUE_EXPORTS
#define AE_QUEUE_ENTRY A_DECL_EXPORT
#else
#define AE_QUEUE_ENTRY A_DECL_IMPORT
#endif
//
// 循环队列类模版
//
const size_t _MAX_QUEUE_SIZE = 1024;
const int _NAME_QUENE_SIZE = 48;
#define QUEUE_T_NAME(x) (#x)
template <typename T>
class AE_QUEUE_ENTRY CAEShareQueue
{
struct queue_header
{
char name[_NAME_QUENE_SIZE];
unsigned int len;
size_t size;
size_t rear;
};
public:
CAEShareQueue();
virtual ~CAEShareQueue();
public:
bool bindQueue(char *name, size_t &size);
public:
size_t getSize() const;
int enqueue(const T value);
int dequeue(T *value);
private:
bool createQueue(char *name, size_t &size);
private:
T *pQueue_;
size_t front_;
private:
queue_header *header_;
void *shmaddr_;
void *shmHanlde_;
};
template <typename T>
CAEShareQueue<T>::CAEShareQueue()
: front_(0), pQueue_(NULL)
{
header_ = NULL;
shmaddr_ = shmHanlde_ = NULL;
}
template <typename T>
bool CAEShareQueue<T>::createQueue(char *name, size_t &size)
{
CAEShareMemory share;
assert(name != NULL);
int nCreateFlag = 0;
size = (size == 0 ? _MAX_QUEUE_SIZE : size);
int nTotalSize = ((sizeof(T) * size +sizeof(queue_header) + 7) / 8) * 8;
if (pQueue_ == NULL)
{
void *pData = (void *)share.CreateAndMapping(
AEQUEUE_SHM_DATA, nTotalSize, &nCreateFlag);
assert(pData != NULL);
// Init header information.
header_ = (queue_header *)pData;
if (nCreateFlag)
{
strncpy(header_->name, name, _NAME_QUENE_SIZE - 1);
header_->len = sizeof(T);
header_->size = size;
header_->rear = 0;
}
else
{
if (strcmp(header_->name, name) != 0 &&
header_->len != sizeof(T))
{
returnfalse;
}
size = header_->size;
}
shmaddr_ = pData;
shmHanlde_ = share.GetHandle();
// Init queue data pointer position.
pQueue_ = (T *)((char *)pData +sizeof(queue_header));
}
return true;
}
template <typename T>
bool CAEShareQueue<T>::bindQueue(char *name, size_t &size)
{
return createQueue(name, size);
}
template <typename T>
CAEShareQueue<T>::~CAEShareQueue()
{
CAEShareMemory shm;
shm.UnMapShareMemory(shmaddr_, shmHanlde_);
}
template <typename T>
size_t CAEShareQueue<T>::getSize() const
{
if (pQueue_)
{
size_t nSize = header_->size;
return (header_->rear - front_ + nSize) % nSize;
}
return 0;
}
template <typename T>
int CAEShareQueue<T>::enqueue(const T value)
{
if (pQueue_)
{
pQueue_[header_->rear] = value;
size_t nSize = header_->size;
header_->rear = (header_->rear + 1) % nSize;
return 1;
}
return 0;
}
template <typename T>
int CAEShareQueue<T>::dequeue(T *value)
{
if (getSize() <= 0 || !pQueue_)
return 0;
if (pQueue_)
{
*value = pQueue_[front_];
size_t nSize = header_->size;
front_ = (front_ + 1) % nSize;
return 1;
}
return 0;
}
#endif // _AESHAREQUEUE_H_
4、 封装与测试
1)、封装一个类为“CAEQueue”,实现对信号消息流读写操作头文件定义(Aequeue.h):
#ifndef _AEQUEUE_H_
#define _AEQUEUE_H_
#ifdef AE_QUEUE_EXPORTS
#define AE_QUEUE_ENTRY A_DECL_EXPORT
#else
#define AE_QUEUE_ENTRY A_DECL_IMPORT
#endif
const unsignedshort _MAX_BODY_LEN = 256;
struct signal_message
{
long StationId;
long DimsType;
long DimsId;
long status;
unsigned char label;
unsigned short len;
unsigned char stream[_MAX_BODY_LEN];
signal_message()
{
StationId = 0;
DimsType= 0;
DimsId = 0;
status = 0;
label = 0;
len = 0;
stream[0] = '\0';
};
};
class CAEQueuePrivate;
class AE_QUEUE_ENTRY CAEQueue
{
public:
CAEQueue();
virtual ~CAEQueue();
public:
int getSize();
int push(signal_message obj);
int pop(signal_message *pObj);
private:
CAEQueuePrivate *d;
};
#endif // _AEQUEUE_H_
2)、类的实现部分:
#include "stdafx.h"
#include "signal/aesharequeue.h"
#include "signal/aequeue.h"
class CAEQueuePrivate
{
public:
CAEShareQueue<signal_message> queue;
};
CAEQueue::CAEQueue() : d(new CAEQueuePrivate)
{
unsigned int nSize = 4048;
d->queue.bindQueue(QUEUE_T_NAME(signal_message), nSize);
}
CAEQueue::~CAEQueue()
{
delete d;
}
int CAEQueue::getSize()
{
return d->queue.getSize();
}
int CAEQueue::push(signal_message obj)
{
return d->queue.enqueue(obj);
}
int CAEQueue::pop(signal_message *pObj)
{
return d->queue.dequeue(pObj);
}
3)、写和读
Process1:
#include <iostream>
#include "signal/aequeue.h"
using namespace std;
int main (int argc,char *argv [])
{
CAEQueue queue;
//
// Main loop
//
cout << "Enter for 'p' to push message or 'x' for exit:\n";
char c;
do
{
cout << "> ";
cin >> c;
if (c == 'p')
{
signal_message obj;
obj.StationId = 1
obj.DimsType =2;
obj.DimsId = 3;
obj.status = 4;
queue.push(obj);
printf("StationId : %d, DimsType :%d, DimsId :%d, status :%d",
obj.StationId, obj.DimsType, obj.DimsId, obj.status);
}
}
while (cin.good() && c != 'x');
system("pause");
return 0;
}
Process2:
#include <iostream>
#include "signal/aequeue.h"
using namespace std;
int main (int argc,char *argv [])
{
CAEQueue queue;
//
// Main loop
//
cout << "Enter for 'p' to post message or 'x' for exit:\n";
char c;
do
{
cout << "> ";
cin >> c;
if (c == 'p')
{
signal_message obj;
if (queue.pop(&obj))
{
printf("StationId : %d, DimsType :%d, DimsId :%d, status :%d",
obj.StationId, obj.DimsType, obj.DimsId, obj.status);
}
}
}
while (cin.good() && c !='x');
system("pause");
return 0;
}
图4-1 读写流程
5、 总结
共享内存循环队列池适合不同进程模块任务的分担,写进程负责信号消息的入队写操作,而读进程负责消息的出列实现消息调度。共享队列需要考虑的问题,主要包括队列空间大小申请和读写速度的配匹,考虑任务的实时性,在并发控制中读和写操作没有考虑锁粒度相关问题。