【原创】《Linux设备驱动程序》学习之循序渐进 --- 并发和竟态
第五章 --- 并发和竟态
让我们快速看一段 scull 内存管理代码. 在写逻辑的深处, scull 必须决定它请求的内存是否已经分配. 处理这个任务的代码是:
if (!dptr->data[s_pos]) {
dptr->data[s_pos] = kmalloc(quantum, GFP_KERNEL);
if (!dptr->data[s_pos])
goto out;
}
假设有 2 个进程( 我们会称它们为"A"和"B" ) 独立地试图写入同一个 schull 设备的相同偏移. 每个进程同时到达上面片段的第一行的 if 测试. 如果被测试的指针是 NULL, 每个进程都会决定分配内存, 并且每个都会复制结果指针给 dptr->datat[s_pos]. 因为 2 个进程都在赋值给同一个位置, 显然只有一个赋值可以成功. 当然, 发生的是第 2 个完成赋值的进程将"胜出". 如果进程 A 先赋值, 它的赋值将被进程 B 覆盖. 在此, scull 将完全忘记 A 分配的内存; 它只有指向 B 的内存的指针. A 所分配的指针, 因此, 将被丢掉并且不再返回给系统.
事情的这个顺序是一个竞争情况的演示. 竞争情况是对共享数据的无控制存取的结果. 当错误的存取模式发生了, 产生了不希望的东西. 对于这里讨论的竞争情况, 结果是内存泄漏.
在现代 Linux 系统, 有非常多的并发源, 并且因此而来的可能竞争情况. 多个用户空间进程在运行, 它们可能以令人惊讶的方式组合存取你的代码. SMP 系统能够同时在不同处理器上执行你的代码. 内核代码是可抢占的; 你的驱动代码可能在任何时间失去处理器, 代替它的进程可能也在你的驱动中运行. 设备中断是能够导致你的代码并发执行的异步事件. 内核也提供各种延迟代码执行的机制, 例如 workqueue, tasklet, 以及定时器, 这些能够使你的代码在任何时间以一种与当前进程在做的事情无关的方式运行. 在现代支持热插拔的世界中, 你的设备可能在你使用它们的时候轻易地消失.
因此第一个经验法则是在你设计驱动时在任何可能的时候记住避免共享的资源. 如果没有并发存取, 就没有竞争情况. 因此小心编写的内核代码应当有最小的共享. 这个想法的最明显应用是避免使用全局变量. 如果你将一个资源放在多个执行线路能够找到它的地方, 应当有足够的理由.
事实是, 然而, 这样的共享常常是需要的. 硬件资源是, 由于它们的特性, 共享的, 软件资源也必须常常共享给多个线程. 也要记住全局变量远远不是共享数据的唯一方式; 任何时候你的代码传递一个指针给内核的其他部分, 潜在地它创造了一个新的共享情形. 共享是生活的事实.
这是资源共享的硬规则: 任何时候一个硬件或软件资源被超出一个单个执行线程共享, 并且可能存在一个线程看到那个资源的不一致时, 你必须明确地管理对那个资源的存取.
信号量
信号量在计算机科学中是一个被很好理解的概念. 在它的核心, 一个信号量是一个单个整型值, 结合有一对函数, 典型地称为 P 和 V. 一个想进入临界区的进程将在相关信号量上调用 P; 如果信号量的值大于零, 这个值递减 1 并且进程继续. 相反, 如果信号量的值是 0 ( 或更小 ), 进程必须等待直到别人释放信号量. 解锁一个信号量通过调用 V 完成; 这个函数递增信号量的值, 并且, 如果需要, 唤醒等待的进程.
当信号量用作互斥 -- 阻止多个进程同时在同一个临界区内运行 -- 它们的值将初始化为 1. 这样的信号量在任何给定时间只能由一个单个进程或者线程持有. 以这种模式使用的信号量有时称为一个互斥锁, 就是, 当然, "互斥"的缩写. 几乎所有在 Linux 内核中发现的信号量都是用作互斥.
为使用信号量, 内核代码必须包含 <asm/semaphore.h>. 相关的类型是 struct semaphore; 实际信号量可以用几种方法来声明和初始化.具体方法参见书中113页内容。
读者/写者信号量
信号量为所有调用者进行互斥, 不管每个线程可能想做什么. 然而, 很多任务分为 2 种清楚的类型: 只需要读取被保护的数据结构的类型, 和必须做改变的类型. 允许多个并发读者常常是可能的, 只要没有人试图做任何改变. 这样做能够显著提高性能; 只读的任务可以并行进行它们的工作而不必等待其他读者退出临界区.Linux 内核为这种情况提供一个特殊的信号量类型称为 rwsem (或者" reader/writer semaphore"). rwsem 在驱动中的使用相对较少, 但是有时它们有用. 使用 rwsem 的代码必须包含 <linux/rwsem.h>. 读者写者信号量 的相关数据类型是 struct rw_semaphore。具体方法参见书中116页内容。
completion
在 2.4.7 内核中增加了 "completion" 接口. completion 是任务使用的一个轻量级机制: 允许一个线程告诉另一个线程工作已经完成. 为使用 completion, 你的代码必须包含 <linux/completion.h>. 一个 completion 可被创建。具体方法参见书中117页内容。自旋锁
对于互斥, 信号量是一个有用的工具, 但是它们不是内核提供的唯一这样的工具. 相反, 大部分加锁是由一种称为自旋锁的机制来实现. 不象信号量, 自旋锁可用在不能睡眠的代码中, 例如中断处理. 当正确地使用了, 通常自旋锁提供了比信号量更高的性能. 然而, 它们确实带来对它们用法的一套不同的限制.自旋锁概念上简单. 一个自旋锁是一个互斥设备, 只能有 2 个值:"上锁"和"解锁". 它常常实现为一个整数值中的一个单个位. 想获取一个特殊锁的代码测试相关的位. 如果锁是可用的, 这个"上锁"位被置位并且代码继续进入临界区. 相反, 如果这个锁已经被别人获得, 代码进入一个紧凑的循环中反复检查这个锁, 直到它变为可用. 这个循环就是自旋锁的"自旋"部分.
自旋锁原语要求的包含文件是 <linux/spinlock.h>. 一个实际的锁有类型 spinlock_t. 象任何其他数据结构, 一个 自旋锁必须初始化. 这个初始化可以在编译时完成,具体方法参见书中119页内容。
注意所有的自旋锁等待是, 由于它们的特性, 不可中断的. 一旦你调用 spin_lock, 你将自旋直到锁变为可用.
应用到自旋锁的核心规则是任何代码必须, 在持有自旋锁时, 是原子性的. 它不能睡眠; 事实上, 它不能因为任何原因放弃处理器, 除了服务中断(并且有时即便此时也不行)
内核抢占的情况由自旋锁代码自己处理. 内核代码持有一个自旋锁的任何时间, 抢占在相关处理器上被禁止. 即便单处理器系统必须以这种方式禁止抢占以避免竞争情况. 这就是为什么需要正确的加锁, 即便你从不期望你的代码在多处理器机器上运行.
关于自旋锁使用的最后一个重要规则是自旋锁必须一直是尽可能短时间的持有.为避免产生这类问题, 重视使你的锁持有时间越短越好.
实际上有 4 个函数可以加锁一个自旋锁:
void spin_lock(spinlock_t *lock);
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);
void spin_lock_irq(spinlock_t *lock);
void spin_lock_bh(spinlock_t *lock
也有 4 个方法来释放一个自旋锁; 你用的那个必须对应你用来获取锁的函数.
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
还有一套非阻塞的自旋锁操作:
int spin_trylock(spinlock_t *lock);
int spin_trylock_bh(spinlock_t *lock);
这些函数成功时返回非零( 获得了锁 ), 否则 0. 没有"try"版本来禁止中断.
读者/写者自旋锁
内核提供了一个自旋锁的读者/写者形式, 直接模仿我们在本章前面见到的读者/写者信号量. 这些锁允许任何数目的读者同时进入临界区, 但是写者必须是排他的存取. 读者写者锁有一个类型 rwlock_t, 在 <linux/spinlokc.h> 中定义.锁陷阱
一个正确的加锁机制需要清晰和明确的规则. 当你创建一个可以被并发存取的资源时, 你应当定义哪个锁将控制存取. 加锁应当真正在开始处进行; 事后更改会是难的事情. 开始时花费的时间常常在调试时获得回报.当你编写你的代码, 你会毫无疑问遇到几个函数需要存取通过一个特定锁保护的结构. 在此, 你必须小心: 如果一个函数需要一个锁并且接着调用另一个函数也试图请求这个锁, 你的代码死锁. 不论信号量还是自旋锁都不允许一个持锁者第 2 次请求锁; 如果你试图这样做, 系统将挂起.
这个问题的解决方法常常是简单的: 当多个锁必须获得时, 它们应当一直以同样顺序获得. 只要遵照这个惯例, 象上面描述的简单死锁能够避免. 然而, 遵照加锁顺序规则是做比说难. 非常少见这样的规则真正在任何地方被写下. 常常你能做的最好的是看看别的代码如何做的.
一些经验规则能帮上忙. 如果你必须获得一个对你的代码来说的本地锁(假如, 一个设备锁), 以及一个属于内核更中心部分的锁, 先获取你的. 如果你有一个信号量和自旋锁的组合, 你必须, 当然, 先获得信号量; 在持有一个自旋锁时调用 down (可能睡眠) 是一个严重的错误. 但是最重要的, 尽力避免需要多于一个锁的情况.
原子变量
内核提供了一个原子整数类型称为 atomic_t, 定义在 <asm/atomic.h>. 一个 atomic_t 持有一个 int 值在所有支持的体系上. 但是, 因为这个类型在某些处理器上的工作方式, 整个整数范围可能不是都可用的; 因此, 你不应当指望一个 atomic_t 持有多于 24 位.具体方法参见书中127页内容。你还应当记住, atomic_t 值只在当被置疑的量真正是原子的时候才起作用. 需要多个 atomic_t 变量的操作仍然需要某种其他种类的加锁.
位操作
atomic_t 类型在进行整数算术时是不错的. 但是, 它无法工作的好, 当你需要以原子方式操作单个位时. 为此, 内核提供了一套函数来原子地修改或测试单个位. 因为整个操作在单步内发生, 没有中断(或者其他处理器)能干扰. 原子位操作非常快, 因为它们使用单个机器指令来进行操作, 而在任何时候低层平台做的时候不用禁止中断. 函数是体系依赖的并且在 <asm/bitops.h> 中声明. 它们保证是原子的, 即便在 SMP 计算机上, 并且对于跨处理器保持一致是有用的.seqlock 锁
要保护的资源小, 简单, 并且常常被存取, 并且很少写存取但是必须要快.seqlock 在这种情况下工作。seqlock 通常不能用在保护包含指针的数据结构, 因为读者可能跟随着一个无效指针而写者在改变数据结构. seqlock 定义在 <linux/seqlock.h>.读取-拷贝-更新
读取-拷贝-更新(RCU) 是一个高级的互斥方法, 能够有高效率在合适的情况下. 它在驱动中的使用很少但是不是没人知道, 因此这里值得快速浏览下. 那些感兴趣 RCU 算法的完整细节的人可以在由它的创建者出版的白皮书中找到( http://www.rdrop.com/users/paulmck/rclock/intro/rclock_intro.html).RCU 对它所保护的数据结构设置了不少限制. 它对经常读而极少写的情况做了优化. 被保护的资源应当通过指针来存取, 并且所有对这些资源的引用必须由原子代码持有.使用 RCU 的代码应当包含 <linux/rcupdate.h>. 具体方法参见书中131页内容。
快速参考
#include <asm/semaphore.h>定义信号量和其上操作的包含文件.
DECLARE_MUTEX(name);
DECLARE_MUTEX_LOCKED(name);
2 个宏定义, 用来声明和初始化一个在互斥模式下使用的信号量.
void init_MUTEX(struct semaphore *sem);
void init_MUTEX_LOCKED(struct semaphore *sem);
这 2 函数用来在运行时初始化一个信号量.
void down(struct semaphore *sem);
int down_interruptible(struct semaphore *sem);
int down_trylock(struct semaphore *sem);
void up(struct semaphore *sem);
加锁和解锁信号量. down 使调用进程进入不可打断睡眠, 如果需要;
down_interruptible, 相反, 可以被信号打断. down_trylock 不睡眠; 相
反, 它立刻返回如果信号量不可用. 加锁信号量的代码必须最终使用 up 解锁
它.
struct rw_semaphore;
init_rwsem(struct rw_semaphore *sem);
信号量的读者/写者版本和初始化它的函数.
void down_read(struct rw_semaphore *sem);
int down_read_trylock(struct rw_semaphore *sem);
void up_read(struct rw_semaphore *sem);
获得和释放对读者/写者信号量的读存取的函数.
void down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
void up_write(struct rw_semaphore *sem);
void downgrade_write(struct rw_semaphore *sem);
管理对读者/写者信号量写存取的函数.
#include <linux/completion.h>
DECLARE_COMPLETION(name);
init_completion(struct completion *c);
INIT_COMPLETION(struct completion c);
描述 Linux completion 机制的包含文件, 已经初始化 completion 的正
常方法. INIT_COMPLETION 应当只用来重新初始化一个之前已经使用过的
completion.
void wait_for_completion(struct completion *c);
等待一个 completion 事件发出.
void complete(struct completion *c);
void complete_all(struct completion *c);
发出一个 completion 事件. completion 唤醒, 最多, 一个等待着的线程,
而 complete_all 唤醒全部等待者.
void complete_and_exit(struct completion *c, long retval);
通过调用 complete 来发出一个 completion 事件, 并且为当前线程调用
exit.
#include <linux/spinlock.h>
spinlock_t lock = SPIN_LOCK_UNLOCKED;
spin_lock_init(spinlock_t *lock);
定义自旋锁接口的包含文件, 以及初始化锁的 2 个方法.
void spin_lock(spinlock_t *lock);
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);
void spin_lock_irq(spinlock_t *lock);
void spin_lock_bh(spinlock_t *lock);
加锁一个自旋锁的各种方法, 并且, 可能地, 禁止中断.
int spin_trylock(spinlock_t *lock);
int spin_trylock_bh(spinlock_t *lock);
上面函数的非自旋版本; 在获取锁失败时返回 0, 否则非零.
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
释放一个自旋锁的相应方法.
rwlock_t lock = RW_LOCK_UNLOCKED
rwlock_init(rwlock_t *lock);
初始化读者/写者锁的 2 个方法.
void read_lock(rwlock_t *lock);
void read_lock_irqsave(rwlock_t *lock, unsigned long flags);
void read_lock_irq(rwlock_t *lock);
void read_lock_bh(rwlock_t *lock);
获得一个读者/写者锁的读存取的函数.
void read_unlock(rwlock_t *lock);
void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void read_unlock_irq(rwlock_t *lock);
void read_unlock_bh(rwlock_t *lock);
释放一个读者/写者自旋锁的读存取.
void write_lock(rwlock_t *lock);
void write_lock_irqsave(rwlock_t *lock, unsigned long flags);
void write_lock_irq(rwlock_t *lock);
void write_lock_bh(rwlock_t *lock);
获得一个读者/写者锁的写存取的函数.
void write_unlock(rwlock_t *lock);
void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void write_unlock_irq(rwlock_t *lock);
void write_unlock_bh(rwlock_t *lock);
释放一个读者/写者自旋锁的写存取的函数.
#include <asm/atomic.h>
atomic_t v = ATOMIC_INIT(value);
void atomic_set(atomic_t *v, int i);
int atomic_read(atomic_t *v);
void atomic_add(int i, atomic_t *v);
void atomic_sub(int i, atomic_t *v);
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);
int atomic_inc_and_test(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_sub_and_test(int i, atomic_t *v);
int atomic_add_negative(int i, atomic_t *v);
int atomic_add_return(int i, atomic_t *v);
int atomic_sub_return(int i, atomic_t *v);
int atomic_inc_return(atomic_t *v);
int atomic_dec_return(atomic_t *v);
原子地存取整数变量. atomic_t 变量必须只通过这些函数存取.
#include <asm/bitops.h>
void set_bit(nr, void *addr);
void clear_bit(nr, void *addr);
void change_bit(nr, void *addr);
test_bit(nr, void *addr);
int test_and_set_bit(nr, void *addr);
int test_and_clear_bit(nr, void *addr);
int test_and_change_bit(nr, void *addr);
原子地存取位值; 它们可用做标志或者锁变量. 使用这些函数阻止任何与
并发存取这个位相关的竞争情况.
#include <linux/seqlock.h>
seqlock_t lock = SEQLOCK_UNLOCKED;
seqlock_init(seqlock_t *lock);
定义 seqlock 的包含文件, 已经初始化它们的 2 个方法.
unsigned int read_seqbegin(seqlock_t *lock);
unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags);
int read_seqretry(seqlock_t *lock, unsigned int seq);
int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);
获得一个 seqlock-保护 的资源的读权限的函数.
void write_seqlock(seqlock_t *lock);
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags);
void write_seqlock_irq(seqlock_t *lock);
void write_seqlock_bh(seqlock_t *lock);
获取一个 seqlock-保护的资源的写权限的函数.
void write_sequnlock(seqlock_t *lock);
void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags);
void write_sequnlock_irq(seqlock_t *lock);
void write_sequnlock_bh(seqlock_t *lock);
释放一个 seqlock-保护的资源的写权限的函数.
#include <linux/rcupdate.h>
需要使用读取-拷贝-更新(RCU)机制的包含文件.
void rcu_read_lock;
void rcu_read_unlock;
获取对由 RCU 保护的资源的原子读权限的宏定义.
void call_rcu(struct rcu_head *head, void (*func)(void *arg), void *arg);
安排一个回调在所有处理器已经被调度以及一个 RCU-保护的资源可用被
安全的释放之后运行.