4--消息队列(报文队列)实践到内核--消息队列的控制

时间:2022-07-07 05:33:01
前边三节,我们讲了消息队列的创建、发送信息和接收消息,今天继续沿着应用程序路线看内核中的对消息队列的控制,首先是我们看一下应用程序中的界面函数
msgctl(msgid,IPC_RMID,0);
通过sys_ipc()系统调用中看到这样一句switch语句代码:
 

    case MSGCTL:
        return sys_msgctl (first, second, (struct msqid_ds __user *) ptr);

可以看到是进入了sys_msgctl()函数并从此返回,我们追踪一下这个函数

第一参数不言而喻,朋友们通过前三节的阅读这里应该知道是消息队列的ID号,第二个参数我们需要说明一下,应用程序传递过来的是IPC_RMID

asmlinkage long sys_msgctl(int msqid, int cmd, struct msqid_ds __user *buf)
{
    struct msg_queue *msq;
    int err, version;
    struct ipc_namespace *ns;

    if (msqid < 0 || cmd < 0)
        return -EINVAL;

    version = ipc_parse_version(&cmd);
    ns = current->nsproxy->ipc_ns;

    switch (cmd) {
    case IPC_INFO:
    case MSG_INFO:
    {
        struct msginfo msginfo;
        int max_id;

        if (!buf)
            return -EFAULT;
        /*
         * We must not return kernel stack data.
         * due to padding, it's not enough
         * to set all member fields.
         */

        err = security_msg_queue_msgctl(NULL, cmd);
        if (err)
            return err;

        memset(&msginfo, 0, sizeof(msginfo));
        msginfo.msgmni = ns->msg_ctlmni;
        msginfo.msgmax = ns->msg_ctlmax;
        msginfo.msgmnb = ns->msg_ctlmnb;
        msginfo.msgssz = MSGSSZ;
        msginfo.msgseg = MSGSEG;
        down_read(&msg_ids(ns).rw_mutex);
        if (cmd == MSG_INFO) {
            msginfo.msgpool = msg_ids(ns).in_use;
            msginfo.msgmap = atomic_read(&ns->msg_hdrs);
            msginfo.msgtql = atomic_read(&ns->msg_bytes);
        } else {
            msginfo.msgmap = MSGMAP;
            msginfo.msgpool = MSGPOOL;
            msginfo.msgtql = MSGTQL;
        }
        max_id = ipc_get_maxid(&msg_ids(ns));
        up_read(&msg_ids(ns).rw_mutex);
        if (copy_to_user(buf, &msginfo, sizeof(struct msginfo)))
            return -EFAULT;
        return (max_id < 0) ? 0 : max_id;
    }
    case MSG_STAT:    /* msqid is an index rather than a msg queue id */
    case IPC_STAT:
    {
        struct msqid64_ds tbuf;
        int success_return;

        if (!buf)
            return -EFAULT;

        if (cmd == MSG_STAT) {
            msq = msg_lock(ns, msqid);
            if (IS_ERR(msq))
                return PTR_ERR(msq);
            success_return = msq->q_perm.id;
        } else {
            msq = msg_lock_check(ns, msqid);
            if (IS_ERR(msq))
                return PTR_ERR(msq);
            success_return = 0;
        }
        err = -EACCES;
        if (ipcperms(&msq->q_perm, S_IRUGO))
            goto out_unlock;

        err = security_msg_queue_msgctl(msq, cmd);
        if (err)
            goto out_unlock;

        memset(&tbuf, 0, sizeof(tbuf));

        kernel_to_ipc64_perm(&msq->q_perm, &tbuf.msg_perm);
        tbuf.msg_stime = msq->q_stime;
        tbuf.msg_rtime = msq->q_rtime;
        tbuf.msg_ctime = msq->q_ctime;
        tbuf.msg_cbytes = msq->q_cbytes;
        tbuf.msg_qnum = msq->q_qnum;
        tbuf.msg_qbytes = msq->q_qbytes;
        tbuf.msg_lspid = msq->q_lspid;
        tbuf.msg_lrpid = msq->q_lrpid;
        msg_unlock(msq);
        if (copy_msqid_to_user(buf, &tbuf, version))
            return -EFAULT;
        return success_return;
    }
    case IPC_SET:
    case IPC_RMID:
        err = msgctl_down(ns, msqid, cmd, buf, version);
        return err;
    default:
        return -EINVAL;
    }

out_unlock:
    msg_unlock(msq);
    return err;
}

在分析这段代码之前我们要看一下参数cmd

/*
 * Control commands used with semctl, msgctl and shmctl
 * see also specific commands in sem.h, msg.h and shm.h
 */

#define IPC_RMID 0 /* remove resource */
#define IPC_SET 1 /* set ipc_perm options */
#define IPC_STAT 2 /* get ipc_perm options */
#define IPC_INFO 3 /* see ipcs */

这些命令代码不仅是为消息队列所使用的,还为整个IPC通讯所使用,包括信号量和共享内存,后边二种我们以后进行实践和分析,另外,也还有消息队列专用的命令

/* ipcs ctl commands */
#define MSG_STAT 11
#define MSG_INFO 12

我们还是那种方式在代码中去理解这些宏的作用,不过聪明的读者可能从定义的字面上能看出其作用,为了避免死记硬背,死搬硬套的学习方法,我们就从实践中记忆吧

这里还有第三个参数struct msqid_ds ,它是为了保持兼容以前的代码所用的,我们暂且放一放,避免跑了主题,可以看到在我们的应用程序传递下来的这个参数是空指针。

好了我们进入函数内部看一下,因为使用的命令代码是IPC_RMID,所以会进入这段case语句

    case IPC_RMID:
        err = msgctl_down(ns, msqid, cmd, buf, version);
        return err;

 

static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
         struct msqid_ds __user *buf, int version)
{
    struct kern_ipc_perm *ipcp;
    struct msqid64_ds msqid64;
    struct msg_queue *msq;
    int err;

    if (cmd == IPC_SET) {
        if (copy_msqid_from_user(&msqid64, buf, version))
            return -EFAULT;
    }

    ipcp = ipcctl_pre_down(&msg_ids(ns), msqid, cmd,
             &msqid64.msg_perm, msqid64.msg_qbytes);
    if (IS_ERR(ipcp))
        return PTR_ERR(ipcp);

    msq = container_of(ipcp, struct msg_queue, q_perm);

    err = security_msg_queue_msgctl(msq, cmd);
    if (err)
        goto out_unlock;

    switch (cmd) {
    case IPC_RMID:
        freeque(ns, ipcp);
        goto out_up;
    case IPC_SET:
        if (msqid64.msg_qbytes > ns->msg_ctlmnb &&
         !capable(CAP_SYS_RESOURCE)) {
            err = -EPERM;
            goto out_unlock;
        }

        msq->q_qbytes = msqid64.msg_qbytes;

        ipc_update_perm(&msqid64.msg_perm, ipcp);
        msq->q_ctime = get_seconds();
        /* sleeping receivers might be excluded by
         * stricter permissions.
         */

        expunge_all(msq, -EAGAIN);
        /* sleeping senders might be able to send
         * due to a larger queue size.
         */

        ss_wakeup(&msq->q_senders, 0);
        break;
    default:
        err = -EINVAL;
    }
out_unlock:
    msg_unlock(msq);
out_up:
    up_write(&msg_ids(ns).rw_mutex);
    return err;
}

上面这段代码的重要作用就是删除消息队列,清除消息,我们重点看里面的几个调用的函数,其余的代码很简单,有基础的朋友可以看的懂,抓住重点来看,直接进入

static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
{
    struct list_head *tmp;
    struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);

    expunge_all(msq, -EIDRM);
    ss_wakeup(&msq->q_senders, 1);
    msg_rmid(ns, msq);
    msg_unlock(msq);

    tmp = msq->q_messages.next;
    while (tmp != &msq->q_messages) {
        struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);

        tmp = tmp->next;
        atomic_dec(&ns->msg_hdrs);
        free_msg(msg);
    }
    atomic_sub(msq->q_cbytes, &ns->msg_bytes);
    security_msg_queue_free(msq);
    ipc_rcu_putref(msq);
}

首先是expunge_all()函数

static void expunge_all(struct msg_queue *msq, int res)
{
    struct list_head *tmp;

    tmp = msq->q_receivers.next;
    while (tmp != &msq->q_receivers) {
        struct msg_receiver *msr;

        msr = list_entry(tmp, struct msg_receiver, r_list);
        tmp = tmp->next;
        msr->r_msg = NULL;
        wake_up_process(msr->r_tsk);
        smp_mb();
        msr->r_msg = ERR_PTR(res);
    }
}

expunge_all是擦去的意思,这个函数的的作用就是循环从消息队列中的等待接收的进程开始,依次唤醒他们让他们返回,也就是消息的主体*开始了,等待消息的客户请回吧。里面的smp与多处理器smp机制有关,这里不用关心,以后我们会接触它。然后执行下一个函数

static void ss_wakeup(struct list_head *h, int kill)
{
    struct list_head *tmp;

    tmp = h->next;
    while (tmp != h) {
        struct msg_sender *mss;

        mss = list_entry(tmp, struct msg_sender, list);
        tmp = tmp->next;
        if (kill)
            mss->list.next = NULL;
        wake_up_process(mss->tsk);
    }
}

清除所有在消除队列中的等待发送的进程,唤醒他们*。下面执行

static inline void msg_rmid(struct ipc_namespace *ns, struct msg_queue *s)
{
    ipc_rmid(&msg_ids(ns), &s->q_perm);
}

void ipc_rmid(struct ipc_ids *ids, struct kern_ipc_perm *ipcp)
{
    int lid = ipcid_to_idx(ipcp->id);

    idr_remove(&ids->ipcs_idr, lid);

    ids->in_use--;

    ipcp->deleted = 1;

    return;
}

这里重点的函数就是idr_remove

void idr_remove(struct idr *idp, int id)
{
    struct idr_layer *p;

    /* Mask off upper bits we don't use for the search. */
    id &= MAX_ID_MASK;

    sub_remove(idp, (idp->layers - 1) * IDR_BITS, id);
    if (idp->top && idp->top->count == 1 && (idp->layers > 1) &&
     idp->top->ary[0]) { // We can drop a layer


        p = idp->top->ary[0];
        idp->top->bitmap = idp->top->count = 0;
        free_layer(idp, idp->top);
        idp->top = p;
        --idp->layers;
    }
    while (idp->id_free_cnt >= IDR_FREE_MAX) {
        p = alloc_layer(idp);
        kmem_cache_free(idr_layer_cache, p);
    }
    return;

我们看到他释放了我们以前建立消息队列时分配的idr_layer结构,这里我们需要提醒一下,删除队列时唤醒等待接收和发送的进程他们将要执行什么路线,我们回忆一下前二节的内容,接收和发送的进程在唤醒后也就是从shedule()返回到他们各自的代码中时,就会再次对所在的消息队列加锁,如果加锁失败他们此时就会退出接收和发送代码部分退出了,关于加锁rcu机制我们以后重点在讲锁的部分进行介绍,至于其余的代码都很简单了,朋友们可以根据这条主线举一反三的阅读一下。下一节我们探讨共享内存的实践和内核。