1消息队列(报文队列)实践到内核消息队列的创建

时间:2021-02-02 05:33:47
我是无名小卒,一直想写一些关于内核方面的资料,学习内核很久了,市面上的内核书我都读过了,无法对任何一本书加以总结,因为他就象linux的内核一样在不断更新和升级,针对2.6内核现在市面上非常缺少相关内核的分析资料情况,当然,也有不少网友写了一些关于2.6内核的博客文章,我也看过,但是写的不够深刻具体,总是在内核的过程上粗略的一笔带过,因此我下决心只要有空闲时间就写一些日志来与大家分享,很多书籍和博客都是直接剖析内核的,我读过这些书后发现,这种学习方法虽然快,但是效果不太好,特别是我读了几遍后还是对很多知识点和结构记忆不深,对初学者来说更是枯燥无味的过程,使很多朋友放弃了研读内核代码的兴趣,确实如此,就象缺一张好的导游地图一样,如果我们目的明确,但是缺乏了指路的地图那无非是在大森林里迷了路一样,再高的旅游兴趣也荡然无存,因此,我想要是有一本书或者资料能够在实践中逐步深入到内核该有多好,那样能够使我们在即看到效果的时候导游到如何产生这样效果的内核中将会是一件非常有趣的事情,肯定能够轻松地掌握全部想要的知识,目前市面上有这样的书,但是评价不怎么好,所以我想根据多年读内核的书来整理和书写这类的文章,希望对有兴趣的朋友起到导游地图的效果。这些文章有可能是来自大家所熟悉的资料中也有可能来自互联网,总之,多多益善,我不会在文中注明具体出处,也希望原作者勿怪,我们的目标是大家共同进步。请注意本文使用的是基于2.6.26的内核,这是个人认为应该稳定使用的版本。   这节我们开始消息队列(也有称报文队列)的实践到内核,我是无名小卒,很多书上讲到的消息队列都很枯燥,我尽量写的易读易懂,避免书者之风,我们的目标是深入到内核的细节中去,从实践的应用上追踪到内核的实现。   消息队列的概念相信大家能从字面上看的出,他的优点资料非常全面,不介绍了我们直接进入主题,看代码 我们在实验中写二个程序一个是msg1.c是接受消息队列程序,另一个是msg2.c是发送消息队列用的,在这二个程序中可以创建新的消息队列,但是在接受的程序中在完成接受后我们删除消息队列。 下面是msg1.c接受消息队列的程序代码:  

#include
<unistd.h>
#include
<
stdio.h>
#include
<
stdlib.h>
#include
<
string.h>

#include
<
sys/types.h>
#include
<
sys/ipc.h>
#include
<
sys/msg.h>

struct my_msg_st{
long int my_msg_type;
char some_text[BUFSIZ];
};
int main()
{
int running=1;
int msgid;
struct my_msg_st some_data;
long int msg_to_receive=0;

下面我们要建立一个消息队列,这个函数就是msgget()
msgid=msgget((key_t)1234,0666|IPC_CREAT);
if(msgid==-1){
fprintf(stderr,"msgget failed/n");
exit(EXIT_FAILURE);
}

while(running){
if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){
fprintf(stderr,"msgrcv failed/n");
exit(EXIT_FAILURE);
}
上面的while循环中是通过msgrcv()函数从消息队列中接受消息直到没有消息为止,我们继续往下看
printf("you wrote: %s/n",some_data.some_text);
if(strncmp(some_data.some_text,"end",3)==0){
running=0;
}
}
if(msgctl(msgid,IPC_RMID,0)==-1){
fprintf(stderr,"msgctl failed /n");
exit(EXIT_FAILURE);
}
上面是通过msgctl()函数删除消息队列
exit(EXIT_SUCCESS);
}


  第二个是发送消息队列的程序代码,这个代码相对于上面是小了许多,在下面的程序main()中,删除了msg_to_receive,并且用buffer[BUFSIZ]替换了。下面的程序代码将发送文字给接受消息队列程序,具体的msg2.c的代码如下:

#include
<unistd.h>
#include
<
stdio.h>
#include
<
stdlib.h>
#include
<
string.h>

#include
<
sys/types.h>
#include
<
sys/ipc.h>
#include
<
sys/msg.h>

struct my_msg_st{
long int my_msg_type;
char some_text[BUFSIZ];
};
int main()
{
int running=1;
int msgid;
struct my_msg_st some_data;
char buffer[BUFSIZ];
上面就是我们删除了msg_to_receive,并且用buffer[BUFSIZ]替换了

msgid=msgget((key_t)1234,0666|IPC_CREAT);
尽管上面也用了msgget()函数但是这个函数有查询功能就是有相同名称的消息队列就不自创建,深入内核时会看到这个作用。
if(msgid==-1){
fprintf(stderr,"msgget failed/n");
exit(EXIT_FAILURE);
}

while(running){
printf("enter some text:");
fgets(buffer,BUFSIZ,stdin);
some_data.my_msg_type=1;
strcpy(some_data.some_text,buffer);
上面是我们在控制台上让用户输入一些文字并复制声明的结构变量some_data的some_text,它是一个char 类型的数组,我们继续往下看

if(msgsnd(msgid,(void*)&some_data,BUFSIZ,0)==-1){
fprintf(stderr,"msgsnd failed /n");
exit(EXIT_FAILURE);
}
我们通过msgsnd()函数将some_data做为消息发送出给接收消息队列的程序
if(strncmp(some_data.some_text,"end",3)==0){
running=0;
}
}
exit(EXIT_SUCCESS);
}



至于上面其余的代码相信有C语言基础的朋友都能看懂,如果你还是看不明白,需要补习一下C语言的基础了,下面编译程序
$gcc -o msg2 msg2.c
$gcc -o msg1 msg1.c
启动发送消息队列程序并输入下面的文字
$./msg2 
Enter some text:hello Enter some text:Are you ready? Enter some text:end 启动接收消息队列程序自动显示出文字
$./msg1
You wrote:hello
You wrote:Are you ready? You wrote:end $ 我们可以从上面的二个程序中看到分别使用了四个消息队列的函数,我是无名小卒,我们要追踪的内核版本是2.6.26,这个版本的内核是我写文章时最新的,也是我打算一段时间稳定使用的版本: msgget((key_t)1234,0666|IPC_CREAT); msgsnd(msgid,(void*)&some_data,BUFSIZ,0); msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0); msgctl(msgid,IPC_RMID,0);   下面我们分别追踪他们到内核,了解到底内核是如何处理的,首先是   msgget((key_t)1234,0666|IPC_CREAT);   上面这些函数全部是能过系统调用ipc()进入内核的,所以msgget()当然不能例外,系统调用我们在这里就不谈了,将在中断的实践部分再进行详述,大家只需要知道我们现在已经通过系统调用进入了内核,上面四个消息队列的统一入口是sys_ipc(),他的代码在2.6.26版本内核中arch/x86/kernel/sys_i386_32.c文件中第106行开始:    

asmlinkage
int
sys_ipc (uint call,
int first,
int
second,
            int third,
void __user *ptr,
long fifth)
{

上面asmlinkage是一个宏标识,他的作用是说明后边的函数在C语言和汇编的交互过程中要通过堆栈传递参数,更多的linux宏标识知识请参考这里http://www.diybl.com/course/6_system/linux/Linuxjs/200896/139512.html
    int version, ret;

    version = call
>
> 16;
/* hack for backward compatibility */
    call &= 0xffff;

    switch (call)
{
    case SEMOP:
        return sys_semtimedop
(
first, (struct sembuf __user
*)ptr, second,
NULL);
    case SEMTIMEDOP:
        return sys_semtimedop(first,
(struct sembuf __user
*)ptr, second,
                    (const
struct timespec __user
*)fifth);

    case SEMGET:
        return sys_semget
(
first, second, third);
    case SEMCTL:
{
        union semun fourth;
        if (!ptr)
            return
-
EINVAL;
        if (get_user(fourth.__pad,
(void __user
* __user *) ptr))
            return
-
EFAULT;
        return sys_semctl
(
first, second, third, fourth);
    }

    case MSGSND:
        return sys_msgsnd
(
first, (struct msgbuf __user
*) ptr,

                 second, third);
    case MSGRCV:
        switch (version)
{
        case 0:
{
            struct ipc_kludge tmp;
            if (!ptr)
                return
-
EINVAL;
            
            if (copy_from_user(&tmp,
                     (struct ipc_kludge __user
*) ptr,

                     sizeof
(
tmp)))
                return
-
EFAULT;
            return sys_msgrcv
(
first, tmp.msgp, second,
                     tmp.msgtyp, third);
        }
        default:
            return sys_msgrcv
(
first,
                     (struct msgbuf __user
*) ptr,
                     second, fifth, third);
        }
    case MSGGET:
        return sys_msgget
(
(key_t) first, second);
    case MSGCTL:
        return sys_msgctl
(
first, second,
(struct msqid_ds __user
*) ptr);

    case SHMAT:
        switch (version)
{
        default:
{
            ulong raddr;
            ret = do_shmat
(
first, (char __user
*) ptr, second,
&raddr);
            if (ret)
                return ret;
            return put_user
(
raddr, (ulong __user
*) third);
        }
        case 1:    /* iBCS2 emulator entry point */
            if (!segment_eq(get_fs(),
get_ds()))
                return
-
EINVAL;
            /* The "(ulong *) third" is valid _only_ because of the kernel segment thing */
            return do_shmat
(
first, (char __user
*) ptr, second,
(ulong *) third);
        }
    case SHMDT:

        return sys_shmdt
(
(char __user
*)ptr);
    case SHMGET:
        return sys_shmget
(
first, second, third);
    case SHMCTL:
        return sys_shmctl
(
first, second,
                 (struct shmid_ds __user
*) ptr);
    default:
        return -ENOSYS;
    }
}

sys_ipc()函数的代码比较容易懂,从中我们也可以看出进程间通信的信号量还有共享内存也是通过个系统调用入口进入的,我们先不管其他的,重点看我们关于消息队列的部分,不言而喻,msgget()进入sys_ipc()时,肯定会走到这里

一、消息队列的建立

case MSGGET:
        return sys_msgget
(
(key_t) first, second);

至于上面的参数call是如何设置的,那应该是在C语言中的库函数时过渡到linux系统调用时自动设置好的,我们先不要关心这个过程,只要顺着内核进入抓关键,现在我们看到在这里调用了一个sys_msgget()函数,而这个函数的参数与上面msgget()的对照一下会发现完全相同的个数,我们追踪进入sys_msgget()去看一下,先列出这个函数,这个函数位于ipc/msg.c中第312行:

asmlinkage
long
sys_msgget(key_t key,
int msgflg)
{
    struct ipc_namespace
*
ns;
    struct ipc_ops msg_ops;
    struct ipc_params msg_params;

    ns = current->nsproxy->ipc_ns;

    msg_ops.getnew
=
newque;
    msg_ops.associate
=
msg_security;
    msg_ops.more_checks
=
NULL;

    msg_params.key
=
key;
    msg_params.flg
=
msgflg;

    return ipcget(ns,
&msg_ids(ns),
&msg_ops,
&
msg_params);
}

我们先来看一下这二个参数,我建议朋友们在学习内核的函数时一定要注意函数的参数,包括他们的类型,还有就是函数的返回类型,这些对阅读内核源码非常必要,尤其是我们在追踪一个函数的调用过程时,参数更是环节中的纽带,第一个参数是key_t key,它是一个key_t类型,内核中定义它为int,但是我们从msgget()的参数界面上

msgget((key_t)1234,0666|IPC_CREAT)

第一个参数是转换为key_t类型,这是为了保持与进入sys_msgget()函数保持参数对正,我们可看出传递进来的key这个值是1234,在这里我们要强调一下,这个key的作用是为了区分消息队列(有的称为报文队列,以下我们统称消息队列)用的,朋友们都知道文件系统中的文件区分是用文件号来进行的,所以消息队列使用一个键值意义上等同于文件号,但是他并没有使用文件号。

第二个参数是int msgflg,类型是int,参数名称msgflg意为message flag,很清晰,代表着消息队列的一些标记设置,在这里我们看到传递进来的是0666|IPC_CREAT,采用的是八进制我们从上面的sys_msgget()中看到这二个函数使用在

msg_params.key
= key;
msg_params.flg = msgflg;

struct ipc_params msg_params;可以看出这是一个进程间通信时传递参数的一个数据结构,另外二个数据结构变量struct ipc_namespace*ns和struct ipc_ops msg_ops我们在应用到时再看这二个结构,具体这三个结构的作用我们追踪中自然能看明白,也有的朋友可以从字面上猜出个大概,我们的学习方法是从应用中理解他们的作用和概念,现在我们只需要知道sys_msgget()这个函数一转眼就把msgget()传来的参数转交给了ipcget()函数。

static
inline int ipcget(struct ipc_namespace
*ns,
struct
ipc_ids *ids,
            struct ipc_ops
*
ops, struct ipc_params
*params)
{
    if (params->key
== IPC_PRIVATE)
        return ipcget_new(ns, ids, ops, params);
    else
        return ipcget_public(ns, ids, ops, params);
}

这个函数在ipc/util.h文件的804行,我是无名小卒,我们使用的这个内核版本是2.6.26,当前最新的内核版本。从上面的ipcget()的参数中我们可看出有一个没有在上面函数中出现的结构struct ipc_ids*ids,还是要看上面传递下来的参数,这个对应的参数是&msg_ids(ns), 我们得说明一下这个参数是从何而来

#define msg_ids(ns)    (*((ns)->ids[IPC_MSG_IDS]))

struct ipc_namespace {
...
 struct ipc_ids *ids[3];

...
};

#define IPC_SEM_IDS 0
#define IPC_MSG_IDS 1
#define IPC_SHM_IDS 2

它实际上最终是指向ipc_namespace结构变量nc中的ids,我们看到ids结构数组有三个元素,分别是代表着信号量、消息队列和共享内存,这里的参数&msg_ids(ns)是取得数组中的消息队列元素,它是一个ipc_ids数据结构

struct ipc_ids
{
    int in_use;
    unsigned short seq;
    unsigned short seq_max;
    struct rw_semaphore rw_mutex;
    struct idr ipcs_idr;
};

这个数据结构中的ipcs_idr指向另一个数据结构,朋友们不要担心“这么多数据结构我们能记得住吗?”我们只需要先有一个印象,知道有这么个结构就行了,等到下面追踪到时至少可以有点似曾相识的感觉,那时就会明白他的具体作用了,我们看一下这个idr结构,它肯定也是一个关键

struct idr
{
    struct idr_layer
*
top;
    struct idr_layer
*
id_free;
    int         layers;
    int         id_free_cnt;
    spinlock_t     lock;
};

结构中还有二个数据结构,不要害怕,数据结构也是一些数据的容器而已,车到山前必有路,先了解一下为好

struct idr_layer
{
    unsigned long         bitmap;
/* A zero bit means "space here" */
    struct idr_layer    *ary[1<<IDR_BITS];
    int            
count
;    
/* When zero, we can release it */

};

看不懂他的作用,不要紧先放着,等到用到时再回来看,自然知道作用了

回到ipcget()函数中,我们可以看到if (params->key== IPC_PRIVATE),params参数是上面我们传递进来的key参数,这里IPC_PRIVATE是0,他的使用是判断是否是建立私用的消息队列也就是自己发送自己接收的队列,我们看到传递进来的参数是1234,肯定不是私有队列。虽然我们的追踪应该进入ipcget_public()这个函数,但是,为了理清很多数据结构的关系我们需要说明ipcget_new()这条线索来先掌握几个数据结构的关系,这个函数在Util.c的300行处:

/**
 *    ipcget_new    -    create a new ipc object
 *    @ns: namespace
 *    @ids: IPC identifer set
 *    @ops: the actual creation routine to call
 *    @params: its parameters
 *
 *    This routine is called by sys_msgget, sys_semget() and sys_shmget()
 *    when the key is IPC_PRIVATE.
 */

static int ipcget_new(struct ipc_namespace
*ns,
struct
ipc_ids *ids,
        struct ipc_ops
*
ops, struct ipc_params
*params)
{
    int err;
retry:
    err = idr_pre_get(&ids->ipcs_idr, GFP_KERNEL);

    if (!err)
        return -ENOMEM;

    down_write(&ids->rw_mutex);
    err = ops->getnew(ns, params);
    up_write(&ids->rw_mutex);

    if (err
==
-
EAGAIN)
        goto retry;

    return err;
}

/**
 * idr_pre_get - reserver resources for idr allocation
 * @idp:    idr handle
 * @gfp_mask:    memory allocation flags
 *
 * This function should be called prior to locking and calling the
 * following function. It preallocates enough memory to satisfy
 * the worst possible allocation.
 *
 * If the system is REALLY out of memory this function returns 0,
 * otherwise 1.
 */

int idr_pre_get(struct idr
*idp, gfp_t gfp_mask)
{
    while (idp->id_free_cnt
< IDR_FREE_MAX)
{
        struct idr_layer
*
new;
        new = kmem_cache_alloc(idr_layer_cache, gfp_mask);
        if (new
==
NULL
)
            return
(
0);
        free_layer(idp,
new);
    }
    return 1;
}

代码注释中也说明了这个是一个检测idr分配的数量函数,如果小于固定的值IDR_FREE_MAX就会按照做为参数传递下来的gfp_mask在专用的slab缓存中分配idr_layer,然后通过free_layer()函数让idp的id_free空闲指针指向它,这个函数在根目录中lib下的idr.c的61处,相信读者可以自己看明白

/* only called when idp->lock is held */
static void __free_layer(struct idr
*idp,
struct
idr_layer *p)
{
    p->ary[0]
= idp->id_free;
    idp->id_free
= p;
    idp->id_free_cnt++;
}

static void free_layer(struct idr
*idp,
struct
idr_layer *p)
{
    unsigned long flags;

    
/*
     * Depends on the return element being zeroed.
     */

    spin_lock_irqsave(&idp->lock, flags);
    __free_layer(idp, p);
    spin_unlock_irqrestore(&idp->lock, flags);
}

回到ipcget_new()函数中我们看到下面通过ops->getnew()调用作为参数传下来的msg_ops,它是一个struct ipc_ops结构变量,另外我们忽略了关于信号量的细节,这些内容将在其他的追踪章节专门叙述,这里我们看一下是在sys_msgget()函数中的320行处

msg_ops.getnew = newque;

所以在这里其实是调用的newque这个函数,我们跟进看一下,这个函数在ipc目录下的msg.c的180行:

/**
 * newque - Create a new msg queue
 * @ns: namespace
 * @params: ptr to the structure that contains the key and msgflg
 *
 * Called with msg_ids.rw_mutex held (writer)
 */

static int newque(struct ipc_namespace
*ns,
struct
ipc_params *params)
{
    struct msg_queue
*
msq;
    int id, retval;
    key_t key = params->key;
    int msgflg = params->flg;

    msq = ipc_rcu_alloc(sizeof(*msq));
    if (!msq)
        return -ENOMEM;

    msq->q_perm.mode
= msgflg & S_IRWXUGO;
    msq->q_perm.key
= key;

    msq->q_perm.security
= NULL;
    retval = security_msg_queue_alloc(msq);
    if (retval)
{
        ipc_rcu_putref(msq);
        return retval;
    }

    
/*
     * ipc_addid() locks msq
     */

    id = ipc_addid(&msg_ids(ns),
&msq->q_perm, ns->msg_ctlmni);
    if (id
<
0) {
        security_msg_queue_free(msq);
        ipc_rcu_putref(msq);
        return id;
    }

    msq->q_stime
= msq->q_rtime
= 0;
    msq->q_ctime
= get_seconds();
    msq->q_cbytes
= msq->q_qnum
= 0;
    msq->q_qbytes
= ns->msg_ctlmnb;
    msq->q_lspid
= msq->q_lrpid
= 0;
    INIT_LIST_HEAD(&msq->q_messages);
    INIT_LIST_HEAD(&msq->q_receivers);
    INIT_LIST_HEAD(&msq->q_senders);

    msg_unlock(msq);

    return msq->q_perm.id;
}

这个函数是创建消息队列的关键函数,首先通过ipc_rcu_alloc()分配一个msg_queue结构空间,security_msg_queue_alloc和ipc_rcu_putref函数都是2.6内核新添加的一种新的RCU锁机制(Read-Copy Update),我们先跳过,有兴趣的朋友可以先参考这篇,我是无名小卒,喜欢的朋友可以看http://www.ibm.com/developerworks/cn/linux/l-rcu/文章,我们把重点围绕在消息队列的创建主线上,我们需要先看一下msg_queue这个数据结构

/* one msq_queue structure for each present queue on the system */
struct msg_queue {
    struct kern_ipc_perm q_perm;
    time_t q_stime;            /* last msgsnd time */
    time_t q_rtime;            /* last msgrcv time */
    time_t q_ctime;            /* last change time */
    unsigned long q_cbytes;        /* current number of bytes on queue */
    unsigned long q_qnum;        /* number of messages in queue */
    unsigned long q_qbytes;        /* max number of bytes on queue */
    pid_t q_lspid;            /* pid of last msgsnd */
    pid_t q_lrpid;            /* last receive pid */

    struct list_head q_messages;
    struct list_head q_receivers;
    struct list_head q_senders;
};

linux系统中每一个消息队列都有这么一个数据结构来表示,结构中的第一个变量是一个struct kern_ipc_perm ,在以前的2.4内核中这个是至关重要的一个结构变量,它是多个消息队列的连接头

/* used by in-kernel data structures */
struct kern_ipc_perm
{
    spinlock_t    lock;
    int        deleted;
    int        id;
    key_t        key;
    uid_t        uid;
    gid_t        gid;
    uid_t        cuid;
    gid_t        cgid;
    mode_t        mode;
    unsigned long    seq;
    void        *security;
};

先大体看一下这个结构中的变量,可以从声明的名称上猜出个大概主要是装载着键值和进程的一些信息及权限等内容,我们先知道这些在newque()函数中进一步调用ipc_addid()

/**
 *    ipc_addid     -    add an IPC identifier
 *    @ids: IPC identifier set
 *    @new: new IPC permission set
 *    @size: limit for the number of used ids
 *
 *    Add an entry 'new' to the IPC ids idr. The permissions object is
 *    initialised and the first free entry is set up and the id assigned
 *    is returned. The 'new' entry is returned in a locked state on success.
 *    On failure the entry is not locked and a negative err-code is returned.
 *
 *    Called with ipc_ids.rw_mutex held as a writer.
 */

 
int ipc_addid(struct ipc_ids* ids,
struct kern_ipc_perm*
new,
int
size)
{
    int id, err;

    if (size
> IPCMNI)
        size = IPCMNI;

    if (ids->in_use
>= size)
        return -ENOSPC;

    err = idr_get_new(&ids->ipcs_idr,
new,
&
id);
    if (err)
        return err;

    ids->in_use++;

    new->cuid
= new->uid
= current->euid;
    new->gid
= new->cgid
= current->egid;

    new->seq
= ids->seq++;
    if(ids->seq
> ids->seq_max)
        ids->seq
= 0;

    new->id
= ipc_buildid(id,
new->seq);
    spin_lock_init(&new->lock);
    new->deleted
= 0;
    rcu_read_lock();
    spin_lock(&new->lock);
    return id;
}

这个函数总的来说就是为消息队列分配一个标识符,首先在idr_get_new()函数中分配一个新的idr结构项

int idr_get_new(struct idr
*idp,
void
*ptr,
int *id)
{
    int rv;

    rv = idr_get_new_above_int(idp, ptr, 0);
    
/*
     * This is a cheap hack until the IDR code can be fixed to
     * return proper error values.
     */

    if (rv
<
0) {
        if (rv
==
-
1)
            return
-
EAGAIN;
        else /* Will be -3 */
            return
-
ENOSPC;
    }
    *id = rv;
    return 0;
}

在这里我们不得不说一下idr机制了,idr在linux内核中指的就是整数ID管理机制,从本质上来说,这就是一种将整数ID号和特定指针关联在一起的机制。这个机制最早是在2003年2月加入内核的,当时是作为POSIX定时器的一个补丁。现在,在内核的很多地方都可以找到idr的身影。采用radix树实现,可以很方便地将整数和指针关联起来,并且具有很高的搜索效率。

我们来看是如何通过idr_get_new_above_int建立新的idr的,这个代码在lib/idr.c的236行处

static
int idr_get_new_above_int(struct idr
*idp,
void
*ptr,
int starting_id)
{
    struct idr_layer
*
pa[MAX_LEVEL];
    int id;

    id = idr_get_empty_slot(idp, starting_id, pa);
    if (id
>
= 0)
{
        
/*
         * Successfully found an empty slot. Install the user
         * pointer and mark the slot full.
         */

        pa[0]->ary[id
& IDR_MASK]
= (struct idr_layer
*)ptr;
        pa[0]->count++;
        idr_mark_full(pa, id);
    }

    return id;
}

进一步调用idr_get_empty_slot()函数

static
int idr_get_empty_slot(struct idr
*idp,
int
starting_id,
             struct idr_layer
*
*pa)
{
    struct idr_layer
*
p, *new;
    int layers, v, id;
    unsigned long flags;

    id = starting_id;
build_up:
    p = idp->top;
    layers = idp->layers;
    if (unlikely(!p))
{
        if (!(p
= alloc_layer(idp)))
            return
-
1;
        layers = 1;
    }
    
/*
     * Add a new layer to the top of the tree if the requested
     * id is larger than the currently allocated space.
     */

    while ((layers
< (MAX_LEVEL
- 1))
&&
(
id >=
(1 <<
(layers*IDR_BITS))))
{
        layers++;
        if (!p->count)
            continue;
        if (!(new
= alloc_layer(idp)))
{
            
/*
             * The allocation failed. If we built part of
             * the structure tear it down.
             */

            spin_lock_irqsave(&idp->lock, flags);
            for (new
= p; p
&
& p != idp->top;
new = p)
{
                p = p->ary[0];
                new->ary[0]
= NULL;
                new->bitmap
= new->count
= 0;
                __free_layer(idp,
new);
            }
            spin_unlock_irqrestore(&idp->lock, flags);
            return
-
1;
        }
        new->ary[0]
= p;
        new->count
= 1;
        if (p->bitmap
== IDR_FULL)
            __set_bit(0,
&new->bitmap);
        p = new;
    }
    idp->top
= p;
    idp->layers
= layers;
    v = sub_alloc(idp,
&id, pa);
    if (v
=
= -2)
        goto build_up;
    return(v);
}

这个函数中我们可以看到关键的函数alloc_layer()它实际上是取得idr_pre_get()函数中的建立的空闲的idr_layer

static
struct idr_layer *alloc_layer(struct idr
*idp)
{
    struct idr_layer
*
p;
    unsigned long flags;

    spin_lock_irqsave(&idp->lock, flags);
    if ((p
= idp->id_free))
{
        idp->id_free
= p->ary[0];
        idp->id_free_cnt--;
        p->ary[0]
= NULL;
    }
    spin_unlock_irqrestore(&idp->lock, flags);
    return(p);
}

只不过在前边我们看到的idr_pre_get()函数中代码中 

   p->ary[0]= idp->id_free;
    idp->id_free= p;
    idp->id_free_cnt++;

将建立的空闲的idr_layer放到idr中的id_free指向处,也就是说他将在下一次的idr分配时用掉,在alloc_layer()函数中我们可以看到它用掉的是前一次的空闲idr_layer。为idr指定了一些idr_layer及数目后进一步使用sub_alloc()对idr_layer进行分配,这个函数比较大,但是目的很明确就是基于struct idr_layer *pa这个队列数组元素的分配,有兴趣的朋友可以看一下这个函数,我们不做分析了,总之,回到idr_get_new_above_int时,我们可以想象idr数据结构已经分配好了,再继续往下看关键的一句

  /*
   * Successfully found an empty slot.  Install the user
   * pointer and mark the slot full.
   */
  pa[0]->ary[id & IDR_MASK] = (struct idr_layer *)ptr;

结合注释部分可以看到这里就是把我们的消息队列头struct kern_ipc_perm指针转化为idr_layer类型的指针存放到pa的结构队列数组中。是只使用IDR_MASK做为掩码保证不超过数组的个数即1<<IDR_BITS,IDR_BITS定义为6。即ary数组包括64个idr_layer结构元素。在上面的分析过程中我们对idr的机制只是一个简要的描述,我是无名小卒,如果有兴趣的朋友可以参考http://blog.csdn.net/onlyzhangqin/archive/2008/06/06/2518081.aspx文章,从中可以看出它是为了解决linux中的一些关于超过32个结构数组队列而产生的一种新的算法机制。到此我们看一下idr_get_new()已经建立了一个idr结构并且已经与消息队列通过idr_layer和kern_ipc_perm结构指针的转换指向实现了对接,可以说这是整个创建的关键,回到ipc_addid()函数中是对消息队列头kern_ipc_perm的一些设置,再回到newque()函数后进一步对新立的消息队列进行设置。读者可以自行阅读那些设置。无非是对一些进程信息和创建时间及大小等内容的设置,但是关键有三个队列必须一提:

    INIT_LIST_HEAD(&msq->q_messages);
    INIT_LIST_HEAD(&msq->q_receivers);
    INIT_LIST_HEAD(&msq->q_senders);

初始化了三个关键的队列头。至此,我们应该可看出newque实际上就是建立一个消息队列并将ipc_namespace结构数组中的消息队列ipc_ids 中的idr下的idr_layer与消息队列头挂上钩。有点绕,但是我们可以理解上述那些数据结构的作用了。确实看应用要比直白的看数据要容易的多。

回到ipcget()函数中,假如我们不是建立私有的消息队列,那么就会走ipcget_public()这条路线,而这正是我们追踪的路线,上面的说明是打下了一个铺垫,我们来看ipcget_public这个函数,他在ipc/util.c中的364行

/**
 *    ipcget_public    -    get an ipc object or create a new one
 *    @ns: namespace
 *    @ids: IPC identifer set
 *    @ops: the actual creation routine to call
 *    @params: its parameters
 *
 *    This routine is called by sys_msgget, sys_semget() and sys_shmget()
 *    when the key is not IPC_PRIVATE.
 *    It adds a new entry if the key is not found and does some permission
 * / security checkings if the key is found.
 *
 *    On success, the ipc id is returned.
 */

static int ipcget_public(struct ipc_namespace
*ns,
struct
ipc_ids *ids,
        struct ipc_ops
*
ops, struct ipc_params
*params)
{
    struct kern_ipc_perm
*
ipcp;
    int flg = params->flg;
    int err;
retry:
    err = idr_pre_get(&ids->ipcs_idr, GFP_KERNEL);

    /*
     * Take the lock as a writer since we are potentially going to add
     * a new entry + read locks are not "upgradable"
     */

    down_write(&ids->rw_mutex);
    ipcp = ipc_findkey(ids, params->key);
    if (ipcp
==
NULL
) {
        /* key not used */
        if (!(flg
& IPC_CREAT))
            err =
-
ENOENT;
        else if
(!err)
            err =
-
ENOMEM;
        else
            err = ops->getnew(ns, params);
    } else
{

        /* ipc object has been locked by ipc_findkey() */

        if (flg
& IPC_CREAT && flg
& IPC_EXCL)
            err =
-
EEXIST;
        else {
            err = 0;
            if (ops->more_checks)
                err = ops->more_checks(ipcp, params);
            if (!err)
                /*
                 * ipc_check_perms returns the IPC id on
                 * success
                 */

                err = ipc_check_perms(ipcp, ops, params);
        }
        ipc_unlock(ipcp);
    }
    up_write(&ids->rw_mutex);

    if (err
==
-
EAGAIN)
        goto retry;

    return err;
}

idr_pre_get()我们已经在前边看到过了,主要是创建一个idr结构中的idr_layer,

继续往下看一个关键的函数ipc_findkey()

/**
 *    ipc_findkey    -    find a key in an ipc identifier set    
 *    @ids: Identifier set
 *    @key: The key to find
 *    
 *    Requires ipc_ids.rw_mutex locked.
 *    Returns the LOCKED pointer to the ipc structure if found or NULL
 *    if not.
 *    If key is found ipc points to the owning ipc structure
 */

 
static struct kern_ipc_perm
*ipc_findkey(struct ipc_ids
*ids, key_t key)
{
    struct kern_ipc_perm
*
ipc;
    int next_id;
    int total;

    for (total
= 0, next_id
= 0; total
<
ids->in_use; next_id++)
{
        ipc = idr_find(&ids->ipcs_idr, next_id);

        if (ipc
==
NULL
)
            continue;

        if (ipc->key
!= key)
{
            total++;
            continue;
        }

        ipc_lock_by_ptr(ipc);
        return ipc;
    }

    return NULL;
}

期间又调用了另一个函数idr_find()

/**
 * idr_find - return pointer for given id
 * @idp: idr handle
 * @id: lookup key
 *
 * Return the pointer given the id it has been registered with. A %NULL
 * return indicates that @id is not valid or you passed %NULL in
 * idr_get_new().
 *
 * The caller must serialize idr_find() vs idr_get_new() and idr_remove().
 */

void *idr_find(struct idr
*idp,
int
id)
{
    int n;
    struct idr_layer
*
p;

    n = idp->layers
* IDR_BITS;
    p = idp->top;

    /* Mask off upper bits we don't use for the search. */
    id &= MAX_ID_MASK;

    if (id
>
= (1
<< n))
        return NULL;

    while (n
> 0 && p)
{
        n -= IDR_BITS;
        p = p->ary[(id
>> n)
& IDR_MASK];
    }
    return((void
*)p);
}

我们可以看到idr_find()函数在ids中的idr下寻找指定的idr_layer 指针,并返回给kern_ipc_perm指针。关键的ary数组我们在上面的newque()函数中看到过了,这里的意思就是说如果能找到消息队列头的话就使kern_ipc_perm 结构变量指针*ipcp指向他,如果没有找到就要根据IPC_CREAT标志来决定是否创建消息队列,在这里我们看到追踪传递下的标志中有IPC_CREAT标志,所以如果没有存在就会在这里进入ops->getnew(ns, params),而这里我们可以看到是与newque()函数走的一条路线,我们就不用细说了,接着ipc_check_perms()函数对消息队列的权限进行检验,这个函数很简单我们不看了。从上面的代码中我们可以追踪看出如何从应用程序到内核建立起的消息队列。今天时间关系,暂且写至此,明天继续