IPC之—消息队列

时间:2021-02-12 17:36:45

什么是消息队列

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。每个数据块都被认为是有一个类型,接收者今晨会二手的数据块可以有不同的类型值。我们可以通过发送消息来避免命名管道的同步和阻塞问题。消息队列与管道不同的是,消息队列是基于消息的,而管道是基于字节流的,且消息队列的读取不一定是先入先出。消息队列与命名管道有一样的不足,就是每个消息队列的最大长度是有上限的,每个消息队列的总子节数是有上限的,系统上消息队列的总数也有一个上限。
如下图所示
IPC之—消息队列
内核为每个IPC对象维护一个数据结构(/usr/include/liux/ipc.h)
如下图
IPC之—消息队列

消息队列的特点

  1. 消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.
  2. 消息队列允许一个或多个进程向它写入与读取消息.
  3. 管道和命名管道都是通信数据都是先进先出的原则。
  4. 消息队列可以实现消息的随机查询,消息不一定要以先进先出的次序读取,也可以按消息的类型读取.比FIFO更有优势
  5. 生命周期随内核。
    消息队列的结构如下图所示
    IPC之—消息队列
    可以看到第一个条目就是IPC结构体,即是共有的,后面的都是消息队列所私有的成员,消息队列是用链表实现的。

创建消息队列

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflag)

创建如果成功,返回值将消息队列标识符(非负整数),否则返回-1。
key 可以认为是一个端口号,也可以由ftok函数生成。
msgflag :
IPC_CREAT: 如果IPC不存在,就创建一个,否则就打开操作。
IPC_EXCL:只有在共享内存不存在时,显得共享内存才建立,否则就产生错误
IPC_CREAT和IPC_EXCL一起使用,msgget()将返回一个新建的IPC标识符;如果该IPC资源已经存在,或者返回-1.
下面用代码实现创建一个消息队列,并用命令删除消息队列

#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#define PATHNAME "."//设置路径为当前目录
#define ID 0
int main()
{
key_t key = ftok(PATHNAME,ID);//创建key值
if(key<0)
{
perror("ftok error!");
return -1;
}
int msgid = msgget(key,IPC_CREAT|IPC_EXCL|0666);//创建消息队列,并把权限改为666
if(msgid<0)
{
perror("msgget error!\n");
return -2;
}
return msgid;
}

运行结果如下,
IPC之—消息队列

消息队列的删除

上面已经提到了,用命令删除一个消息队列,下面介绍一下如何用代码删除一个消息队列。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
//他不仅可以实现删除消息队列,还是控制型函数

第一个参数就是要删除的消息队列的ID,第二个参数是IPC_RMID,第三个参数不需要关心,直接设置为空就可。删除成功返回0,失败返回-1.

if(msgctl(msgid,IPC_RMID,NULL)<0)
{
printf("msgctl perror!\n");
return -3;
}
else
return 0;
//这几行代码只需要加入上面的代码创建好消息队列后,就可以验证。

设置消息队列的属性

这个和上面所用到的删除消息队列的函数是一样的。
msg系统调用对msgqid标识的消息队列执行cmd操作,系统定义了3中cmd操作:IPC_STAT,IPC_SET,IPC_RMID
IPC_STAT:用来获取消息队列对应的msgqid_ds数据结构,将其保存在 buf 指定的地址空间。
IPC_SET :用来设置消息队列中的属性,要设置的属性存在 buf 中
IPC_RMID:从内核中删除msgqid标识的消息队列。
既然消息队列是用来实现进程间通信的,那它必然就会有读和写功能。这里的读和写不同,system V是系统提供的第三方接口,和管道不一样,它的其中之一特点是可以实现双向通信。
读写所调用的函数是msgsnd()和msgrcv

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); //msgsnd将数据放到消息队列中
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); //msgrcv从队列中取⽤消息

msqid:消息队列的标识码
msgp:指向消息缓冲区的指针,此位置是来暂时存储发送和接收的消息,是一个用户可定义的通用结构
msgsz:消息的大小。
msgtyp:从消息队列内读取的消息形态。如果值为零,则表示消息队列中的所有消息都会被读取。
函数如果调用成功则返回0,失败则返回-1;

用消息队列实现进程间通信,代码如下

//comm.h
#ifndef _COMM_H_
#define _COMM_H_

#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/types.h>
#include<sys/msg.h>
#include<string.h>

#define FILENAME "."//代表但当前路径
#define ID 0x6666
#define SERVER_TYPE 1
#define CLIENT_TYPE 2

struct _msgbuf
{
long mtype;
char mtext[1024];
};
int creatMsgQueue();//消息队列的创建
int getMsgQueue();//获取消息队列
int destoryMsgQueue(int msgid);//消息队列的删除
int sendMsg(int msgid,int type,const char*msg);//发送消息
int recvMsg(int msgid,int type,char *out);//接收消息

#endif
//comm.c
#include"comm.h"
static int commMsgQueue(int flags)
{
key_t _key = ftok(FILENAME,ID);//获取key值
if(_key<0)
{
printf("ftok");
return -1;
}
int msgid = msgget(_key,flags);
if(msgid<0)
{
printf("msgget");
return -2;
}
return msgid;
}

int creatMsgQueue()
{
return commMsgQueue(IPC_CREAT | IPC_EXCL|0666);
}

int getMsgQueue()
{
return commMsgQueue(IPC_CREAT);
}
int destoryMsgQueue(int msgid)
{
if(msgctl(msgid,IPC_RMID,NULL)<0)
{
perror("msgctl");
return -1;
}
return 0;
}
int sendMsg(int msgid,int type,const char*msg)
{
struct _msgbuf _mb;
_mb.mtype = type;
strcpy(_mb.mtext,msg);
if(msgsnd(msgid,&_mb,sizeof(_mb.mtext),0)<0)
{
perror("msgsnd");
return -1;
}
return 0;
}
int recvMsg(int msgid,int type,char *out)
{
truct _msgbuf _mb;
if(msgrcv(msgid,&_mb,sizeof(_mb.mtext),type,0)<0)
{
perror("msgrcv");
return -1;
}
strcpy(out,_mb.mtext);
return 0;
}
//server.c
#include"comm.h"
int main()
{
int msgid = creatMsgQueue();//creat msg
char buf[1024];
while(1)
{
buf[0] = 0;
recvMsg(msgid,CLIENT_TYPE,buf);
printf("client say# %s\n",buf);
printf("please Enter# ");
fflush(stdout);//使上面那句话刷新到屏幕
ssize_t s = read(0,buf,sizeof(buf)-1);
if(s>0)
{
buf[s-1] = 0;//buyonghuanhang
sendMsg(msgid,SERVER_TYPE,buf);
}
}
destoryMsgQueue(msgid);
return 0;

}
//client.c
#include"comm.h"

int main()
{
int msgid = getMsgQueue();
char buf[1024];
while(1)
{
buf[0] = 0;
printf("client Enter#");
fflush(stdout);
ssize_t s = read(0,buf,sizeof(buf)-1);
if(s>0)
{
buf[s-1] = 0;
sendMsg(msgid,CLIENT_TYPE,buf);
}
recvMsg(msgid,SERVER_TYPE,buf);
printf("server say# %s\n",buf);
}
return 0;
}

运行结果如下,实现了消息队列之间的通信
IPC之—消息队列