linux 中断softirq tasklet

时间:2022-12-13 08:00:47

硬中断为什么不能休眠---

中断线程以及软中断解决了什么问题----

软中断cb函数是否允许相应本地中断,什么时候开启中断关闭中断----

什么是软中断上下文-------

什么是tasklet 和软中断区别------

1、中断线程以及软中断属于中断下半部机制;硬件中断会打断进程,异步执行,对于

重要的进程代码来说,希望硬件中断越短越好。所以硬件中断会把不重要以及等待处理数据

延时处理。同时 硬件中断会关闭中断不再响应外部中断,所以需要尽早结束中断。

2、数据结构

/* softirq mask and active fields moved to irq_cpustat_t in
* asm/hardirq.h to get better cache usage. KAO
*/

struct softirq_action
{
void (*action)(struct softirq_action *);
};
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
NR_SOFTIRQS 是表示支持最大的软中断数量,
__cacheline_aligned_in_smp 次结构和cache line 对其。
typedef struct {
unsigned int __softirq_pending;
} ____cacheline_aligned irq_cpustat_t;
irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;

irq_stat 表示“”“软中断状态寄存器,使用softirq_pending字段表示”。irq_stat[NR_CPU]

每个cpu有一个软中断信息寄存器。

3、softirq 软中断注册函数以及软件中断唤醒触发函数

void open_softirq(int nr, void (*action)(struct softirq_action *))
{
softirq_vec[nr].action = action;
}
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
//注册tcpip协议收发包函数。软中断初始化通常实在系统启动的时候完成的,
//系统系统时时串行。不会有冲突发生
void raise_softirq(unsigned int nr)
{
unsigned long flags;

local_irq_save(flags);
raise_softirq_irqoff(nr);
local_irq_restore(flags);
}
/*保存寄存器的状态然后调用raise_softirq_irqoff 激活软中断*/


/*
* This function must run with irqs disabled!
*/
inline void raise_softirq_irqoff(unsigned int nr)
{
__raise_softirq_irqoff(nr);

/*
* If we're in an interrupt or softirq, we're done
* (this also catches softirq-disabled code). We will
* actually run the softirq once we return from
* the irq or softirq.
*
* Otherwise we wake up ksoftirqd to make sure we
* schedule the softirq soon.
*/
if (!in_interrupt())
wakeup_softirqd();
}
void __raise_softirq_irqoff(unsigned int nr)
{
trace_softirq_raise(nr);
or_softirq_pending(1UL << nr);
}
#define or_softirq_pending(x) this_cpu_or(irq_stat.__softirq_pending, (x))

#define hardirq_count() (preempt_count() & HARDIRQ_MASK)
#define softirq_count() (preempt_count() & SOFTIRQ_MASK)
#define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK \
| NMI_MASK))

/*
* Are we doing bottom half or hardware interrupt processing?
* Are we in a softirq context? Interrupt context?
* in_softirq - Are we currently processing softirq or have bh disabled?
* in_serving_softirq - Are we currently processing softirq?
*/
#define in_irq() (hardirq_count())
#define in_softirq() (softirq_count())
#define in_interrupt() (irq_count())
----in_softirq - Are we currently processing softirq or have bh disabled--
是否处于软中断或者关闭BH
----in_serving_softirq - Are we currently processing softirq?-----
是否处于软中断上下文

可知触发一个软中断逻辑是:设备本地cpu的irq_stat数据中的softirq_pending的成员第nrbit位,

nr表示软中断的序号。然后检查是否处于中断上下文(软中断+硬中断+NMI);如果不是,表示可以唤醒

软中断处理。ps:在中断返回,此cpu会检查sofirq_pending成员的比特位。如果softirq_pending不为0.

说明有pending的软中断需要处理。softirq_pending 位每cpu 变量,不需要考虑cpu的并发情况。也就是不需要

使用spinn_Lock防止cpu抢占

如上所述:中断退出的时候也会检查是否需要执行软中断处理函数。

/*
* Exit an interrupt context. Process softirqs if needed and possible:
*/
void irq_exit(void)
{
#ifndef __ARCH_IRQ_EXIT_IRQS_DISABLED
local_irq_disable();
#else
WARN_ON_ONCE(!irqs_disabled());
#endif

account_irq_exit_time(current);
preempt_count_sub(HARDIRQ_OFFSET);
if (!in_interrupt() && local_softirq_pending())
invoke_softirq();

tick_irq_exit();
rcu_irq_exit();
trace_hardirq_exit(); /* must be last! */
}

irq_exit  退出中断的时候,如果此时没有处于中断上下文以及本地cpu有softirq没有处理;就会

invoke_softirq中断函数。

如果本次硬件中断点发生软中断处理过程中,那么中断退出时会返回到中断上下文,这时是不允许重新调度,

软中断在一个cpu上总是串行执行的

1 static inline void invoke_softirq(void)
2 {
3 if (!force_irqthreads) {
4 #ifdef CONFIG_HAVE_IRQ_EXIT_ON_IRQ_STACK
5 /*
6 * We can safely execute softirq on the current stack if
7 * it is the irq stack, because it should be near empty
8 * at this stage.
9 */
10 __do_softirq();//执行软中断
11 #else
12 /*
13 * Otherwise, irq_exit() is called on the task stack that can
14 * be potentially deep already. So call softirq in its own stack
15 * to prevent from any overrun.
16 */
17 do_softirq_own_stack();
18 #endif
19 } else {//唤醒软中断线程
20 wakeup_softirqd();
21 }
22 }asmlinkage __visible void __do_softirq(void)
23 {
24 unsigned long end = jiffies + MAX_SOFTIRQ_TIME;
25 unsigned long old_flags = current->flags;
26 int max_restart = MAX_SOFTIRQ_RESTART;
27 struct softirq_action *h;
28 bool in_hardirq;
29 __u32 pending;
30 int softirq_bit;
31
32 /*
33 * Mask out PF_MEMALLOC s current task context is borrowed for the
34 * softirq. A softirq handled such as network RX might set PF_MEMALLOC
35 * again if the socket is related to swap
36 */
37 current->flags &= ~PF_MEMALLOC;
38
39 pending = local_softirq_pending();//获取本地cpu的softirq_pending值
40 account_irq_enter_time(current);
41
42 __local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);//增加perrmpt_count中的softirq计数,
// 表示关闭软中断
43 ----in_hardirq = lockdep_softirq_start();
44
45 restart:
46 /* Reset the pending bitmask before enabling irqs */
47 set_softirq_pending(0);//清楚软中断寄存器。
48
49 local_irq_enable();打开本地中断
50
51 h = softirq_vec;
52
53 while ((softirq_bit = ffs(pending))) {//一次执行软中断的回调函数
54 unsigned int vec_nr;
55 int prev_count;softirq_action
56
57 h += softirq_bit - 1;
58
59 vec_nr = h - softirq_vec;
60 prev_count = preempt_count();
61
62 kstat_incr_softirqs_this_cpu(vec_nr);
63
64 trace_softirq_entry(vec_nr);
65 h->action(h);
66 trace_softirq_exit(vec_nr);
67 if (unlikely(prev_count != preempt_count())) {
68 pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
69 vec_nr, softirq_to_name[vec_nr], h->action,
70 prev_count, preempt_count());
71 preempt_count_set(prev_count);
72 }
73 h++;
74 pending >>= softirq_bit;
75 }
76
77 rcu_bh_qs();
78 local_irq_disable();//关闭本地中断
79 //再次检查softpending是否又产生了中断,因为软中断执行过程中是开本地中断的,有可能在此过程中又开启了触发了
///软中断或者有raise_softirq。
80 pending = local_softirq_pending();
81 if (pending) {//不是检车到软中断就会继续执行软中断。也许使用内核线程来处理
82 if (time_before(jiffies, end) && !need_resched() &&
83 --max_restart)
84 goto restart;
85 //唤醒ksoftirq线程来处理软中断。
86 wakeup_softirqd();
87 }
88
89 lockdep_softirq_end(in_hardirq);
90 account_irq_exit_time(current);
91 __local_bh_enable(SOFTIRQ_OFFSET);//表示离开软中断上下文.
92 WARN_ON_ONCE(in_interrupt());
93 tsk_restore_flags(current, old_flags, PF_MEMALLOC);
94 }

 

 4、tasklet 

tasklet 是利用软中断实现的一种下半部机制。实际上是软中断的一个变种。

 

/* Tasklets --- multithreaded analogue of BHs.

Main feature differing them of generic softirqs: tasklet
is running only on one CPU simultaneously.

Main feature differing them of BHs: different tasklets
may be run simultaneously on different CPUs.

Properties:
* If tasklet_schedule() is called, then tasklet is guaranteed
to be executed on some cpu at least once after this.
* If the tasklet is already scheduled, but its execution is still not
started, it will be executed only once.
* If this tasklet is already running on another CPU (or schedule is called
from tasklet itself), it is rescheduled for later.
* Tasklet is strictly serialized wrt itself, but not
wrt another tasklets. If client needs some intertask synchronization,
he makes it with spinlocks.
*/

struct tasklet_struct
{
struct tasklet_struct *next;//多个tasklet串成一个链表
unsigned long state;//TASKLET_STATE_SCHED
atomic_t count;//0 表示tasklet 处于激活状态,非0 表示禁止,不允许执行
void (*func)(unsigned long);
unsigned long data;
};
enum
{
TASKLET_STATE_SCHED, /* 表示准备调度,正准备执行Tasklet is scheduled for execution */
TASKLET_STATE_RUN /* taklet 正在运行 Tasklet is running (SMP only) */
};

#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }

#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }

tasklet:调度函数:tasklet_schedule

static inline void tasklet_schedule(struct tasklet_struct *t)
{ //如果标志位为sched,说明已经被挂在到队列里面,如果不是则需要把tasklet加入链表中。
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
void __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;

local_irq_save(flags);
t->next = NULL;
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
raise_softirq_irqoff(TASKLET_SOFTIRQ);//激活软中断 执行调度线程
local_irq_restore(flags);
}

/*
* This function must run with irqs disabled!
*/
inline void raise_softirq_irqoff(unsigned int nr)
{
__raise_softirq_irqoff(nr);

/*
* If we're in an interrupt or softirq, we're done
* (this also catches softirq-disabled code). We will
* actually run the softirq once we return from
* the irq or softirq.
*
* Otherwise we wake up ksoftirqd to make sure we
* schedule the softirq soon.
*/
if (!in_interrupt())
wakeup_softirqd();
}

1、tasklet并不是在tasklet_schedule后立即执行。因为tasklet 是基于软中断。需要等到软中断执行的时候

才会去执行tasklet。

2、一个tasklet挂在到队列中时会设置为TASKLET_STATE_SCHED,只要改tasklet还没有执行。即使

多次调用taklet_schedlue也不起作用。一旦tasklet加入到队列后,就必须在该cpu的软中断

上下文中执行,知道执行完并清除TASKLET_STATE_SCHED。才能有机会再别的cpu处理

static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;

local_irq_disable();
list = __this_cpu_read(tasklet_vec.head);
__this_cpu_write(tasklet_vec.head, NULL);
__this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
local_irq_enable();

while (list) {
struct tasklet_struct *t = list;

list = list->next;

if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
if (!test_and_clear_bit(TASKLET_STATE_SCHED,
&t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}

local_irq_disable();
t->next = NULL;
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();
}
}
static inline int tasklet_trylock(struct tasklet_struct *t)
{
return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}
static inline void tasklet_unlock(struct tasklet_struct *t)
{
smp_mb__before_atomic();
clear_bit(TASKLET_STATE_RUN, &(t)->state);
}
可知:1、tasklet的回调函数执行过程中是开启中断的,如果tasklet已经处于TASKLET_STATE_RUN,标志,这不会
获取到lock,那么tasklet队列将会该tasklet,不会执行。这样是为了保证同一个tasklet只能在一个cpu上执行
2、taskle_trylock:原地检查count 是否为0, 0 表示激活。tasklet_disable会原子增加count计数,
test_and_set_bit(TASKLET_STATE_RUN, &(t)->state) 原子读完count计数后,可能被别的内核路径
disable增加计数了,但是这只会影响下一次tasklet处理
3、先清除TASKLET_STATE_SCHED 标志,软后执行t->func 再才是清除TASKLET_STATE_RUN标志位。是为了
执行func 期间可以响应调度,以免丢失。
4、tasklet在其他cpu执行,taskle_trylock获取失败,此时会把当前该tasklet重新挂入当前cpu的
tasklet——vec链表中,等待下一次软中断触发。同时如果之前tasklet_disable也会将该tasklet处理忽略过
local_bh_disable以及local_bh_enable的使用:作用为关闭/打开软中断的函数,
static __always_inline void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)
{
preempt_count_add(cnt);
barrier();
}
#endif

static inline void local_bh_disable(void)
{
__local_bh_disable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);
}
static inline void local_bh_disable(void)
{
__local_bh_disable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);
}
static inline void local_bh_enable(void)
{
__local_bh_enable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);
}
void __local_bh_enable_ip(unsigned long ip, unsigned int cnt)
{
WARN_ON_ONCE(in_irq() || irqs_disabled());
#ifdef CONFIG_TRACE_IRQFLAGS
local_irq_disable();
#endif
/*
* Are softirqs going to be turned on now:
*/
if (softirq_count() == SOFTIRQ_DISABLE_OFFSET)
trace_softirqs_on(ip);
/*
* Keep preemption disabled until we are done with
* softirq processing:
*/
preempt_count_sub(cnt - 1);

if (unlikely(!in_interrupt() && local_softirq_pending())) {
/*
* Run softirq if any pending. And do it in its own stack
* as we may be calling this deep in a task call stack already.
*/
do_softirq();
}

preempt_count_dec();
#ifdef CONFIG_TRACE_IRQFLAGS
local_irq_enable();
#endif
preempt_check_resched();//后续就会打开抢占。然后检查高优先级的抢占任务。
}可知:local_bh_disable:实现就是在当前进程的preempt_count成员上加上计数SOFTIRQ_DISABLE_OFFSET
local_bh_enable:可知 硬件中断上下文或者 关中断的情况下不应爱调用enablesoft;
disable的时候是add SOFTIRQ_DISABLE_OFFSET 但是enable的时候是sub
(SOFTIRQ_DISABLE_OFFSET - 1);留1 表示不希望被抢占。关闭本地cpu抢占;调用do_soft的时候
不希望被高优先纪任务抢占,或者迁移到其他cpu 上。
比如processA 在CPU0 上,此时执行到unlikely(!in_interrupt() && local_softirq_pending()
发生中断。中断返回发生抢占,process A 在cpu1 上执行,但是cpu1 和cpu0 的pending寄存器不一样。
softirq可能不会执行。
if (unlikely(!in_interrupt() && local_softirq_pending())) {
/*
* Run softirq if any pending. And do it in its own stack
* as we may be calling this deep in a task call stack already.
*/
do_softirq();
}

在非中断上下文执行软中断。

后续就会打开抢占。然后检查高优先级的抢占任务。

 

all:

1、软中断是静态定义的类型,linux 内核不希望我们自己添加新的类型,如果要使用,可以使用tasklet 代替。

2、软中断回调函数实在开启中断环境下执行。

3、同一类型的软中断可以在多个cpu上执行,比如tasklet_softirq,多个cpu可以同时执行tasklet_schedule,多个cpu也能同时从中断处理返回,同时触发执行tasklet_sfotirq类型软中断

4、中断函数不能睡眠,睡眠的话如果发生调度,那么保存堆栈就不是中断上下文堆栈而是进程堆栈,不能恢复。

5、软中断实在硬件中断返回前执行检查是否需要执行软中断。然后才执行检查是否需要抢占当前进程

,软中断总是抢占进程上下文。

6、tasklet是串行执行的,一个tasklet在tasklet_schedule时会绑定一个cpu的tasklet_vec链表,

在该cpu 上执行完后才会和该cpu解除绑定关系。其中TASKLET_STATE_SCHED以及TASKLET_STATE_RUN 标志位构成了串行执行

 

eg:软中断总是抢占进程上下文,如果硬中断放回前夕一直都是在执行软中断,那么被打断的

进程上下文就得不到及时处理,影响性能。所有可以使用workqueue替代tasklet。

软中断上下文有:

1、下半部执行的软中断以及tasklet,irq_exit-----》invoke_softirq;

2、ksoftirq内核线程执行的软中断,当do_softirq执行时间太长就会唤醒ksoftirq内核线程,

或者在invoke_softirq中使能 了强制中断化线程fore_irqthreads。

3、进程上下文调用local_bh_enable 也会去执行软中断。

 

假设中断上下文发生休眠------------??

比如:在中断服务程序里面直接sechedule,中断上下文会通过current获取thread_info结构,

此时内核栈保存的是发生中断时进程栈的信息,没有中断上下文schedule时的信息,如果调度了,

那么就不会返回到该中断上下文了。同时中断源会一直屏蔽等待下去。

 

javascript:void(0)
有人 整理这篇总结:不错

​​Linux软中断、tasklet和工作队列​​

Linux内核中的软中断、tasklet和工作队列详解

引言

软中断、tasklet和工作队列并不是Linux内核中一直存在的机制,而是由更早版本的内核中的“下半部”(bottom half)演变而来。下半部的机制实际上包括五种,但2.6版本的内核中,下半部和任务队列的函数都消失了,只剩下了前三者。
介绍这三种下半部实现之前,有必要说一下上半部与下半部的区别。
上半部指的是中断处理程序,下半部则指的是一些虽然与中断有相关性但是可以延后执行的任务。举个例子:在网络传输中,网卡接收到数据包这个事件不一定需要马上被处理,适合用下半部去实现;但是用户敲击键盘这样的事件就必须马上被响应,应该用中断实现。
两者的主要区别在于:中断不能被相同类型的中断打断,而下半部依然可以被中断打断;中断对于时间非常敏感,而下半部基本上都是一些可以延迟的工作。由于二者的这种区别,所以对于一个工作是放在上半部还是放在下半部去执行,可以参考下面4条:

  1. 如果一个任务对时间非常敏感,将其放在中断处理程序中执行。
  2. 如果一个任务和硬件相关,将其放在中断处理程序中执行。
  3. 如果一个任务要保证不被其他中断(特别是相同的中断)打断,将其放在中断处理程序中执行。
  4. 其他所有任务,考虑放在下半部去执行。
    有写内核任务需要延后执行,因此才有的下半部,进而实现了三种实现下半部的方法。这就是本文要讨论的软中断tasklet工作队列

下表可以更直观的看到它们之间的关系。

linux 中断softirq tasklet

软中断

软中断作为下半部机制的代表,是随着SMP(share memory processor)的出现应运而生的,它也是tasklet实现的基础(tasklet实际上只是在软中断的基础上添加了一定的机制)。软中断一般是“可延迟函数”的总称,有时候也包括了tasklet(请读者在遇到的时候根据上下文推断是否包含tasklet)。它的出现就是因为要满足上面所提出的上半部和下半部的区别,使得对时间不敏感的任务延后执行,而且可以在多个CPU上并行执行,使得总的系统效率可以更高。它的特性包括:

  • 产生后并不是马上可以执行,必须要等待内核的调度才能执行。软中断不能被自己打断(即单个cpu上软中断不能嵌套执行),只能被硬件中断打断(上半部)。
  • 可以并发运行在多个CPU上(即使同一类型的也可以)。所以软中断必须设计为可重入的函数(允许多个CPU同时操作),因此也需要使用自旋锁来保其数据结构。

相关数据结构

  • 软中断描述符
    ​​​struct softirq_action{ void (*action)(struct softirq_action *);};​​​描述每一种类型的软中断,其中​​void(*action)​​是软中断触发时的执行函数。
  • 软中断全局数据和类型
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;  
enum
{
HI_SOFTIRQ=0, /*用于高优先级的tasklet*/
TIMER_SOFTIRQ, /*用于定时器的下半部*/
NET_TX_SOFTIRQ, /*用于网络层发包*/
NET_RX_SOFTIRQ, /*用于网络层收报*/
BLOCK_SOFTIRQ,
BLOCK_IOPOLL_SOFTIRQ,
TASKLET_SOFTIRQ, /*用于低优先级的tasklet*/
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */
NR_SOFTIRQS
};

相关API

  • 注册软中断
void open_softirq(int nr, void (*action)(struct softirq_action *))

即注册对应类型的处理函数到全局数组softirq_vec中。例如网络发包对应类型为NET_TX_SOFTIRQ的处理函数net_tx_action.

  • 触发软中断
void raise_softirq(unsigned int nr)

实际上即以软中断类型nr作为偏移量置位每cpu变量irq_stat[cpu_id]的成员变量__softirq_pending,这也是同一类型软中断可以在多个cpu上并行运行的根本原因。

  • 软中断执行函数
do_softirq-->__do_softirq

执行软中断处理函数__do_softirq前首先要满足两个条件:
(1)不在中断中(硬中断、软中断和NMI) 。​​1​​(2)有软中断处于pending状态。
系统这么设计是为了避免软件中断在中断嵌套中被调用,并且达到在单个CPU上软件中断不能被重入的目的。对于ARM架构的CPU不存在中断嵌套中调用软件中断的问题,因为ARM架构的CPU在处理硬件中断的过程中是关闭掉中断的。只有在进入了软中断处理过程中之后才会开启硬件中断,如果在软件中断处理过程中有硬件中断嵌套,也不会再次调用软中断,because硬件中断是软件中断处理过程中再次进入的,此时preempt_count已经记录了软件中断!对于其它架构的CPU,有可能在触发调用软件中断前,也就是还在处理硬件中断的时候,就已经开启了硬件中断,可能会发生中断嵌套,在中断嵌套中是不允许调用软件中断处理的。Why?我的理解是,在发生中断嵌套的时候,表明这个时候是系统突发繁忙的时候,内核第一要务就是赶紧把中断中的事情处理完成,退出中断嵌套。避免多次嵌套,哪里有时间处理软件中断,所以把软件中断推迟到了所有中断处理完成的时候才能触发软件中断。

实现原理和实例

软中断的调度时机:

  1. do_irq完成I/O中断时调用irq_exit。
  2. 系统使用I/O APIC,在处理完本地时钟中断时。
  3. local_bh_enable,即开启本地软中断时。
  4. SMP系统中,cpu处理完被CALL_FUNCTION_VECTOR处理器间中断所触发的函数时。
  5. ksoftirqd/n线程被唤醒时。
    下面以从中断处理返回函数irq_exit中调用软中断为例详细说明。
    触发和初始化的的流程如图所示:

软中断处理流程

asmlinkage void __do_softirq(void)
{
struct softirq_action *h;
__u32 pending;
int max_restart = MAX_SOFTIRQ_RESTART;
int cpu;

pending = local_softirq_pending();
account_system_vtime(current);

__local_bh_disable((unsigned long)__builtin_return_address(0));
lockdep_softirq_enter();

cpu = smp_processor_id();
restart:
/* Reset the pending bitmask before enabling irqs */
set_softirq_pending(0);

local_irq_enable();

h = softirq_vec;

do {
if (pending & 1) {
int prev_count = preempt_count();
kstat_incr_softirqs_this_cpu(h - softirq_vec);

trace_softirq_entry(h, softirq_vec);
h->action(h);
trace_softirq_exit(h, softirq_vec);
if (unlikely(prev_count != preempt_count())) {
printk(KERN_ERR "huh, entered softirq %td %s %p"
"with preempt_count %08x,"
" exited with %08x?\n", h - softirq_vec,
softirq_to_name[h - softirq_vec],
h->action, prev_count, preempt_count());
preempt_count() = prev_count;
}

rcu_bh_qs(cpu);
}
h++;
pending >>= 1;
} while (pending);

local_irq_disable();

pending = local_softirq_pending();
if (pending && --max_restart)
goto restart;

if (pending)
wakeup_softirqd();

lockdep_softirq_exit();

account_system_vtime(current);
_local_bh_enable();
}
  1. 首先调用local_softirq_pending函数取得目前有哪些位存在软件中断。
  2. 调用__local_bh_disable关闭软中断,其实就是设置正在处理软件中断标记,在同一个CPU上使得不能重入__do_softirq函数。
  3. 重新设置软中断标记为0,set_softirq_pending重新设置软中断标记为0,这样在之后重新开启中断之后硬件中断中又可以设置软件中断位。
  4. 调用local_irq_enable,开启硬件中断。
  5. 之后在一个循环中,遍历pending标志的每一位,如果这一位设置就会调用软件中断的处理函数。在这个过程中硬件中断是开启的,随时可以打断软件中断。这样保证硬件中断不会丢失。
  6. 之后关闭硬件中断(local_irq_disable),查看是否又有软件中断处于pending状态,如果是,并且在本次调用__do_softirq函数过程中没有累计重复进入软件中断处理的次数超过max_restart=10次,就可以重新调用软件中断处理。如果超过了10次,就调用wakeup_softirqd()唤醒内核的一个进程来处理软件中断。设立10次的限制,也是为了避免影响系统响应时间。
  7. 调用_local_bh_enable开启软中断。

软中断内核线程

之前我们分析的触发软件中断的位置其实是中断上下文中,而在软中断的内核线程中实际已经是进程的上下文。
这里说的软中断上下文指的就是系统为每个CPU建立的ksoftirqd进程。
软中断的内核进程中主要有两个大循环,外层的循环处理有软件中断就处理,没有软件中断就休眠。内层的循环处理软件中断,每循环一次都试探一次是否过长时间占据了CPU,需要调度就释放CPU给其它进程。具体的操作在注释中做了解释。

set_current_state(TASK_INTERRUPTIBLE);
//外层大循环。
while (!kthread_should_stop()) {
preempt_disable();//禁止内核抢占,自己掌握cpu
if (!local_softirq_pending()) {
preempt_enable_no_resched();
//如果没有软中断在pending中就让出cpu
schedule();
//调度之后重新掌握cpu
preempt_disable();
}

__set_current_state(TASK_RUNNING);

while (local_softirq_pending()) {
/* Preempt disable stops cpu going offline.
If already offline, we'll be on wrong CPU:
don't process */
if (cpu_is_offline((long)__bind_cpu))
goto wait_to_die;
//有软中断则开始软中断调度
do_softirq();
//查看是否需要调度,避免一直占用cpu
preempt_enable_no_resched();
cond_resched();
preempt_disable();
rcu_sched_qs((long)__bind_cpu);
}
preempt_enable();
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;

wait_to_die:
preempt_enable();
/* Wait for kthread_stop */
set_current_state(TASK_INTERRUPTIBLE);
while (!kthread_should_stop()) {
schedule();
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
tasklet

由于软中断必须使用可重入函数,这就导致设计上的复杂度变高,作为设备驱动程序的开发者来说,增加了负担。而如果某种应用并不需要在多个CPU上并行执行,那么软中断其实是没有必要的。因此诞生了弥补以上两个要求的tasklet。它具有以下特性:
a)一种特定类型的tasklet只能运行在一个CPU上,不能并行,只能串行执行。
b)多个不同类型的tasklet可以并行在多个CPU上。
c)软中断是静态分配的,在内核编译好之后,就不能改变。但tasklet就灵活许多,可以在运行时改变(比如添加模块时)。
tasklet是在两种软中断类型的基础上实现的,因此如果不需要软中断的并行特性,tasklet就是最好的选择。也就是说tasklet是软中断的一种特殊用法,即延迟情况下的串行执行

相关数据结构

  • tasklet描述符
struct tasklet_struct
{
struct tasklet_struct *next;//将多个tasklet链接成单向循环链表
unsigned long state;//TASKLET_STATE_SCHED(Tasklet is scheduled for execution) TASKLET_STATE_RUN(Tasklet is running (SMP only))
atomic_t count;//0:激活tasklet 非0:禁用tasklet
void (*func)(unsigned long); //用户自定义函数
unsigned long data; //函数入参
};
  • tasklet链表
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);//低优先级
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);//高优先级

相关API

  • 定义tasklet
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
//定义名字为name的非激活tasklet
#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
//定义名字为name的激活tasklet
void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data)
//动态初始化tasklet
  • tasklet操作
static inline void tasklet_disable(struct tasklet_struct *t)
//函数暂时禁止给定的tasklet被tasklet_schedule调度,直到这个tasklet被再次被enable;若这个tasklet当前在运行, 这个函数忙等待直到这个tasklet退出
static inline void tasklet_enable(struct tasklet_struct *t)
//使能一个之前被disable的tasklet;若这个tasklet已经被调度, 它会很快运行。tasklet_enable和tasklet_disable必须匹配调用, 因为内核跟踪每个tasklet的"禁止次数"
static inline void tasklet_schedule(struct tasklet_struct *t)
//调度 tasklet 执行,如果tasklet在运行中被调度, 它在完成后会再次运行; 这保证了在其他事件被处理当中发生的事件受到应有的注意. 这个做法也允许一个 tasklet 重新调度它自己
tasklet_hi_schedule(struct tasklet_struct *t)
//和tasklet_schedule类似,只是在更高优先级执行。当软中断处理运行时, 它处理高优先级 tasklet 在其他软中断之前,只有具有低响应周期要求的驱动才应使用这个函数, 可避免其他软件中断处理引入的附加周期.
tasklet_kill(struct tasklet_struct *t)
//确保了 tasklet 不会被再次调度来运行,通常当一个设备正被关闭或者模块卸载时被调用。如果 tasklet 正在运行, 这个函数等待直到它执行完毕。若 tasklet 重新调度它自己,则必须阻止在调用 tasklet_kill 前它重新调度它自己,如同使用 del_timer_sync

实现原理

  • 调度原理
static inline void tasklet_schedule(struct tasklet_struct *t)
{
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
void __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;

local_irq_save(flags);
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);//加入低优先级列表
raise_softirq_irqoff(TASKLET_SOFTIRQ);//触发软中断
local_irq_restore(flags);
}
  • tasklet执行过程
    TASKLET_SOFTIRQ对应执行函数为tasklet_action,HI_SOFTIRQ为tasklet_hi_action,以tasklet_action为例说明,tasklet_hi_action大同小异。
static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;

local_irq_disable();
list = __get_cpu_var(tasklet_vec).head;
__get_cpu_var(tasklet_vec).head = NULL;
__get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;//取得tasklet链表
local_irq_enable();

while (list) {
struct tasklet_struct *t = list;

list = list->next;

if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
//执行tasklet
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
//如果t->count的值不等于0,说明这个tasklet在调度之后,被disable掉了,所以会将tasklet结构体重新放回到tasklet_vec链表,并重新调度TASKLET_SOFTIRQ软中断,在之后enable这个tasklet之后重新再执行它
local_irq_disable();
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();
}
}

 

linux 中断softirq tasklet

工作队列

从上面的介绍看以看出,软中断运行在中断上下文中,因此不能阻塞和睡眠,而tasklet使用软中断实现,当然也不能阻塞和睡眠。但如果某延迟处理函数需要睡眠或者阻塞呢?没关系工作队列就可以如您所愿了。
把推后执行的任务叫做工作(work),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct ,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events。
工作队列(work queue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行—这个下半部分总是会在进程上下文执行,但由于是内核线程,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠
通常,在工作队列和软中断/tasklet中作出选择非常容易。可使用以下规则:
- 如果推后执行的任务需要睡眠,那么只能选择工作队列。
- 如果推后执行的任务需要延时指定的时间再触发,那么使用工作队列,因为其可以利用timer延时(内核定时器实现)。
- 如果推后执行的任务需要在一个tick之内处理,则使用软中断或tasklet,因为其可以抢占普通进程和内核线程,同时不可睡眠。
- 如果推后执行的任务对延迟的时间没有任何要求,则使用工作队列,此时通常为无关紧要的任务。
实际上,工作队列的本质就是将工作交给内核线程处理,因此其可以用内核线程替换。但是内核线程的创建和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以我们也推荐使用工作队列。

相关数据结构

  • 正常工作结构体
struct work_struct {
atomic_long_t data; //传递给工作函数的参数
#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
struct list_head entry; //链表结构,链接同一工作队列上的工作。
work_func_t func; //工作函数,用户自定义实现
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
//工作队列执行函数的原型:
void (*work_func_t)(struct work_struct *work);
//该函数会由一个工作者线程执行,因此其在进程上下文中,可以睡眠也可以中断。但只能在内核中运行,无法访问用户空间。

 

  • 延迟工作结构体(延迟的实现是在调度时延迟插入相应的工作队列)
struct delayed_work {
struct work_struct work;
struct timer_list timer; //定时器,用于实现延迟处理
};

 

  • 工作队列结构体
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq; //指针数组,其每个元素为per-cpu的工作队列
struct list_head list;
const char *name;
int singlethread; //标记是否只创建一个工作者线程
int freezeable; /* Freeze threads during suspend */
int rt;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};

 

  • 每cpu工作队列(每cpu都对应一个工作者线程worker_thread)
struct cpu_workqueue_struct {
spinlock_t lock;
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work;
struct workqueue_struct *wq;
struct task_struct *thread;
} ____cacheline_aligned;

 

相关API

  • 缺省工作队列
静态创建 
DECLARE_WORK(name,function); //定义正常执行的工作项
DECLARE_DELAYED_WORK(name,function);//定义延后执行的工作项

动态创建
INIT_WORK(_work, _func) //创建正常执行的工作项
INIT_DELAYED_WORK(_work, _func)//创建延后执行的工作项

调度默认工作队列
int schedule_work(struct work_struct *work)

//对正常执行的工作进行调度,即把给定工作的处理函数提交给缺省的工作队列和工作者线程。工作者线程本质上是一个普通的内核线程,在默认情况下,每个CPU均有一个类型为“events”的工作者线程,当调用schedule_work时,这个工作者线程会被唤醒去执行工作链表上的所有工作。

系统默认的工作队列名称是:keventd_wq,默认的工作者线程叫:events/n,这里的n是处理器的编号,每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。
默认的工作队列和工作者线程由内核初始化时创建:
start_kernel()-->rest_init-->do_basic_setup-->init_workqueues

调度延迟工作
int schedule_delayed_work(struct delayed_work *dwork,unsigned long delay)

刷新缺省工作队列
void flush_scheduled_work(void)
//此函数会一直等待,直到队列中的所有工作都被执行。

取消延迟工作
static inline int cancel_delayed_work(struct delayed_work *work)
//flush_scheduled_work并不取消任何延迟执行的工作,因此,如果要取消延迟工作,应该调用cancel_delayed_work。

 

以上均是采用缺省工作者线程来实现工作队列,其优点是简单易用,缺点是如果缺省工作队列负载太重,执行效率会很低,这就需要我们创建自己的工作者线程和工作队列。

  • 自定义工作队列
create_workqueue(name) 
//宏定义 返回值为工作队列,name为工作线程名称。创建新的工作队列和相应的工作者线程,name用于该内核线程的命名。

int queue_work(struct workqueue_struct *wq, struct work_struct *work)
//类似于schedule_work,区别在于queue_work把给定工作提交给创建的工作队列wq而不是缺省队列。

int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay)
//调度延迟工作。

void flush_workqueue(struct workqueue_struct *wq)
//刷新指定工作队列。

void destroy_workqueue(struct workqueue_struct *wq)
//释放创建的工作队列。

 

实现原理

  1. 工作队列的组织结构
    即workqueue_struct、cpu_workqueue_struct与work_struct的关系。
    一个工作队列对应一个work_queue_struct,工作队列中每cpu的工作队列由cpu_workqueue_struct表示,而work_struct为其上的具体工作。
    关系如下图所示:
  2. linux 中断softirq tasklet

  3. 2.工作队列的工作过程
  4. linux 中断softirq tasklet


  5. 应用实例
    linux各个接口的状态(up/down)的消息需要通知netdev_chain上感兴趣的模块同时上报用户空间消息。这里使用的就是工作队列。
    具体流程图如下所示:
  6. linux 中断softirq tasklet




  1. 是否处于中断中在Linux中是通过preempt_count来判断的,具体如下: 在linux系统的进程数据结构里,有这么一个数据结构:
    #define preempt_count() (current_thread_info()->preempt_count)
    利用preempt_count可以表示是否处于中断处理或者软件中断处理过程中,如下所示:
    # define hardirq_count() (preempt_count() & HARDIRQ_MASK)
    #define softirq_count() (preempt_count() & SOFTIRQ_MASK)
    #define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK | NMI_MASK))
    #define in_irq() (hardirq_count())
    #define in_softirq() (softirq_count())
    #define in_interrupt() (irq_count())
  2. linux 中断softirq tasklet

  3. preempt_count的8~23位记录中断处理和软件中断处理过程的计数。如果有计数,表示系统在硬件中断或者软件中断处理过程中。 ↩


 

 

 

softirq(软中断)下半部中tasklet与workqueue的区别



 一、中断处理的tasklet(小任务)机制

中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。因此,内核把中断处理分为两部分:上半部(tophalf)和下半部(bottomhalf),上半部(就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理,

首先,一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。

下半部运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。

但是,内核到底什时候执行下半部,以何种方式组织下半部?这就是我们要讨论的下半部实现机制,这种机制在内核的演变过程中不断得到改进,在以前的内核中,这个机制叫做bottomhalf(简称bh),在2.4以后的版本中有了新的发展和改进,改进的目标使下半部可以在多处理机上并行执行,并有助于驱动程序的开发者进行驱动程序的开发。下面主要介绍常用的小任务(Tasklet)机制及2.6内核中的工作队列机制。

小任务机制

这里的小任务是指对要推迟执行的函数进行组织的一种机制。其数据结构为tasklet_struct,每个结构代表一个独立的小任务,其定义如下:

 



[cpp] view plaincopy




查看文本打印


  1. struct tasklet_struct {    
  2. struct tasklet_struct *next; /*指向链表中的下一个结构*/    
  3. unsignedlong state; /* 小任务的状态*/    
  4. atomic_tcount; /* 引用计数器*/    
  5. void(*func) (unsigned long); /* 要调用的函数*/    
  6. unsignedlong data; /* 传递给函数的参数*/    
  7. };  




结构中的func域就是下半部中要推迟执行的函数,data是它唯一的参数。
State域的取值为TASKLET_STATE_SCHED或TASKLET_STATE_RUN。TASKLET_STATE_SCHED表示小任务已被调度,正准备投入运行,TASKLET_STATE_RUN表示小任务正在运行。TASKLET_STATE_RUN只有在多处理器系统上才使用,单处理器系统什么时候都清楚一个小任务是不是正在运行(它要么就是当前正在执行的代码,要么不是)。
Count域是小任务的引用计数器。如果它不为0,则小任务被禁止,不允许执行;只有当它为零,小任务才被激活,并且在被设置为挂起时,小任务才能够执行。
1. 声明和使用小任务大多数情况下,为了控制一个寻常的硬件设备,小任务机制是实现下半部的最佳选择。小任务可以动态创建,使用方便,执行起来也比较快。
我们既可以静态地创建小任务,也可以动态地创建它。选择那种方式取决于到底是想要对小任务进行直接引用还是一个间接引用。如果准备静态地创建一个小任务(也就是对它直接引用),使用下面两个宏中的一个:
DECLARE_TASKLET(name,func, data)
DECLARE_TASKLET_DISABLED(name,func, data)
这两个宏都能根据给定的名字静态地创建一个tasklet_struct结构。当该小任务被调度以后,给定的函数func会被执行,它的参数由data给出。这两个宏之间的区别在于引用计数器的初始值设置不同。第一个宏把创建的小任务的引用计数器设置为0,因此,该小任务处于激活状态。另一个把引用计数器设置为1,所以该小任务处于禁止状态。例如:
DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev);
这行代码其实等价于
structtasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0),
tasklet_handler,dev};
这样就创建了一个名为my_tasklet的小任务,其处理程序为tasklet_handler,并且已被激活。当处理程序被调用的时候,dev就会被传递给它。
2. 编写自己的小任务处理程序小任务处理程序必须符合如下的函数类型:
voidtasklet_handler(unsigned long data)
由于小任务不能睡眠,因此不能在小任务中使用信号量或者其它产生阻塞的函数。但是小任务运行时可以响应中断。
3. 调度自己的小任务通过调用tasklet_schedule()函数并传递给它相应的tasklt_struct指针,该小任务就会被调度以便适当的时候执行:
tasklet_schedule(&my_tasklet); /*把my_tasklet标记为挂起 */
在小任务被调度以后,只要有机会它就会尽可能早的运行。在它还没有得到运行机会之前,如果一个相同的小任务又被调度了,那么它仍然只会运行一次。
可以调用tasklet_disable()函数来禁止某个指定的小任务。如果该小任务当前正在执行,这个函数会等到它执行完毕再返回。调用tasklet_enable()函数可以激活一个小任务,如果希望把以DECLARE_TASKLET_DISABLED()创建的小任务激活,也得调用这个函数,如:
tasklet_disable(&my_tasklet); /*小任务现在被禁止,这个小任务不能运行*/
tasklet_enable(&my_tasklet); /* 小任务现在被激活*/
也可以调用tasklet_kill()函数从挂起的队列中去掉一个小任务。该函数的参数是一个指向某个小任务的tasklet_struct的长指针。在小任务重新调度它自身的时候,从挂起的队列中移去已调度的小任务会很有用。这个函数首先等待该小任务执行完毕,然后再将它移去。
4.tasklet的简单用法
下面是tasklet的一个简单应用,以模块的形成加载。

 



[cpp] view plaincopy




查看文本打印


  1. #include <linux module.h="">    
  2. #include<linux init.h="">    
  3. #include<linux fs.h="">    
  4. #include<linux kdev_t.h="">    
  5. #include <linux cdev.h="">    
  6. #include <linux kernel.h="">    
  7. #include<linux interrupt.h="">    
  8.     
  9. static struct t asklet_struct my_tasklet;    
  10.     
  11. static void tasklet_handler (unsigned longd ata)    
  12. {    
  13. printk(KERN_ALERT,"tasklet_handler is running./n");    
  14. }    
  15.     
  16. staticint __init test_init(void)    
  17. {    
  18. tasklet_init(&my_tasklet,tasklet_handler,0);    
  19. tasklet_schedule(&my_tasklet);    
  20. return0;    
  21. }    
  22.     
  23. static void __exit test_exit(void)    
  24. {    
  25. tasklet_kill(&tasklet);    
  26. printk(KERN_ALERT,"test_exit is running./n");    
  27. }    
  28. MODULE_LICENSE("GPL");    
  29.     
  30. module_init(test_init);    
  31. module_exit(test_exit);  
  32. </linux></linux></linux></linux></linux></linux></linux>  



从这个例子可以看出,所谓的小任务机制是为下半部函数的执行提供了一种执行机制,也就是说,推迟处理的事情是由tasklet_handler实现,何时执行,经由小任务机制封装后交给内核去处理。

 

二、中断处理的工作队列机制

工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。

那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。

 

  1. 工作、工作队列和工作者线程

如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。

  1. 表示工作的数据结构

工作用<linux/workqueue.h>中定义的work_struct结构表示:

 



[cpp] view plaincopy




查看文本打印


  1. struct work_struct{    
  2. unsigned long pending; /* 这个工作正在等待处理吗?*/    
  3. struct list_head entry; /* 连接所有工作的链表 */    
  4. void (*func) (void *); /* 要执行的函数 */    
  5. void *data; /* 传递给函数的参数 */    
  6. void *wq_data; /* 内部使用 */    
  7. struct timer_list timer; /* 延迟的工作队列所用到的定时器 */    
  8. };  



这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。

3. 创建推后的工作

要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地建该结构:

DECLARE_WORK(name, void (*func) (void *), void *data);

这样就会静态地创建一个名为name,待执行函数为func,参数为data的work_struct结构。

同样,也可以在运行时通过指针创建一个工作:

INIT_WORK(struct work_struct *work, woid(*func) (void *), void *data);

这会动态地初始化一个由work指向的工作。

4. 工作队列中待执行的函数

工作队列待执行的函数原型是:

void work_handler(void *data)

这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

5. 对工作进行调度

现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events工作线程,只需调用

schedule_work(&work);

work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。

有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

schedule_delayed_work(&work, delay);

这时,&work指向的work_struct直到delay指定的时钟节拍用完以后才会执行。

6. 工作队列的简单应用

 



[cpp] view plaincopy




查看文本打印


  1. #include<linux module.h="">    
  2. #include<linux init.h="">    
  3. #include<linux workqueue.h="">    
  4.     
  5. staticstruct workqueue_struct *queue =NULL;    
  6. staticstruct work_struct work;    
  7.     
  8. staticvoid work_handler(struct work_struct*data)    
  9. {    
  10. printk(KERN_ALERT"work handler function./n");    
  11. }    
  12.     
  13. staticint __init test_init(void)    
  14. {    
  15. queue= create_singlethread_workqueue("helloworld"); /*创建一个单线程的工作队列*/    
  16. if(!queue)    
  17. goto err;    
  18.     
  19. INIT_WORK(&work, work_handler);    
  20. schedule_work(&work);/*schedule_work是添加到系统的events workqueue, 要添加到自己的workqueue, 应该使用queue_work, 故此处有误*/    
  21.     
  22. return 0;    
  23. err:    
  24. return-1;    
  25. }    
  26.     
  27. staticvoid __exit test_exit(void)    
  28. {    
  29. destroy_workqueue(queue);    
  30. }    
  31. MODULE_LICENSE("GPL");    
  32. module_init(test_init);    
  33. module_exit(test_exit);  
  34. </linux></linux></linux>  



tasklet与workqueue的区别和不同应用环境总结  

 

tasklet

Workqueue

处于atomic context,不能sleep

不处于atomic context,可以sleep

处于中断上下文,OS不可以进行进程调度

处于进程上下文,OS可以进行进程调度

运行调度它们的同一个CPU上

默认同一个CPU上

不能指定确定时间进行调度

不能指定确定时间进行调度或者指定至少延时一个确定时间后调度

只能交给ksoftirqd/0

可以提交给events/0,也可以提交给自定义的workqueue

Tasklet函数带参数

Work函数不带参数

Tasklet与workqueue的不同应用环境总结如下:

(1) 必须立即进行紧急处理的极少量任务放入在中断的顶半部中,此时屏蔽了与自己同类型的中断,由于任务量少,所以可以迅速不受打扰地处理完紧急任务。

(2) 需要较少时间的中等数量的急迫任务放在tasklet中。此时不会屏蔽任何中断(包括与自己的顶半部同类型的中断),所以不影响顶半部对紧急事务的处理;同时又不会进行用户进程调度,从而保证了自己急迫任务得以迅速完成。

(3) 需要较多时间且并不急迫(允许被操作系统剥夺运行权)的大量任务放在workqueue中。此时操作系统会尽量快速处理完这个任务,但如果任务量太大,期间操作系统也会有机会调度别的用户进程运行,从而保证不会因为这个任务需要运行时间将其它用户进程无法进行。

(4) 可能引起睡眠的任务放在workqueue中。因为在workqueue中睡眠是安全的。



 

 

软中断/tasklet/工作队列



软中断、tasklet和工作队列并不是Linux内核中一直存在的机制,而是由更早版本的内核中的“下半部”(bottom half)演变而来。下半部的机制实际上包括五种,但2.6版本的内核中,下半部和任务队列的函数都消失了,只剩下了前三者。本文重点在于介绍这三者之间的关系。(函数细节将不会在本文中出现,可以参考文献,点这里)

(1)上半部和下半部的区别
上半部指的是中断处理程序,下半部则指的是一些虽然与中断有相关性但是可以延后执行的任务。举个例子:在网络传输中,网卡接收到数据包这个事件不一定需要马上被处理,适合用下半部去实现;但是用户敲击键盘这样的事件就必须马上被响应,应该用中断实现。
两者的主要区别在于:中断不能被相同类型的中断打断,而下半部依然可以被中断打断;中断对于时间非常敏感,而下半部基本上都是一些可以延迟的工作。由于二者的这种区别,所以对于一个工作是放在上半部还是放在下半部去执行,可以参考下面四条:
a)如果一个任务对时间非常敏感,将其放在中断处理程序中执行。
b)如果一个任务和硬件相关,将其放在中断处理程序中执行。
c)如果一个任务要保证不被其他中断(特别是相同的中断)打断,将其放在中断处理程序中执行。
d)其他所有任务,考虑放在下半部去执行。

(2)为什么要使用软中断?
软中断作为下半部机制的代表,是随着SMP(share memory processor)的出现应运而生的,它也是tasklet实现的基础(tasklet实际上只是在软中断的基础上添加了一定的机制)。软中断一般是“可延迟函数”的总称,有时候也包括了tasklet(请读者在遇到的时候根据上下文推断是否包含tasklet)。它的出现就是因为要满足上面所提出的上半部和下半部的区别,使得对时间不敏感的任务延后执行,而且可以在多个CPU上并行执行,使得总的系统效率可以更高。它的特性包括:
a)产生后并不是马上可以执行,必须要等待内核的调度才能执行。软中断不能被自己打断,只能被硬件中断打断(上半部)。
b)可以并发运行在多个CPU上(即使同一类型的也可以)。所以软中断必须设计为可重入的函数(允许多个CPU同时操作),因此也需要使用自旋锁来保护其数据结构。

(3)为什么要使用tasklet?(tasklet和软中断的区别)
由于软中断必须使用可重入函数,这就导致设计上的复杂度变高,作为设备驱动程序的开发者来说,增加了负担。而如果某种应用并不需要在多个CPU上并行执行,那么软中断其实是没有必要的。因此诞生了弥补以上两个要求的tasklet。它具有以下特性:
a)一种特定类型的tasklet只能运行在一个CPU上,不能并行,只能串行执行。
b)多个不同类型的tasklet可以并行在多个CPU上。
c)软中断是静态分配的,在内核编译好之后,就不能改变。但tasklet就灵活许多,可以在运行时改变(比如添加模块时)。
tasklet是在两种软中断类型的基础上实现的,但是由于其特殊的实现机制(将在4.3节详细介绍),所以具有了这样不同于软中断的特性。而由于这种特性,所以降低了设备驱动程序开发者的负担,因此如果不需要软中断的并行特性,tasklet就是最好的选择。

(4)可延迟函数(软中断及tasklet)的使用
一般而言,在可延迟函数上可以执行四种操作:初始化/激活/执行/屏蔽。屏蔽我们这里不再叙述,前三个则比较重要。下面将软中断和tasklet的三个步骤分别进行对比介绍。

(4.1)初始化
初始化是指在可延迟函数准备就绪之前所做的所有工作。一般包括两个大步骤:首先是向内核声明这个可延迟函数,以备内核在需要的时候调用;然后就是调用相应的初始化函数,用函数指针等初始化相应的描述符。
如果是软中断则在内核初始化时进行,其描述符定义如下:

  struct softirq_action
          {
                   void (*action)(struct softirq_action *);
                   void*data;
          };

在\kernel\softirq.c文件中包括了32个描述符的数组static struct softirq_action softirq_vec[32];但实际上只有前6个已经被内核注册使用(包括tasklet使用的HI_SOFTIRQ/TASKLET_SOFTIRQ和网络协议栈使用的NET_TX_SOFTIRQ/NET_RX_SOFTIRQ,还有SCSI存储和系统计时器使用的两个),剩下的可以由内核开发者使用。需要使用函数:
         void open_softirq(int nr, void (*action)(struct softirq_action*), void *data)
初始化数组中索引为nr的那个元素。需要的参数当然就是action函数指针以及data。例如网络子系统就通过以下两个函数初始化软中断(net_tx_action/net_rx_action是两个函数):

    open_softirq(NET_TX_SOFTIRQ,net_tx_action);
     open_softirq(NET_RX_SOFTIRQ,net_rx_action);

这样初始化完成后实际上就完成了一个一一对应的关系:当内核中产生到NET_TX_SOFTIRQ软中断之后,就会调用net_tx_action这个函数。
tasklet则可以在运行时定义,例如加载模块时。定义方式有两种:
静态声明

DECLARE_TASKET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data)

动态声明

void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data)

其参数分别为描述符,需要调用的函数和此函数的参数—必须是unsigned long类型。也需要用户自己写一个类似net_tx_action的函数指针func。初始化最终生成的结果就是一个实际的描述符,假设为my_tasklet(将在下面用到)。

(4.2)激活
激活标记一个可延迟函数为挂起(pending)状态,表示内核可以调用这个可延迟函数(即使在中断过程中也可以激活可延迟函数,只不过函数不会被马上执行);这种情况可以类比处于TASK_RUNNING状态的进程,处在这个状态的进程只是准备好了被CPU调度,但并不一定马上就会被调度。
软中断使用raise_softirq()函数激活,接收的参数就是上面初始化时用到的数组索引nr。
tasklet使用tasklet_schedule()激活,该函数接受tasklet的描述符作为参数,例如上面生成的my_tasklet:

tasklet_schedule(& my_tasklet)

(4.3)执行
执行就是内核运行可延迟函数的过程,但是执行只发生在某些特定的时刻(叫做检查点,具体有哪些检查点?详见《深入》p.177)。
每个CPU上都有一个32位的掩码__softirq_pending,表明此CPU上有哪些挂起(已被激活)的软中断。此掩码可以用local_softirq_pending()宏获得。所有的挂起的软中断需要用do_softirq()函数的一个循环来处理。
而对于tasklet,由于软中断初始化时,就已经通过下面的语句初始化了当遇到TASKLET_SOFTIRQ/HI_SOFTIRQ这两个软中断所需要执行的函数:

    open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);
     open_softirq(HI_SOFTIRQ, tasklet_hi_action, NULL);

因此,这两个软中断是要被区别对待的。tasklet_action和tasklet_hi_action内部实现就是为什么软中断和tasklet有不同的特性的原因(当然也因为二者的描述符不同,tasklet的描述符要比软中断的复杂,也就是说内核设计者自己多做了一部分限制的工作而减少了驱动程序开发者的工作)。

(5)为什么要使用工作队列work queue?(work queue和软中断的区别)
上面我们介绍的可延迟函数运行在中断上下文中(软中断的一个检查点就是do_IRQ退出的时候),于是导致了一些问题:软中断不能睡眠、不能阻塞。由于中断上下文出于内核态,没有进程切换,所以如果软中断一旦睡眠或者阻塞,将无法退出这种状态,导致内核会整个僵死。但可阻塞函数不能用在中断上下文中实现,必须要运行在进程上下文中,例如访问磁盘数据块的函数。因此,可阻塞函数不能用软中断来实现。但是它们往往又具有可延迟的特性。
因此在2.6版的内核中出现了在内核态运行的工作队列(替代了2.4内核中的任务队列)。它也具有一些可延迟函数的特点(需要被激活和延后执行),但是能够能够在不同的进程间切换,以完成不同的工作。




 


 










http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子