进程间通信——Posix消息队列

时间:2022-07-25 04:45:02

消息队列具有随内核的持续性,队列中的每个消息具有如下属性:一个unsigned int 优先级或一个long 类型;消息的数据部分长度;数据本身

#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */ ); // mq描述符/-1

int mq_close(mqd_t mqdes); // 0/-1
int mq_unlink(const char *name); // 0/-1

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, struct mq_attr *attr); // 0/-1

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio); // 0/-1
int mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop); // bytes/-1

int mq_notify(mqd_t mqdes, const struct sigevent *notification); // 0/-1

mq_open函数的oflag参数,是O_RDONLY, O_WRONLY, O_RDWR之一,可能按位或上O_CREAT, O_EXCL, O_NONBLOCK
mq_unlink函数类似于unlink函数删除一个文件的机制,当一个消息队列的引用计数仍大于0时,其name就能删除,但是该队列的析构要到最后一个mq_close发生时才进行

struct mq_attr{
long mq_flags; // 0, O_NONBLOCK
long mq_maxmsg;
long mq_msgsize;
long mq_curmsgs;
};
创建一个新的队列时,给它指定mq_maxmsg, mq_msgsize属性,mq_open忽略另外另个成员;mq_setattr只使用mq_flags成员,已设置或清除非阻塞标志,另外3成员被忽略

每个消息有一个优先级,是一个小于MQ_PRIO_MAX的无符号整数,可以通过 getconf MQ_PRIO_MAX获得,mq_receive总是返回所指定队列中最高优先级的最早消息,其len参数的值不能小于mq_msgsize,否则返回EMSGSIZE。mq_send的prio参数是待发送消息的优先级,其值必须小于MQ_PRIO_MAX;若应用不必使用优先级不同的消息,指定值为0的优先级。

Posix消息队列允许异步事件通知,以告知何时有一个消息放置到了某个空消息队列中。通知方式有两种:产生一个信号;创建一个线程来执行一个指定的函数。

union sigval {
int sival_int;
void *sival_ptr;
};
struct sigevent {
int sigev_notify; // SIGEV_{NONE,SIGNAL,THREAD}
int sigev_signo;
union sigval sigev_value;
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};
异步事件通知的若干规则:

notification参数非空,该进程注册为接受该队列的通知;否则已存在的注册被撤销。任意时刻只有一个进程可以被注册为接受某个给定队列的通知;若当前队列为空且有一个进程被注册通知,只有在没有任何线程阻塞在该队列的recv调用的前提下通知才被发出,recv调用优先;通知发出后,注册被注销,如果想要的话需再次重新注册。

//非阻塞mq_receive的信号通知
#include"unpipc.h"

volatile sig_atomic_tmqflag;/* set nonzero by signal handler */
static voidsig_usr1(int);

int
main(int argc, char **argv)
{
mqd_tmqd;
void*buff;
ssize_tn;
sigset_tzeromask, newmask, oldmask;
struct mq_attrattr;
struct sigeventsigev;

if (argc != 2)
err_quit("usage: mqnotifysig3 <name>");

/* 4open queue, get attributes, allocate read buffer */
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);

Sigemptyset(&zeromask);/* no signals blocked */
Sigemptyset(&newmask);
Sigemptyset(&oldmask);
Sigaddset(&newmask, SIGUSR1);
/* 4establish signal handler, enable notification */
Signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);

for ( ; ; ) {
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);/* block SIGUSR1 */
while (mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0;/* reset flag */

Mq_notify(mqd, &sigev);/* reregister first */
while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
Sigprocmask(SIG_UNBLOCK, &newmask, NULL);/* unblock SIGUSR1 */
}
exit(0);
}

static void
sig_usr1(int signo)
{
mqflag = 1;
return;
}

//sigwait版本的信号通知
#include"unpipc.h"

int
main(int argc, char **argv)
{
intsigno;
mqd_tmqd;
void*buff;
ssize_tn;
sigset_tnewmask;
struct mq_attrattr;
struct sigeventsigev;

if (argc != 2)
err_quit("usage: mqnotifysig4 <name>");

/* 4open queue, get attributes, allocate read buffer */
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);

Sigemptyset(&newmask);
Sigaddset(&newmask, SIGUSR1);
Sigprocmask(SIG_BLOCK, &newmask, NULL);/* block SIGUSR1 */

/* 4establish signal handler, enable notification */
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);

for ( ; ; ) {
Sigwait(&newmask, &signo);
if (signo == SIGUSR1) {
Mq_notify(mqd, &sigev);/* reregister first */
while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
}
}
exit(0);
}
//select版本的信号通知#include"unpipc.h"intpipefd[2];static voidsig_usr1(int);/* $$.bp$$ */intmain(int argc, char **argv){intnfds;charc;fd_setrset;mqd_tmqd;void*buff;ssize_tn;struct mq_attrattr;struct sigeventsigev;if (argc != 2)err_quit("usage: mqnotifysig5 <name>");/* 4open queue, get attributes, allocate read buffer */mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);Mq_getattr(mqd, &attr);buff = Malloc(attr.mq_msgsize);Pipe(pipefd);/* 4establish signal handler, enable notification */Signal(SIGUSR1, sig_usr1);sigev.sigev_notify = SIGEV_SIGNAL;sigev.sigev_signo = SIGUSR1;Mq_notify(mqd, &sigev);FD_ZERO(&rset);for ( ; ; ) {FD_SET(pipefd[0], &rset);nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);if (FD_ISSET(pipefd[0], &rset)) {Read(pipefd[0], &c, 1);Mq_notify(mqd, &sigev);/* reregister first */while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {printf("read %ld bytes\n", (long) n);}if (errno != EAGAIN)err_sys("mq_receive error");}}exit(0);}static voidsig_usr1(int signo){Write(pipefd[1], "", 1);/* one byte of 0 */return;}
//线程版本的信号通知#include"unpipc.h"mqd_tmqd;struct mq_attrattr;struct sigeventsigev;static voidnotify_thread(union sigval);/* our thread function */intmain(int argc, char **argv){if (argc != 2)err_quit("usage: mqnotifythread1 <name>");mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);Mq_getattr(mqd, &attr);sigev.sigev_notify = SIGEV_THREAD;sigev.sigev_value.sival_ptr = NULL;sigev.sigev_notify_function = notify_thread;sigev.sigev_notify_attributes = NULL;Mq_notify(mqd, &sigev);for ( ; ; )pause();/* each new thread does everything */exit(0);}static voidnotify_thread(union sigval arg){ssize_tn;void*buff;printf("notify_thread started\n");buff = Malloc(attr.mq_msgsize);Mq_notify(mqd, &sigev);/* reregister */while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {printf("read %ld bytes\n", (long) n);}if (errno != EAGAIN)err_sys("mq_receive error");free(buff);pthread_exit(NULL);}