posix 匿名信号量与互斥锁 示例生产者--消费者问题

时间:2021-10-26 16:50:49

一、posix 信号量

信号量的概念参见这里。前面也讲过system v 信号量,现在来说说posix 信号量。

system v 信号量只能用于进程间同步,而posix 信号量除了可以进程间同步,还可以线程间同步。system v 信号量每次PV操作可以是N,但Posix 信号量每次PV只能是1。除此之外,posix 信号量还有命名和匿名之分(man 7 sem_overview):

1、命名信号量

名字以/somename 形式分辨,只能有一个/ ,且总长不能超过NAME_MAX - 4(一般是251)。

需要用sem_open 函数创建或打开,PV操作分别是sem_wait 和 sem_post,可以使用sem_close 关闭,删除用sem_unlink。

命名信号量用于不共享内存的进程间同步(内核实现),类似system v 信号量。

2、匿名信号量

存放在一块共享内存中,如果是线程共享,这块区域可以是全局变量;如果是进程共享,可以是system v 共享内存(shmget 创建,shmat 映射),也可以是 posix 共享内存(shm_open 创建,mmap 映射)。

匿名信号量必须用sem_init 初始化,sem_init 函数其中一个参数pshared决定了线程共享还是进程共享,也可以用sem_post 和sem_wait 进行操作,在共享内存释放前,匿名信号量要先用sem_destroy 销毁。

有关这些函数的具体参数可以man 一下。

二、互斥锁

对于多线程的程序,访问冲突的问题是很普遍的,解决的办法是引入互斥锁(Mutex,MutualExclusive

Lock),获得锁的线程可以完成“读-修改-写”的操作,然后释放锁给其它线程,没有获得锁的线程只能等待而不能访问共享数据,这样“读-修改-写”三步操作组成一个原子操作,要么都执行,要么都不执行,不会执行到中间被打断,也不会在其它处理器上并行做这个操作。

Mutex用pthread_mutex_t类型的变量表示,pthread_mutex_init函数对Mutex做初始化,参数attr设定Mutex的属性,如果attr为NULL则表示缺省属性,具体看结构体:有人建议开发中总是设置
PTHREAD_MUTEX_RECURSIVE 属性,避免死锁。

// 互斥量属性: 同一线程可多次加锁
pthread_mutexattr_t m_attr;
pthread_mutexattr_init(&m_attr);
pthread_mutexattr_settype(&m_attr, PTHREAD_MUTEX_RECURSIVE);

 C++ Code 
1
2
3
4
5
6
7
8
9
10
 
struct pthread_mutexattr_t
{
    enum lock_type    // 使用pthread_mutexattr_settype来更改
    {
         PTHREAD_MUTEX_TIMED_NP [default]//当一个线程加锁后,其余请求锁的线程形成等待队列,在解锁后按优先级获得锁。
         PTHREAD_MUTEX_ADAPTIVE_NP       // 动作最简单的锁类型,解锁后所有线程重新竞争。
         PTHREAD_MUTEX_RECURSIVE_NP      // 允许同一线程对同一锁成功获得多次。当然也要解锁多次。其余线程在解锁时重新竞争。
         PTHREAD_MUTEX_ERRORCHECK_NP     // 若同一线程请求同一锁,返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP动作相同。
    } type;
} attr;

用pthread_mutex_init函数初始化的Mutex可以用pthread_mutex_destroy销毁。如果Mutex变量是静态分配的(全局变量或static变量),也可以用宏定义PTHREAD_MUTEX_INITIALIZER来初始化,相当于用pthread_mutex_init初始化并且attr参数为NULL。

一个线程可以调用pthread_mutex_lock获得Mutex,如果这时另一个线程已经调用pthread_mutex_lock获得了该Mutex,则当前线程需要挂起等待,直到另一个线程调用pthread_mutex_unlock释放Mutex,当前线程被唤醒,才能获得该Mutex并继续执行。

如果一个线程既想获得锁,又不想挂起等待,可以调用pthread_mutex_trylock,如果Mutex已经被另一个线程获得,这个函数会失败返回EBUSY,而不会使线程挂起等待。

上面的具体函数可以man 一下。

三、生产者消费者问题

生产者消费者问题概念参见这里。下面使用posix 信号量和互斥锁一起来演示:

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
 
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 1
#define BUFFSIZE 10

int g_buffer[BUFFSIZE];

unsigned short in = 0;
unsigned short out = 0;
unsigned short produce_id = 0;
unsigned short consume_id = 0;

sem_t g_sem_full;
sem_t g_sem_empty;
pthread_mutex_t g_mutex;

pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];

void *consume(void *arg)
{
    int i;
    int num = (int)arg;
    while (1)
    {
        printf("%d wait buffer not empty\n", num);
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);

for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

if (i == out)
                printf("\t<--consume");

printf("\n");
        }
        consume_id = g_buffer[out];
        printf("%d begin consume product %d\n", num, consume_id);
        g_buffer[out] = -1;
        out = (out + 1) % BUFFSIZE;
        printf("%d end consume product %d\n", num, consume_id);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_full);
        sleep(1);
    }
    return NULL;
}

void *produce(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not full\n", num);
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

if (i == in)
                printf("\t<--produce");

printf("\n");
        }

printf("%d begin produce product %d\n", num, produce_id);
        g_buffer[in] = produce_id;
        in = (in + 1) % BUFFSIZE;
        printf("%d end produce product %d\n", num, produce_id++);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_empty);
        sleep(5);
    }
    return NULL;
}

int main(void)
{
    int i;
    for (i = 0; i < BUFFSIZE; i++)
        g_buffer[i] = -1;

sem_init(&g_sem_full, 0, BUFFSIZE);
    sem_init(&g_sem_empty, 0, 0);

pthread_mutex_init(&g_mutex, NULL);

for (i = 0; i < CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void *)i);

for (i = 0; i < PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)i);

for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

sem_destroy(&g_sem_full);
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);

return 0;
}

这里的程序相比,程序逻辑没太大变化,只是用pthread_mutex_lock
替代了
sem_mutex,其次这里是演示线程间同步,现在上述程序生产者消费者各一个线程,但生产者睡眠时间是消费者的5倍,故消费者会经常阻塞在sem_wait(&g_sem_empty)
上面,因为缓冲区经常为空,可以将PRODUCTORS_COUNT
改成5,即有5个生产者线程和1个消费者线程,而且生产者睡眠时间还是消费者的5倍,从动态输出可以看出,基本上就动态平衡了,即5个生产者一下子生产了5份东西,消费者1s消费1份,刚好在生产者继续生产前消费完。

四、自旋锁和读写锁简介

(一)、自旋锁

自旋锁类似于互斥锁,它的性能比互斥锁更高。
自旋锁与互斥锁很重要的一个区别在于,线程在申请自旋锁的时候,线程不会被挂起,它处于忙等待的状态,一般用于等待时间比较短的情形。
pthread_spin_init
pthread_spin_destroy
pthread_spin_lock
pthread_spin_unlock

(二)、读写锁

1、只要没有线程持有给定的读写锁用于写,那么任意数目的线程可以持有读写锁用于读
2、仅当没有线程持有某个给定的读写锁用于读或用于写时,才能分配读写锁用于写
3、读写锁用于读称为共享锁,读写锁用于写称为排它锁
pthread_rwlock_init
pthread_rwlock_destroy
int pthread_rwlock_rdlock
int pthread_rwlock_wrlock
int pthread_rwlock_unlock

更多有关linux中的锁问题可以参考这篇文章 :《透过Linux内核看无锁编程》

http://www.ibm.com/developerworks/cn/linux/l-cn-lockfree/

透过 Linux 内核看无锁编程

非阻塞型同步 (Non-blocking Synchronization) 简介

如何正确有效的保护共享数据是编写并行程序必须面临的一个难题,通常的手段就是同步。同步可分为阻塞型同步(Blocking Synchronization)和非阻塞型同步( Non-blocking Synchronization)。

阻塞型同步是指当一个线程到达临界区时,因另外一个线程已经持有访问该共享数据的锁,从而不能获取锁资源而阻塞,直到另外一个线程释放锁。常见的同步原语有 mutex、semaphore 等。如果同步方案采用不当,就会造成死锁(deadlock),活锁(livelock)和优先级反转(priority inversion),以及效率低下等现象。

为了降低风险程度和提高程序运行效率,业界提出了不采用锁的同步方案,依照这种设计思路设计的算法称为非阻塞型算法,其本质特征就是停止一个线程的执行不会阻碍系统中其他执行实体的运行。

当今比较流行的 Non-blocking Synchronization 实现方案有三种:

  1. Wait-free

    Wait-free 是指任意线程的任何操作都可以在有限步之内结束,而不用关心其它线程的执行速度。 Wait-free 是基于 per-thread 的,可以认为是 starvation-free 的。非常遗憾的是实际情况并非如此,采用 Wait-free 的程序并不能保证 starvation-free,同时内存消耗也随线程数量而线性增长。目前只有极少数的非阻塞算法实现了这一点。

  2. Lock-free

    Lock-Free 是指能够确保执行它的所有线程中至少有一个能够继续往下执行。由于每个线程不是 starvation-free 的,即有些线程可能会被任意地延迟,然而在每一步都至少有一个线程能够往下执行,因此系统作为一个整体是在持续执行的,可以认为是 system-wide 的。所有 Wait-free 的算法都是 Lock-Free 的。

  3. Obstruction-free

    Obstruction-free 是指在任何时间点,一个孤立运行线程的每一个操作可以在有限步之内结束。只要没有竞争,线程就可以持续运行。一旦共享数据被修改,Obstruction-free 要求中止已经完成的部分操作,并进行回滚。 所有 Lock-Free 的算法都是 Obstruction-free 的。

综上所述,不难得出 Obstruction-free 是 Non-blocking synchronization 中性能最差的,而 Wait-free 性能是最好的,但实现难度也是最大的,因此 Lock-free 算法开始被重视,并广泛运用于当今正在运行的程序中,比如 linux 内核。

一般采用原子级的 read-modify-write 原语来实现 Lock-Free 算法,其中 LL 和 SC 是 Lock-Free 理论研究领域的理想原语,但实现这些原语需要 CPU 指令的支持,非常遗憾的是目前没有任何 CPU 直接实现了 SC 原语。根据此理论,业界在原子操作的基础上提出了著名的 CAS(Compare - And - Swap)操作来实现 Lock-Free 算法,Intel 实现了一条类似该操作的指令:cmpxchg8。

CAS 原语负责将某处内存地址的值(1 个字节)与一个期望值进行比较,如果相等,则将该内存地址处的值替换为新值,CAS 操作伪码描述如下:

清单 1. CAS 伪码
Bool CAS(T* addr, T expected, T newValue)
{
     if( *addr == expected )
    {
         *addr =  newValue;
          return true;
    }
    else
          return false;
}

在实际开发过程中,利用 CAS 进行同步,代码如下所示:

清单 2. CAS 实际操作
do{
       备份旧数据;
       基于旧数据构造新数据;
}while(!CAS( 内存地址,备份的旧数据,新数据 ))

就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出 CAS 操作是基于共享数据不会被修改的假设,采用了类似于数据库的 commit-retry 的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。

加锁的层级

根据复杂程度、加锁粒度及运行速度,可以得出如下图所示的锁层级:

图 1. 加锁层级

posix 匿名信号量与互斥锁 示例生产者--消费者问题

其中标注为红色字体的方案为 Blocking synchronization,黑色字体为 Non-blocking synchronization。Lock-based 和 Lockless-based 两者之间的区别仅仅是加锁粒度的不同。图中最底层的方案就是大家经常使用的 mutex 和 semaphore 等方案,代码复杂度低,但运行效率也最低。

Linux 内核中的无锁分析

Linux 内核可能是当今最大最复杂的并行程序之一,它的并行主要来至于中断、内核抢占及 SMP 等。内核设计者们为了不断提高 Linux 内核的效率,从全局着眼,逐步废弃了大内核锁来降低锁的粒度;从细处下手,不断对局部代码进行优化,用无锁编程替代基于锁的方案,如 seqlock 及 RCU 等;不断减少锁冲突程度、降低等待时间,如 Double-checked locking 和原子锁等。

内核无锁第一层级 — 少锁

无论什么时候当临界区中的代码仅仅需要加锁一次,同时当其获取锁的时候必须是线程安全的,此时就可以利用 Double-checked Locking 模式来减少锁竞争和加锁载荷。目前 Double-checked Locking 已经广泛应用于单例 (Singleton) 模式中。内核设计者基于此思想,巧妙的将 Double-checked Locking 方法运用于内核代码中。

当一个进程已经僵死,即进程处于 TASK_ZOMBIE 状态,如果父进程调用 waitpid() 系统调用时,父进程需要为子进程做一些清理性的工作,代码如下所示:

清单 3. 少锁操作
984   static int wait_task_zombie(task_t *p, int noreap,
985           struct siginfo __user *infop,
986           int __user *stat_addr, struct rusage __user *ru)
987   {
          ……
1103       if (p->real_parent != p->parent) {
1104           write_lock_irq(&tasklist_lock);
1105           /* Double-check with lock held.  */
1106           if (p->real_parent != p->parent) {
1107               __ptrace_unlink(p);
1108               // TODO: is this safe?
1109               p->exit_state = EXIT_ZOMBIE;
                  ……
1120           }
1121           write_unlock_irq(&tasklist_lock);
1122      }
         ……
1127  }

如果将 write_lock_irq 放置于 1103 行之前,锁的范围过大,锁的负载也会加重,影响效率;如果将加锁的代码放到判断里面,且没有 1106 行的代码,程序会正确吗?在单核情况下是正确的,但在双核情况下问题就出现了。一个非主进程在一个 CPU 上运行,正准备调用 exit 退出,此时主进程在另外一个 CPU 上运行,在子进程调用 release_task 函数之前调用上述代码。子进程在 exit_notify 函数中,先持有读写锁 tasklist_lock,调用 forget_original_parent。主进程运行到 1104 处,由于此时子进程先持有该锁,所以父进程只好等待。在 forget_original_parent 函数中,如果该子进程还有子进程,则会调用 reparent_thread(),将执行 p->parent = p->real_parent; 语句,导致两者相等,等非主进程释放读写锁 tasklist_lock 时,另外一个 CPU 上的主进程被唤醒,一旦开始执行,继续运行将会导致 bug。

严格的说,Double-checked locking 不属于无锁编程的范畴,但由原来的每次加锁访问到大多数情况下无须加锁,就是一个巨大的进步。同时从这里也可以看出一点端倪,内核开发者为了降低锁冲突率,减少等待时间,提高运行效率,一直在持续不断的进行改进。

内核无锁第二层级 — 原子锁

原子操作可以保证指令以原子的方式执行——执行过程不被打断。内核提供了两组原子操作接口:一组针对于整数进行操作,另外一组针对于单独的位进行操作。内核中的原子操作通常是内联函数,一般是通过内嵌汇编指令来完成。对于一些简单的需求,例如全局统计、引用计数等等,可以归结为是对整数的原子计算。

内核无锁第三层级 — Lock-free

1. Lock-free 应用场景一 —— Spin Lock

Spin Lock 是一种轻量级的同步方法,一种非阻塞锁。当 lock 操作被阻塞时,并不是把自己挂到一个等待队列,而是死循环 CPU 空转等待其他线程释放锁。 Spin lock 锁实现代码如下:

清单 4. spin lock 实现代码
static inline void __preempt_spin_lock(spinlock_t *lock)
{
          ……
 do {
   preempt_enable();
   while (spin_is_locked(lock))
     cpu_relax();
   preempt_disable();
 } while (!_raw_spin_trylock(lock));
}
static inline int _raw_spin_trylock(spinlock_t *lock)
{
 char oldval;
 __asm__ __volatile__(
   "xchgb %b0,%1"
   :"=q" (oldval), "=m" (lock->lock)
   :"0" (0) : "memory");
 return oldval > 0;
}

汇编语言指令 xchgb 原子性的交换 8 位 oldval( 存 0) 和 lock->lock 的值,如果 oldval 为 1(lock 初始值为 1),则获取锁成功,反之,则继续循环,接着 relax 休息一会儿,然后继续周而复始,直到成功。

对于应用程序来说,希望任何时候都能获取到锁,也就是期望 lock->lock 为 1,那么用 CAS 原语来描述 _raw_spin_trylock(lock) 就是 CAS(lock->lock,1,0);

如果同步操作总是能在数条指令内完成,那么使用 Spin Lock 会比传统的 mutex lock 快一个数量级。Spin Lock 多用于多核系统中,适合于锁持有时间小于将一个线程阻塞和唤醒所需时间的场合。

pthread 库已经提供了对 spin lock 的支持,所以用户态程序也能很方便的使用 spin lock 了,需要包含 pthread.h 。在某些场景下,pthread_spin_lock 效率是 pthread_mutex_lock 效率的一倍多。美中不足的是,内核实现了读写 spin lock 锁,但 pthread 未能实现。

2. Lock -free 应用场景二 —— Seqlock

手表最主要最常用的功能是读时间,而不是校正时间,一旦后者成了最常用的功能,消费者肯定不会买账。计算机的时钟也是这个功能,修改时间是小概率事件,而读时间是经常发生的行为。以下代码摘自 2.4.34 内核:

清单 5. 2.4.34 seqlock 实现代码
443 void do_gettimeofday(struct timeval *tv)
444 {
              ……
448         read_lock_irqsave(&xtime_lock, flags);
              ……
455         sec = xtime.tv_sec;
456         usec += xtime.tv_usec;
457         read_unlock_irqrestore(&xtime_lock, flags);
              ……
466 }
468 void do_settimeofday(struct timeval *tv)
469 {
470         write_lock_irq(&xtime_lock);
              ……
490         write_unlock_irq(&xtime_lock);
491 }

不难发现获取时间和修改时间采用的是 spin lock 读写锁,读锁和写锁具有相同的优先级,只要读持有锁,写锁就必须等待,反之亦然。

Linux 2.6 内核中引入一种新型锁——顺序锁 (seqlock),它与 spin lock 读写锁非常相似,只是它为写者赋予了较高的优先级。也就是说,即使读者正在读的时候也允许写者继续运行。当存在多个读者和少数写者共享一把锁时,seqlock 便有了用武之地,因为 seqlock 对写者更有利,只要没有其他写者,写锁总能获取成功。根据 lock-free 和时钟功能的思想,内核开发者在 2.6 内核中,将上述读写锁修改成了顺序锁 seqlock,代码如下:

清单 6. 2.6.10 seqlock 实现代码
static inline unsigned read_seqbegin(const seqlock_t *sl)
{
 unsigned ret = sl->sequence;
 smp_rmb();
 return ret;
}
static inline int read_seqretry(const seqlock_t *sl, unsigned iv)
{
 smp_rmb();
 return (iv & 1) | (sl->sequence ^ iv);
}
static inline void write_seqlock(seqlock_t *sl)
{
 spin_lock(&sl->lock);
 ++sl->sequence;
 smp_wmb();     
}
void do_gettimeofday(struct timeval *tv)
{
 unsigned long seq;
 unsigned long usec, sec;
 unsigned long max_ntp_tick;
   ……
 do {
   unsigned long lost;
   seq = read_seqbegin(&xtime_lock);
     ……
   sec = xtime.tv_sec;
   usec += (xtime.tv_nsec / 1000);
 } while (read_seqretry(&xtime_lock, seq));
   ……
 tv->tv_sec = sec;
 tv->tv_usec = usec;
}
int do_settimeofday(struct timespec *tv)
{
   ……
 write_seqlock_irq(&xtime_lock);
   ……
 write_sequnlock_irq(&xtime_lock);
 clock_was_set();
 return 0;
}

Seqlock 实现原理是依赖一个序列计数器,当写者写入数据时,会得到一把锁,并且将序列值加 1。当读者读取数据之前和之后,该序列号都会被读取,如果读取的序列号值都相同,则表明写没有发生。反之,表明发生过写事件,则放弃已进行的操作,重新循环一次,直至成功。不难看出,do_gettimeofday 函数里面的 while 循环和接下来的两行赋值操作就是 CAS 操作。

采用顺序锁 seqlock 好处就是写者永远不会等待,缺点就是有些时候读者不得不反复多次读相同的数据直到它获得有效的副本。当要保护的临界区很小,很简单,频繁读取而写入很少发生(WRRM--- Write Rarely Read Mostly)且必须快速时,就可以使用 seqlock。但 seqlock 不能保护包含有指针的数据结构,因为当写者修改数据结构时,读者可能会访问一个无效的指针。

3. Lock -free 应用场景三 —— RCU

在 2.6 内核中,开发者还引入了一种新的无锁机制 -RCU(Read-Copy-Update),允许多个读者和写者并发执行。RCU 技术的核心是写操作分为写和更新两步,允许读操作在任何时候无阻碍的运行,换句话说,就是通过延迟写来提高同步性能。RCU 主要应用于 WRRM 场景,但它对可保护的数据结构做了一些限定:RCU 只保护被动态分配并通过指针引用的数据结构,同时读写控制路径不能有睡眠。以下数组动态增长代码摘自 2.4.34 内核:

清单 7. 2.4.34 RCU 实现代码

其中 ipc_lock 是读者,grow_ary 是写者,不论是读或者写,都需要加 spin lock 对被保护的数据结构进行访问。改变数组大小是小概率事件,而读取是大概率事件,同时被保护的数据结构是指针,满足 RCU 运用场景。以下代码摘自 2.6.10 内核:

清单 8. 2.6.10 RCU 实现代码
#define rcu_read_lock()    preempt_disable()
#define rcu_read_unlock()  preempt_enable()
#define rcu_assign_pointer(p, v)  ({ \
                   smp_wmb(); \
                   (p) = (v); \
                     })
struct kern_ipc_perm* ipc_lock(struct ipc_ids* ids, int id)
{
   ……
 rcu_read_lock();
 entries = rcu_dereference(ids->entries);
 if(lid >= entries->size) {
   rcu_read_unlock();
   return NULL;
 }
 out = entries->p[lid];
 if(out == NULL) {
   rcu_read_unlock();
   return NULL;
 }
   ……
 return out;
}
static int grow_ary(struct ipc_ids* ids, int newsize)
{
 struct ipc_id_ary* new;
 struct ipc_id_ary* old;
   ……
 new = ipc_rcu_alloc(sizeof(struct kern_ipc_perm *)*newsize +
         sizeof(struct ipc_id_ary));
 if(new == NULL)
   return size;
 new->size = newsize;
 memcpy(new->p, ids->entries->p, sizeof(struct kern_ipc_perm *)*size
              +sizeof(struct ipc_id_ary));
 for(i=size;i<newsize;i++) {
   new->p[i] = NULL;
 }
 old = ids->entries;
 /*
  * Use rcu_assign_pointer() to make sure the memcpyed contents
  * of the new array are visible before the new array becomes visible.
  */
 rcu_assign_pointer(ids->entries, new);
 ipc_rcu_putref(old);
 return newsize;
}

纵观整个流程,写者除内核屏障外,几乎没有一把锁。当写者需要更新数据结构时,首先复制该数据结构,申请 new 内存,然后对副本进行修改,调用 memcpy 将原数组的内容拷贝到 new 中,同时对扩大的那部分赋新值,修改完毕后,写者调用 rcu_assign_pointer 修改相关数据结构的指针,使之指向被修改后的新副本,整个写操作一气呵成,其中修改指针值的操作属于原子操作。在数据结构被写者修改后,需要调用内存屏障 smp_wmb,让其他 CPU 知晓已更新的指针值,否则会导致 SMP 环境下的 bug。当所有潜在的读者都执行完成后,调用 call_rcu 释放旧副本。同 Spin lock 一样,RCU 同步技术主要适用于 SMP 环境。

内核无锁第四层级 — 免锁

环形缓冲区是生产者和消费者模型中常用的数据结构。生产者将数据放入数组的尾端,而消费者从数组的另一端移走数据,当达到数组的尾部时,生产者绕回到数组的头部。

如果只有一个生产者和一个消费者,那么就可以做到免锁访问环形缓冲区(Ring Buffer)。写入索引只允许生产者访问并修改,只要写入者在更新索引之前将新的值保存到缓冲区中,则读者将始终看到一致的数据结构。同理,读取索引也只允许消费者访问并修改。

图 2. 环形缓冲区实现原理图

posix 匿名信号量与互斥锁 示例生产者--消费者问题

如图所示,当读者和写者指针相等时,表明缓冲区是空的,而只要写入指针在读取指针后面时,表明缓冲区已满。

清单 9. 2.6.10 环形缓冲区实现代码
/*
* __kfifo_put - puts some data into the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
      unsigned char *buffer, unsigned int len)
{
 unsigned int l;
 len = min(len, fifo->size - fifo->in + fifo->out);
 /* first put the data starting from fifo->in to buffer end */
 l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
 memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
 /* then put the rest (if any) at the beginning of the buffer */
 memcpy(fifo->buffer, buffer + l, len - l);
 fifo->in += len;
 return len;
}
/*
* __kfifo_get - gets some data from the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
    unsigned char *buffer, unsigned int len)
{
 unsigned int l;
 len = min(len, fifo->in - fifo->out);
 /* first get the data from fifo->out until the end of the buffer */
 l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
 memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
 /* then get the rest (if any) from the beginning of the buffer */
 memcpy(buffer + l, fifo->buffer, len - l);
 fifo->out += len;
 return len;
}

以上代码摘自 2.6.10 内核,通过代码的注释(斜体部分)可以看出,当只有一个消费者和一个生产者时,可以不用添加任何额外的锁,就能达到对共享数据的访问。

总结

通过对比 2.4 和 2.6 内核代码,不得不佩服内核开发者的智慧,为了提高内核性能,一直不断的进行各种优化,并将业界最新的 lock-free 理念运用到内核中。

在实际开发过程中,进行无锁设计时,首先进行场景分析,因为每种无锁方案都有特定的应用场景,接着根据场景分析进行数据结构的初步设计,然后根据先前的分析结果进行并发模型建模,最后在调整数据结构的设计,以便达到最优。

相关主题

  • Andrei Alexandrescu. 《 Lock-Free Data Structures--- Keeping threads moving while avoiding deadlock 》,《 Dr. Dobb's Journal 》, October 01, 2004。
  • 《 Non-blocking synchronization 》, http://en.wikipedia.org/wiki/Non-blocking_synchronization
  • Shameem Akhter and Jason Roberts. 李宝峰,富弘毅,李韬译 . 《多核程序设计技术》,电子工业出版社,2007。
  • Rebert Love,《 Linux Kernel Development,2rd Edition 》,机械工业出版社,2006。
  • Daniel P. Bovet,Marco Cesati,《 Understanding the Linux Kernel,3rd Edition 》,东南大学出版社,2006。
  • Jonatban Corbet 等,魏永明等译,《 Linux 设备驱动程序》,中国电力出版社,2006。
  • Gordon Fischer 等,《 The Linux Kernel Prime 》,机械工业出版社,2006。
  • developerWorks Linux 专区 寻找为 Linux 开发人员(包括 Linux 新手入门)准备的更多参考资料,查阅我们 最受欢迎的文章和教程
  • 在 developerWorks 上查阅所有 Linux 技巧Linux 教程