Linux进程间通信-消息队列(mqueue)

时间:2023-12-09 21:33:43

前面两篇文章分解介绍了匿名管道和命名管道方式的进程间通信,本文将介绍Linux消息队列(posix)的通信机制和特点。

1、消息队列

消息队列的实现分为两种,一种为System V的消息队列,一种是Posix消息队列;这篇文章将主要围绕Posix消息队列介绍;

消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反。Posix消息队列与System V消息队列的区别如下:

(1) 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。

(2)当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制。

2、消息队列的基本操作

2.1 打开一个消息队列

#include    <mqueue.h>

typedef int mqd_t;

mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);

返回: 成功时为消息队列描述字,出错时为-1。

功能: 创建一个新的消息队列或打开一个已存在的消息的队列。

2.2 关闭一个消息队列

#include    <mqueue.h>

int mq_close(mqd_t mqdes);

返回: 成功时为0,出错时为-1。

功能: 关闭已打开的消息队列。

2.3 删除一个消息队列

#include    <mqueue.h>

int mq_unlink(const char *name)

返回: 成功时为0,出错时为-1

功能: 从系统中删除消息队列。

这三个函数操作的代码如下:

#include <mqueue.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h> int main(int argc, char* argv[])
{
int flag = O_RDWR | O_CREAT | O_EXCL;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open("/mq_test", flag, mode,NULL);
if (mqid == -)
{
printf("mqueue create failed!\n");
return ;
}
else
{
printf("mqueue create success!\n");
}
mq_close(mqid); return ;
}
#include <mqueue.h>
#include <unistd.h> int main(int argc, char* argv[])
{
mq_unlink("/mq_test");
return ;
}

注意:编译posix mqueue时,要连接运行时库(runtime library),既-lrt选项,运行结果如下:

Linux进程间通信-消息队列(mqueue)

关于mqueue更多详细内容可以使用:man mq_overview命令查看,里面有一条需要注意的是,Linux下的Posix消息队列是在vfs中创建的,可以用

mount -t mqueue none /dev/mqueue

将消息队列挂在在/dev/mqueue目录下,便于查看。

2.4 mq_close()和mq_unlink()

mq_close()的功能是关闭消息队列的文件描述符,但消息队列并不从系统中删除,要删除一个消息队列,必须调用mq_unlink();这与文件系统的unlink()机制是一样的。

3、消息队列的属性

#include    <mqueue.h>

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);

均返回:成功时为0, 出错时为-1

每个消息队列有四个属性:
struct mq_attr
{
    long mq_flags;        /* message queue flag : 0, O_NONBLOCK */
    long mq_maxmsg;  /* max number of messages allowed on queue*/
    long mq_msgsize;  /* max size of a message (in bytes)*/
    long mq_curmsgs;  /* number of messages currently on queue */
};

4、消息收发

#include    <mqueue.h>

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

返回:成功时为0,出错为-1

ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);

返回:成功时为消息中的字节数,出错为-1

mqsend代码如下:

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h> int main(int argc, char* argv[])
{
int flag = O_RDWR;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);
if (mqid == -)
{
printf("open mqueue failed!\n");
return ;
} char *buf = "hello, i am sender!";
mq_send(mqid,buf,strlen(buf),);
mq_close(mqid); return ;
}

mqrecv代码如下:

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h> int main(int argc, char* argv[])
{
int flag = O_RDWR;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);
if (mqid == -)
{
printf("open mqueue failed!\n");
return ;
} struct mq_attr attr;
mq_getattr(mqid,&attr);
char buf[] = {};
int priority = ;
mq_receive(mqid,buf,attr.mq_msgsize,&priority);
printf("%s\n",buf);
mq_close(mqid);
return ;
}

运行结果如下:

Linux进程间通信-消息队列(mqueue)

首先我们运行三次send,然后运行四次recv,可见recv的前三次是可以收到消息队列里的三个消息的,当运行第四次的时,系统消息队列里为空,recv就会阻塞;关于非阻塞式mqueue见下文。

5、mq_notify函数

如前文介绍,poxis消息队列运行异步通知,以告知何时有一个消息放置到某个消息队列中,这种通知有两种方式可以选择:

(1)产生一个信号

(2)创建一个线程来执行一个指定的函数

这种通知通过mq_notify() 函数建立。该函数为指定的消息消息队列建立或删除异步事件通知,

#include <mqueue.h>

int mq_notify(mqd_t mqdes, const struct sigevent* notification);

(1)如果notification参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。

(2)如果notification参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。

(3)任意时刻只有一个进程可以被注册为接收某个给定队列的通知。

(4)当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。

(5)当前通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册。

sigevent结构如下:

union sigval{
int sival_int; /*integer value*/
void *sival_ptr; /*pointer value*/
}; struct sigevent{
int sigev_notify; /*SIGEV_{NONE, SIGNAL, THREAD}*/
int sigev_signo; /*signal number if SIGEV_SIGNAL*/ union sigval sigev_value; void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};

5.1 mq_notify() 使用信号处理程序

一个正确的使用非阻塞mq_receive的信号通知的例子:

#include <unistd.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h> void sig_usr1(int );
volatile sig_atomic_t mqflag; int main(int argc, char* argv[])
{
mqd_t mqid = ;
void *buff;
struct mq_attr attr;
struct sigevent sigev;
sigset_t zeromask,newmask,oldmask; int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);  // 非阻塞式打开mqueue
mq_getattr(mqid,&attr);
buff = malloc(attr.mq_msgsize); sigemptyset(&zeromask);
sigemptyset(&newmask);
sigemptyset(&oldmask);
sigaddset(&newmask,SIGUSR1);    // 初始化信号集
signal(SIGUSR1,sig_usr1);      // 信号处理程序
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
int n = mq_notify(mqid,&sigev);  // 启用通知 for (;;)
{
sigprocmask(SIG_BLOCK,&newmask,&oldmask);
while(mqflag == )
sigsuspend(&zeromask); mqflag = ;
ssize_t n;
mq_notify(mqid, &sigev);    // 重新注册
while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=)
printf("SIGUSR1 received, read %ld bytes.\n",(long)n);  //读取消息
if(errno != EAGAIN)
printf("mq_receive error\n");
sigprocmask(SIG_UNBLOCK,&newmask,NULL);
}
return ;
} void sig_usr1(int signo)
{
mqflag = ;
}

这里为什么使用的是非阻塞式mq_receive,为什么不在信号处理程序中打印接收到的字符请参阅《unp 第二卷》

5.2 mq_notify() 使用线程处理程序

异步事件通知的另一种方式是把sigev_notify设置成SIGEV_THREAD,这会创建一个新线程,该线程调用由sigev_notify_function指定的函数,所用的参数由sigev_value指定,新线程的属性由sigev_notify_attributes指定,要指定线程的默认属性的话,传空指针。新线程是作为脱离线程创建的。

#include <unistd.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h> mqd_t mqid = ;
struct mq_attr attr;
struct sigevent sigev; static void notify_thread(union sigval); int main(int argc, char* argv[])
{ int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);
mq_getattr(mqid,&attr); sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_attributes = NULL;
int n = mq_notify(mqid,&sigev); for (;;)
pause();
return ;
} static void notify_thread(union sigval arg)
{
ssize_t n;
char* buff;
printf("notify_thread_started!\n");
buff = malloc(attr.mq_msgsize);
mq_notify(mqid, &sigev);
while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=)
printf("SIGUSR1 received, read %ld bytes.\n",(long)n);
if(errno != EAGAIN)
printf("mq_receive error\n");
free(buff);
pthread_exit();
}