select/poll源码分析——I/O复用函数总结(一)

时间:2022-11-11 00:16:10

转载请注明出处:http://blog.csdn.net/linxdcn/article/details/72983933


1 概述

Linux内核一旦发现进程指定的一个或多个IO条件就绪,它就通知进程。这个能力称为IO复用。

IO复用的函数主要由select、poll和epoll,本文主要介绍select和poll函数。

poll和select的实现基本上是一致的,只是传递参数有所不同,他们的基本流程如下:

  1. 复制用户数据到内核空间
  2. 估计超时时间
  3. 遍历每个文件并调用f_op->poll 取得文件当前就绪状态, 如果前面遍历的文件都没有就绪,向文件插入wait_queue节点
  4. 遍历完成后检查状态:
    a). 如果已经有就绪的文件转到5;
    b). 如果有信号产生,重启poll或select(转到 1或3);
    c). 否则挂起进程等待超时或唤醒,超时或被唤醒后再次遍历所有文件取得每个文件的就绪状态
  5. 将所有文件的就绪状态复制到用户空间
  6. 清理申请的资源

2 select源码分析

select函数的主要调用流程如下:

  • syscall的select:处理时间参数,调用core_sys_select核心函数
  • core_sys_select:处理三个fd_set文件描述符,调用do_select
  • do_select:select最主要的工作在此完成。在合适的时机把自己挂起等待,调用file_operations的poll

2.1 select

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
struct timespec64 end_time, *to = NULL;
struct timeval tv;
int ret;

// 1 判断是否有设置超时时间,如果超时发出信号
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;

to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}

// 2 调用core_sys_select完成主要工作
ret = core_sys_select(n, inp, outp, exp, to);
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);

return ret;
}

主要步骤如下:

  1. 判断是否有设置超时时间,如果超时发出信号
  2. 调用core_sys_select完成主要工作

2.2 core_sys_select

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec64 *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
size_t size, alloc_size;
struct fdtable *fdt;
// 在栈中先申请一小块stack_fds空间存储
// SELECT_STACK_ALLOC 定义为256
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

ret = -EINVAL;
if (n < 0)
goto out_nofds;

// 获取当前进程的文件描述符表
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;

// 如果stack_fds数组的大小不能容纳下所有的fd_set,就用kmalloc重新分配一个大数组。
// 然后将位图平均分成份,并初始化fds结构
// 1 为传入的文件描述符和结果文件描述符bitmap申请分配空间
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))
goto out_nofds;

alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;

// 2 调用get_fd_set从用户空间拷贝了fd_set,只是copy_from_user的包装
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;

// 把结果fd_set清零
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);

// 3 调用do_select真正完成select任务
ret = do_select(n, &fds, end_time);

if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}

// 4 把结果集,拷贝回用户空间
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;

out:
if (bits != stack_fds)
kvfree(bits);
out_nofds:
return ret;
}

主要步骤:

  1. 为传入的文件描述符和结果文件描述符bitmap申请分配空间
  2. 调用get_fd_set从用户空间拷贝了fd_set,这个函数只是copy_from_user的包装
  3. 调用do_select真正完成select任务
  4. 把结果集通过set_fd_set拷贝回用户空间

2.3 do_select

// 真正的select在此,遍历了所有的fd,调用对应的xxx_poll函数
// n是最大描述符+1,描述符0,1,2...n-1均被检测
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
u64 slack = 0;
unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;

// 1 根据位图检查用户打开的fd, 要求对应fd必须打开, 并且返回最大的fd
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();

if (retval < 0)
return retval;
n = retval;

// 2 将当前进程放入自已的等待队列table, 并将该等待队列加入到该测试表wait
poll_initwait(&table);
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}

if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);

retval = 0;

// 自旋
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;

inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

// 3 遍历所有的fd
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;

// 检查当前的 slot 中的描述符
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;

// 没有需要监听的描述符, 下一个slot
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}

for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;

// 不需要监听描述符 i
if (!(bit & all_bits))
continue;

// 取得文件结构
f = fdget(i);
if (f.file) {
const struct file_operations *f_op;

// 没有 f_op 的话就认为一直处于就绪状态
f_op = f.file->f_op;
mask = DEFAULT_POLLMASK;
// 4 获得到file结构指针,并调用file->f_op->poll
// 注意第三个参数是等待队列,在poll成功后会将本进程唤醒执行
if (f_op->poll) {
// 设置等待事件的掩码
wait_key_set(wait, in, out,
bit, busy_flag);

// 获取当前的就绪状态, 并添加到文件的对应等待队列中
mask = (*f_op->poll)(f.file, wait);
}
// 释放file结构指针
fdput(f);

// 5 根据poll的结果设置状态,要返回select出来的fd数目
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;

/*
* only remember a returned
* POLL_BUSY_LOOP if we asked for it
*/

} else if (busy_flag & mask)
can_busy_loop = true;

}
}
// 6 根据poll的结果写回到输出位图里
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait->_qproc = NULL;
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}

/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;

/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/

if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}

if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}

poll_freewait(&table);

return retval;
}

主要步骤:

  1. 根据位图检查用户打开的fd, 要求对应fd必须打开, 并且返回最大的fd,传入的n会被修正
  2. 将当前进程放入自已的等待队列table, 并将该等待队列加入到该测试表wait
  3. 遍历所有的fd
  4. 获得到对应的file结构指针,并调用file->f_op->poll,在poll成功后会将本进程唤醒执行
  5. 根据poll的结果设置状态,要返回select出来的fd数目
  6. 把poll的结果写回到输出位图里

3 poll源码分析

poll函数的主要调用流程如下:

  • syscall的poll:处理时间参数,调用do_sys_poll核心函数
  • do_sys_poll:处理poll_list文件描述符链表,调用do_poll
  • do_poll:poll最主要的工作在此完成。调用do_pollfd,在合适的时机把自己挂起等待。

3.1 poll

// poll 使用的结构体 
struct pollfd {
int fd; // 描述符
short events; // 关注的事件掩码
short revents; // 返回的事件掩码
};
// 用链表来管理pollfd
struct poll_list {
struct poll_list *next;
int len;
struct pollfd entries[0];
};

SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,
int, timeout_msecs)
{
struct timespec64 end_time, *to = NULL;
int ret;

if (timeout_msecs >= 0) {
to = &end_time;
// 将相对超时时间msec 转化为绝对时间
poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,
NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
}

// 调用do_sys_poll处理
ret = do_sys_poll(ufds, nfds, to);

// do_sys_poll 被信号中断, 重新调用, 对使用者来说 poll 是不会被信号中断的
if (ret == -EINTR) {
struct restart_block *restart_block;

restart_block = &current->restart_block;
restart_block->fn = do_restart_poll;
restart_block->poll.ufds = ufds;
restart_block->poll.nfds = nfds;

if (timeout_msecs >= 0) {
restart_block->poll.tv_sec = end_time.tv_sec;
restart_block->poll.tv_nsec = end_time.tv_nsec;
restart_block->poll.has_timeout = 1;
} else
restart_block->poll.has_timeout = 0;

// ERESTART_RESTARTBLOCK 不会返回给用户进程,
// 而是会被系统捕获, 然后调用 do_restart_poll
ret = -ERESTART_RESTARTBLOCK;
}
return ret;
}

3.2 do_sys_poll

static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
struct timespec64 *end_time)
{
struct poll_wqueues table;
int err = -EFAULT, fdcount, len, size;
// 首先使用栈上的空间,节约内存,加速访问
long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
struct poll_list *const head = (struct poll_list *)stack_pps;
// 由于采用链表保存pollfd,所以监听的文件描述符不受限制
struct poll_list *walk = head;
unsigned long todo = nfds;

// 文件描述符数量超过当前进程限制
if (nfds > rlimit(RLIMIT_NOFILE))
return -EINVAL;

// 复制用户空间数据到内核
len = min_t(unsigned int, nfds, N_STACK_PPS);
for (;;) {
walk->next = NULL;
walk->len = len;
if (!len)
break;

// 复制到当前的 entries
if (copy_from_user(walk->entries, ufds + nfds-todo,
sizeof(struct pollfd) * walk->len))
goto out_fds;

todo -= walk->len;
if (!todo)
break;

// 1 当申请的栈上空间不足,在堆上申请剩余的poll_list空间
len = min(todo, POLLFD_PER_PAGE);
size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
walk = walk->next = kmalloc(size, GFP_KERNEL);
if (!walk) {
err = -ENOMEM;
goto out_fds;
}
}

// 2 初始化 poll_wqueues 结构, 设置函数指针_qproc 为__pollwait
poll_initwait(&table);
// 3 调用do_poll
fdcount = do_poll(head, &table, end_time);
// 4 从文件wait queue 中移除对应的节点, 释放entry
poll_freewait(&table);

// 5 复制结果到用户空间
for (walk = head; walk; walk = walk->next) {
struct pollfd *fds = walk->entries;
int j;

for (j = 0; j < walk->len; j++, ufds++)
if (__put_user(fds[j].revents, &ufds->revents))
goto out_fds;
}

err = fdcount;
out_fds:
// 释放申请的内存
walk = head->next;
while (walk) {
struct poll_list *pos = walk;
walk = walk->next;
kfree(pos);
}

return err;
}

主要步骤:

  1. 当申请的栈上空间不足,在堆上申请剩余的poll_list空间
  2. 初始化 poll_wqueues 结构, 设置函数指针_qproc为__pollwait
  3. 调用do_poll
  4. 从文件wait queue 中移除对应的节点, 释放entry
  5. 复制结果到用户空间

3.3 do_poll

static int do_poll(struct poll_list *list, struct poll_wqueues *wait,
struct timespec64 *end_time)
{
poll_table* pt = &wait->pt;
ktime_t expire, *to = NULL;
int timed_out = 0, count = 0;
u64 slack = 0;
unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;

// 已经超时,直接遍历所有文件描述符, 然后返回
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
pt->_qproc = NULL;
timed_out = 1;
}

if (end_time && !timed_out)
// 估计进程等待时间,纳秒
slack = select_estimate_accuracy(end_time);

// 1 遍历文件,为每个文件的等待队列添加唤醒函数(pollwake)
for (;;) {
struct poll_list *walk;
bool can_busy_loop = false;

for (walk = list; walk != NULL; walk = walk->next) {
struct pollfd * pfd, * pfd_end;

pfd = walk->entries;
pfd_end = pfd + walk->len;
for (; pfd != pfd_end; pfd++) {
// do_pollfd 会向文件对应的wait queue 中添加节点
// 和回调函数(如果 pt 不为空)
// 并检查当前文件状态并设置返回的掩码
if (do_pollfd(pfd, pt, &can_busy_loop,
busy_flag)) {
// 该文件已经准备好了.
// 不需要向后面文件的wait queue 中添加唤醒函数了.
count++;
pt->_qproc = NULL;
/* found something, stop busy polling */
busy_flag = 0;
can_busy_loop = false;
}
}
}
// 下次循环的时候不需要向文件的wait queue 中添加节点,
// 因为前面的循环已经把该添加的都添加了
pt->_qproc = NULL;

// 第一次遍历没有发现ready的文件
if (!count) {
count = wait->error;
if (signal_pending(current))
count = -EINTR;
}

// 有ready的文件或已经超时
if (count || timed_out)
break;

if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;

/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/

if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}

// 2 等待事件就绪, 如果有事件发生或超时,就再循 环一遍,取得事件状态掩码并计数,
// 注意此次循环中, 文件 wait queue 中的节点依然存在
if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
timed_out = 1;
}
return count;
}

主要步骤:

  1. 遍历文件,为每个文件的等待队列添加唤醒函数(pollwake)
  2. 等待事件就绪, 如果有事件发生或超时,就再循 环一遍,取得事件状态掩码并计数

4 总结

(1)select/poll执行效率低

select/poll函数的执行效率相对于epoll函数低很多,主要有两方面的原因:

  • 每次调用select/poll都需要将所有数据(select是fd_set,poll是poll_fd)用用户空间传入到内核空间,然后结果又需要从内核空间传回用户空间。
  • 每次调用select/poll都需要在内核中遍历所有传入的文件描述符,并处理(调用file的poll)

上述两个原因在fd较多时,带来严重的性能下降。

(2)select支持最大文件描述符数量为1024,而poll没有限制

fd_set结构定义

typedef struct
{
#ifdef__USE_XOPEN
__fd_maskfds_bits[__FD_SETSIZE/__NFDBITS];
#define__FDS_BITS(set)((set)->fds_bits)
#else
__fd_mask__fds_bits[__FD_SETSIZE/__NFDBITS];
#define__FDS_BITS(set)((set)->__fds_bits)
#endif
}fd_set;

实际上是一long类型的数组,每一个long元素是一个bitmap,每一位都能与一打开的文件句柄对应,由于数组大小有限,最大支持1024个文件描述符。

而poll采用了pollfd的链表poll_list来存储,所有没有大小限制。

(3)select传入的fd_set是值-结果参数,返回时,传入的参数fd_set是最终的结果;而poll则不是,要测试的成员由events成员指定,返回结果在revents成员


select/poll源码分析——I/O复用函数总结(一)


转载请注明出处:http://blog.csdn.net/linxdcn/article/details/72983933