一、什么是消息队列
消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值。我们可以通过发送消息来避免命名管道的同步和阻塞问题。消息队列与管道不同的是,消息队列是基于消息的,而管道是基于字节流的,且消息队列的读取不一定是先入先出。消息队列与命名管道有一样的不足,就是每个消息的最大长度是有上限的(MSGMAX),每个消息队列的总的字节数是有上限的(MSGMNB),系统上消息队列的总数也有一个上限(MSGMNI)。
二、消息队列结构
15 struct msqid_ds {三、函数
16 struct ipc_perm msg_perm;
17 struct msg *msg_first; /* first message on queue,unused */
18 struct msg *msg_last; /* last message in queue,unused */
19 __kernel_time_t msg_stime; /* last msgsnd time */
20 __kernel_time_t msg_rtime; /* last msgrcv time */
21 __kernel_time_t msg_ctime; /* last change time */
22 unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
23 unsigned long msg_lqbytes; /* ditto */
24 unsigned short msg_cbytes; /* current number of bytes on queue */
25 unsigned short msg_qnum; /* number of messages in queue */
26 unsigned short msg_qbytes; /* max number of bytes on queue */
27 __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
28 __kernel_ipc_pid_t msg_lrpid; /* last receive pid */
29 };
1.创建消息队列或者获得已有的消息队列
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
2.向消息队列读写消息
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);3.设置消息队列属性
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
示例:
comm.h
1 #pragma oncecomm.c
2 #include<stdio.h>
3 #include<stdlib.h>
4 #include <sys/types.h>
5 #include <sys/ipc.h>
6 #include <sys/msg.h>
7 #include<string.h>
8 #include<errno.h>
9
10 #define _PATH_NAME_ "/home/bob/bob/msg_queue/file"
11 #define _PROJ_ID_ 0x6666
12 #define _SIZE_ 1024
13
14 #define SERVER_T 1
15 #define CLIENT_T 2
16
17 typedef struct msgbuf
18 {
19 long mtype;
20 char mtext[1024];
21 }msg_t;
22
23 int creat_msg_queue();
24 int get_msg_queue();
25 void recv_msg(int msg_id,int type,char* out);
26 void send_msg(int msg_id,int type,const char* msg);
27 int destory_msg_queue(int msg_id);
1 #include"comm.h"server.c
2
3 static int comm_msg_queue(int flag)
4 {
5 key_t key=ftok(_PATH_NAME_,_PROJ_ID_);
6 if(key==-1)
7 {
8 perror("ftok");
9 return -2;
10 }
11 int msg_id=msgget(key,flag);
12 if(msg_id<0)
13 {
14 perror("msgget");
15 }
16 return msg_id;
17 }
18
19 int creat_msg_queue()
20 {
21 int flag=IPC_CREAT|IPC_EXCL|0664;
22 return comm_msg_queue(flag);
23 }
24
25 int get_msg_queue()
26 {
27 int flag=IPC_CREAT;
28 return comm_msg_queue(flag);
29 }
30 int destroy_msg_queue(int msg_id)
31 {
32 if(msgctl(msg_id,IPC_RMID,NULL)<0)
33 {
34 perror("msgctl");
35 return -1;
36 }
37 return 0;
38 }
39 void send_msg(int msg_id,int type,const char* msg)
40 {
41 msg_t data;
42 memset(data.mtext,'\0',sizeof(data.mtext));
43 data.mtype=type;
44 strcpy(data.mtext,msg);
45 if(msgsnd(msg_id,&data,sizeof(data.mtext),0)<0)
46 {
47 perror("msgsnd");
48 }
49 }
50 void recv_msg(int msg_id,int type,char* out)
51 {
52 msg_t data;
53 memset(data.mtext,'\0',sizeof(data.mtext));
54 data.mtype=type;
55 if(msgrcv(msg_id,&data,sizeof(data.mtext),type,IPC_NOWAIT)<0)
56 {
57 perror("msgrcv");
58 }
59 else
60 {
61 strcpy(out,data.mtext);
62 }
63 }
1 #include"comm.h"client.c
2
3 int main()
4 {
5 int msg_id=creat_msg_queue();
6 if(msg_id<0)
7 {
8 return -1;
9 }
10 int done=0;
11 char buf[_SIZE_];
12 while(!done)
13 {
14 memset(buf,'\0',sizeof(buf));
15 recv_msg(msg_id,CLIENT_T,buf);
16 printf("client:%s\n",buf);
17 printf("Please Enter: ");
18 fflush(stdout);
19 ssize_t _s=read(0,buf,sizeof(buf)-1);
20 if(_s>0)
21 {
22 buf[_s-1]='\0';
23 }
24 send_msg(msg_id,SERVER_T,buf);
25 }
26 destroy_msg_queue(msg_id);
27 return 0;
28 }
1 #include"comm.h"运行结果:
2
3 int main()
4 {
5 int msg_id=get_msg_queue();
6 int done=0;
7 char buf[_SIZE_];
8 while(!done)
9 {
10 printf("Please Enter:");
11 fflush(stdout);
12 ssize_t _s=read(0,buf,sizeof(buf)-1);
13 if(_s>0)
14 {
15 buf[_s-1]='\0';
16 }
17 send_msg(msg_id,CLIENT_T,buf);
18 memset(buf,'\0',sizeof(buf)-1);
19 recv_msg(msg_id,SERVER_T,buf);
20 printf("server:%s\n",buf);
21 }
22 destroy_msg_queue(msg_id);
23 return 0;
24 }