等待队列的简单使用

时间:2022-08-27 07:54:25
一、什么是等待队列
    1、概念
        等待队列是一种实现阻塞和唤醒的内核机制,很早就作为一个基本的功能单位出现在Linux内核中,它以队列为基础数据结构,与进程调度机制紧密结合,能够用于实现内核中的异步事件通知机制。
    
    2、数据结构    
        a、等待队列头
        struct __wait_queue_head {  
            spinlock_t lock;  
            struct list_head task_list;  
        };  
        typedef struct __wait_queue_head wait_queue_head_t;     
        
        b、等待队列
        struct __wait_queue {  
            unsigned int flags;  
            void *private;  
            wait_queue_func_t func;  
            struct list_head task_list;  
        };  
        typedef struct __wait_queue wait_queue_t;
        
     3、等待队列头和等待队列的关系图
      等待队列的简单使用


二、如何使用等待队列

    1、使用步骤
        a、初始化等待队列头以及将条件置成假(condition = 0)。
        b、在需要阻塞的地方调用wait_event()函数,使进程进入睡眠,将控制权释放给调度器。在wait_event()函数的后面需要将条件置成假(condition = 0)。
        c、当条件满足时,在内核的另一处,先将条件置成真(condition = 1),然后调用wake_up()函数唤醒等待队列中的睡眠进程。
        
    2、初始化用到的宏
        a、定义一个等待队列头,并初始化其成员
            #define DECLARE_WAIT_QUEUE_HEAD(name)               \  
                wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

            #define __WAIT_QUEUE_HEAD_INITIALIZER(name) {       \  
                .lock   = __SPIN_LOCK_UNLOCKED(name.lock),      \  
                .task_list  = { &(name).task_list, &(name).task_list }}  
        
        b、定义一个等待队列,并初始化其成员,通常用以下两个宏。
            #define DECLARE_WAITQUEUE(name, tsk)                \  
                wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
                
            #define __WAITQUEUE_INITIALIZER(name, tsk) {        \  
                .private    = tsk,                              \  
                .func   = default_wake_function,                \  
                .task_list  = { NULL, NULL } }    
                
            #define DEFINE_WAIT(name)                           \
                DEFINE_WAIT_FUNC(name, autoremove_wake_function)
                
            #define DEFINE_WAIT_FUNC(name, function)            \
            wait_queue_t name = {                               \
                .private = current,                             \
                .func = function,                               \
                .task_list = LIST_HEAD_INIT((name).task_list),  \
            }

         说明:使用等待队列时,调用现成的接口,一般只需要要到a,b这里贴出来是因为下面讲等待队列的原理时有用到。
                                          
    3、一个简单的例子  
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/module.h>
#include <linux/sched.h>
#include <linux/kthread.h>
#include <linux/delay.h>

MODULE_AUTHOR("Jimmy");
MODULE_DESCRIPTION("wait queue example");
MODULE_LICENSE("GPL");

static int condition;
static struct task_struct *task_1;
static struct task_struct *task_2;
static struct task_struct *task_3;

DECLARE_WAIT_QUEUE_HEAD(wq);


static int thread_func_1(void *data)
{
msleep(100);//延时100ms,使得这个进程的等待队列插入在整个链表的头部,最先被唤醒。所以先打印这个进程!

wait_event_interruptible(wq, condition);
condition = 0;
printk(">>>>>this task 1\n");

do {
msleep(1000);
}while(!kthread_should_stop());

return 1;
}

static int thread_func_2(void *data)
{

wait_event_interruptible(wq, condition);
condition = 0;
printk(">>>>>this task 2\n");

do {
msleep(1000);
}while(!kthread_should_stop());

return 2;
}

static int thread_func_3(void *data)
{
msleep(2000);
printk(">>>>>this task 3\n");
condition = 1;
wake_up_interruptible(&wq);

msleep(2000);
condition = 1;
wake_up_interruptible(&wq);

do {
msleep(1000);
}while(!kthread_should_stop());

return 3;
}

static int __init mod_init(void)
{
condition = 0;

task_1 = kthread_run(thread_func_1, NULL, "thread%d", 1);
if (IS_ERR(task_1)) {
printk("******create thread 1 failed\n");
} else {
printk("======success create thread 1\n");
}

task_2 = kthread_run(thread_func_2, NULL, "thread%d", 2);
if (IS_ERR(task_2)) {
printk("******create thread 2 failed\n");
} else {
printk("======success create thread 2\n");
}

task_3 = kthread_run(thread_func_3, NULL, "thread%d", 3);
if (IS_ERR(task_2)) {
printk("******create thread 3 failed\n");
} else {
printk("======success create thread 3\n");
}

return 0;
}

static void __exit mod_exit(void)
{
int ret;

if (!IS_ERR(task_1)) {
ret = kthread_stop(task_1);
printk("<<<<<<<<task 1 exit, ret = %d\n", ret);
}

if (!IS_ERR(task_2)) {
ret = kthread_stop(task_2);
printk("<<<<<<<<task 2 exit, ret = %d\n", ret);
}

if (!IS_ERR(task_3)) {
ret = kthread_stop(task_3);
printk("<<<<<<<<task 3 exit, ret = %d\n", ret);
}

return;
}

module_init(mod_init);
module_exit(mod_exit);
   
       注意:
        1、要从wait_event()函数跳出,需要两个条件。1、condition = 1; 2、在内核的另一个地方调用了wake_up()函数。
        2、wake_up()每次只能唤醒一个进程,而且是从队列头开始唤醒的,而wait_event()函数每次会将新建的等待队列插到队列头,因此最后调用wait_event()函数的进程先被唤醒!如果要唤醒某个特定的进程,没有现成的函数,按照本人理解,只能使用wake_up_all()函数唤醒所有进程,然后在通过条件condition来控制(每个进程使用不同的变量来控制,在wake_up_all()函数后只将要唤醒的进程的变量置成真)。
       
三、等待队列实现阻塞的过程及原理       
    1、wait_event() 等待事件发生,不可中断,condition条件为真,则退出,否则调用__wait_event()进入睡眠让出控制器
    
        #define wait_event(wq, condition)   \  
        do {                                \  
            if (condition)                  \    
                break;                      \  
            __wait_event(wq, condition);    \  
        } while (0)
    
    2、__wait_event() 设置进程属性,进入调度
    
    #define __wait_event(wq, condition)     \  
    do {                                    \  
        DEFINE_WAIT(__wait);                \  
                                            \  
        for (;;) {                          \  
            prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE);    \   //TASK_UNINTERRUPTIBLE标明不可中断  
            if (condition)                                          \   //唤醒的条件为真,则退出for循环  
                break;                                              \  
            schedule();                                             \   //进入调度  
        }                                                           \  
        finish_wait(&wq, &__wait);                                  \   //设置进程为TASK_RUNNING,并移除等待队列中的队列项  
    } while (0)
    
    
    3、wake_up(x)
        #define wake_up(x)   __wake_up(x, TASK_NORMAL, 1, NULL)  
    
    4、__wake_up()
        void __wake_up(wait_queue_head_t *q, unsigned int mode,int nr_exclusive, void *key)  
        {  
            unsigned long flags;  
          
            spin_lock_irqsave(&q->lock, flags);  
            __wake_up_common(q, mode, nr_exclusive, 0, key);    //-->__wake_up_common  
            spin_unlock_irqrestore(&q->lock, flags);  
        }
    
    5、__wake_up_common()
        static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,int nr_exclusive, int wake_flags, void *key)  
        {  
            wait_queue_t *curr, *next;  
          
            list_for_each_entry_safe(curr, next, &q->task_list, task_list) { //从队列头开始遍历等待队列表
                unsigned flags = curr->flags;      
                //调用其等待队列项的func方法,-->default_wake_function  
                if (curr->func(curr, mode, wake_flags, key) &&(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)  
                    break;  
            }  
        }    
    
    6、default_wake_function()    
        int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,void *key)  
        {  
            return try_to_wake_up(curr->private, mode, wake_flags);  //-->try_to_wake_up  
        }
        
    7、try_to_wake_up()
        static int try_to_wake_up(struct task_struct *p, unsigned int state,int wake_flags)  
        {  
            int cpu, orig_cpu, this_cpu, success = 0;  
            unsigned long flags;  
            unsigned long en_flags = ENQUEUE_WAKEUP;  
            struct rq *rq;  
            this_cpu = get_cpu();  
          
            smp_wmb();  
            rq = task_rq_lock(p, &flags);  
            if (!(p->state & state))  
                goto out;  
          
            if (p->se.on_rq)  
                goto out_running;  
          
            cpu = task_cpu(p);  
            orig_cpu = cpu;  
          
        #ifdef CONFIG_SMP  
            if (unlikely(task_running(rq, p)))  
                goto out_activate;  
          
            if (task_contributes_to_load(p)) {  
                if (likely(cpu_online(orig_cpu)))  
                    rq->nr_uninterruptible--;  
                else  
                    this_rq()->nr_uninterruptible--;  
            }  
            p->state = TASK_WAKING;  
          
            if (p->sched_class->task_waking) {  
                p->sched_class->task_waking(rq, p);  
                en_flags |= ENQUEUE_WAKING;  
            }  
          
            cpu = select_task_rq(rq, p, SD_BALANCE_WAKE, wake_flags);  
            if (cpu != orig_cpu)  
                set_task_cpu(p, cpu);  
            __task_rq_unlock(rq);  
          
            rq = cpu_rq(cpu);  
            raw_spin_lock(&rq->lock);  
          
            WARN_ON(task_cpu(p) != cpu);  
            WARN_ON(p->state != TASK_WAKING);  
          
        #ifdef CONFIG_SCHEDSTATS  
            schedstat_inc(rq, ttwu_count);  
            if (cpu == this_cpu)  
                schedstat_inc(rq, ttwu_local);  
            else {  
                struct sched_domain *sd;  
                for_each_domain(this_cpu, sd) {  
                    if (cpumask_test_cpu(cpu, sched_domain_span(sd))) {  
                        schedstat_inc(sd, ttwu_wake_remote);  
                        break;  
                    }  
                }  
            }  
        #endif /* CONFIG_SCHEDSTATS */  
          
        out_activate:  
        #endif /* CONFIG_SMP */  
            ttwu_activate(p, rq, wake_flags & WF_SYNC, orig_cpu != cpu, cpu == this_cpu, en_flags); //最终调用activate_task激活等待进程  
            success = 1;  
        out_running:  
            ttwu_post_activation(p, rq, wake_flags, success);  
        out:  
            task_rq_unlock(rq, &flags);  
            put_cpu();  
          
            return success;  
        }      
    
  四、相关变种函数
    wait_event(wq, condition)                                       //等待事件,不可中断   
    wait_event_interruptible(wq, condition)                         //等待事件,可中断         
    wait_event_timeout(wq, condition, timeout)                      //超时等待,不可中断  
    wait_event_interruptible_timeout(wq, condition, timeout)        //超时等待,可中断    
 
    wake_up(x)                                                     //唤醒一个进程,不可中断  
    wake_up_interruptible(x)                                       //唤醒一个进程,可中断  
    
    #define wake_up_nr(x, nr)       __wake_up(x, TASK_NORMAL, nr, NULL)  //唤醒nr个进程,不可中断
    #define wake_up_all(x)          __wake_up(x, TASK_NORMAL, 0, NULL)   //唤醒所有进程,不可中断

    #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL)  //唤醒nr个进程,可中断
    #define wake_up_interruptible_all(x)    __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)   //唤醒所有进程,可中断