1、概念
等待队列是一种实现阻塞和唤醒的内核机制,很早就作为一个基本的功能单位出现在Linux内核中,它以队列为基础数据结构,与进程调度机制紧密结合,能够用于实现内核中的异步事件通知机制。
2、数据结构
a、等待队列头
struct __wait_queue_head {
spinlock_t lock;
struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
b、等待队列
struct __wait_queue {
unsigned int flags;
void *private;
wait_queue_func_t func;
struct list_head task_list;
};
typedef struct __wait_queue wait_queue_t;
3、等待队列头和等待队列的关系图
二、如何使用等待队列
1、使用步骤a、初始化等待队列头以及将条件置成假(condition = 0)。
b、在需要阻塞的地方调用wait_event()函数,使进程进入睡眠,将控制权释放给调度器。在wait_event()函数的后面需要将条件置成假(condition = 0)。
c、当条件满足时,在内核的另一处,先将条件置成真(condition = 1),然后调用wake_up()函数唤醒等待队列中的睡眠进程。
2、初始化用到的宏
a、定义一个等待队列头,并初始化其成员
#define DECLARE_WAIT_QUEUE_HEAD(name) \
wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
#define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \
.lock = __SPIN_LOCK_UNLOCKED(name.lock), \
.task_list = { &(name).task_list, &(name).task_list }}
b、定义一个等待队列,并初始化其成员,通常用以下两个宏。
#define DECLARE_WAITQUEUE(name, tsk) \
wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
#define __WAITQUEUE_INITIALIZER(name, tsk) { \
.private = tsk, \
.func = default_wake_function, \
.task_list = { NULL, NULL } }
#define DEFINE_WAIT(name) \
DEFINE_WAIT_FUNC(name, autoremove_wake_function)
#define DEFINE_WAIT_FUNC(name, function) \
wait_queue_t name = { \
.private = current, \
.func = function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
}
说明:使用等待队列时,调用现成的接口,一般只需要要到a,b这里贴出来是因为下面讲等待队列的原理时有用到。
3、一个简单的例子
#include <linux/kernel.h>注意:
#include <linux/init.h>
#include <linux/module.h>
#include <linux/sched.h>
#include <linux/kthread.h>
#include <linux/delay.h>
MODULE_AUTHOR("Jimmy");
MODULE_DESCRIPTION("wait queue example");
MODULE_LICENSE("GPL");
static int condition;
static struct task_struct *task_1;
static struct task_struct *task_2;
static struct task_struct *task_3;
DECLARE_WAIT_QUEUE_HEAD(wq);
static int thread_func_1(void *data)
{
msleep(100);//延时100ms,使得这个进程的等待队列插入在整个链表的头部,最先被唤醒。所以先打印这个进程!
wait_event_interruptible(wq, condition);
condition = 0;
printk(">>>>>this task 1\n");
do {
msleep(1000);
}while(!kthread_should_stop());
return 1;
}
static int thread_func_2(void *data)
{
wait_event_interruptible(wq, condition);
condition = 0;
printk(">>>>>this task 2\n");
do {
msleep(1000);
}while(!kthread_should_stop());
return 2;
}
static int thread_func_3(void *data)
{
msleep(2000);
printk(">>>>>this task 3\n");
condition = 1;
wake_up_interruptible(&wq);
msleep(2000);
condition = 1;
wake_up_interruptible(&wq);
do {
msleep(1000);
}while(!kthread_should_stop());
return 3;
}
static int __init mod_init(void)
{
condition = 0;
task_1 = kthread_run(thread_func_1, NULL, "thread%d", 1);
if (IS_ERR(task_1)) {
printk("******create thread 1 failed\n");
} else {
printk("======success create thread 1\n");
}
task_2 = kthread_run(thread_func_2, NULL, "thread%d", 2);
if (IS_ERR(task_2)) {
printk("******create thread 2 failed\n");
} else {
printk("======success create thread 2\n");
}
task_3 = kthread_run(thread_func_3, NULL, "thread%d", 3);
if (IS_ERR(task_2)) {
printk("******create thread 3 failed\n");
} else {
printk("======success create thread 3\n");
}
return 0;
}
static void __exit mod_exit(void)
{
int ret;
if (!IS_ERR(task_1)) {
ret = kthread_stop(task_1);
printk("<<<<<<<<task 1 exit, ret = %d\n", ret);
}
if (!IS_ERR(task_2)) {
ret = kthread_stop(task_2);
printk("<<<<<<<<task 2 exit, ret = %d\n", ret);
}
if (!IS_ERR(task_3)) {
ret = kthread_stop(task_3);
printk("<<<<<<<<task 3 exit, ret = %d\n", ret);
}
return;
}
module_init(mod_init);
module_exit(mod_exit);
1、要从wait_event()函数跳出,需要两个条件。1、condition = 1; 2、在内核的另一个地方调用了wake_up()函数。
2、wake_up()每次只能唤醒一个进程,而且是从队列头开始唤醒的,而wait_event()函数每次会将新建的等待队列插到队列头,因此最后调用wait_event()函数的进程先被唤醒!如果要唤醒某个特定的进程,没有现成的函数,按照本人理解,只能使用wake_up_all()函数唤醒所有进程,然后在通过条件condition来控制(每个进程使用不同的变量来控制,在wake_up_all()函数后只将要唤醒的进程的变量置成真)。
三、等待队列实现阻塞的过程及原理
1、wait_event() 等待事件发生,不可中断,condition条件为真,则退出,否则调用__wait_event()进入睡眠让出控制器
#define wait_event(wq, condition) \
do { \
if (condition) \
break; \
__wait_event(wq, condition); \
} while (0)
2、__wait_event() 设置进程属性,进入调度
#define __wait_event(wq, condition) \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \ //TASK_UNINTERRUPTIBLE标明不可中断
if (condition) \ //唤醒的条件为真,则退出for循环
break; \
schedule(); \ //进入调度
} \
finish_wait(&wq, &__wait); \ //设置进程为TASK_RUNNING,并移除等待队列中的队列项
} while (0)
3、wake_up(x)
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL)
4、__wake_up()
void __wake_up(wait_queue_head_t *q, unsigned int mode,int nr_exclusive, void *key)
{
unsigned long flags;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0, key); //-->__wake_up_common
spin_unlock_irqrestore(&q->lock, flags);
}
5、__wake_up_common()
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) { //从队列头开始遍历等待队列表
unsigned flags = curr->flags;
//调用其等待队列项的func方法,-->default_wake_function
if (curr->func(curr, mode, wake_flags, key) &&(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
6、default_wake_function()
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,void *key)
{
return try_to_wake_up(curr->private, mode, wake_flags); //-->try_to_wake_up
}
7、try_to_wake_up()
static int try_to_wake_up(struct task_struct *p, unsigned int state,int wake_flags)
{
int cpu, orig_cpu, this_cpu, success = 0;
unsigned long flags;
unsigned long en_flags = ENQUEUE_WAKEUP;
struct rq *rq;
this_cpu = get_cpu();
smp_wmb();
rq = task_rq_lock(p, &flags);
if (!(p->state & state))
goto out;
if (p->se.on_rq)
goto out_running;
cpu = task_cpu(p);
orig_cpu = cpu;
#ifdef CONFIG_SMP
if (unlikely(task_running(rq, p)))
goto out_activate;
if (task_contributes_to_load(p)) {
if (likely(cpu_online(orig_cpu)))
rq->nr_uninterruptible--;
else
this_rq()->nr_uninterruptible--;
}
p->state = TASK_WAKING;
if (p->sched_class->task_waking) {
p->sched_class->task_waking(rq, p);
en_flags |= ENQUEUE_WAKING;
}
cpu = select_task_rq(rq, p, SD_BALANCE_WAKE, wake_flags);
if (cpu != orig_cpu)
set_task_cpu(p, cpu);
__task_rq_unlock(rq);
rq = cpu_rq(cpu);
raw_spin_lock(&rq->lock);
WARN_ON(task_cpu(p) != cpu);
WARN_ON(p->state != TASK_WAKING);
#ifdef CONFIG_SCHEDSTATS
schedstat_inc(rq, ttwu_count);
if (cpu == this_cpu)
schedstat_inc(rq, ttwu_local);
else {
struct sched_domain *sd;
for_each_domain(this_cpu, sd) {
if (cpumask_test_cpu(cpu, sched_domain_span(sd))) {
schedstat_inc(sd, ttwu_wake_remote);
break;
}
}
}
#endif /* CONFIG_SCHEDSTATS */
out_activate:
#endif /* CONFIG_SMP */
ttwu_activate(p, rq, wake_flags & WF_SYNC, orig_cpu != cpu, cpu == this_cpu, en_flags); //最终调用activate_task激活等待进程
success = 1;
out_running:
ttwu_post_activation(p, rq, wake_flags, success);
out:
task_rq_unlock(rq, &flags);
put_cpu();
return success;
}
四、相关变种函数
wait_event(wq, condition) //等待事件,不可中断
wait_event_interruptible(wq, condition) //等待事件,可中断
wait_event_timeout(wq, condition, timeout) //超时等待,不可中断
wait_event_interruptible_timeout(wq, condition, timeout) //超时等待,可中断
wake_up(x) //唤醒一个进程,不可中断
wake_up_interruptible(x) //唤醒一个进程,可中断
#define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL) //唤醒nr个进程,不可中断
#define wake_up_all(x) __wake_up(x, TASK_NORMAL, 0, NULL) //唤醒所有进程,不可中断
#define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) //唤醒nr个进程,可中断
#define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL) //唤醒所有进程,可中断