Linux并发与同步专题 (4) Mutex互斥量

时间:2022-06-09 09:15:44

关键词:mutex、MCS、OSQ。

Linux并发与同步专题 (1)原子操作和内存屏障

Linux并发与同步专题 (2)spinlock

Linux并发与同步专题 (3) 信号量

Linux并发与同步专题 (4) Mutex互斥量

Linux并发与同步专题 (5) 读写锁

Linux并发与同步专题 (6) RCU

Linux并发与同步专题 (7) 内存管理中的锁

Linux并发与同步专题 (8) 最新更新与展望

信号量是在并行处理环境中对多个处理器访问某个公共资源进行保护的机制,mutex用于互斥操作。

信号量的count初始化为1,down()/up()也可以实现类似mutex的作用,那为什么还要单独实现mutex机制呢?

mutex的语义相对于信号量要简单轻便一些,在锁争用激烈的测试场景下,mutex比信号量执行速度更快,可扩展性更好,另外mutex数据结构的定义比信号量小。

1. mutex数据结构

struct mutex数据结构用于描述mutex。

struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;----------------------------原子计数,1表示没人持有锁;0表示锁被持有;负数表示锁被持有且有人在等待队列中等待。
spinlock_t wait_lock;----------------------spinlock锁,用于保护wait_list睡眠等待队列。
struct list_head wait_list;--------------------用于管理所有在该mutex上睡眠的进程,没有成功获取锁的进程会睡眠在此链表上。
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;---------------------用于指向锁持有者的task_struct数据结构。
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */----用于实现MCS锁机制。
#endif...
};

2. MCS锁机制

mutex实现了自旋等待的机制,自旋等待机制的核心原理是当发现持有锁者正在临界区执行并且没有其他优先级高的进程要被调度(need_resched)时,那么当前进程坚信锁持有者会很快离开临界区并释放锁,因此睡眠与其睡眠等待不如乐观地自选等待,以减少睡眠唤醒的开销。

排队自旋锁的缺点:

在多处理器和NUMA系统中,排队自旋锁仍然存在一个比较严重的问题。

假设在一个锁争用激烈的系统中,所有自旋等待锁的线程都在同一个共享变量上自旋,申请和释放锁都在同一个变量上修改,由cache一致性原理导致参与自旋的CPU中的cacheline变得无效,即多个CPU上的cacheline反复失效,大大降低系统整体性能。

MCS算法可以解决自旋遇到的问题,显著减少CPU cacheline bouncing问题。

MCS算法的核心思想是每个锁的申请者只在本地CPU的变量上自旋,而不是全局变量。

2.1 MCS锁数据结构

struct optimistic_spin_node数据结构表示本地CPU上的节点,它可以组织成一个双向链表,包含next和prev指针。

/*
* An MCS like lock especially tailored for optimistic spinning for sleeping
* lock implementations (mutex, rwsem, etc).
*/
struct optimistic_spin_node {
struct optimistic_spin_node *next, *prev;-------------------next和prev指针可以组成一个双向链表。
int locked; /* 1 if lock acquired */------------------------表示加锁状态。
int cpu; /* encoded CPU # + 1 value */----------------------用于重新编码CPU编号,表示该node在那个CPU上。
}; struct optimistic_spin_queue {
/*
* Stores an encoded value of the CPU # of the tail node in the queue.
* If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.
*/
atomic_t tail;
};

struct optimistic_spin_node数据结构会定义成per-CPU变量,即每个CPU有一个node结构。

static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_node);

2.2 MCS锁初始化

MCS锁在osq_lock_init()函数中进行初始化。

#define OSQ_UNLOCKED_VAL (0)

/* Init macro and function. */
#define OSQ_LOCK_UNLOCKED { ATOMIC_INIT(OSQ_UNLOCKED_VAL) } static inline void osq_lock_init(struct optimistic_spin_queue *lock)
{
atomic_set(&lock->tail, OSQ_UNLOCKED_VAL);
}

2.3 osq_lock()/osq_unlock()

osq_lock()/osq_unlock()用于申请和释放MCS锁。

bool osq_lock(struct optimistic_spin_queue *lock)
{
struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);-----------node指向当前CPU的struct optimistic_spin_node节点。
struct optimistic_spin_node *prev, *next;
int curr = encode_cpu(smp_processor_id());-----------------------------表示当前CPU编号,0表示没有CPU,1表示CPU0,以此类推。
int old; node->locked = ;
node->next = NULL;
node->cpu = curr; old = atomic_xchg(&lock->tail, curr);-----------使用原子交换函数atomic_xchg()交换全局lock->tail和当前CPU号,如果lock->tail就只等于初始化OSQ_UNLOCKED_VAL,说明没有人持锁,那么让lock->tail等于当前CPU标号表示成功持锁。
if (old == OSQ_UNLOCKED_VAL)
return true; prev = decode_cpu(old);-------------------------之前获取锁失败,prev表示old指向的CPU所属节点的struct optimistic_spin_node数据结构。
node->prev = prev;
ACCESS_ONCE(prev->next) = node; while (!ACCESS_ONCE(node->locked)) {------------一直查询当前节点node->locked是否变成了1,因为前继节点prev释放锁时会把它的下一个节点中的locked成员置为1,然后才能成功释放锁。
if (need_resched())-------------------------在自旋等待过程中,如果有更高优先级进程抢占或者被调度器要求调度出去,那应该放弃自旋等待,退出MCS链表,跳转到unqueue标签处处理MCS链表删除节点的情况。
goto unqueue; cpu_relax_lowlatency();
}
return true; unqueue:
/*
* Step - A -- stabilize @prev
*
* Undo our @prev->next assignment; this will make @prev's
* unlock()/unqueue() wait for a next pointer since @lock points to us
* (or later).
*/ for (;;) {
if (prev->next == node &&
cmpxchg(&prev->next, node, NULL) == node)
break; /*
* We can only fail the cmpxchg() racing against an unlock(),
* in which case we should observe @node->locked becomming
* true.
*/
if (smp_load_acquire(&node->locked))
return true; cpu_relax_lowlatency(); /*
* Or we race against a concurrent unqueue()'s step-B, in which
* case its step-C will write us a new @node->prev pointer.
*/
prev = ACCESS_ONCE(node->prev);
} /*
* Step - B -- stabilize @next
*
* Similar to unlock(), wait for @node->next or move @lock from @node
* back to @prev.
*/ next = osq_wait_next(lock, node, prev);
if (!next)
return false; /*
* Step - C -- unlink
*
* @prev is stable because its still waiting for a new @prev->next
* pointer, @next is stable because our @node->next pointer is NULL and
* it will wait in Step-A.
*/ ACCESS_ONCE(next->prev) = prev;
ACCESS_ONCE(prev->next) = next; return false;
} #define smp_load_acquire(p) \
({ \
typeof(*p) ___p1 = ACCESS_ONCE(*p); \
compiletime_assert_atomic_type(*p); \
smp_mb(); \
___p1; \
}) /*
* Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
* Can return NULL in case we were the last queued and we updated @lock instead.
*/
static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
struct optimistic_spin_node *node,
struct optimistic_spin_node *prev)
{
struct optimistic_spin_node *next = NULL;
int curr = encode_cpu(smp_processor_id());
int old; /*
* If there is a prev node in queue, then the 'old' value will be
* the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
* we're currently last in queue, then the queue will then become empty.
*/
old = prev ? prev->cpu : OSQ_UNLOCKED_VAL; for (;;) {
if (atomic_read(&lock->tail) == curr &&
atomic_cmpxchg(&lock->tail, curr, old) == curr) {
/*
* We were the last queued, we moved @lock back. @prev
* will now observe @lock and will complete its
* unlock()/unqueue().
*/
break;
} /*
* We must xchg() the @node->next value, because if we were to
* leave it in, a concurrent unlock()/unqueue() from
* @node->next might complete Step-A and think its @prev is
* still valid.
*
* If the concurrent unlock()/unqueue() wins the race, we'll
* wait for either @lock to point to us, through its Step-B, or
* wait for a new @node->next from its Step-C.
*/
if (node->next) {
next = xchg(&node->next, NULL);
if (next)
break;
} cpu_relax_lowlatency();
} return next;
}
void osq_unlock(struct optimistic_spin_queue *lock)
{
struct optimistic_spin_node *node, *next;
int curr = encode_cpu(smp_processor_id()); /*
* Fast path for the uncontended case.
*/
if (likely(atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))
return; /*
* Second most likely case.
*/
node = this_cpu_ptr(&osq_node);
next = xchg(&node->next, NULL);
if (next) {
ACCESS_ONCE(next->locked) = ;
return;
} next = osq_wait_next(lock, node, NULL);
if (next)
ACCESS_ONCE(next->locked) = ;
}

3. mutex锁的实现

3.1 mutex数据结构

struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
}; struct mutex_waiter {
struct list_head list;
struct task_struct *task;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
};

3.2 mutex初始化

mutex锁的初始化有两种方式,一种是静态使用DEFINE_MUTEX宏,另一种是在内核代码中动态使用mutex_init()函数。

#define __MUTEX_INITIALIZER(lockname) \
{ .count = ATOMIC_INIT() \
, .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
__DEBUG_MUTEX_INITIALIZER(lockname) \
__DEP_MAP_MUTEX_INITIALIZER(lockname) } #define DEFINE_MUTEX(mutexname) \
struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)
# define mutex_init(mutex) \
do { \
static struct lock_class_key __key; \
\
__mutex_init((mutex), #mutex, &__key); \
} while () void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_set(&lock->count, );
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);----------------------初始化MCS锁
#endif debug_mutex_init(lock, name, key);
}

3.3 mutex_lock()/mutex_unlock()

mutex_lock()申请mutex锁的快车道条件是count计数原子减1后等于0;如果count原子减1后小于0,说明该锁已经被人持有,那么要进入慢车道__mutex_lock_slowpath()。

void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
/*
* The locking fastpath is the 1->0 transition from
* 'unlocked' into 'locked' state.
*/__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
mutex_set_owner(lock);--------------------------------------------在成功持锁后要设置lock->owner指向当前进程的task_struct数据结构。
} static inline void
__mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_dec_return(count) < ))
fail_fn(count);
} __visible void __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count); __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, ,
NULL, _RET_IP_, NULL, );
}

static inline void mutex_set_owner(struct mutex *lock)
{
  lock->owner = current;
}

 
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct task_struct *task = current;
struct mutex_waiter waiter;
unsigned long flags;
int ret; preempt_disable();---------------------------------------------关闭内核抢占。
mutex_acquire_nest(&lock->dep_map, subclass, , nest_lock, ip); if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
/* got the lock, yay! */
preempt_enable();-------------------------------------------恢复抢占。
return ;
} spin_lock_mutex(&lock->wait_lock, flags); /*
* Once more, try to acquire the lock. Only try-lock the mutex if
* it is unlocked to reduce unnecessary xchg() operations.
*/
if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, ) == ))----再尝试一次获取锁。
goto skip_wait; debug_mutex_lock_common(lock, &waiter);
debug_mutex_add_waiter(lock, &waiter, task_thread_info(task)); /* add waiting tasks to the end of the waitqueue (FIFO): */
list_add_tail(&waiter.list, &lock->wait_list);------------------------把waiter加入到mutex等待队列wait_list中,这里实现的是先进先出队列。
waiter.task = task; lock_contended(&lock->dep_map, ip); for (;;) {
if (atomic_read(&lock->count) >= &&
(atomic_xchg(&lock->count, -) == ))---------------------每次循环首先尝试是否可以获取锁,lock->count设置为-1,在后面代码中还会判断会判断等待队列中是否还有等待者。
break; if (unlikely(signal_pending_state(state, task))) {
ret = -EINTR;
goto err;-------------------------------------------------收到异常信号退出循环。
} if (use_ww_ctx && ww_ctx->acquired > ) {
ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
if (ret)
goto err;
} __set_task_state(task, state); /* didn't get the lock, go to sleep: */
spin_unlock_mutex(&lock->wait_lock, flags);
schedule_preempt_disabled();----------------------------------如果获取失败,调用schedule_preempt_disabled()让出CPU,当前进程进入睡眠状态。
spin_lock_mutex(&lock->wait_lock, flags);
}
__set_task_state(task, TASK_RUNNING);-----------------------------如果for循环成功获取锁而退出for循环,那么将设置当前进程为可运行状态TASK_EUNNING。 mutex_remove_waiter(lock, &waiter, current_thread_info());--------从lock->waiter_list中出列。
/* set it to 0 if there are no waiters left: */
if (likely(list_empty(&lock->wait_list)))
atomic_set(&lock->count, );----------------------------------如果等待队列中没有人在睡眠等待,那么把count设置为0。
debug_mutex_free_waiter(&waiter); skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
mutex_set_owner(lock); if (use_ww_ctx) {
struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
ww_mutex_set_context_slowpath(ww, ww_ctx);
} spin_unlock_mutex(&lock->wait_lock, flags);
preempt_enable();
return ;-------------------------------------------------------成功获取锁,设置owner为当前进程,打开内核抢占,返回0. err:
mutex_remove_waiter(lock, &waiter, task_thread_info(task));
spin_unlock_mutex(&lock->wait_lock, flags);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, , ip);
preempt_enable();
return ret;
} static bool mutex_optimistic_spin(struct mutex *lock,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct task_struct *task = current; if (!mutex_can_spin_on_owner(lock))----------mutex_can_spin_on_owner()返回0说明锁持有者并没有正在运行,不符合自旋等待机制的条件。自旋等待的条件是持有锁者正在临界区执行,自选等待才有价值。
goto done;
if (!osq_lock(&lock->osq))-------------------获取一个OSQ锁保护,因为接下来要自旋等待该锁尽快释放,不希望有其他人参与进来一起自旋等待,多人参与自旋等待会导致严重的CPU高速缓存颠簸。这里把所有在等待mutex的参与者放入OSQ锁队列中,只有队列的第一个等待者可以参与自旋等待。
goto done; while (true) {-----------------------------一直自旋并且判断锁持有者是否释放了锁。
struct task_struct *owner; if (use_ww_ctx && ww_ctx->acquired > ) {
struct ww_mutex *ww; ww = container_of(lock, struct ww_mutex, base);
if (ACCESS_ONCE(ww->ctx))
break;
} owner = ACCESS_ONCE(lock->owner);
if (owner && !mutex_spin_on_owner(lock, owner))------一直自旋等待锁持有者尽快释放锁,返回true表示释放锁。
break; /* Try to acquire the mutex if it is unlocked. */
if (mutex_try_to_acquire(lock)) {---------------------在只有这释放了锁之后,当前进程尝试去获取该锁。
lock_acquired(&lock->dep_map, ip); if (use_ww_ctx) {
struct ww_mutex *ww;
ww = container_of(lock, struct ww_mutex, base); ww_mutex_set_context_fastpath(ww, ww_ctx);
} mutex_set_owner(lock);----------------------------把lock->owner指向当前进程task_struct数据结构。
osq_unlock(&lock->osq);
return true;
} if (!owner && (need_resched() || rt_task(task)))-----owner为NULL,也有可能持有锁者在成功获取锁和设置owner间隙中被强占调度,或者如果当前是实时进程或者也要退出自旋等待。
break; cpu_relax_lowlatency();
} osq_unlock(&lock->osq);
done: if (need_resched()) {
__set_current_state(TASK_RUNNING);
schedule_preempt_disabled();
} return false;
} static inline int mutex_can_spin_on_owner(struct mutex *lock)
{
struct task_struct *owner;
int retval = ; if (need_resched())---------------------如果当前进程需要被调度,返回0。
return ; rcu_read_lock();-------------------------RCU读临界区包括owner指向的task_struct数据结构在读临界区内不会被释放。
owner = ACCESS_ONCE(lock->owner);
if (owner)
retval = owner->on_cpu;--------------owner指向持锁进程的task_struct数据结构,task_struct->on_cpu为1表示锁持有者正在运行,也就是正在临界区中执行,因为锁持有者释放该锁后lock->owner为NULL。
rcu_read_unlock();
/*
* if lock->owner is not set, the mutex owner may have just acquired
* it and not set the owner yet or the mutex has been released.
*/
return retval;
} static noinline
int mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
rcu_read_lock();
while (owner_running(lock, owner)) {-----判断锁的持有者lock->owner是否和owner相等,如果不等返回0;否则返回owner->on_cpu的值。owner_running()返回0,那么当前进程就没有必要在while循环里一直监视持有锁者的情况。
if (need_resched())------------------如果调度器需要调度其它进程,那么当前进程也只能*退出自旋等待。
break;
cpu_relax_lowlatency();
}
rcu_read_unlock(); return lock->owner == NULL;--------------当lock->owner为null时,表示持有者释放锁,返回true。
}
有两种情况导致退出自旋:
一是锁持有者释放了锁,即lock->owner不指向锁持有者或者锁持有者发生了变化;
二是锁持有者没有释放锁,但是锁持有者在临界区执行时被调度出去了,也就是睡眠了,即on_cpu=0。
这两种情况下,当前进程都应该积极主动退出自旋等待机制。

static inline bool owner_running(struct mutex *lock, struct task_struct *owner)
{
    if (lock->owner != owner)
        return false;

barrier();

return owner->on_cpu;
}

static inline bool mutex_try_to_acquire(struct mutex *lock)
{
return !mutex_is_locked(lock) &&
(atomic_cmpxchg(&lock->count, , ) == );-----------首先读取原子变量lock->count,判断是否为1,如果是1使用atomic_cmpxchg()函数把count设为0,成功获取锁。
} static inline int mutex_is_locked(struct mutex *lock)
{
return atomic_read(&lock->count) != ;
}

mutex_unlock()解除mutex锁,和加锁一样有快车道和慢车道之分。

解锁快车道时如果count原子加1后大于0,说明等待队列中没有人,那么就解锁成功;否则进入慢车道函数__mutex_unlock_slowpath()。

void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_MUTEXES
mutex_clear_owner(lock);------------清除lock->owner的指向。
#endif
__mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
} static inline void mutex_clear_owner(struct mutex *lock)
{
lock->owner = NULL;
} static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_inc_return(count) <= ))-------count加1后大于0,说明等待队列中没有人,解锁成功;小于等于0,说明等待队列中还有人,进入解锁慢车道。
fail_fn(count);
} __visible void
__mutex_unlock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count); __mutex_unlock_common_slowpath(lock, );
}
__mutex_unlock_common_slowpath()是慢车道释放锁的函数,首先lock->count减1,然后查看等待队列是否为空,不为空则唤醒第一个waiter。
static inline void
__mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
unsigned long flags; if (__mutex_slowpath_needs_to_unlock())------------处于性能考虑,首先释放锁,然后取唤醒等待队列中的waiters。让其它进程可以抢占锁。
atomic_set(&lock->count, ); spin_lock_mutex(&lock->wait_lock, flags);
mutex_release(&lock->dep_map, nested, _RET_IP_);
debug_mutex_unlock(lock); if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_entry(lock->wait_list.next,
struct mutex_waiter, list);-----只唤醒等待队列中排在第一位的waiter。 debug_mutex_wake_waiter(lock, waiter); wake_up_process(waiter->task);
} spin_unlock_mutex(&lock->wait_lock, flags);
}

4. 小结

4.1 mutex的优点和使用注意点

mutex和信号量相比要高效的多:

  • mutex最先实现自旋等待机制
  • mutex在睡眠之前尝试获取锁
  • mutex实现MCS所来避免多个CPU争用锁而导致CPU高速缓存颠簸现象。

正因为mutex的简洁高效,mutex的使用场景比信号量更加严格:

  • 同一时刻只有一个线程可以持有mutex。
  • 只有锁持有者可以解锁。不能再一个进程中持有mutex,在另外一个进程中释放他。
  • 不允许递归地加锁和解锁。
  • 当进程持有mutex时,进程不可以退出。
  • mutex必须使用官方API来初始化。
  • mutex可以睡眠,所以不允许在中断处理程序或者中断下半部中使用,例如tasklet、定时器等。

4.2 如何选择spinlock、信号量、mutex?

  • 在中断上下文中毫不犹豫地使用spinlock。
  • 如果临界区可能睡眠、调度等动作,应避免选择spinlock。
  • mutex和信号量,优先选择mutex,除非mutex不适合上述限制场景。