Linux驱动--并发和竞争

时间:2022-11-07 23:35:29

概述:
并发和竞争,对于驱动来说,就是对临界区资源的保护。在面对多核CPU,很多进程同时运行,同时访问公共设备和数据,要保证这么多进程有序进行访问。

以下,将主要从用法上归纳总结linux驱动对并发和竞争处理提供的接口。

接口类别:

  • 旗标(semaphore 也可以说信号量)
  • Completions机制
  • 自旋锁(spinlock)
  • 顺序锁(seqlock)
  • 原子变量(atomic)
  • 原子位操作(bitops)
  • RCU(read-copy-update:读取-拷贝-更新)

各种接口类别应用的场景:

  1. 旗标(semaphore)
    保护临界区资源,同一时刻只能有一个进程获取旗标,可以访问保护的临界区,一个进程未释放旗标,另外的进程试图访问临界区,将不能获取旗标,会睡眠等待释放旗标,多个进程访问未释放旗标的临界区,这些进程将排队等待并睡眠,直到旗标释放,一个一个唤起。主要应用在进程上下文环境中,可以睡眠。不能用在不可睡眠的地方,如中断服务程序。

  2. Completions
    一个动作,需要等待另外一个动作完成,才能继续下去。这种场景用completions,虽然用旗标也可以做到,但是completions在这种场景要比旗标好,性能上和各种极端情况都要好。completions等待另外动作完成的进程,是不杀死的。completions典型应用是模块退出和线程退出时一起退出。

  3. 自旋锁(spinlock)
    自旋锁和旗标功能一样,保护临界区。自旋锁保护的临界区code是原子性的,在获取锁时会关闭中断。因此,自旋锁可以用在不可睡眠的地方,如中断服务程序,不可用在可以睡眠的方。自旋锁的原子性,要求临界区的code不能睡眠,否则会死锁。

  4. 顺序锁(seqlock)
    顺序锁(seqlock)是对读写锁的一种优化,提高了读锁和写锁的独立性。写锁不会被读锁阻塞,读锁也不会被写锁阻塞。写锁会被写锁阻塞。
    如果读执行单元在读操作期间,写执行单元已经发生了写操作,那么,读执行单元必须重新去读数据,以便确保读到的数据是完整的;这种锁在读写操作同时进行的概率比较小,性能是非常好的,而且它允许读写操作同时进行,因而更大地提高了并发性。
    顺序锁有一个限制:它必须要求被保护的共享资源中不能含有指针;因为写执行单元可能会使指针失效,当读执行单元如果正要访问该指针时,系统就会崩溃。
    更加详细的可以参考这篇博文: Linux并发控制——顺序锁(seqlock)

  5. 原子变量(atomic)
    原子变量,主要用于对保护的数据是一个简单的整数时用,用其他的又浪费(旗标,锁等对系统性能影响较大)。

  6. 原子位操作(bitops)
    与原子变量一样,对于位操作用bitops比较快,而且对系统影响小,还是原子性的。

  7. RCU
    RCU是用来应对读写场景,而且主要用来保护由指针指向的对象(而不保护指针本身),读不阻塞,读操作的优先权大于写操作(与seqlock相反),写的时候不阻塞读,写的时候发现在读,要等待读完成,才销毁指针指向旧地址空间数据,然后更新指针指向新的地址。
    应用在读者与写者场景,目前性能最好(旗标,自旋锁都有实现读者写者的专用接口,当然顺序锁也是应用在这个场景,但性能都不及RCU)。
    RCU可以参考如下博客:
    Linux并发控制——RCU
    linux内核 RCU机制详解
    Linux 2.6内核中新的锁机制–RCU

加锁原则:
(个人的理解,不当之处望大家指出)
优先考虑从上往下
1、能不加锁尽量不加锁,能用算法代替尽量用算法代替,锁的开销对系统效率影响大,来自网上的图片:
Linux驱动--并发和竞争

2、能用原子变量和原子位操作尽量用

3、加锁遵循下面规则:

  • 模糊的规则和顺序的规则,有资料这么说,看描述大体总结下:
    (1)同一进程,不能同一时刻,两次获取同一把锁,否则将导致死锁。
    (2)获取一把锁,必须释放后,才能获取其他锁,即尽量不要嵌套,否则,稍微不注意就死锁。
    (3)用锁时,一定要清晰明确。
    上面的规则要尽量遵循,当然锁是可以嵌套,但是嵌套逻辑复杂,除非很清楚。如遇到的,自旋锁嵌套,外层的锁,会被嵌套自旋锁,释放时,打开中断,导致外层锁不在具有原子性。

4、细-粗-粒度加锁
在考虑加锁时,尽量粗粒度。如一个驱动模块能用一把锁尽量用一把,绝对不用两把。

读者写者选择:
(也可以说是生产者消费者)
1、都有基于旗标和自旋锁实现的读者写者,都分别继承各种的优势,要根据场景来选择。多个进程读不阻塞,多个进程写会阻塞,在写的时候,有读,此时读会被阻塞
2、顺序锁用于保护非指针数据。基于上面,顺序锁的写不会阻塞读,发现被修改,会重读,其他都和上面相同。
3、RCU性能最好,尽量考虑用。读不阻塞,读操作的优先权大于写操作(与seqlock相反),写的时候不阻塞读,写的时候发现在读,要等待读完成,才销毁指针指向旧地址空间数据,然后更新指针指向新的地址。

总之,总的原则来说,可以不用锁就不用,非用不可,需根据场景来选择不同的锁,保证设计思路清晰。对于读者写者,读多写越少,各接口性能就越好,写越多性能越不好。

函数接口:

  1. 旗标(semaphore)
    定义头文件:
#include <linux/semaphore.h>

首先,定义和初始化:
静态的用宏:

#define DEFINE_SEMAPHORE(name) \
struct semaphore name = __SEMAPHORE_INITIALIZER(name, 1)

动态的先定义semaphore指针变量,然后调用inline void sema_init(struct semaphore *sem, int val),第2个参数一般为1。

函数:

extern void down(struct semaphore *sem);
extern int __must_check down_interruptible(struct semaphore *sem);
extern int __must_check down_killable(struct semaphore *sem);
extern int __must_check down_trylock(struct semaphore *sem);
extern int __must_check down_timeout(struct semaphore *sem, long jiffies);
extern void up(struct semaphore *sem);

down()—–加锁,得不到锁的进程,将在此处睡眠等待,直到等到锁,中途睡眠不可打断。得到锁,down()返回继续往下执行。
用法:

DEFINE_SEMAPHORE(sem)
void exampSem()
{
down(&sem);
...
...
...
up(&sem)
}

down_interruptible()—-加锁,得不到锁的进程,将在此睡眠,进程中途可被其他信号打断,打断后down_interruptible()将返回非0,进程继续运行。得到锁,down_interruptible()返回0.
用法:

DEFINE_SEMAPHORE(sem)
void exampSem()
{
if(down_interruptible(&sem))
return -ERESTARTSYS;
...
...
...
up(&sem)
}

down_killable()和down_interruptible()用法差不多,返回值也差不多,在打断信号类型上有要求。

down_trylock()用法和down_interruptible()差不多,返回值也差不多,区别在于,不等待不睡眠,得不到锁立即返回。

down_timeout()用法上和down_interruptible()差不多,返回值也差不多,却别在于,睡眠不可打断,睡眠时间受第2个参数jiffies限制,时间到,还没获得锁,返回非0,进程不访问临界区,返回进程继续运行。

up()解锁。

旗标实现的读者写者:

定义的头文件:

#include <linux/rwsem.h>

首先一样是定义rw_semaphore类型变量:
静态:

#define DECLARE_RWSEM(name) \
struct rw_semaphore name = __RWSEM_INITIALIZER(name)

动态用:

#define init_rwsem(sem) \
do { \
static struct lock_class_key __key; \
\
__init_rwsem((sem), #sem, &__key); \
} while (0)

然后调用函数,和旗标各个版本函数接口用法差不多:

void down_read(struct rw_semaphore *sem);
int down_read_trylock(struct rw_semaphore *sem);
void up_read(struct rw_semaphore *sem);
获得和释放对读者/写者旗标的读存取的函数.

void down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
void up_write(struct rw_semaphore *sem);
void downgrade_write(struct rw_semaphore *sem);
管理对读者/写者旗标写存取的函数.

2. Completion
(1)头文件:

#include <linux/completion.h>

(2)初始化:
静态定义并初始化:

/**
* DECLARE_COMPLETION - declare and initialize a completion structure
* @work: identifier for the completion structure
*
* This macro declares and initializes a completion structure. Generally used
* for static declarations. You should use the _ONSTACK variant for automatic
* variables.
*/

#define DECLARE_COMPLETION(work) \
struct completion work = COMPLETION_INITIALIZER(work)

动态定义并初始化:
用init_completion(),如下:

struct completion comp;
init_completion(&comp);

(3)等待函数接口:

extern void wait_for_completion(struct completion *);
extern void wait_for_completion_io(struct completion *);
extern int wait_for_completion_interruptible(struct completion *x);
extern int wait_for_completion_killable(struct completion *x);
extern unsigned long wait_for_completion_timeout(struct completion *x,
unsigned long timeout);
extern unsigned long wait_for_completion_io_timeout(struct completion *x,
unsigned long timeout);
extern long wait_for_completion_interruptible_timeout(
struct completion *x, unsigned long timeout);
extern long wait_for_completion_killable_timeout(
struct completion *x, unsigned long timeout);
extern bool try_wait_for_completion(struct completion *x);
extern bool completion_done(struct completion *x);

有点多,从函数名字顾名思义,
wait_for_completion()等待通知完成,进程将在此睡眠等待。注意这个函数挂起的进程,是不可杀死的。

(4)通知完成函数:

extern void complete(struct completion *);
extern void complete_all(struct completion *);

等待在一个completion上的可能又很多进程,complete()只能一次唤起一个,
如果用complete_all()将一次唤醒等待在同一个completion上的全部进程,用这个函数通知后,再次使用,需要重新调用init_completion()初始化complation。

void complete_and_exit(struct completion *c, long retval);
通过调用 complete 来发出一个 completion 事件, 并且为当前线程调用 exit。

3. 自旋锁(spinlock)
(1)头文件:

#include <linux/spinlock.h>

(2)初始化:

struct spinlock_t splk;
spin_lock_init(&splk);

(3)加锁接口:

void spin_lock(spinlock_t *lock);
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);
void spin_lock_irq(spinlock_t *lock);
void spin_lock_bh(spinlock_t *lock);

spin_lock()—不禁止中断,只禁止内核抢占,小心使用,被打段后,再有其他函数来获取同一个锁,会导致死锁。
spin_lock_irq()—禁止所有中断,禁止内核抢占
spin_lock_irqsave()—在spin_lock_irq()基础上一防止改变中断状态,先保存在第2个参数中,退出时用spin_unlock_irqrestore()会恢复中断状态。
spin_lock_bh()—禁止软件中断,硬件其他中断不禁止,禁止内核抢占。

(4)对应释放锁的函数接口:

void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);

上锁和解锁接口对应使用。

(5)自旋锁实现的读者写者:
头文件:

#include <linux/rwlock.h>

初始化:

rwlock_t rwl;
rwlock_init(&rwl);

读者加锁解锁接口:

void read_lock(rwlock_t *lock);
void read_lock_irqsave(rwlock_t *lock, unsigned long flags);
void read_lock_irq(rwlock_t *lock);
void read_lock_bh(rwlock_t *lock);
void read_unlock(rwlock_t *lock);
void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void read_unlock_irq(rwlock_t *lock);
void read_unlock_bh(rwlock_t *lock);

写者加锁解锁:

void write_lock(rwlock_t *lock);
void write_lock_irqsave(rwlock_t *lock, unsigned long flags);
void write_lock_irq(rwlock_t *lock);
void write_lock_bh(rwlock_t *lock);
int write_trylock(rwlock_t *lock);
void write_unlock(rwlock_t *lock);
void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void write_unlock_irq(rwlock_t *lock);
void write_unlock_bh(rwlock_t *lock);

4、顺序锁(seqlock)

(1)头文件:

#include <linux/seqlock.h>

(2)定义或者初始化:
静态:

DEFINE_SEQLOCK(seql)
seqlock_init(&seql);

动态的:

seqlock_t seql;
seqlock_init(&seql);

获得seqlock保护的读:

unsigned int read_seqbegin(seqlock_t *lock);
int read_seqretry(seqlock_t *lock, unsigned int seq);

用法:

seqlock_t seql;
seqlock_init(&seql);
unsigned int seqSta;
do{
seqSta = read_seqbegin(&seql);
...
...
...
}while(read_seqretry(&seql,seqSta));

当读时,read_seqbegin(&seql)获取并保证在seqSta变量中,在完成读后,read_seqretry()比较seqSta,如发现不一样表示读期间有写,于是read_seqretry()返回非0,重新读,如果发现和传入的seqSta值一样,返回0,不再重新读取。

获取seqlock写保护:

void write_seqlock(seqlock_t *lock);
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags);
void write_seqlock_irq(seqlock_t *lock);
void write_seqlock_bh(seqlock_t *lock);

seqlock写锁释放:

void write_sequnlock(seqlock_t *lock);
void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags);
void write_sequnlock_irq(seqlock_t *lock);
void write_sequnlock_bh(seqlock_t *lock);

seqlock写接口和spinloc差不多,各个对应版本意思也差不多。

5、原子变量
(1)头文件:

#include <asm/atomic.h>

(2)定义和初始化:

atomic_t v = ATOMIC_INIT(value);

(3)函数接口:
void atomic_set(atomic_t *v, int i);//将v初始化成i的值
int atomic_read(atomic_t *v);//读取v的值,返回值就是v的值
void atomic_add(int i, atomic_t *v);//v加上i的值
void atomic_sub(int i, atomic_t *v);//v减去i的值
void atomic_inc(atomic_t *v);//v的值加1
void atomic_dec(atomic_t *v);//v的值减1
int atomic_inc_and_test(atomic_t *v);//v加上1,并测试v的值是否为0,为0返回1,否则返回0
int atomic_dec_and_test(atomic_t *v);//v减1,并测试v的值是否为0,为0返回1,否则返回0
int atomic_sub_and_test(int i, atomic_t *v);//v减去i,并测试v的值是否为0,为0返回1,否则返回0
int atomic_add_negative(int i, atomic_t *v);//v加i,并测试v的值是否为负数,若是返回1,否则返回0
int atomic_add_return(int i, atomic_t *v);//v加上i,并返回结果
int atomic_sub_return(int i, atomic_t *v);//v减去i,并返回结果
int atomic_inc_return(atomic_t *v);//v加1,并返回结果
int atomic_dec_return(atomic_t *v);//v减1,并返回结果

6、位原子操作
(1)头文件:

#include <asm/bitops.h>

(2)定义和使用:

int a=0;
set_bit(3,&a)

将a的第3位置1,结果a等于8。

(3)接口:
void set_bit(nr, void *addr);//第nr位置1
void clear_bit(nr, void *addr);//第nr位清0
void change_bit(nr, void *addr);//第nr位取反
test_bit(nr, void *addr);//测试第nr位,为1则返回1,否则返回0
int test_and_set_bit(nr, void *addr);//获取第nr为值并返回这位置(0或1),然后将第nr位置1
int test_and_clear_bit(nr, void *addr);//获取第nr为值并返回这位置(0或1),然后将第nr清0
int test_and_change_bit(nr, void *addr);//获取第nr为值并返回这位置(0或1),然后将第nr取反

7、RCU(read-copy-update)
(1)头文件:

#include <linux/rcupdate.h>

(2)读保护获取和释放:

rcu_read_lock()
...
...
...
rcu_read_unlock

rcu_read_lock()会禁止抢占
rcu_read_unlock()会打开抢占
所以保护代码的临界区,不会被其它进程抢占(设想,中断服务程序调用,会导致?)
rcu_read_lock()和rcu_read_unlock所包含的code,不会阻塞读进程,在rcu_read_unlock()未执行完时,写者,是不能修改更新指针地址的。

(3)写者保护:

extern void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *head));

call_rcu()调用到时,将func加到队列。
所有受rcu_read_lock()和rcu_read_unlock()保护的临界区释放,func才会被调用来释放更新指针指的数据。如临界区的rcu_read_lock()被调用,有rcu_read_unlock()还没有被调用,则func不会被调用,直到所有rcu_read_unlock()调用完成,func才能被调用。

用法:

void *p = NULL;
char *tp = NULL;
struct rcu_head eam;
spinlock_t sp;

void read(char *x)
{
int i=0;
rcu_read_lock();
tp = rcu_dereference(p);
for(i=0;i<200;i++)
{
*(x+i) = *(tp+i);
}
rcu_read_unlock();
}

void free(struct rcu_head *head)
{
kfree(tp);
}

void write(char *x)
{
spin_lock(&sp)
rcu_assign_pointer(p, x)
spin_unlock(&sp);
call_rcu(&ema,free);
}
void init()
{
spin_init_lock(&sp);
}

rcu_assign_pointer(p, x)将p的值安全修改成x,这个函数是原子性的;
rcu_dereference(p)将指针p的值给tp,也是原子性的,安全的。
当写完成,如果rcu_read_unlock()没被调用,free()不会被调用,只有rcu_read_unlock()被调用,free才会被执行。
这样就能并发的读写。

call_rcu可以用synchronize_rcu代替,synchronize_rcu阻塞写进程,如下:

void *p = NULL;
char *tp = NULL;
struct rcu_head eam;
spinlock_t sp;

void read(char *x)
{
int i=0;
rcu_read_lock();
tp = rcu_dereference(p);
for(i=0;i<200;i++)
{
*(x+i) = *(tp+i);
}
rcu_read_unlock();
}

void write(char *x)
{
spin_lock(&sp)
rcu_assign_pointer(p, x)
spin_unlock(&sp);
synchronize_rcu();
kfree(tp);
}
void init()
{
spin_init_lock(&sp);
}

进程会在synchronize_rcu()处等待,直到rcu_read_unlock()完成后,才继续执行下去。