本文代码基于linux内核4.19.195
之前写了rcu、rcu nocb的文章,感觉还差个srcu就完整了,现在补齐一下。
SRCU(Sleepable RCU)是rcu的一个变体,顾名思义,就是在rcu的读临界区中允许睡眠,而rcu在读临界区中是不运许睡眠的。许多文章详细的介绍了srcu的使用方法和局限,我们跳过这部分,直奔其原理。
对比rcu的数据结构,srcu也有相应的数据结构。
一个srcu_struct,代表一个逻辑srcu子系统。这个结构,在srcu相关函数中需要经常被调用到,而在rcu中基本没有使用,这主要是内核需要区分多种srcu,且每个子系统可能会定义自己的srcu;而rcu只有那几种,内核代码里早就写好了,从而在api中就封装掉了,开发者无需关注。
读者通过调用srcu_read_lock和srcu_read_unlock进出srcu读临界区
/*
* Counts the new reader in the appropriate per-CPU element of the
* srcu_struct.
* Returns an index that must be passed to the matching srcu_read_unlock().
*/
int __srcu_read_lock(struct srcu_struct *sp)
{
int idx;
idx = READ_ONCE(sp->srcu_idx) & 0x1;
this_cpu_inc(sp->sda->srcu_lock_count[idx]);/* 在对应的宽限期增加锁计数。cpu变量per_cpu_ref->c[idx]加1 */
smp_mb(); /* B */ /* Avoid leaking the critical section. */
return idx;//需要返回index
}
EXPORT_SYMBOL_GPL(__srcu_read_lock);
/**
* srcu_read_lock - register a new reader for an SRCU-protected structure.
* @sp: srcu_struct in which to register the new reader.
*
* Enter an SRCU read-side critical section. Note that SRCU read-side
* critical sections may be nested. However, it is illegal to
* call anything that waits on an SRCU grace period for the same
* srcu_struct, whether directly or indirectly. Please note that
* one way to indirectly wait on an SRCU grace period is to acquire
* a mutex that is held elsewhere while calling synchronize_srcu() or
* synchronize_srcu_expedited().
*
* Note that srcu_read_lock() and the matching srcu_read_unlock() must
* occur in the same context, for example, it is illegal to invoke
* srcu_read_unlock() in an irq handler if the matching srcu_read_lock()
* was invoked in process context.
*/
//读者进入临界区
static inline int srcu_read_lock(struct srcu_struct *sp) __acquires(sp)
{
int retval;
retval = __srcu_read_lock(sp);
rcu_lock_acquire(&(sp)->dep_map);
return retval;
}
/**
* srcu_read_unlock - unregister a old reader from an SRCU-protected structure.
* @sp: srcu_struct in which to unregister the old reader.
* @idx: return value from corresponding srcu_read_lock().
*
* Exit an SRCU read-side critical section.
*/
static inline void srcu_read_unlock(struct srcu_struct *sp, int idx)
__releases(sp)
{
rcu_lock_release(&(sp)->dep_map);
__srcu_read_unlock(sp, idx);
}
/*
* Removes the count for the old reader from the appropriate per-CPU
* element of the srcu_struct. Note that this may well be a different
* CPU than that which was incremented by the corresponding srcu_read_lock().
*/
//read unlock并没有修改srcu_idx,也就是说,一次宽限期内可以进入无数次读临界区,直到process_srcu修改srcu_idx
void __srcu_read_unlock(struct srcu_struct *sp, int idx)
{
smp_mb(); /* C */ /* Avoid leaking the critical section. */
this_cpu_inc(sp->sda->srcu_unlock_count[idx]);/* cpu变量per_cpu_ref->c[idx]减1 */
}
EXPORT_SYMBOL_GPL(__srcu_read_unlock);
可以看到,srcu_read_lock函数是有返回值的,且该返回值需要传递给srcu_read_unlock函数,这是srcu的特别之处,内核需要知道当前代码是准备进入哪个srcu的临界区。
进入临界区的原理很简单,获取当前的srcu的idx,对相关percpu的atomic变量进行原子加一操作;而离开临界区的原理也类似,根据srcu_read_lock的返回值,对相关percpu的另一atomic变量进行原子加一操作。这样只要将这两组percpu的atomic变量相加,只要和一样,就说明没有人在临界区里面了。
真的是这样吗?
首先为什么是percpu变量的和?
这是因为一个进程可能在cpu A上进的临界区,然后在cpu B上离开的临界区,这就导致记录进出临界区的变量在单个cpu上不一定是相等的。
那么,内核是如何检测当前这两组percpu的atomic变量的和是否相等呢?具体来说,每个核上可能会有进程在不断的进出srcu临界区,如何保证这个检测动作的原子性呢?要知道,多个数(cpu数量那么多)的求和,除非加锁,才能做好同步,但是很明显,srcu_read_lock函数和srcu_read_unlock函数都没有加锁动作,这是怎么回事?
回答这个问题,要看srcu是怎么处理一个宽限期的。
rcu的宽限期统计使用软中断实现,srcu使用工作队列实现宽限期统计,由函数process_srcu完成,其中函数srcu_advance_state完成了执行当前宽限期的状态机的工作。
call_srcu函数中,会根据情况调用srcu_funnel_gp_start,从而唤醒process_srcu相关的work,触发process_srcu的调用
/*
* This is the work-queue function that handles SRCU grace periods.
*/
static void process_srcu(struct work_struct *work)
{
struct srcu_struct *sp;
sp = container_of(work, struct srcu_struct, work.work);
srcu_advance_state(sp);
srcu_reschedule(sp, srcu_get_delay(sp)); //确定是否需要启动下一个宽限期
}
/*
* Core SRCU state machine. Push state bits of ->srcu_gp_seq
* to SRCU_STATE_SCAN2, and invoke srcu_gp_end() when scan has
* completed in that state.
*/
static void srcu_advance_state(struct srcu_struct *sp)
{
int idx;
mutex_lock(&sp->srcu_gp_mutex);
/*
* Because readers might be delayed for an extended period after
* fetching ->srcu_idx for their index, at any point in time there
* might well be readers using both idx=0 and idx=1. We therefore
* need to wait for readers to clear from both index values before
* invoking a callback.
*
* The load-acquire ensures that we see the accesses performed
* by the prior grace period.
*/
idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
if (idx == SRCU_STATE_IDLE) { //如果宽限期处于空闲状态
spin_lock_irq_rcu_node(sp);
if (ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed)) { //如果宽限期没有注册回调函数
WARN_ON_ONCE(rcu_seq_state(sp->srcu_gp_seq));
spin_unlock_irq_rcu_node(sp);
mutex_unlock(&sp->srcu_gp_mutex);
return;
}
idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
if (idx == SRCU_STATE_IDLE)
srcu_gp_start(sp); //启动宽限期N
spin_unlock_irq_rcu_node(sp);
if (idx != SRCU_STATE_IDLE) {
mutex_unlock(&sp->srcu_gp_mutex);
return; /* Someone else started the grace period. */
}
}
if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
idx = 1 ^ (sp->srcu_idx & 1);
if (!try_check_zero(sp, idx, 1)) {//是否所有读者都退出了宽限期(N-1)的临界区
mutex_unlock(&sp->srcu_gp_mutex);
return; /* readers present, retry later. */
}
srcu_flip(sp); //切换当前读者数组索引 rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2); //切换状态到SRCU_STATE_SCAN2
}
if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
/*
* SRCU read-side critical sections are normally short,
* so check at least twice in quick succession after a flip.
*/
idx = 1 ^ (sp->srcu_idx & 1);
if (!try_check_zero(sp, idx, 2)) {//是否所有读者都退出了宽限期N的临界区
mutex_unlock(&sp->srcu_gp_mutex);
return; /* readers present, retry later. */
}
srcu_gp_end(sp); /* Releases ->srcu_gp_mutex. */ //结束宽限期N
}
}
从代码中可以看到,SRCU会经历三个状态:
SRCU_STATE_IDLE->SRCU_STATE_SCAN1->SRCU_STATE_SCAN2
一开始的时候,srcu初始化完毕会处于SRCU_STATE_IDLE状态。当有第一个人调用call_srcu的时候,会触发process_srcu的运行,进而调用srcu_gp_start启动当前宽限期N,将srcu的状态修改为SRCU_STATE_SCAN1后,process_srcu退出。
当再次调用到process_srcu时(这里还没弄明白是怎么被调用的,记个TODO后续学习),发现是SRCU_STATE_SCAN1状态,则调用try_check_zero判断前一个宽限期(N-1)是否完成,判断方法就是前面说的两组percpu变量的和。特别注意,这里的index取dx = 1 ^ (sp->srcu_idx & 1);
如果判断成功,即前一个宽限期的所有进程都退出了临界区,则调用srcu_flip,切换当前读者数组索引,也就是修改srcu_idx,这就是同步的关键。为什么这么说呢,因为调用了srcu_flip之后,srcu_read_lock返回的index,就和调用srcu_flip前不一样了,也就意味着,后续的srcu_read_lock,增加的atomic变量,和原先的srcu_read_lock增加的atomic变量,不再是一个变量了,这就能保证宽限期N的同步工作。
接下来,srcu进入SRCU_STATE_SCAN2状态,只需要等待try_check_zero返回成功,即可结束宽限期N。