Linux内核原子函数,自旋锁,信号量了解(转)

时间:2022-04-01 10:48:07

一.为什么内核需要同步方法
并发指的是多个执行单元同时,并行被执行,而并发的执行单元对共享资源(硬件资源和软件上的全局变量,静态变量等)的访问则很容易导致竞态。
主要竞态发生如下:
1.对称多处理器(SMP)多个CPU
 SMP是一种紧耦合,共享存储的系统模型,它的特点是多个CPU使用共同的系统总线,因此可访问共同的外设和存储器。
 
2.单CPU内进程与抢占它的进程
Linux2.6内核支持抢占调度,一个进程在内核执行的时候被另一高优先级的进程打断,进程与抢占它的进程访问共享资源的情况类似于SMP

3.中断(硬中断,软中断,Tasklet,底半部)与进程之间
中断可以打断正在执行的进程,如果中断处理程序访问进程正在访问的资源,则竞态也会发生。

此外,中断也有可能被新的更高级优先级中断中断,因此,多个中断之间本身也可能引起竞态。

那是不是就没有办法呢?当然不是,记住linux开源的力量是无穷的。
解决竞态问题的途径是保证对共享资源的互斥访问,互斥访问就是一个执行单元在访问的时候,其他执行单元禁止访问。
访问共享资源的代码区域成为临界区,临界区需要互斥机制加以保护。
中断屏蔽,原子操作,自旋锁和信号量等是linux互斥操作的途径。



 
二.内核的同步方法分类
1.中断屏蔽
中断屏蔽使得中断与进程之间的并发不再发生,linux内核进程调度等操作都依赖中断实现,内核抢占进程之间的并发就可以避免了。
但是,由于linux内核的异步I/O,进程调度等都依赖中断,所以长时间屏蔽中断很危险。
且中断屏蔽只能禁止本CPU内的中断,不能解决SMP引发的竞态。
因此中断屏蔽不是值得推荐的方法,它适宜与自旋锁联合使用。

2.原子操作
原子操作保证所有指令以原子方式执行(执行过程不能被中断)。
例:
线程1                                     线程2
increment(2->3)                           ...
...                                       increment(3->4)
两个操作不可能并发访问同一个变量,绝对不可能引起竞态。

2.1>针对原子整数操作只能对atomic_t类型的数据处理。
定义在<asm/atomic.h>  typedef struct { int counter; } atomic_t;
使用atomic_t而不是int类型确保编译器不对相应的值进行优化,使得原子操作接受正确的地址。强类型匹配。


原子整数操作API简介:

atomic_read(atomic_t * v);            该函数对原子类型的变量进行原子读操作,它返回原子类型的变量v的值。

atomic_set(atomic_t * v, int i);      该函数设置原子类型的变量v的值为i。

void atomic_add(int i, atomic_t *v);   该函数给原子类型的变量v增加值i。

atomic_sub(int i, atomic_t *v);        该函数从原子类型的变量v中减去i。

int atomic_sub_and_test(int i, atomic_t *v);
该函数从原子类型的变量v中减去i,并判断结果是否为0,如果为0,返回真,否则返回假。

void atomic_inc(atomic_t *v);          该函数对原子类型变量v原子地增加1。

void atomic_dec(atomic_t *v);          该函数对原子类型的变量v原子地减1。

int atomic_dec_and_test(atomic_t *v);
该函数对原子类型的变量v原子地减1,并判断结果是否为0,如果为0,返回真,否则返回假。

int atomic_inc_and_test(atomic_t *v);
该函数对原子类型的变量v原子地增加1,并判断结果是否为0,如果为0,返回真,否则返回假。

int atomic_add_negative(int i, atomic_t *v);
该函数对原子类型的变量v原子地增加I,并判断结果是否为负数,如果是,返回真,否则返回假。

注:原子整数的操作主要用途实现计数器。



2.2>原子位操作
位操作函数是对普通函数内存地址进行操作,它的参数是一个指针和一个位号。

原子位操作API简介:
void set_bit(int nr, void *addr)        原子设置addr所指的第nr位

void clear_bit(int nr, void *addr)      原子的清空所指对象的第nr位

void change_bit(nr, void *addr)         原子的翻转addr所指的第nr位

int test_bit(nr, void *addr)            原子的返回addr位所指对象nr位

int test_and_set_bit(nr, void *addr)    原子设置addr所指对象的第nr位,并返回原先的值

int test_and_clear_bit(nr, void *addr)  原子清空addr所指对象的第nr位,并返回原先的值

int test_and_change_bit(nr, void *addr)  原子翻转addr所指对象的第nr位,并返回原先的值

如:标志寄存器EFLSGS的系统标志,用于控制I/O访问,可屏蔽硬件中断。共32位,不同的位代表不同的信息,
对其中信息改变都是通过位操作实现的。


3.自旋锁

3.1>自旋锁
如果执行单元(一般针对线程)申请自旋锁已经被别的执行单元占用,申请者就一直循环在那里看是否该执行单元释放了自旋锁。
其作用是为了解决某项资源的互斥使用。虽然它的效率比互斥锁高,但是它也有些不足之处:
    1、自旋锁一直占用CPU,他在未获得锁的情况下,一直运行自旋,所以占用着CPU。所以自旋琐不应该长时间持有。
    2、在用自旋锁时有可能造成死锁,当递归调用时有可能造成死锁,调用有些其他函数也可能造成死锁,如 copy_to_user()、
    copy_from_user()、kmalloc()等。
    
自旋锁主要针对SMP,在单CPU中它仅仅设置内核抢占机制的是否启用的开关。 在内核不支持抢占的系统中,自旋锁退为空操作。
尽管自旋琐可以保证临界区不受别的CPU和本进程的抢占进程打扰,但执行临界区还受到中断和底半部(bh)的影响。所以与中断屏蔽
联系使用。

自旋锁的API:
spin_lock_init(spinlock_t *x);   //自旋锁在真正使用前必须先初始化
   
  获得自旋锁:spin_lock(x);   //只有在获得锁的情况下才返回,否则一直“自旋”
                           spin_trylock(x);  //如立即获得锁则返回真,否则立即返回假
      释放锁:spin_unlock(x);
   
结合以上有以下代码段:

    spinlock_t lock;        //定义一个自旋锁
    spin_lock_init(&lock);
    spin_lock(&lock);   
    临界区
    spin_unlock(&lock);   //释放锁
   

spin_lock_irqsave(lock, flags)
该宏获得自旋锁的同时把标志寄存器的值保存到变量flags中并失效本地中//断。相当于:spin_lock()+local_irq_save()
spin_unlock_irqrestore(lock, flags)
该宏释放自旋锁lock的同时,也恢复标志寄存器的值为变量flags保存的//值。它与spin_lock_irqsave配对使用。
相当于:spin_unlock()+local_irq_restore()

spin_lock_irq(lock)
该宏类似于spin_lock_irqsave,只是该宏不保存标志寄存器的值。相当         //于:spin_lock()+local_irq_disable()
spin_unlock_irq(lock)
该宏释放自旋锁lock的同时,也使能本地中断。它与spin_lock_irq配对应用。相当于: spin_unlock()+local_irq+enable()

spin_lock_bh(lock)
该宏在得到自旋锁的同时失效本地软中断。相当于:  //spin_lock()+local_bh_disable()
spin_unlock_bh(lock)
该宏释放自旋锁lock的同时,也使能本地的软中断。它与spin_lock_bh配对//使用。相当于:spin_unlock()+local_bh_enable()


3.2>读写自旋锁
读写自旋锁为读和写分别提供了不同的锁。一个或多个读任务并发的持有读写锁;用于写的锁最多只能被一个写任务持有,
此时不能并发的读操作。

注:读锁和写锁要完全分割在代码的分支中
read_lock(&mr_rwlock)
write_lock(&mr_rwlock)  将会带来死锁,因为写锁会不断自旋。
读写自旋锁相当于自旋锁的读的一种优化,具有自旋锁的特点,如:不宜长时间持有锁等。
关于读写自旋锁的API函数自己分析,其中读写各有自己的API函数。

3.3>顺序锁
顺序锁是对读写锁的一种优化,读执行单元绝不会被写执行单元阻塞。读执行单元可以在写执行单元对被顺序锁保护的共享资源
进行写操作时还可以继续读,而不必等待写执行单元完成写操作,写执行单元也不需要等待所有读执行单元完成读操作才进行写操作。

注:顺序锁要求被保护的共享资源不含有指针,因为写执行单元可能使指针失效,读执行单元正在访问该资源,导致Oops。

重要:读执行单元访问共享资源时,如果有写操作执行完成,读执行单元就需要重新进行读操作。
do{
  seqnum=read_seqbegin(&seqlock_a);
  //读执行代码
  ...
  }while(read_seqretry(&seqlock_a,seqnum));      //判断是否需要重读
 

3.4>RCU(读-拷贝-更新)
RCU(Read-Copy Update),对于被RCU保护的共享数据结构,读者不需要获得任何锁就可以访问它,但写者在访问它时首先拷贝一个副本,然后对副本进行修改,最后使用一个回调(callback)机制在适当的时机把指向原来数据的指针重新指向新的被修改的数据。这个时机就是所有引用该数据的CPU都退出对共享数据的操作。

RCU实际上是一种改进的rwlock,读者几乎没有什么同步开销,它不需要锁,不使用原子指令,而且在除alpha的所有架构上也不需要内存栅(Memory Barrier),因此不会导致锁竞争,内存延迟以及流水线停滞。不需要锁也使得使用更容易,因为死锁问题就不需要考虑了。写者的同步开销比较大,它需要延迟数据结构的释放,复制被修改的数据结构,它也必须使用某种锁机制同步并行的其它写者的修改操作。读者必须提供一个信号给写者以便写者能够确定数据可以被安全地释放或修改的时机。有一个专门的垃圾收集器来探测读者的信号,一旦所有的读者都已经发送信号告知它们都不在使用被RCU保护的数据结构,垃圾收集器就调用回调函数完成最后的数据释放或修改操作。 RCU与rwlock的不同之处是:它既允许多个读者同时访问被保护的数据,又允许多个读者和多个写者同时访问被保护的数据(注意:是否可以有多个写者并行访问取决于写者之间使用的同步机制),读者没有任何同步开销,而写者的同步开销则取决于使用的写者间同步机制。但RCU不能替代rwlock,因为如果写比较多时,对读者的性能提高不能弥补写者导致的损失。


读者在访问被RCU保护的共享数据期间不能被阻塞,也就说当读者在引用被RCU保护的共享数据期间,读者所在的CPU不能发生上下文切换,spinlock和rwlock都需要这样的前提。写者在访问被RCU保护的共享数据时不需要和读者竞争任何锁,只有在有多于一个写者的情况下需要获得某种锁以与其他写者同步。写者修改数据前首先拷贝一个被修改元素的副本,然后在副本上进行修改,修改完毕后它向垃圾回收器注册一个回调函数以便在所有读执行单元已经完成对临界区的访问进行修改操作。

RCU的API如下:

#define rcu_read_lock()         preempt_disable()
#define rcu_read_unlock()       preempt_enable()
读者在读取由RCU保护的共享数据时使用rcu_read_lock()标记它进入读端临界区

#define rcu_read_lock_bh()      local_bh_disable()
#define rcu_read_unlock_bh()    local_bh_enable()

rcu_read_lock_bh() 只在修改是通过 call_rcu_bh 进行的情况下使用,因为 call_rcu_bh将把 softirq 的执行完毕也认为是一个 quiescent state,因此如果修改是通过 call_rcu_bh 进行的,在进程上下文的读端临界区必须使用rcu_read_lock_bh()。

注:rcu_read_lock和rcu_read_lock_bh实质作用只是禁止内核的抢占调度。



synchronize_rcu()

该函数由RCU写端调用,它将阻塞写者,直到所有读执行单元完成对临界区的访问后,写者才可以继续下一步操
作。如果有多个RCU写端调用该函数,他们将在所有读执行单元完成对临界区的访问后全部被唤醒。
需要指出的是,函数synchronize_rcu 的实现实际上使用函数call_rcu。

函数详解:http://blog.chinaunix.net/u1/55599/showart_1102424.html


void fastcall call_rcu(struct rcu_head *head,
                                void (*func)(struct rcu_head *rcu))
struct rcu_head {
        struct rcu_head *next;
        void (*func)(struct rcu_head *head);
};
其中:fastcall加快访问速度.
函数call_rcu也由RCU写端调用,它不会使写者阻塞,因而可以在中断上下文或softirq使用。该函数将把函数func挂接到RCU回调函数链上,然后立即返回。一旦所有的CPU都已经完成端临界区操作,该函数将被调用来释放删除的将绝不在被应用的数据。参数head用于记录回调函数 func,一般该结构会作为被RCU保护的数据结构的一个字段,以便省去单独为该结构分配内存的操作。

函数详解:http://blog.chinaunix.net/u1/55599/showart_1101879.html

其中call_rcu_bh()函数的功能几乎与call_rcu()完全相同,唯一的差别就是它把软中断的完成也当作经历一个quiescent state(静默状态),
因此如果读执行单元使用了该函数,在进程的上下文的读执行单元必须使用rcu_read_lock_bh()。


RCU还增加了链表操作函数的RCU版本,这里就不做介绍了。
具体参考:http://blog.chinaunix.net/u1/55599/showart_1101095.html



看晕了没,举个例子对比下:


       static enum audit_state audit_filter_task(struct task_struct *tsk)
        {
                struct audit_entry *e;
                enum audit_state   state;
                rcu_read_lock();
                list_for_each_entry_rcu(e, &audit_tsklist, list) {
                        if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                                spin_lock(&e->lock);
                                if (e->deleted) {
                                        spin_unlock(&e->lock);
                                        rcu_read_unlock();
                                        return AUDIT_BUILD_CONTEXT;
                                }
                                rcu_read_unlock();
                                return state;
                        }
                }
                rcu_read_unlock();
                return AUDIT_BUILD_CONTEXT;
        }
        


注意,对于这种情况,每一个链表条目都需要一个spinlock保护,因为删除操作将修改条目的deleted标志。此外,该函数如果搜索到条目,返回时应当保持该条目的锁,因为只有这样,才能看到新的修改的数据,否则,仍然可能看到就的数据。

写端的删除操作将变成:

       static inline int audit_del_rule(struct audit_rule *rule,
                                         struct list_head *list)
        {
                struct audit_entry  *e;
                /* Do not use the _rcu iterator here, since this is the only
                 * deletion routine. */
                list_for_each_entry(e, list, list) {
                        if (!audit_compare_rule(rule, &e->rule)) {
                                spin_lock(&e->lock);
                                list_del_rcu(&e->list);
                                e->deleted = 1;
                                spin_unlock(&e->lock);
                                call_rcu(&e->rcu, audit_free_rule, e);
                                return 0;
                        }
                }
                return -EFAULT;         /* No matching rule */
        }


  4.信号量

4.1>信号量
信号量在1968年提出,后来慢慢形成琐机制。信号量支持两个原子操作P()和V()。

只有得到信号的进程才能执行代码。与自旋锁不同的是,当得不到信号量时,进程不会原地循环(占有CPU)而是进入睡眠(通过上下文切换)。
没有得到信号量进程推进一个等待队列中,然后让其睡眠。

注:本CPU的进程执行完成,会唤醒等待队列的进程(FIFO先进先出)

信号量在创建时需要设置一个初始值count,表示同时可以有几个任务可以访问该信号量保护的共享资源,初始值为1就变成互斥锁(Mutex),即同时只能有一个任务可以访问信号量保护的共享资源。

一个任务要想访问共享资源,首先必须得到信号量,获取信号量的操作将把信号量的值减1,若当前信号量的值为负数,表明无法获得信号量,该任务必须挂起在该信号量的等待队列等待该信号量可用;若当前信号量的值为非负数,表示可以获得信号量,因而可以立刻访问被该信号量保护的共享资源。

当任务访问完被信号量保护的共享资源后,必须释放信号量,释放信号量通过把信号量的值加1实现,如果信号量的值为非正数,表明有任务等待当前信号量,因此它也唤醒所有等待该信号量的任务。
  
 
由于信号量会导致睡眠,所以不能用在中断上下文。再者使用down()而进入睡眠的进程不能被信号打断。信号量允许任意多个执行单元持有该信号
量(设置count大于1),自旋锁只允许最多一个任务持有。

  
下面介绍信号主要的两个操作down()和up()

  97static inline void down(struct semaphore * sem)
  98{
  99        might_sleep();
 100        __asm__ __volatile__(
 101                "# atomic down operation/n/t"
 102                LOCK_PREFIX "decl %0/n/t"     /* --sem->count */
 103                "jns 2f/n"
 104                "/tlea %0,%%eax/n/t"
 105                "call __down_failed/n"
 106                "2:"
 107                :"+m" (sem->count)
 108                :
 109                :"memory","ax");
 110}
 
 
  64void __sched
  65__down_failed(struct semaphore *sem)
  66{
  67        struct task_struct *tsk = current;
  68        DECLARE_WAITQUEUE(wait, tsk);
  69
  70#ifdef CONFIG_DEBUG_SEMAPHORE
  71        printk("%s(%d): down failed(%p)/n",
  72               tsk->comm, tsk->pid, sem);
  73#endif
  74
  75        tsk->state = TASK_UNINTERRUPTIBLE;
  76        wmb();
  77        add_wait_queue_exclusive(&sem->wait, &wait);
  78
  79        /*
  80         * Try to get the semaphore.  If the count is > 0, then we've
  81         * got the semaphore; we decrement count and exit the loop.
  82         * If the count is 0 or negative, we set it to -1, indicating
  83         * that we are asleep, and then sleep.
  84         */
  85        while (__sem_update_count(sem, -1) <= 0) {
  86                schedule();
  87                set_task_state(tsk, TASK_UNINTERRUPTIBLE);
  88        }
  89        remove_wait_queue(&sem->wait, &wait);
  90        tsk->state = TASK_RUNNING;
  91
  92        /*
  93         * If there are any more sleepers, wake one of them up so
  94         * that it can either get the semaphore, or set count to -1
  95         * indicating that there are still processes sleeping.
  96         */
  97        wake_up(&sem->wait);
  98
  99#ifdef CONFIG_DEBUG_SEMAPHORE
 100        printk("%s(%d): down acquired(%p)/n",
 101               tsk->comm, tsk->pid, sem);
 102#endif
 103}
 


分析:
如果count.counter为1,则置count.counter为0,直接跳出函数。获得信号量

如果count.counter为0,count.counter被减为-1,之后执行__down_failed()函数。

__down_failed()函数首先将当前进程设置为不可中断状态(TASK_UNINTERRUPTIBLE)
然后将其添加进等待进程队列,接下来在whlie循环处试图获得信号量。如果count.counter大于0就获得了信号量,则不进入循环,
将当前进程从等待队列中删除,并设置其状态为可运行状态(TASK_RUNNING),最后唤醒等待该信号量的进程。
否则将count.counter设置为-1(count为负时只可能为-1,只判断是否有等待队列是否有进程,遵循FIFO),并进入whlie循环,
挂起当前进程,继续测试count.counter字段直到其大于0(即获得信号量)。

具体参考:http://blog.chinaunix.net/u2/73528/showart_1098606.html


注:在你占用信号量的时候不能占用锁。因为在你等待信号量可能会睡眠,而持有锁不允许睡眠。

4.2>完成量
在信号量中讲过,可以设置count的值为5时,可以实现5个进程访问该信号量。通常不会有问题,但有时会有问题。

虽然信号量可以用于多个进程实现同步,但往往可能会出现一些不好的结果。例如:当进程A分配了一个临时信号量变量,把它count设置大于1,并把其地址传递给进程B,然后在A之上调用down(),进程A打算一旦被唤醒就撤销给信号量。随后,运行在不同CPU上的进程B在同一个信号量上调用up()。然而,up()和down()的目前实现还允许这两个函数在同一个信号量上并发。因此,进程A可以被唤醒并撤销临时信号量,而进程B还在运行up()函数。结果up()可能试图访问一个不存在的数据结构。这样就会出现错误。为了防止发生这种错误就专门设计了completion机制专门用于同步。

完成量禁止多个进程持有一个信号量,完成量仅仅是信号量的一种简单解决方法。

具体API操作不做详解。

4.3>互斥体
互斥琐与自旋锁非常相似,唯一的不同的是:自旋锁在得不到锁的时候,占有CPU在不停的自旋。
而互斥体在得不到信号量的时候,让等待的进程进入睡眠。

这样是否就明白互斥体了呢,呵呵。

具体API操作不做详解。


三.内核同步方法的比较

可以从上面看出,现在互斥方法主要用自旋锁,信号量。下面就看看它们的不同点吧

1.自旋锁(一般是线程)提供一种简单的锁的实现方法,如果加锁时间不长那么最好代码不要睡眠,因为上下文切换耗时,耗的资源远远多于执行单元
在那自旋耗的资源(中断处理程序)。
一个任务试图获得信号量(一般是进程),信号量已经被占用,该任务被推向一个等待队列,然后让其睡眠(通过上下文切换)。这是CPU执行其他代码。
所以信号量使用于长时间持有信号的情况。

2.自旋锁可以用于中断,不能用于进程上下文(会引起死锁)。而信号量不允许使用在中断中,而可以用于进程上下文。

在使用信号量的大多数时候,往往需要和用户空间同步,你的代码需要睡眠。由于不受睡眠的影响,信号量比较容易点。如果需要自旋
锁和信号中作选择,应该根据锁持有时间长短来判断。



注:以上是本人对同步的总结欢迎指错