I/O多路复用 SELECT POLL -- 内核实现

时间:2022-03-01 16:28:43

等待队列

先补充个基础知识――等待队列

认识

定义

wait_queue_head_t wait_queue;

初始化

init_waitqueue_head(&wait_queue);

等待

wait_event(queue, condition)   等待某个条件而进入睡眠

wait_event_interruptible(queue, condition)  等待某个条件而进入睡眠并允许信号中断睡眠

wait_event_timeout(queue, condition,timeout) 等待某个条件而进入睡眠 最多等待timeout时间

wait_event_interruptible_timeout(queue, condition,timeout)

唤醒

void wake_up(wait_queue_head_t *queue);   唤醒阻塞在该等待队列上的进程

void wake_up_interruptible(wait_queue_head_t *queue);

使用

假设你的设备驱动程序在中断中接收数据,为用户空间提供读取的操作。

你可以这样处理:

1、为简单说明,不考虑同步。

read()

{

            If(len > 0)

                    Read...

                    Return len;

    }else {

                Return 0;

    }

}

Irq_handler()

{

       Recv...

Add Len

    }

这是一种非阻塞的实现

2、

Read()

{

          If(wait_event_interruptible(wait_queue, len > 0)) {

              Return error;

}

Read...

Return len;

}

Irq_handler()

{

        recv

        Add len

        wake_up_interruptible(&wait_queue);

}

利用等待队列实现的阻塞方式,无数据会把自己放到等待队列中进入睡眠,当数据到来发生中断时,在中断中唤醒睡眠中等待队列上的进程进行处理。当然阻塞其实是和睡眠无关的,这里你无数据可以忙等,但睡眠是更优雅的方式。

进一步分析

wait_event

跟进wait_event(queue, condition)会发现他定义了一个wait_queue_t __wait {.private = current, .func = autoremove_wake_function, },然后将__wait放到了等待队列queue中,即放到了queue的task_list链表中。

接下来设置当前进程的状态为TASK_UNINTERRUPTIBLE,并调用schedule(),调度并切换到一个新的进程开始运行。

设置为TASK_UNINTERRUPTIBLE的进程,不会再被系统调度执行,会一直死在这里。到此,该进程让出了CPU不再执行,可以认为他进入了睡眠。

wake_up

跟进wake_up(queue),他其实遍历queue的task_list链表,对每个结点(wait_queue_t类型),调用其func函数。

而此时queue里面应该放着wait_event时放入的__wait,于是wake_up调用了__wait->func函数,__wait->func即autoremove_wake_function函数。

跟进autoremove_wake_function,发现函数里面调用了try_to_wake_up,其参数就是__wait中赋予的current值,这样就实现了在其他进程或中断中,唤醒之前睡眠的进程。

try_to_wake_up中的处理比较复杂,不再继续跟了,我们可以确定try_to_wake_up将之前睡眠的进程状态设为TASK_RUNING,这样之前的进程就可以继续被调度执行了,即被唤醒了。

执行完try_to_wake_up后,将__wait从queue中删除,wake_up的工作就完成了。

再次回到wait_event

之前我们知道,进程在调用schedule后就睡了,然后被其他进程或者中断wake_up唤醒了,那么进程唤醒后应该继续在schedule后继续执行。

继续跟进,schedule返回后,会首先判断条件condition是否成立,如果不成立,再次定义__wait,然后添加到等待队列,schedule睡眠。如果成立,那么wait_event执行完成,进程等待的条件满足,可以继续处理了。

wait_event_timeout

wait_event_timeout与wait_event的不同是wait_event调用的是schedule,而wait_event_timeout调用的是schedule_timeout。

schedule_timeout里面又调用了schedule,但在调用之前,他定义了一个定时器,定时器在指定的timeout超时时,调用wake_up_process,进而调用try_to_wake_up唤醒进程。也就是说wait_event_timetou除了依赖于其他进程或中断唤醒自己,本身还有个定时器可以唤醒自己。

select

我们知道select同时可以监视多个描述符,只要任一个有事件,就可以直接返回处理。如果都没有事件则select睡眠等待,并且任一一个描述符有事件就可以唤醒select。其实现是基于等待队列的。原理简单的讲就是每个描述符都对应一个等待队列,每个描述符对应的驱动都提供一个poll方法。Select调用描述符的poll方法,检查是否有事件,当没有事件时,定义一个wait_queut_t的对象,放到描述符的等待队列中。当select检查到没有事件进入睡眠后,任一个描述符有事件,执行唤醒等待队列的操作就可以唤醒select。

Select的系统调用sys_select,在fs/select.c中(linux 2.6.27内核),其调用路径为sys_select -> core_sys_select -> do_select。接下来我们看下slect系统调用的具体实现,代码比较多,只捡重点的部分看,其他细节有时间再研究。

用户空间在使用select时,会定义fd_set类型的变量,对应于不同的事件有readset、writeset、exceptset,其实他们都是unsigned long类型的数组,数组中的每一位标识一个fd,我们常用的FD_SET(fd, set),是将set中的数组的第fd位设为1。我们关心fd的那几个事件,就将相应的set的第fd位置一,传给内核,通知内核帮我监视,有情况告诉我。通过看内核对fd_set的定义,可以看出fd_set是一个1024位的数组,也就是最多支持1024个fd,如果需要支持更多的fd,需要修改代码重新编译内核了。

内核空间中,core_sys_select函数首先定义了一个long类型的数组,如果fd个数多,数组不够,他会调用kmalloc,动态申请一个数组。数组的使用分为六块,如下图所示,每块其实都是一个小的fd_set,只是fd_set是固定长度(1024位,注意是位不是字节)的数组,但这里每块的长度是和真实的fd的个数有关的。

I/O多路复用  SELECT  POLL  -- 内核实现

接下来core_sys_select调用get_fd_set将用户空间传递的readset、writeset、exceptset拷贝到in、out、ex中,然后调用do_select,将这个大数组传给他。do_select通过in、out、ex里面的位标识,得到要监视哪些fd,监视哪些事件(read、write、except),将监视的结果记录到res_in、res_out、res_ex中。返回到core_sys_select,程序调用set_fd_set将res_in、res_out、res_ex中的结果,拷贝到用户空间。select系统调用返回,就获得事件处理了。

上一步提到了do_select,我们进一步研究研究他。

首先设置当前进程状态 set_current_state(TASK_INTERRUPTIBLE);(这块我还不是很了解,内核没有抢占吗,如果设置状态后,切换出去了,岂不永远都切不回来了,一是此时还没添加唤醒的处理,不会有其他进程唤醒他,二是CPU不会调度TASK_INTERRUPTIBLE状态的进程执行。那么这里是没有内核抢占还是设置了TASK_INTERRUPTILBE的进程不会没抢占?)

然后循环扫描的in、out、ex中的信息(哪些fd关心read事件、哪些fd关心write事件、哪些fd关心except事件),调用具体的fd的驱动相关的poll函数获取fd的事件的状态,根据返回的状态,将结果设置到res_in、res_out、res_ex。其实很简单,如果in中的第n位为一,标识fd=n的描述符关心read事件,在调用fd=n对应的驱动的poll之后,如果有read事件,则将res_in中的n位置一。

(cond_resched这个函数是做什么的?)

在处理完一轮后(处理完了in、out、ex中的请求),如果fd请求的事件发生了,则返回,如果都没有发生则调用schedule_timeout,进入睡眠,等待事件到来时被唤醒。

好,我们看看,do_select是怎样在有事件时被唤醒的。在这之前,我们先想想如果我们自己来做,如何利用等待队列实现。大体思路,我们应该定义一个等待队列wait_queue_head_t queue,select在没有事件时,定义一个wait_queue_t的对象wait放到queue中,然后调度schedule进入睡眠。在驱动中,当事件到来时,遍历等待在queue的wait并唤醒。其实内核实现就是这个思路,支持阻塞IO的驱动实现中,通常会定义三个等待队列,对应于read、write、except,select调用到poll中时,如果没有事件,会定义一个wait_queue_t的wait放到等待队列中,当驱动检查到事件发生时,会唤醒睡在等待队列上的进程。

接下来看看select在睡之前做了哪些准备工作,怎样将wait加入到等待队列中的。

先了解一下do_select中使用的一个数据结构

struct poll_wqueues {

  poll_table pt;

  struct poll_table_page *table;

  struct task_struct *polling_task;

  int triggered;

  int error;

  int inline_index;

  struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];

}

do_select声明了这样类型的一个对象table,然后初始化其成员polling_task = current, pt->qproc = __pollwait。

接下来在调用各fd对应的驱动的poll时,将table.pt(poll_table类型)作为参数传入。

我们知道各个驱动模块实现的各自的poll函数中,如果自己没有read、write、except事件,会调用poll_wait函数,参数wait_address是驱动中声明的等待队列,p是调用poll时传入的table.pt。以下是poll_wait的实现:

static inline void poll_wait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)

{

  p->qproc(file, wait_address, p);

}

可以看到poll_wait中调用的p->qproc就是之前初始化poll_wqueue时,指定的__pollwait函数。

static void __pollwait(struct file *filp,wait_queue_head_t *wait_address, poll_table *p);

struct poll_table_entry {

  struct file *filp;

  unsigned long key;

  wait_queue_t wait;

  wait_queue_head_t *wait_address;

}

__pollwait中首先获取一个poll_table_entry类型的变量entry,获取其实是在poll_wqueue的inline_entries中拿的。然后初始化entry,entry->file = file;entry->key = p->key;entry->wait.func = pollwake,最后将entry->wait添加到等待队列wait_address中。

所有的准备工作做好了,如果没有事件产生,do_select调度schedule进入了睡眠。

唤醒一般在中断或者软中断中处理的,一般在检查到事件到来时,驱动中会调用wake_up函数,参数为驱动中定义的等待队列。

追踪wake_up函数,最终调用了__wake_up_common,在这个函数中,遍历wait_queue_head_t中的结点,每个结点是wait_queue_t类型,调用每个结点的func指针指向的函数。前面我们知道func指针指向了pollwake,pollwake最终通过调用try_wake_up唤醒了进程。

pollwake->__pollwake->default_wake_function->try_to_wake_up

wait_queue_t中记录了要被唤醒的进程的task_struct结构,因此通过以上系列调用,最终实现了睡眠进程的唤醒。

POLL

poll与select的流程基本一致,其调用路径为sys_poll->do_sys_poll->do_poll->do_pollfd

do_sys_poll将用户空间的pollfd拷贝到内核空间,初始化poll_wqueues table对象,其使用与select相同。调用do_poll,取得需监视的fd的状态,然后将状态拷贝到用户空间,返回。

do_poll与do_select类似,查询事件,没事件睡眠。只是do_poll中使用pollfd,do_select使用long类型中的每一位记录状态。

do_pollfd实现对poll的调用,然后将状态记录到pollfd中。

我们看看select与poll的不同

select使用fd_set记录要检查的描述符,该结构本身是1024位,也就限制了最多只能检测1024个描述符。

poll使用pollfd结构的数组,检测多少个描述符,就传递多大的数组就可以了。

struct pollfd {

    int fd;

    short events;

    short revents;

};

select使用的fd_set记录输入输出,每次返回后,返回的结果就把系统调用时传入的信息给覆盖掉了,因此每次调用select都需要给fd_set赋值。

poll使用pollfd结构,events记录要检测的事件,revents记录结果,pollfd初始化一次就可以了,以后每次poll调用不需要重新初始化pollfd。

不知不觉写这么多了,epoll的探究再开一片吧。

由于也是边查资料边看代码边整理,是一个学习的过程,思路有点跳跃不连贯,欢迎拍砖,接下来我会再次整理,屡屡思路。