Linux内核原子(1) - spinlock的实现

时间:2022-04-27 13:56:43

spinlock的数据结构spinlock_t定义在头文件linux/spinlock_types.h里面:

   typedef struct {
raw_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} spinlock_t;

其中抛开debug的数据成员,最核心的成员就是raw_lock,这是一个和处理器架构相关的结构。

比如X86中定义是(arch/x86/include/asm/spinlock_types.h):

    typedef struct raw_spinlock {
unsigned int slock;
} raw_spinlock_t;

ARM中的定义是(arch/arm/include/asm/spinlock_types.h):

    typedef struct {
volatile unsigned int lock;
} raw_spinlock_t;

在单核处理器中,raw_spinlock_t被定义为空结构体(linux/spinlock_types_up.h):

   typedef struct { } raw_spinlock_t;

不管是什么体系结构,kernel都是根据结构体里的slock或者lock的值来判断当前的锁是被占用还是空闲,并且做出相应的动作(单核处理器除外)。

在linux/spinlock.h中定义了spinlock操作的API。

spinlock的思想就是在SMP环境中,保护共享的数据结构;也就是CPU-A正在访问(读写)共享数据的期间,其他CPU不能访问同样的共享数据,这样就保证了SMP-safe。每个线程在访问共享数据的之前,都需要获取spin lock,如果锁正被其他线程所占有,那么获取锁的线程则“空转”CPU以等待其他线程释放锁;spin lock相对于信号量这样的锁机制的好处就是,节约了2次context switch的开销,所以如果线程等待锁的时间小于2次context switch的时间,系统性能从spin lock获得的提升就越多。

spin lock除了考虑SMP-safe以外,还要考虑两种伪并发情况,就是中断(interrupt)和抢占(preemption),就是要保证interrupt-safe和preempt-safe。

如果在中断处理程序中,因为要访问共享变量而使用spin lock,则要避免dead-lock出现。比如,CPU0上线程A获取了锁1,在获取和释放锁之间CPU0上发生软中断进入中断处理程序,中断处理程序也尝试去获取spin lock,但是由于同一CPU0上的lock holder线程A在中断处理程序退出之前无法被调度而释放锁,所以在CPU0上就出现dead-lock;但是如果软中断是发生在其他CPU比如CPU1上,则是没有问题的,因为发现在CPU1上的中断不会中断CPU0上lock holder线程A的执行。所以要保证interrupt-safe,就要在获取锁之前disable本地CPU中断。

kernel文档spinlocks.txt里面有相关的描述:

112	The reasons you mustn't use these versions if you have interrupts that
113 play with the spinlock is that you can get deadlocks:
114
115 spin_lock(&lock);
116 ...
117 <- interrupt comes in:
118 spin_lock(&lock);
119
120 where an interrupt tries to lock an already locked variable. This is ok if
121 the other interrupt happens on another CPU, but it is _not_ ok if the
122 interrupt happens on the same CPU that already holds the lock, because the
123 lock will obviously never be released (because the interrupt is waiting
124 for the lock, and the lock-holder is interrupted by the interrupt and will
125 not continue until the interrupt has been processed).
126
127 (This is also the reason why the irq-versions of the spinlocks only need
128 to disable the _local_ interrupts - it's ok to use spinlocks in interrupts
129 on other CPU's, because an interrupt on another CPU doesn't interrupt the
130 CPU that holds the lock, so the lock-holder can continue and eventually
131 releases the lock).

然后就是preempt-safe。

spin_lock_init

spin_lock_init的实现是一个宏,对spinlock_t类型的lock做一个初始化。

   # define __SPIN_LOCK_UNLOCKED(lockname) \
(spinlock_t) { .raw_lock = __RAW_SPIN_LOCK_UNLOCKED, \
SPIN_DEP_MAP_INIT(lockname) }
94#define SPIN_LOCK_UNLOCKED      __SPIN_LOCK_UNLOCKED(old_style_spin_init)
104# define spin_lock_init(lock)                                   \
105 do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)

其中raw_lock被初始化为宏__RAW_SPIN_LOCK_UNLOCKED,很明显,这个宏也会是体系结构相关的,在X86它被定义为:

   #define __RAW_SPIN_LOCK_UNLOCKED        { 0 }

也就是说,在X86中如果无符号整型变量slock的值是0,为则UNLOCKED的状态。

spin_lock

如果能确定被保护的共享变量在interrupt中是不会被访问的,那么可以忽略interrupt-safe,用简单也更有效率的spin_lock

UP的环境中,spin_lock的实现是没有lock操作的,spin_lock仅仅保证在线程在临界区中(也就是spin_lockspin_unlock之前的section)是不会被抢占的preempt的。

UPspin_lock的实现是在linux/spinlock_api_up.h中:

#define _spin_lock(lock)                        __LOCK(lock)

  /*
22 * In the UP-nondebug case there's no real locking going on, so the
23 * only thing we have to do is to keep the preempt counts and irq
24 * flags straight, to suppress compiler warnings of unused lock
25 * variables, and to add the proper checker annotations:
26 */
#define __LOCK(lock) \
do { preempt_disable(); __acquire(lock); (void)(lock); } while ()

preempt_disable()禁止在临界区中线程被抢占。

(void)(lock)是避免编译器的报警。

smp版的spin_lock的实现在spinlock_api_smp.h中:

  static inline void __spin_lock(spinlock_t *lock)
{
preempt_disable();
spin_acquire(&lock->dep_map, , , _RET_IP_);
LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
} #define LOCK_CONTENDED(_lock, try, lock) \
lock(_lock)

也就是说,核心的操作就是_raw_spin_lock(_lock)。

 # define _raw_spin_lock(lock)           __raw_spin_lock(&(lock)->raw_lock)

__raw_spin_lock的是一个和arch相关的实现了,在内核2.5.24上的X86平台上(asm-x86/spinlock_64.h):

  25static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
asm volatile(
"\n1:\t"
LOCK_PREFIX " ; decl %0\n\t"
"jns 2f\n"
"3:\n"
"rep;nop\n\t"
"cmpl $0,%0\n\t"
"jle 3b\n\t"
"jmp 1b\n"
"2:\t" : "=m" (lock->slock) : : "memory");
}

实际上,spinlock的实现就是检查lock->slock的值来判断锁的free or busy状态,所以不同的CPU对锁进行的decl或者incl指令必须是原子的,否则会出现多个CPU同时认为锁是free而进入临界区或者所有CPU都认为锁是busy而dead-lock的后果;在x86平台上,LOCK_PREFIX用前缀保证对lock->slock的原子性。

LOCK_PREFIX的实现可以参见 http://wenku.baidu.com/view/13dbbe1fb7360b4c2e3f642b.html

'rep;nop'是什么指令呢?我们反汇编看看:

#include <stdio.h>

static inline void rep_nop(void)
{
asm volatile("rep; nop" ::: "memory");
} int main(void)
{
rep_nop();
return ;
} [yzhang2@sles10sp3 ~]$gcc -c asm.c [yzhang2@sles10sp3 ~]$objdump -s -d asm.o <rep_nop>:
: push %rbp
: e5 mov %rsp,%rbp
: f3 pause
: c9 leaveq
: c3 retq

原来'rep;nop'指令被解释为了'pause'指令。

这段代码的具体逻辑就是:

decl %0就是将lock->slock减1,如果锁是空闲的,计算结果是0,根据‘jns 2f'则跳转到2,退出函数获取锁;如果锁是被占有的,结果是负数,则运行'rep;nop',然后再次比较lock->slock,如果其大于0,说明锁已经被释放,跳到1重新尝试获得锁;否则继续等待。

但是这样的自旋锁不能保证获取锁的fairness,所以在2.6.25以后引入了FIFO ticket spinlock

  static __always_inline void __ticket_spin_lock(raw_spinlock_t *lock)
{
int inc = 0x00010000;
int tmp; asm volatile(LOCK_PREFIX "xaddl %0, %1\n"
"movzwl %w0, %2\n\t"
"shrl $16, %0\n\t"
"1:\t"
"cmpl %0, %2\n\t"
"je 2f\n\t"
"rep ; nop\n\t"
"movzwl %1, %2\n\t"
/* don't need lfence here, because loads are in-order */
"jmp 1b\n"
"2:"
: "+r" (inc), "+m" (lock->slock), "=&r" (tmp)
:
: "memory", "cc");
}

在执行完xaddl以后,%0的值是(lock->slock),%1的值是(inc+lock->slock),也就是把slock值的next域加1。

movzwl是将(lock->slock)的owner字段赋给%2,shrl是将(lock->slock)的next字段赋给%0

cmpl就是比较slock的next和owner字段,如果相等,则代表获得了锁;不相等,则进入忙等待(rep;nop),然后通过movzwl %1 %2更新owner字段,因为spin_unlock的操作就是把owner字段加1,然后返回到1重新比较,如果这时owner==next,则获得锁。

这样,各个cpu指定了自己的next字段,然后他们就能按照顺序保证了cpu获取锁的fairness。