原文地址:http://blog.csdn.net/chen19870707/article/details/41083183
CAS原子操作实现无锁及性能分析
Author:Echo Chen(陈斌)
Email:chenb19870707@gmail.com
Date:Nov 13th, 2014
最近在研究nginx的自旋锁的时候,又见到了GCC CAS原子操作,于是决定动手分析下CAS实现的无锁到底性能如何,网上关于CAS实现无锁的文章很多,但少有研究这种无锁的性能提升的文章,这里就以实验结果和我自己的理解逐步展开。
1.什么是CAS原子操作
在研究无锁之前,我们需要首先了解一下CAS原子操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。
大家应该还记得操作系统里面关于“原子操作”的概念,一个操作是原子的(atomic),如果这个操作所处的层(layer)的更高层不能发现其内部实现与结构。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。有了这个原子操作这个保证我们就可以实现无锁了。
CAS原子操作在*中的代码描述如下:
1: int compare_and_swap(int* reg, int oldval, int newval)2: {
3: ATOMIC();
4: int old_reg_val = *reg;5: if (old_reg_val == oldval)6: *reg = newval;
7: END_ATOMIC();
8: return old_reg_val;9: }
也就是检查内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。上面的代码总是返回old_reg_value,调用者如果需要知道是否更新成功还需要做进一步判断,为了方便,它可以变种为直接返回是否更新成功,如下:
1: bool compare_and_swap (int *accum, int *dest, int newval)2: {
3: if ( *accum == *dest ) {4: *dest = newval;
5: return true;6: }
7: return false;8: }
除了CAS还有以下原子操作:
- Fetch And Add,一般用来对变量做 +1 的原子操作。
1: << atomic >>
2: function FetchAndAdd(address location, int inc) {3: int value := *location4: *location := value + inc
5: return value6: }
Test-and-set,写值到某个内存位置并传回其旧值。汇编指令BST。1: #define LOCKED 1
2:
3: int TestAndSet(int* lockPtr) {4: int oldValue;5:
6: // Start of atomic segment7: // The following statements should be interpreted as pseudocode for8: // illustrative purposes only.9: // Traditional compilation of this code will not guarantee atomicity, the10: // use of shared memory (i.e. not-cached values), protection from compiler11: // optimization, or other required properties.12: oldValue = *lockPtr;
13: *lockPtr = LOCKED;
14: // End of atomic segment15:
16: return oldValue;17: }
- Test and Test-and-set,用来实现多核环境下互斥锁,
1: boolean locked := false // shared lock variable2: procedure EnterCritical() {
3: do {4: while (locked == true) skip // spin until lock seems free5: } while TestAndSet(locked) // actual atomic locking6: }
2.CAS 在各个平台下的实现
2.1 Linux GCC 支持的 CAS
GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins)
1: bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)2: type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
2.2 Windows支持的CAS
在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions)
1: InterlockedCompareExchange ( __inout LONG volatile *Target,2: __in LONG Exchange,
3: __in LONG Comperand);
2.3 C++ 11支持的CAS
C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library)
1: template< class T >2: bool atomic_compare_exchange_weak( std::atomic<T>* obj,3: T* expected, T desired );
4: template< class T >5: bool atomic_compare_exchange_weak( volatile std::atomic<T>* obj,6: T* expected, T desired );
3.CAS原子操作实现无锁的性能分析
这里由于只是比较性能,所以采用很简单的方式,创建10个线程并发执行,每个线程中循环对全局变量count进行++操作(i++),循环加2000000次,这必然会涉及到并发互斥操作,在同一台机器上分析 加普通互斥锁、CAS实现的无锁、Fetch And Add实现的无锁消耗的时间,然后进行分析。
3.2 加普通互斥锁代码
1: #include <stdio.h>
2: #include <stdlib.h>
3: #include <pthread.h>
4: #include <time.h>
5: #include "timer.h"
6:
7: pthread_mutex_t mutex_lock;
8: static volatile int count = 0;
9: void *test_func(void *arg)
10: {
11: int i = 0;
12: for(i = 0; i < 2000000; i++)
13: {
14: pthread_mutex_lock(&mutex_lock);
15: count++;
16: pthread_mutex_unlock(&mutex_lock);
17: }
18: return NULL;
19: }
20:
21: int main(int argc, const char *argv[])
22: {
23: Timer timer; // 为了计时,临时封装的一个类Timer。
24: timer.Start(); // 计时开始
25: pthread_mutex_init(&mutex_lock, NULL);
26: pthread_t thread_ids[10];
27: int i = 0;
28: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
29: {
30: pthread_create(&thread_ids[i], NULL, test_func, NULL);
31: }
32:
33: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
34: {
35: pthread_join(thread_ids[i], NULL);
36: }
37:
38: timer.Stop();// 计时结束
39: timer.Cost_time();// 打印花费时间
40: printf("结果:count = %d\n",count);
41:
42: return 0;
43: }
注:Timer类仅作统计时间用,其实现在文章最后给出。
3.2 CAS实现的无锁
1: #include <stdio.h>
2: #include <stdlib.h>
3: #include <pthread.h>
4: #include <unistd.h>
5: #include <time.h>
6: #include "timer.h"
7:
8: int mutex = 0;
9: int lock = 0;
10: int unlock = 1;
11:
12: static volatile int count = 0;
13: void *test_func(void *arg)
14: {
15: int i = 0;
16: for(i = 0; i < 2000000; i++)
17: {
18: while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
19: count++;
20: __sync_bool_compare_and_swap (&mutex, unlock, 0);
21: }
22: return NULL;
23: }
24:
25: int main(int argc, const char *argv[])
26: {
27: Timer timer;
28: timer.Start();
29: pthread_t thread_ids[10];
30: int i = 0;
31:
32: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
33: {
34: pthread_create(&thread_ids[i], NULL, test_func, NULL);
35: }
36:
37: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
38: {
39: pthread_join(thread_ids[i], NULL);
40: }
41:
42: timer.Stop();
43: timer.Cost_time();
44: printf("结果:count = %d\n",count);
45:
46: return 0;
47: }
48:
3.4 Fetch And Add 原子操作
1: #include <stdio.h>
2: #include <stdlib.h>
3: #include <pthread.h>
4: #include <unistd.h>
5: #include <time.h>
6: #include "timer.h"
7:
8: static volatile int count = 0;
9: void *test_func(void *arg)
10: {
11: int i = 0;
12: for(i = 0; i < 2000000; i++)
13: {
14: __sync_fetch_and_add(&count, 1);
15: }
16: return NULL;
17: }
18:
19: int main(int argc, const char *argv[])
20: {
21: Timer timer;
22: timer.Start();
23: pthread_t thread_ids[10];
24: int i = 0;
25:
26: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
27: pthread_create(&thread_ids[i], NULL, test_func, NULL);
28: }
29:
30: for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
31: pthread_join(thread_ids[i], NULL);
32: }
33:
34: timer.Stop();
35: timer.Cost_time();
36: printf("结果:count = %d\n",count);
37: return 0;
38: }
39:
4 实验结果和分析
在同一台机器上,各运行以上3份代码10次,并统计平均值,其结果如下:(单位微秒)
由此可见,无锁操作在性能上远远优于加锁操作,消耗时间仅为加锁操作的1/3左右,无锁编程方式确实能够比传统加锁方式效率高,经上面测试可以发现,可以快到3倍左右。所以在极力推荐在高并发程序中采用无锁编程的方式可以进一步提高程序效率。
5.时间统计类Timer
timer.h
1: #ifndef TIMER_H
2: #define TIMER_H
3:
4: #include <sys/time.h>
5: class Timer6: {
7: public:8: Timer();
9: // 开始计时时间10: void Start();11: // 终止计时时间12: void Stop();13: // 重新设定14: void Reset();15: // 耗时时间16: void Cost_time();17: private:18: struct timeval t1;19: struct timeval t2;20: bool b1,b2;21: };
22: #endiftimer.cpp
1: #include "timer.h"2: #include <stdio.h>
3:
4: Timer::Timer()
5: {
6: b1 = false;
7: b2 = false;
8: }
9: void Timer::Start()10: {
11: gettimeofday(&t1,NULL);
12: b1 = true;
13: b2 = false;
14: }
15:
16: void Timer::Stop()17: {
18: if (b1 == true)19: {
20: gettimeofday(&t2,NULL);
21: b2 = true;
22: }
23: }
24:
25: void Timer::Reset()26: {
27: b1 = false;
28: b2 = false;
29: }
30:
31: void Timer::Cost_time()32: {
33: if (b1 == false)34: {
35: printf("计时出错,应该先执行Start(),然后执行Stop(),再来执行Cost_time()");36: return ;37: }
38: else if (b2 == false)39: {
40: printf("计时出错,应该执行完Stop(),再来执行Cost_time()");41: return ;42: }
43: else44: {
45: int usec,sec;46: bool borrow = false;47: if (t2.tv_usec > t1.tv_usec)48: {
49: usec = t2.tv_usec - t1.tv_usec;
50: }
51: else52: {
53: borrow = true;
54: usec = t2.tv_usec+1000000 - t1.tv_usec;
55: }
56:
57: if (borrow)58: {
59: sec = t2.tv_sec-1 - t1.tv_sec;
60: }
61: else62: {
63: sec = t2.tv_sec - t1.tv_sec;
64: }
65: printf("花费时间:%d秒 %d微秒\n",sec,usec);66: }
67: }
68:
6.参考
-
Echo Chen:Blog.csdn.net/chen19870707
-
- 顶
- 踩
- 猜你在找
- 9楼 workrun2015-09-22 22:01发表
- 多谢分享,学习了~
- 8楼 luoyuyou2015-01-16 14:51发表
- 楼主文章不错~
- 7楼 bxps9992014-12-22 16:03发表
- 学习了
- 6楼 fubar2014-12-02 17:09发表
-
pthread mutex implementation also makes use of CAS under the hood. The major difference, with your test, is that pthread mutex would make a system call to block calling thread until the lock is available, while your CAS implementation makes a fixed-time sleep.
- Re: l0vehj2014-12-05 10:37发表
-
回复fubar:usleep is also a system call, is't it?
why the version which used usleep is much faster then the mutex one?
- Re: chen198707072014-12-04 19:45发表
- 回复fubar:Thanks for your remarks! I'll learn more about it.
- Re: TiGEr_zZ2014-12-04 18:26发表
-
这篇文章,包括很多人对无锁的理解都是非常错误的,这里其实使用了spin锁,实际上也是基于锁的。
另外fubar说的很对,CAS本身没什么神秘的到处都在用。无锁只是说基于CAS来实现,因为它被证明是充分必要条件,而其他原子操作要不不够,要不像ll/sc更强不被广泛支持,两者不是等同的概念。
- 5楼 scgywx2014-11-26 16:22发表
-
学习了,谢谢!
另外有个疑问,CAS实现的无锁,如果拿不到锁就sleep 100ms,,如果时间改小一点会不会有所提高呢?毕竟如果有一次拿到就浪费了100ms了。。- Re: 何以诚2014-12-18 22:09发表
- 回复scgywx:有一种极端情况:若是它能拿到锁的时候永远都在sleep咋办。。。
- Re: chen198707072014-11-26 17:37发表
- 回复scgywx:应该会,你可以测测,我没试
- 4楼 alaclp2014-11-25 09:38发表
-
能够用C++ 11特性, 别使用pthread, 写个线程安全的计数器
在linux下测试没找到pthread
- 3楼 zenny_chen2014-11-17 12:36发表
-
CAS没有LL-SC那么灵活强大
- Re: workrun2015-09-22 23:16发表
-
回复zenny_chen:LL-SC 很强大么,把 一个CAS操作分开来做,还要考虑到期间硬件中断和缓存行被修改的问题,貌似store-conditional 失败率更高
- Re: zenny_chen2015-09-23 20:44发表
-
回复workrun:LL-SC可不是将一个CAS分开来。LL-SC比起CAS来有两个优势:
LL/SC has two advantages over CAS when designing a load-store architecture: reads and writes are separate instructions, as required by the design philosophy (and pipeline architecture); and both instructions can be performed using only two registers (address and value), fitting naturally into LSA encoding schemes. CAS, on the other hand, requires three registers (address, old value, new value) and a dependency between the value read and the value written. x86, being a CISC architecture, does not have this constraint; though modern chips may well translate a CAS instruction into separate LL/SC micro-operations internally.- Re: workrun2015-09-24 13:07发表
-
回复zenny_chen:难道它不是把读写操作分开来的么,读取-监视-条件写,可以看一下这篇http://blogs.msdn.com/b/oldnewthing/archive/2013/09/13/10448736.aspx,
请求CPU监视一个内存地址依赖于CPU自己的实现。但要记住一件事情,CPU在同一时间只能监视一个内存地址,并且这个时间是很短暂的。如果你的代码被抢占了或者在load-link后有一个硬件中断到来,那么你的store-conditional将会失败,因为CPU因为硬件中断而分心了,完全忘记了你要求它监视的内存地址(即使CPU成功的记住了它,也不会记太久,因为硬件中断几乎都会执行自己的load-link指令,因此会替换成它自己要求监视的内存地址)。
另外,CPU可能会有点懒,在监视时并不监视内存地址,而是监视cache line,如果有人修改了一个不同的内存位置,但是刚好跟要被监视的内存地址在同一个cache line里,store-conditional操作也会失败,即使它事实上可以成功完成。ARM架构的CPU是太懒了,以至于任何向同一块2048字节写入的操作都会导致store-conditional失败。
- Re: lojunren2014-11-30 21:28发表
- 回复zenny_chen:X86系列的CPU是没有LL-SC指令的
- Re: chen198707072014-11-17 15:23发表
-
回复zenny_chen:我要研究下,谢谢共享知识
- Re: zenny_chen2014-12-01 10:33发表
-
回复chen19870707:呼呼,推荐阅读:http://en.wikipedia.org/wiki/Non-blocking_algorithm
其中,see also里的Load-Link/Store-Conditional就是LL-SC
- 2楼 丁国华2014-11-16 08:05发表
- 谢谢分享 学习了 `(*∩_∩*)′
- 1楼 soledadzz2014-11-14 17:59发表
- 您的文章已被推荐到博客首页和个人页侧边栏推荐文章,感谢您的分享。