前边三节,我们讲了消息队列的创建、发送信息和接收消息,今天继续沿着应用程序路线看内核中的对消息队列的控制,首先是我们看一下应用程序中的界面函数
msgctl(msgid,IPC_RMID,0);
通过sys_ipc()系统调用中看到这样一句switch语句代码:
case MSGCTL: return sys_msgctl (first, second, (struct msqid_ds __user *) ptr);
|
可以看到是进入了sys_msgctl()函数并从此返回,我们追踪一下这个函数
第一参数不言而喻,朋友们通过前三节的阅读这里应该知道是消息队列的ID号,第二个参数我们需要说明一下,应用程序传递过来的是IPC_RMID
asmlinkage long sys_msgctl(int msqid, int cmd, struct msqid_ds __user *buf) { struct msg_queue *msq; int err, version; struct ipc_namespace *ns;
if (msqid < 0 || cmd < 0) return -EINVAL;
version = ipc_parse_version(&cmd); ns = current->nsproxy->ipc_ns;
switch (cmd) { case IPC_INFO: case MSG_INFO: { struct msginfo msginfo; int max_id;
if (!buf) return -EFAULT; /* * We must not return kernel stack data. * due to padding, it's not enough * to set all member fields. */ err = security_msg_queue_msgctl(NULL, cmd); if (err) return err;
memset(&msginfo, 0, sizeof(msginfo)); msginfo.msgmni = ns->msg_ctlmni; msginfo.msgmax = ns->msg_ctlmax; msginfo.msgmnb = ns->msg_ctlmnb; msginfo.msgssz = MSGSSZ; msginfo.msgseg = MSGSEG; down_read(&msg_ids(ns).rw_mutex); if (cmd == MSG_INFO) { msginfo.msgpool = msg_ids(ns).in_use; msginfo.msgmap = atomic_read(&ns->msg_hdrs); msginfo.msgtql = atomic_read(&ns->msg_bytes); } else { msginfo.msgmap = MSGMAP; msginfo.msgpool = MSGPOOL; msginfo.msgtql = MSGTQL; } max_id = ipc_get_maxid(&msg_ids(ns)); up_read(&msg_ids(ns).rw_mutex); if (copy_to_user(buf, &msginfo, sizeof(struct msginfo))) return -EFAULT; return (max_id < 0) ? 0 : max_id; } case MSG_STAT: /* msqid is an index rather than a msg queue id */ case IPC_STAT: { struct msqid64_ds tbuf; int success_return;
if (!buf) return -EFAULT;
if (cmd == MSG_STAT) { msq = msg_lock(ns, msqid); if (IS_ERR(msq)) return PTR_ERR(msq); success_return = msq->q_perm.id; } else { msq = msg_lock_check(ns, msqid); if (IS_ERR(msq)) return PTR_ERR(msq); success_return = 0; } err = -EACCES; if (ipcperms(&msq->q_perm, S_IRUGO)) goto out_unlock;
err = security_msg_queue_msgctl(msq, cmd); if (err) goto out_unlock;
memset(&tbuf, 0, sizeof(tbuf));
kernel_to_ipc64_perm(&msq->q_perm, &tbuf.msg_perm); tbuf.msg_stime = msq->q_stime; tbuf.msg_rtime = msq->q_rtime; tbuf.msg_ctime = msq->q_ctime; tbuf.msg_cbytes = msq->q_cbytes; tbuf.msg_qnum = msq->q_qnum; tbuf.msg_qbytes = msq->q_qbytes; tbuf.msg_lspid = msq->q_lspid; tbuf.msg_lrpid = msq->q_lrpid; msg_unlock(msq); if (copy_msqid_to_user(buf, &tbuf, version)) return -EFAULT; return success_return; } case IPC_SET: case IPC_RMID: err = msgctl_down(ns, msqid, cmd, buf, version); return err; default: return -EINVAL; }
out_unlock: msg_unlock(msq); return err; }
|
在分析这段代码之前我们要看一下参数cmd
/* * Control commands used with semctl, msgctl and shmctl * see also specific commands in sem.h, msg.h and shm.h */ #define IPC_RMID 0 /* remove resource */ #define IPC_SET 1 /* set ipc_perm options */ #define IPC_STAT 2 /* get ipc_perm options */ #define IPC_INFO 3 /* see ipcs */
|
这些命令代码不仅是为消息队列所使用的,还为整个IPC通讯所使用,包括信号量和共享内存,后边二种我们以后进行实践和分析,另外,也还有消息队列专用的命令
/* ipcs ctl commands */ #define MSG_STAT 11 #define MSG_INFO 12
|
我们还是那种方式在代码中去理解这些宏的作用,不过聪明的读者可能从定义的字面上能看出其作用,为了避免死记硬背,死搬硬套的学习方法,我们就从实践中记忆吧
这里还有第三个参数struct msqid_ds ,它是为了保持兼容以前的代码所用的,我们暂且放一放,避免跑了主题,可以看到在我们的应用程序传递下来的这个参数是空指针。
好了我们进入函数内部看一下,因为使用的命令代码是IPC_RMID,所以会进入这段case语句
case IPC_RMID: err = msgctl_down(ns, msqid, cmd, buf, version); return err;
|
static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, struct msqid_ds __user *buf, int version) { struct kern_ipc_perm *ipcp; struct msqid64_ds msqid64; struct msg_queue *msq; int err;
if (cmd == IPC_SET) { if (copy_msqid_from_user(&msqid64, buf, version)) return -EFAULT; }
ipcp = ipcctl_pre_down(&msg_ids(ns), msqid, cmd, &msqid64.msg_perm, msqid64.msg_qbytes); if (IS_ERR(ipcp)) return PTR_ERR(ipcp);
msq = container_of(ipcp, struct msg_queue, q_perm);
err = security_msg_queue_msgctl(msq, cmd); if (err) goto out_unlock;
switch (cmd) { case IPC_RMID: freeque(ns, ipcp); goto out_up; case IPC_SET: if (msqid64.msg_qbytes > ns->msg_ctlmnb && !capable(CAP_SYS_RESOURCE)) { err = -EPERM; goto out_unlock; }
msq->q_qbytes = msqid64.msg_qbytes;
ipc_update_perm(&msqid64.msg_perm, ipcp); msq->q_ctime = get_seconds(); /* sleeping receivers might be excluded by * stricter permissions. */ expunge_all(msq, -EAGAIN); /* sleeping senders might be able to send * due to a larger queue size. */ ss_wakeup(&msq->q_senders, 0); break; default: err = -EINVAL; } out_unlock: msg_unlock(msq); out_up: up_write(&msg_ids(ns).rw_mutex); return err; }
|
上面这段代码的重要作用就是删除消息队列,清除消息,我们重点看里面的几个调用的函数,其余的代码很简单,有基础的朋友可以看的懂,抓住重点来看,直接进入
static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp) { struct list_head *tmp; struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
expunge_all(msq, -EIDRM); ss_wakeup(&msq->q_senders, 1); msg_rmid(ns, msq); msg_unlock(msq);
tmp = msq->q_messages.next; while (tmp != &msq->q_messages) { struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);
tmp = tmp->next; atomic_dec(&ns->msg_hdrs); free_msg(msg); } atomic_sub(msq->q_cbytes, &ns->msg_bytes); security_msg_queue_free(msq); ipc_rcu_putref(msq); }
|
首先是expunge_all()函数
static void expunge_all(struct msg_queue *msq, int res) { struct list_head *tmp;
tmp = msq->q_receivers.next; while (tmp != &msq->q_receivers) { struct msg_receiver *msr;
msr = list_entry(tmp, struct msg_receiver, r_list); tmp = tmp->next; msr->r_msg = NULL; wake_up_process(msr->r_tsk); smp_mb(); msr->r_msg = ERR_PTR(res); } }
|
expunge_all是擦去的意思,这个函数的的作用就是循环从消息队列中的等待接收的进程开始,依次唤醒他们让他们返回,也就是消息的主体*开始了,等待消息的客户请回吧。里面的smp与多处理器smp机制有关,这里不用关心,以后我们会接触它。然后执行下一个函数
static void ss_wakeup(struct list_head *h, int kill) { struct list_head *tmp;
tmp = h->next; while (tmp != h) { struct msg_sender *mss;
mss = list_entry(tmp, struct msg_sender, list); tmp = tmp->next; if (kill) mss->list.next = NULL; wake_up_process(mss->tsk); } }
|
清除所有在消除队列中的等待发送的进程,唤醒他们*。下面执行
static inline void msg_rmid(struct ipc_namespace *ns, struct msg_queue *s) { ipc_rmid(&msg_ids(ns), &s->q_perm); }
|
void ipc_rmid(struct ipc_ids *ids, struct kern_ipc_perm *ipcp) { int lid = ipcid_to_idx(ipcp->id);
idr_remove(&ids->ipcs_idr, lid);
ids->in_use--;
ipcp->deleted = 1;
return; }
|
这里重点的函数就是idr_remove
void idr_remove(struct idr *idp, int id) { struct idr_layer *p;
/* Mask off upper bits we don't use for the search. */ id &= MAX_ID_MASK;
sub_remove(idp, (idp->layers - 1) * IDR_BITS, id); if (idp->top && idp->top->count == 1 && (idp->layers > 1) && idp->top->ary[0]) { // We can drop a layer
p = idp->top->ary[0]; idp->top->bitmap = idp->top->count = 0; free_layer(idp, idp->top); idp->top = p; --idp->layers; } while (idp->id_free_cnt >= IDR_FREE_MAX) { p = alloc_layer(idp); kmem_cache_free(idr_layer_cache, p); } return;
|
我们看到他释放了我们以前建立消息队列时分配的idr_layer结构,这里我们需要提醒一下,删除队列时唤醒等待接收和发送的进程他们将要执行什么路线,我们回忆一下前二节的内容,接收和发送的进程在唤醒后也就是从shedule()返回到他们各自的代码中时,就会再次对所在的消息队列加锁,如果加锁失败他们此时就会退出接收和发送代码部分退出了,关于加锁rcu机制我们以后重点在讲锁的部分进行介绍,至于其余的代码都很简单了,朋友们可以根据这条主线举一反三的阅读一下。下一节我们探讨共享内存的实践和内核。