中断处理机制是操作系统必不可少的部分,但中断处理本身有一定局限性,包括:
①中断处理程序以异步方式执行,它有可能会打断其他重要代码(甚至中断处理代码)的执行,因此为了避免被打断的代码停止时间过长,中断处理程序应该执行的越快越好。
②如果当前中断处理程序正在执行,最好情况下,该中断线会被屏蔽,最坏情况下(设置了IRQF_DISABLED),当前处理器上所有其他中断都会被屏蔽,禁止中断后硬件与操作系统无法通信,因此,中断处理程序执行越快越好。
③中断处理程序往往需要对硬件进行操作,所以它们通常有很高的时限要求。
④中断处理程序不在进程上下文中运行,所以它们不能阻塞。
操作系统必须有一个快速、异步、简单的机制负责对硬件做出迅速响应并完成对时间要求严格的操作。中断处理程序很适合实现这些功能,但是对于那些对时间要求宽松,耗时的工作,由于中断处理的局限,适合放在中断处理退出后的某个合适时机执行,这就是中断处理下半部。
1.下半部
1.1 下半部的任务:就是执行与中断处理密切相关但中断处理程序本身不执行的工作。
中断处理的上半部,下半部没有严格区分规则(理论上中断处理完成的工作越少越好,我们期望中断处理程序尽可能快地返回),但还是有一些借鉴:
①如果一个任务对时间非常敏感,将其放在中断处理程序中执行。
②如果一个任务和硬件相关,将其放在中断处理程序中执行。
③如果一个任务要求不被其他中断(特别是相同中断)打断,将其放在中断处理程序中执行
④其他所有任务,考虑放在下半部执行。
1.2 下半部执行时间?
下半部会在中断退出后,某个合适的时机执行,并没有明确指定执行时间。通常中断处理程序一返回就会马上运行下半部。
下半部执行的关键在于当它们运行时,允许响应所有的中断。
1.3下半部的环境
①BH(bottomhalf)、任务队列,早期的下半部机制,因不够灵活,性能瓶颈,现以弃用。
②软件中断(softirqs):一组静态定义的下半部接口,有32个,可以在所有处理器上同时执行(两个类型相同也可以),软中断必须在编译期间就进行静态注册,性能非常高。
③tasklet:tasklet实际上是通过软中断实现的,两个不同类型的tasklet可以在不同的处理器上同时执行,但类型相同的tasklet不能同时时行。
tasklet可以动态注册.
④内核定时器:内核定时器可以把操作推迟到某个确定的时间段之后运行。
当前,Linux内核有三种机制可以用来实现中断处理下半部:软中断、tasklet和工作队列。
2.软件中断
1.1 软件中断的实现:
软件中断在编译期间静态分配,用softirq_action结构表示,在
点击(此处)折叠或打开
-
struct softirq_action
-
{
- void (*action)(struct softirq_action *);
-
};
- Kernel/softirq.c中定义了一个包含32个该结构体的数组
- static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
每注册一个软中断,占据该数组的一项,最多可能32个软中断,目前只用了9个。
(1)软中断处理程序
原型是:
void softirq_handler(struct softirq_action*)
当内核运行一个软中断处理程序时,它就会执行这个action函数,参数指向softirq_action本身。以如下形式调用软中断处理函数:
my_softirq->action(my_softirq);
一个软中断不会抢占另外一个软中断,实际上,唯一可以抢占软中断的是中断处理程序。其他软中断(包括同类型)可以在其他处理器上同时执行。
(2)执行软中断
一个注册的软中断必须被标记后才会执行(触发软中断),通常中断处理程序会在返回前标记它的软中断,使其在稍后被执行。在下列地方,待处理的软中断会被检查和执行:
*从一个硬件中断代码处返回时
*在ksoftirqd内核线程中
*在那些显示检查和执行待处理的软中断代码中,如网络子系统中
无论用什么办法唤醒,软中断都要在do_softirq()中执行,该函数循环遍历softirq_vec[],被标记的软中断处理函数被逐个执行。
1.2 软件中断的使用:
软中断保留给系统中对时间要求最严格以及最重要的下半部使用,目前只有网络和SCSI直接使用软中断。内核定时器和tasklet都是由软中断实现的,所以一般优先使用tasklet(对加锁要求不高)。
(1)分配索引
必须根据希望赋予的优先级来决定加入软件中断的位置,索引号小的软中断会先执行。
(2)注册处理程序
在运行时,通过open_softirq()注册软中断处理程序,该函数参数有两个:索引号和处理函数。
点击(此处)折叠或打开
-
void open_softirq(int nr, void (*action)(struct softirq_action *))
-
{
-
softirq_vec[nr].action = action;
- }
实际上就是把处理函数指针添加到软中断数组中。
点击(此处)折叠或打开
-
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
- open_softirq(NET_RX_SOFTIRQ, net_rx_action);
软中断处理程序执行时,允许响应中断,但它自己不能休眠。当前处理器上的软中断被禁止。
其他处理器仍可以执行新的软中断(包括同类型中断再次被触发),所以要注意共享数据的锁保护问题。
引入软中断,主要是因其可扩展性,若不需要扩展至多个处理器,就用tasklet。
(3) 触发你的软中断
通过open_softirq()注册之后,新的软中断处理程序可以运行。 raise_softirq()将一个软中断设置为挂起状态,在下次调用do_softirq()函数时运行。比如网络子系统:
raise_softirq(NET_TX_SOFTIRQ);//触发NET_TX_SOFTIRQ软中断
在触发软中断之前先要禁止中断,触发后再恢复原来状态。
如果中断本来就已经被禁止,可以用另一个函数来触发,这回带来一些优化效果
点击(此处)折叠或打开
-
/*
-
* interrupts must already be
-
*/
- raise_softirq_irqoff(NET_TX_SOFTIRQ)
在中断处理程序触发软中断是最常见形式,ISR执行硬件设备的相关操作,然后触发相应的软中断,退出;内核在执行完ISR后,马上就会调用do_softirq()函数。
3.tasklet
tasklet是利用软中断实现的一种下半部机制,它接口更简单,锁保护也要求较低。
3.1 tasklet的实现 :
tasklet是通过软中断实现的,它由两类软中断代表:HI_SOFTIRQ和TASKLET_SOFTIRQ。两者唯一区别是HI_SOFTIRQ会先于TASKLET_SOFTIRQ执行。
(1) tasklet由tasklet_struct结构表示,每个结构体单独代表一个tasklet,在
点击(此处)折叠或打开
-
struct tasklet_struct {
-
struct tasklet_struct *next; /* next tasklet in the list */
-
unsigned long state; /* state of the tasklet */
-
atomic_t count; /* reference counter */
-
void (*func)(unsigned long); /* tasklet handler function */
-
unsigned long data; /* argument to the tasklet function */
- };
func:是tasklet的处理程序;data是其唯一参数
state状态有:TASKLET_STATE_SCHED,TASKLET_STATE_RUN表明tasklet正在运行(只在多处理器上会用来做优化);
count: 是tasklet引用计数器,如果不为0,tasklet被禁止,不允许执行;只有count为0,tasklet才被激活。
(2)调度tasklet
已调度的tasklet(等同于被触发的软中断),存放在两个单处理器数据结构:tasklet_vec和tasklet_hi_vec(高优先级的tasklet)。
tasklet_stuct结构体构成的链表,链表中的每个tasklet_struct代表一个不同的tasklet。
tasklet由tasklet_schedule()和tasklet_hi_schedule()函数进行调度,接受一个指向tasklet_strcut结构的指针作为参数, tasklet_schedule()的过程:
①检查tasklet的状态是否为TASKLET_STATE_SCHED, 如果是,说明该tasklet已被调度过,立即返回;
②调用__tasklet_schedule();
③保存中断状态,然后禁止本地中断。保证触发的过程,处理器数据不会乱;
④把需要调度的tasklet加到每个处理器一个的tasklet_vec链表或tasklet_hi_vec链表的表头;
⑤唤起TASKLET_SOFTIRQ()或HI_SOFTIRQ()软中断;
⑥恢复中断到原状态并返回;
tasklet的核心执行函数tasklet_action()和 tasklet_hi_action(),主要完成以下任务:
①禁止中断(没必要保存中断状态,中断总是被激活的),并检索tasklet_vec或tasklet_hi_vec链表
②将当前处理器上的该链表设置为NULL,达到清空的效果
③允许响应中断;
④循环遍历获得链表上的每一个待处理的tasklet;
⑤如果是多处理器系统,检查tasklet的state,若为TASKLET_STATE_RUN说明该tasklet正在某个处理器上运行,就不执行,直接跳到下一个taskle处理(同一时刻,整个系统同类型tasklet只能执行一次)
⑥如果当前tasklet没有执行,就设置状态为TASKLET_STATE_RUN.
⑦检查count值是否为0,确保tasklet没有被禁止,若禁止,跳到下一个tasklet;
⑧明确这个tasklet没有在其他地方执行,并且被我们设置为执行状态,并且引用计数为0,现在就可以执行tasklet的处理程序。
⑨tasklet执行完毕,清除tasklet的state域的TASK_STATE_RUN状态
⑩重复执行下一个tasklet,直到处理完所有待处理的tasklet;
这个函数确保同一时间,只有一个给定类别的tasklet会被执行。
3.2 tasklet的使用
(1)申明tasklet:可以静态也可以动态创建
点击(此处)折叠或打开
-
静态创建
-
DECLARE_TASKLET(name, func, data) //引用计数设置为0,tasklet激活
-
DECLARE_TASKLET_DISABLED(name, func, data);//引用计数设置为1,tasklet禁止
-
动态创建:先申明一个tasklet_struct类型变量t,然后调用
- void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data)
(2)tasklet处理程序:原型
void tasklet_handler(unsigned long data);
tasklet是由软中断实现的,所以也不能睡眠,即不能在tasklet中使用信号量或其他阻塞式的函数。
(3)调度tasklet
①通过调用tasklet_schedule()函数并传递给他相应的tasklet_struct指针,该tasklet就被调度以便执行:
tasklet_schedule(&my_tasklet); //把my_tasklet标记为挂起
在tasklet被调度以后,只要有机会就会尽早运行,在它还没运行之前如果相同的tasklet又被调度了,那么它仍然只会运行一次。如果是tasklet已经在某个CPU上运行了,那么这个新的tasklet会被重新调度并再次运行,
②禁止或是能某个指定的tasklet
点击(此处)折叠或打开
-
tasklet_disable(&my_tasklet); //若my_tasklet正在执行,等待它执行完毕再返回
-
tasklet_disable_nosync(&my_tasklet); //不等待正在执行的tasklet执行完毕,直接返回
-
tasklet_ensable(&my_tasklet); //激活my_tasklet
- //通过DECLARE_TASKLET_DISABLED()创建的tasklet, 可以用tasklet_enable()激活
(4) ksoftirad
当软中断频繁触发自己时,若及时响应软中断,用户进程会处于饥饿状态;若不立即响应,当中断不频繁时,软中断得不到及时执行;
为了折中,Linux的实现是,不会立即处理重新触发的软中断,而是当大量软中断出现时,内核会唤醒一组内核线程来处理这些负载。叫做ksoftirqd/n(n是对于CPU编号)线程。Nice值是19,这能避免跟其他重要任务抢夺资源,但最终肯定会被执行。当系统空闲时,软中断处理会非常迅速(仅存的内核线程肯定会马上调度)。
Ksoftirqd会执行下面的死循环
点击(此处)折叠或打开
-
for (;;) {
-
if (!softirq_pending(cpu))
-
schedule();
-
set_current_state(TASK_RUNNING);
-
while (softirq_pending(cpu)) {
-
do_softirq();
-
if (need_resched())
-
schedule();
-
}
-
set_current_state(TASK_INTERRUPTIBLE);
- }
在ksoftirq中,只要有待处理的软中断(softirq_pending()探测发现),ksoftirq就会调用do_softirq()去处理它们。当执行完所需操作之后,内核线程将自己设置为TASK_INTERRUPTIBLE状态。
只要do_softirq()函数发现已经执行过的内核线程重新触发了自己,ksoftirqd线程就会被唤醒;
4.工作队列
工作队列可以把工作推后,交由一个内核线程去执行-->这个下半部分总在进程上下文中。实际上工作队列可以用内核线程替换,但由于内核开发者非常反对创建新的内核线程,所以推荐用工作队列。
工作队列可以睡眠,当需要大量内存,或执行阻塞式I/O操作时,非常有用。
4.1 工作队列的实现
工作队列子系统是一个用于创建内核线程的接口,它创建的进程负责执行由内核其他部分排到队列里的任务,她创建的这些内核线程叫做工作者线程(worker thread). 工作队列可以让驱动程序创建一个专门的工作者线程来处理需要推后的工作。不过,工作队列子系统提供了一个缺省的工作者线程来处理这些工作。这个缺省的工作者线程叫events/n(n表示处理器编号),每个处理器对应一个events线程。许多内核驱动程序把它们的下半部交给缺省的工作者线程去做,当然也可以建立一个新的工作者线程。
(1)表示工作者线程的数据结构
点击(此处)折叠或打开
-
struct workqueue_struct {
-
struct cpu_workqueue_struct cpu_wq[NR_CPUS];
-
struct list_head list;
-
const char *name;
-
int singlethread;
-
int freezeable;
-
int rt;
- };
结构内的cpu_workqueue_struct结构组成的数组cpu_wq[]. 数组每一项对应系统中的一个处理器,每个处理器对应一个工作者线程,每个工作者线程对应一个cpu_workqueue_struct结构体。
点击(此处)折叠或打开
-
struct cpu_workqueue_struct {
-
spinlock_t lock; /* lock protecting this structure */
-
struct list_head worklist; /* list of work */
-
wait_queue_head_t more_work;
-
struct work_struct *current_struct;
-
struct workqueue_struct *wq; /* associated workqueue_struct */
-
task_t *thread; /* associated thread */
- };
每个工作者线程类型关联一个自己的workqueue_struct。
(2)表示工作的数据结构
工作用
点击(此处)折叠或打开
-
struct work_struct {
-
atomic_long_t data;
-
struct list_head entry;
-
work_func_t func;
- };
所有工作的结构体链接成一个链表,每个处理器上的每种类型的队列都对应这样一个链表。
所有工作者线程都是普通内核线程实现的,都要执行worker_thread()函数
worker_thread() 函数的核心流程
点击(此处)折叠或打开
-
for (;;) {
-
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
-
if (list_empty(&cwq->worklist))
-
schedule();
-
finish_wait(&cwq->more_work, &wait);
-
run_workqueue(cwq);
- }
该函数完成以下功能:
①线程将自己设置为休眠状态(TASK_INTERRUPTIBLE), 并把自己加入到等待队列中。
②如果工作队列是空的,线程调用schedule()进入睡眠
③如果链表有对象,线程不会睡眠,而是将自己设置成TASK_RUNNING, 脱离等待队列。
④ 如果链表非空,调用run_workqueue()函数来执行被推后的工作。
run_workqueue()函数主要完成
点击(此处)折叠或打开
-
while (!list_empty(&cwq->worklist)) {
-
struct work_struct *work;
-
work_func_t f;
-
void *data;
-
work = list_entry(cwq->worklist.next, struct work_struct, entry);
-
f = work->func;
-
list_del_init(cwq->worklist.next);
-
work_clear_pending(work);
-
f(work);
- }
该函数循环遍历链表上每个待处理的工作,执行链表每个节点的workqueue_struct中的func成员函数:
①当链表不为空时,选取下一个结点对象
②获取我们希望执行的函数func及其参数data
③把该节点从链表中取下来,将待处理标志位pending清零
④调用工作函数
⑤重复执行,知道处理完链表中所有待处理工作函数。
(3) 工作队列实现机制的总结
位于最高层的是工作者线程,系统允许有多种类型的工作者线程存在。对于指定的一个类型,具有一个workqueue_struct 结构(包含该类型的所有工作者线程),它给每个CPU分配一个cpu_workqueue_struct结构。
工作结构处于最底层,该结构包含一个函数指针,这个指向的函数就是推后执行的具体任务,工作会被提交给某个具体的工作者线程,然后这个工作者线程会被唤醒并执行这些排好的工作。
4.2工作队列的使用
(1)创建推后的工作
静态创建:DECLARE_WORK(name,void (*func)(void *), void *data);
创建一个名为name, 处理函数为func, 参数为data的work_struct结构体。
在运行时创建:INIT_WORK(structwork_struct *work, void (*func)(void *), void *data);
(2)工作队列处理函数
void work_handle(void *data);
这个函数会由一个工作者线程执行,运行在进程上下文中,允许响应中断,并且不持有任何锁。可以睡眠,但是不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常只有在发生系统调用时,内核会代表用户空间的进程运行,此时它才能访问用户空间。
(3)对工作进行调度
把给定的工作处理函数提交给缺省的events工作线程
schedule_work(&my_work);
my_work马上就会被调度,一旦所在处理器的工作线程被唤醒,它就会被执行。
schedule_delayed_work(&my_work, delay);//& my_work指向的work_struct知道delay指定的时钟节拍后才会执行。
(4)刷新操作
void flush_scheduled_work(void);
该函数一直等待,直到队列中所有对象都被执行完以后才返回,等待时处于休眠。但是不取消任何延迟执行的工作, 即由schedule_delayed_work()调度的工作,如果其延迟时间未结束,它并不会因为调用flush_scheduled_work()而被刷新掉,取消延迟执行工作应该调用
int cancel_delayed_work(struct work_struct*work);
这个函数取消任何与work_struct相关的关起工作。
(5)创建新的工作队列
创建一个新的工作队列,会在每个处理器上都创建一个工作者线程,所以只有在明确了必须靠自己的一套线程来提高性能的情况下,再创建自己的工作队列。
点击(此处)折叠或打开
-
①创建一个新的工作队列和与之相关的工作者线程,调用函数原型
-
struct workqueue_struct *create_workqueue(const char *name);
-
struct workqueue_struct * my_workqueue;
-
my_workqueue = create_workqueue(“my_wq”)
-
-
②调度工作
-
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
- int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay)
-
-
③刷新工作队列
- flush_workqueue(struct workqueue_struct *wq)
5.下半部机制的选择
2.6版内核对于下半部有三个选择:软中断、tasklet和工作队列。
软中断:速度最快,但提供的执行序列化保障最少,这就严格要求确保各项数据的安全。不能休眠,同类别的软中断可以在多处理器上同时执行。
tasklet: 用软中断实现,同类别的tasklet不能在多处理器上同时执行。
工作队列:用内核线程实现,处于进程上下文,可以睡眠,开销最大。
6.下半部之间的加锁
如果进程上下文和一个下半部共享数据,在访问这些数据之前,必须禁止下半部的处理并得到锁的使用权,这是为了本地和SMP的保护并且防止死锁的出现。
如果一个中断上下文和一个下半部共享数据,在访问数据之前,必须禁止中断并得到锁的使用权。这是为了本地和SMP并且防止死锁出现。
任何工作队列中被共享的数据也需要使用锁机制。
7.禁止下半部
需要禁止所有的下半部处理,也就是禁止所有软中断和tasklet,可以调用
void local_bh_disable(void);
允许下半部处理
void local_bh_enable(void);
这两个是成对使用的,比如第一次调用local_bh_disable(),本地软中断就被禁止了,再调用一次,本次处理依然是禁止;相应的只有第二次local_bh_enable(),软中断才会被激活。函数也是通过preempt_count为每个进程维护一个计数器的,当计数器为0时,下半部才能被处理。
这些函数并不能禁止工作队列的执行,因为工作队列在进程上下文中执行,不会涉及异步执行的问题。