Linux网络编程学习(九) ----- 消息队列(第四章)

时间:2023-12-21 15:21:14

1、System V IPC

System V中引入的几种新的进程间通信方式,消息队列,信号量和共享内存,统称为System V IPC,其具体实例在内核中是以对象的形式出现的,称为IPC 对象,每个IPC对象在内核中都有一个唯一的标识符。这个标识符的唯一性在同一类IPC中成立,不同IPC的标识符可以相同。IPC对象三个知识点差不多,有一个数据结构,还有各自的使用的函数,这里只以消息队列为例,信号量和共享内存大家自己去看咯。

IPC对象在程序中是使用关键字key来访问,服务端和客户端用的关键字必须一致,同时要保证key的产生唯一性,需要通过更改pathname和proj的组合来解决key的产生冲突问题,函数声明是key_t ftok ( char *pathname, char proj );

2、消息队列

系统内核中保存的一个用来保存消息的队列,并不是简单的先进先出操作,可以控制消息用更加灵活的方式流动。

3、消息队列相关的数据结构

1)ipc_perm --- 保存每个IPC对象权限信息,头文件linux/ipc.h

struct ipc_perm
{
key_t key;
ushort uid; /* owner euid and egid */
ushort gid;
ushort cuid; /* creator euid and egid */
ushort cgid;
ushort mode; /* access modes see mode flags below */
ushort seq; /* slot usage sequence number *//*IPC对象使用频率信息*/
};

2)msgbuf ----自己定义消息的数据类型,但对长度做了限制,最大为4056字节,头文件linux/msg.h

/* message buffer for msgsnd and msgrcv calls */
struct msgbuf
{
long mtype; /* type of message *//*区分不同消息数据类型*/
char mtext[1]; /* message text *//*可以是任意的数据类型*/
};

  比如下面例子中,mtext对应着request_id和结构体info

struct my_msgbuf
{
long mtype; /* Message type */
long request_id; /* Request identifier */
struct client info; /* Client information structure */
};

3)msg --- 消息队列以消息链表形式出现,完成消息链表每个节点结构定义就是msg结构

/* one msg structure for each message */
struct msg
{
struct msg *msg_next; /* next message on queue */
long msg_type;
char *msg_spot; /* message text address */
time_t msg_stime; /* msgsnd time */
short msg_ts; /* message text size */
};
/*
msg_next 成员是指向消息链表中下一个节点的指针,依靠它对整个消息链表进行访问。
msg_type 和msgbuf 中mtype 成员的意义是一样的。
msg_spot 成员指针指出了消息内容(就是msgbuf 结构中的mtext)在内存中的位置。
msg_ts 成员指出了消息内容的长度。
*/

4)msgqid_ds --- 保存消息队列对象有关数据

/* one msqid structure for each queue on the system */
struct msqid_ds
{
struct ipc_perm msg_perm;
struct msg *msg_first; /* first message on queue */
struct msg *msg_last; /* last message in queue */
__kernel_time_t msg_stime; /* last msgsnd time */
__kernel_time_t msg_rtime; /* last msgrcv time */
__kernel_time_t msg_ctime; /* last change time */
struct wait_queue *wwait;
struct wait_queue *rwait;
unsigned short msg_cbytes; /* current number of bytes on queue */
unsigned short msg_qnum; /* number of messages in queue */
unsigned short msg_qbytes; /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};
/*
msg_perm 成员保存了消息队列的存取权限以及其他一些信息(见上面关于ipc_perm
结构的介绍)。
msg_first 成员指针保存了消息队列(链表)中第一个成员的地址。
msg_last 成员指针保存了消息队列中最后一个成员的地址。
msg_stime 成员保存了最近一次队列接受消息的时间。
msg_rtime 成员保存了最近一次从队列中取出消息的时间。
msg_ctime 成员保存了最近一次队列发生改动的时间(见后面的章节)。
wwait 和rwait 是指向系统内部等待队列的指针。
msg_cbytes 成员保存着队列总共占用内存的字节数。
msg_qnum 成员保存着队列里保存的消息数目。
msg_qbytes 成员保存着队列所占用内存的最大字节数。
msg_lspid 成员保存着最近一次向队列发送消息的进程的pid。
msg_lrpid 成员保存着最近一次从队列中取出消息的进程的pid。
*/

  

4、消息队列有关的函数

1)msgget()----int msgget(key_t key, int msgflg)

msgget()函数的第一个参数是消息队列对象的关键字(key),函数将它与已有的消息队列对象的关键字进行比较来判断消息队列对象是否已经创建。而函数进行的具体操作是由第二个参数,msgflg 控制的。它可以取下面的几个值:
IPC_CREAT :
如果消息队列对象不存在,则创建之,否则则进行打开操作;
IPC_EXCL:
和IPC_CREAT 一起使用(用”|”连接),如果消息对象不存在则创建之,否则产生一个
错误并返回。

2)msgsnd() --- int msgsnd ( int msqid, struct msgbuf *msgp, int msgsz, int msgflg )

传给msgsnd()函数的第一个参数msqid 是消息队列对象的标识符(由msgget()函数得到),第二个参数msgp 指向要发送的消息所在的内存,第三个参数msgsz 是要发送信息的长度(字节数),可以用以下的公式计算:
msgsz = sizeof(struct mymsgbuf) - sizeof(long);
第四个参数是控制函数行为的标志,可以取以下的值:
0,忽略标志位;
IPC_NOWAIT,如果消息队列已满,消息将不被写入队列,控制权返回调用函数的线程。如果不指定这个参数,线程将被阻塞直到消息被可以被写入。

示例:用ftok函数产生关键字,然后通过open_queue()获得消息队列的标识符,最后用send_message()函数将消息发送到消息队列

int open_queue( key_t keyval )
{
int qid;
if((qid = msgget( keyval, IPC_CREAT | 0660 )) == -1)
{
return(-1);
}
return(qid);
}
int send_message( int qid, struct mymsgbuf *qbuf )
{
int result, length;
/* The length is essentially the size of the structure minus sizeof(mtype) */
length = sizeof(struct mymsgbuf) - sizeof(long);
if((result = msgsnd( qid, qbuf, length, 0)) == -1)
{
return(-1);
}
return(result);
} main()
{
int qid;
key_t msgkey;
struct mymsgbuf
{
long mtype; /* Message type */
int request; /* Work request number */
double salary; /* Employee's salary */
} msg;
/* Generate our IPC key value */
msgkey = ftok(".", 'm');
/* Open/create the queue */
if(( qid = open_queue( msgkey)) == -1)
{
perror("open_queue");
exit(1);
}
/* Load up the message with arbitrary test data */
msg.mtype = 1; /* Message type must be a positive number! */
msg.request = 1; /* Data element #1 */
msg.salary = 1000.00; /* Data element #2 (my yearly salary!) */
/* Bombs away! */
if((send_message( qid, &msg )) == -1)
{
perror("send_message");
exit(1);
}
}

3)msgrcv() --- int msgrcv ( int msqid, struct msgbuf *msgp, int msgsz, long mtype,int msgflg )

第四个参数mtype指定了函数从队列中所取的消息的类型。函数将从队列中搜索类型与之匹配的消息并将之返回。不过这里有一个例外。如果mtype 的值是零的话,函数将不做类型检查而自动返回队列中的最旧的消息。
第五个参数依然是是控制函数行为的标志,取值可以是:
0,表示忽略;
IPC_NOWAIT,如果消息队列为空,则返回一个ENOMSG,并将控制权交回调用函数的进程。如果不指定这个参数,那么进程将被阻塞直到函数可以从队列中得到符合条件的消息为止。如果一个client 正在等待消息的时候队列被删除,EIDRM 就会被返回。如果进程在阻塞等待过程中收到了系统的中断信号,EINTR 就会被返回。
MSG_NOERROR,如果函数取得的消息长度大于msgsz,将只返回msgsz 长度的信息,剩下的部分被丢弃了。如果不指定这个参数,E2BIG 将被返回,而消息则留在队列中不被取出。
当消息从队列内取出后,相应的消息就从队列中删除了

int read_message( int qid, long type, struct mymsgbuf *qbuf )
{
int result, length;
/* The length is essentially the size of the structure minus sizeof(mtype) */
length = sizeof(struct mymsgbuf) - sizeof(long);
if((result = msgrcv( qid, qbuf, length, type, 0)) == -1)
{
return(-1);
}
return(result);
}

4)msgctl() ---- int msgctl ( int msgqid, int cmd, struct msqid_ds *buf )

函数的第一个参数msgqid 是消息队列对象的标识符。
第二个参数是函数要对消息队列进行的操作,它可以是:
IPC_STAT
取出系统保存的消息队列的msqid_ds 数据,并将其存入参数buf 指向的msqid_ds 结构中。
IPC_SET
设定消息队列的msqid_ds 数据中的msg_perm 成员。设定的值由buf 指向的msqid_ds结构给出。
IPC_EMID
将队列从系统内核中删除。
这三个命令的功能都是明显的,所以就不多解释了。唯一需要强调的是在IPC_STAT命令中队列的msqid_ds 数据中唯一能被设定的只有msg_perm 成员,其是ipc_perm 类型的数据。而ipc_perm 中能被修改的只有mode,pid 和uid 成员。其他的都是只能由系统来设定的。

//IPC_STAT 的例子
int get_queue_ds( int qid, struct msgqid_ds *qbuf )
{
if( msgctl( qid, IPC_STAT, qbuf) == -1)
{
return(-1);
}
return(0);
}
//IPC_SET 的例子:
int change_queue_mode( int qid, char *mode )
{
struct msqid_ds tmpbuf;
/* Retrieve a current copy of the internal data structure */
get_queue_ds( qid, &tmpbuf);
/* Change the permissions using an old trick */
sscanf(mode, "%ho", &tmpbuf.msg_perm.mode);
/* Update the internal data structure */
if( msgctl( qid, IPC_SET, &tmpbuf) == -1)
{
return(-1);
}
return(0);
}
//IPC_RMID 的例子:
int remove_queue( int qid )
{
if( msgctl( qid, IPC_RMID, 0) == -1)
{
return(-1);
}
return(0);
}