多线程计数器——原子操作

时间:2021-10-15 08:05:40

众所周知,多线程下计数存在着计数不正确的问题。这个问题的根源在于多个线程对同一个变量可以同时访问(修改)。这样就造成了修改后的结果不一致。

         首先在这里先强调一点,volatile 关键字并不能提供多线程安全访问。因为有volatie修饰的变量,每次操作时遵循下面动作: 

从内存取值 ---> 放入寄存器 ---> 操作 --->写回内存 

这几个步骤不是原子的操作在任意两个步骤之间都可能被其他的线程中断,所以不是线程安全。详细内容参见

http://blog.csdn.net/littlefang/article/details/6103038

LINUX

         多线程访问同一个变量,大多数程序员最先想到的就是利用锁制造处一片线程安全的区域在该区域内,做线程安全的操作。

         

<span style="white-space:pre"></span>{
Locktem(&mutx);
++cunt;
}

要知道如上的代码执行花费的时间至少是只做++cunt操作的10倍。这样就造成了代码的效率降低。

通过以上的小例子看出加锁操作并不是一种高效的线程计数机制。转而就要向操作系统寻求帮助了。在linux操作系统中只要包含<asm/atomic.h><asm/bitops.h>就可以实现内核级的原子操作,完全能满足我们的需求。

以下内容引自 http://www.linuxidc.com/Linux/2011-06/37402.htm

但是在Linux2.6.18之后,删除了<asm/atomic.h><asm/bitops.h>GCC提供了内置的原子操作函数,更适合用户态的程序使用。现在atomic.h在内核头文件中,不在gcc默认搜索路径下,即使像下面这样强行指定路径,还是会出现编译错误。

 

#include</usr/src/linux-headers-2.6.27-7/include/asm-x86/atomic.h> 

gcc4.1.2提供了__sync_*系列的built-in函数,用于提供加减和逻辑运算的原子操作。

 

可以对1,2,48字节长度的数值类型或指针进行原子操作,其声明如下

 

type __sync_fetch_and_add (type *ptr, type value, ...)  

type __sync_fetch_and_sub (type *ptr, type value, ...)  

type __sync_fetch_and_or (type *ptr, type value, ...)  

type __sync_fetch_and_and (type *ptr, type value, ...)  

type __sync_fetch_and_xor (type *ptr, type value, ...)  

type __sync_fetch_and_nand (type *ptr, type value,...)  

          { tmp =*ptr; *ptr op= value; return tmp; }  

          { tmp =*ptr; *ptr = ~tmp & value; return tmp; }  // nand  

 

type __sync_add_and_fetch (type *ptr, type value, ...)  

type __sync_sub_and_fetch (type *ptr, type value, ...)  

type __sync_or_and_fetch (type *ptr, type value, ...)  

type __sync_and_and_fetch (type *ptr, type value, ...)  

type __sync_xor_and_fetch (type *ptr, type value, ...)  

type __sync_nand_and_fetch (type *ptr, type value,...)  

          { *ptr op=value; return *ptr; }  

          { *ptr =~*ptr & value; return *ptr; }   //nand 

这两组函数的区别在于第一组返回更新前的值,第二组返回更新后的值,下面的示例引自这里http://www.linuxidc.com/Linux/2011-06/37403.htm

 


对于使用atomic.h的老代码,可以通过宏定义的方式,移植到高内核版本的linux系统上,例如

 

#define atomic_inc(x) __sync_add_and_fetch((x),1)  

#define atomic_dec(x) __sync_sub_and_fetch((x),1)  

#define atomic_add(x,y) __sync_add_and_fetch((x),(y))  

#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) 

 

以上内容引自 http://www.linuxidc.com/Linux/2011-06/37402.htm

 

#include <stdio.h>  
#include <pthread.h>
#include <stdlib.h>

static int count = 0;

void *test_func(void *arg)
{
int i=0;
for(i=0;i<20000;++i){
__sync_fetch_and_add(&count,1);
}
returnNULL;
}

int main(int argc, const char *argv[])
{
pthread_tid[20];
int i =0;

for(i=0;i<20;++i){
pthread_create(&id[i],NULL,test_func,NULL);
}

for(i=0;i<20;++i){
pthread_join(id[i],NULL);
}

printf("%d\n",count);
return0;
}

基于GCC编译器,我们可以自己定义实现一个多线程安全的原子计数器。实现代码如下

my_stomic.hpp

#ifndefMY_ATOMIC_H
#defineMY_ATOMIC_H

class my_atmic
{
public:
volatile int m_coun;
explicit my_atmic(int cunn)
:m_coun(cunn)
{}
~my_atmic()
{}
my_atmic& operator++()
{
__sync_fetch_and_add(&m_coun,1);
return *this;
}
my_atmic& operator--()
{
__sync_fetch_and_sub(&m_coun,1);
return *this;
}
my_atmic operator ++(int)
{
my_atmic ret(this->m_coun);
++*this;
return ret;
}
my_atmic operator--(int)
{
my_atmic ret(this->m_coun);
--*this;
return ret;
}
my_atmic& operator+=(constmy_atmic& m)
{
__sync_fetch_and_add(&m_coun,m.m_coun);
return *this;
}
my_atmic& operator +=(int m)
{
__sync_fetch_and_add(&m_coun,m);
return *this;
}
my_atmic& operator-=(constmy_atmic& m)
{
__sync_fetch_and_sub(&m_coun,m.m_coun);
return *this;
}
my_atmic& operator -=(int m)
{
__sync_fetch_and_sub(&m_coun,m);
return *this;
}
my_atmic& operator=(const my_atmic&m)
{
this->m_coun = m.m_coun;
return *this;
}
my_atmic& operator=(int m)
{
this->m_coun = m;
return *this;
}
private:
my_atmic(){}
my_atmic(const my_atmic& );

};
#endif //MY_ATOMIC_H


 

测试代码 main.cpp

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<getopt.h>
#include <pthread.h>



#include"my_atomic.h"
intthread_num=100;

typedef struct_Count
{
unsigned int m;
unsigned int n;
} Count;
typedef struct_Thread
{
pthread_t thread_id;
Count count;
} Thread;

Thread* thread;


void*atomic_start( void* count);

my_atmic y(0);

int z = 0;
int main(intargc, char** argv)
{

thread=(Thread*)malloc(thread_num*sizeof(Thread));
memset(thread,0,thread_num*sizeof(Thread));

int i;
for(i=0;i<thread_num;i++)
{
if(pthread_create(&((thread+i)->thread_id),NULL,atomic_start,&((thread+i)->count)))
{
printf("threadcreate faild: %d",i--);
}
}

for(int i=0;i<100;i++)
{
printf("%d\n",y.m_coun);
usleep(1);
}

return 0;
}



void*atomic_start( void* count)
{
pthread_t thread_id=pthread_self();
pthread_detach(thread_id);

int i;
for(i=1000;i>0;i--)
{
//atomic_inc(&x);
//y++;
//y--;
//++y;
//--y;
y+=2;
//++z;
}
return NULL;
}


WINDOWS

         Windows下也提供了原子操作,在网上我找到一篇讲的非常好的文章,就不再罗说了。直接引用过来。

         以下转自http://blog.csdn.net/asce1885/article/details/5729912

所谓原子访问,指的是一个线程在访问某个资源的同时能够保证没有其他线程会在同一时刻访问同一资源。Interlocked系列函数提供了这样的操作。所有这些函数会以原子方式来操控一个值。

Interlocked函数的工作原理取决于代码运行的CPU平台,如果是x86系列CPU,那么Interlocked函数会在总线上维持一个硬件信号,这个信号会阻止其他CPU访问同一个内存地址。我们必须确保传给这些函数的变量地址是经过对齐的,否则这些函数可能会失败。C运行库提供了一个_aligned_malloc函数,我们可以使用这个函数来分配一块对齐过的内存:

void * _aligned_malloc(

   size_t size,  //要分配的字节数

   size_t alignment //要对齐到的字节边界,传给alignment的值必须是2的整数幂次方

);

Interlocked函数的另一个需要注意的点是它们执行得很快。调用一次Interlocked函数通常只占用几个CPU周期(通常小于50),而且不需要在用户模式和内核模式之间进行切换(这个切换通常需要占用1000个CPU周期以上)。

 

1)原子加减操作InterlockedExchangeAdd函数原型如下:

LONG __cdecl InterlockedExchangeAdd( //对32位值进行操作

 __inout  LONG volatile *Addend, //需要递增的变量地址

 __in     LONG Value //增量值,可为负值表示减法

);

 

LONGLONG __cdecl InterlockedExchangeAdd64(//对64位值进行操作

 __inout  LONGLONG volatile*Addend,

 __in     LONGLONG Value

);

 

2)InterlockedExchange函数用于原子地将32位整数设为指定的值:

LONG __cdecl InterlockedExchange(

 __inout  LONG volatile *Target, //指向要替换的32位值的指针

 __in     LONG Value //替换的值

);

返回值是指向原先的32位整数值。

 

InterlockedExchangePointer函数原子地用于替换地址值:

PVOID __cdecl InterlockedExchangePointer(

 __inout  PVOID volatile *Target,//指向要替换的地址值的指针

 __in     PVOID Value //替换的地址值

);

返回值是原来的地址值。

对32位应用程序来说,以上两个函数都用一个32位值替换另一个32位值,但对64位应用程序来说,InterlockedExchange替换的是32位值,而InterlockedExchangePointer替换的是64位值。

 

当然,还有一个函数InterlockedExchange64专门用来原子地操作64位值的:

LONGLONG __cdecl InterlockedExchange64(

 __inout  LONGLONG volatile*Target,

 __in     LONGLONG Value

);

 

在实现旋转锁时,InterlockedExchange函数极其有用:

//标识一个共享资源是否正在被使用的全局变量

BOOL g_fResourceInUse = FALSE;

...

void ASCEFunc()

{

        //等待访问共享资源

        while(InterlockedExchange(&g_fResourceInUse, TRUE) == TRUE)

                   sleep(0);

        //访问共享资源

        ...

        //结束访问

        InterlockedExchange(&g_fResourceInUse, FALSE);

}

注意,在使用这项技术时要小心,因为旋转锁会耗费CPU时间。特别是在单CPU机器上应该避免使用旋转锁,如果一个线程不停地循环,那么这会浪费宝贵的CPU时间,而且会阻止其他线程改变该锁的值。

 

3)函数InterlockedCompareExchange函数和InterlockedCompareExchangePointer函数原型如下:

LONG __cdecl InterlockedCompareExchange(

 __inout  LONG volatile*Destination, //当前值

 __in     LONG Exchange, //

 __in     LONG Comparand //比较值

);

PVOID __cdeclInterlockedCompareExchangePointer(

 __inout  PVOID volatile*Destination,

 __in     PVOID Exchange,

 __in     PVOID Comparand

);

这两个函数以原子方式执行一个测试和设置操作。对32位应用程序来说,这两个函数都对32位值进行操作;在64位应用程序中,InterlockedCompareExchange对32位值进行操作而InterlockedCompareExchangePointer对64位值进行操作。函数会将当前值(Destination指向的)与参数Comparand进行比较,如果两个值相同,那么函数会将*Destination修改为Exchange参数指定的值。若不等,则*Destination保持不变。函数会返回*Destination原来的值。所有这些操作都是一个原子执行单元来完成的。

当然,这两个函数的64位版本是:

LONGLONG __cdeclInterlockedCompareExchange64(

 __inout  LONGLONG volatile*Destination,

 __in     LONGLONG Exchange,

 __in     LONGLONG Comparand

);

 

4)Interlocked单向链表函数

InitializeSListHead函数用于创建一个空的单向链表栈:

void WINAPI InitializeSListHead(

 __inout  PSLIST_HEADER ListHead

);

 

InterlockedPushEntrySList函数在栈顶添加一个元素:

PSLIST_ENTRY WINAPIInterlockedPushEntrySList(

 __inout  PSLIST_HEADER ListHead,

 __inout  PSLIST_ENTRY ListEntry

);

 

InterlockedPopEntrySList函数移除位于栈顶的元素并将其返回:

PSLIST_ENTRY WINAPIInterlockedPopEntrySList(

 __inout  PSLIST_HEADER ListHead

);

 

InterlockedFlushSList函数用于清空单向链表栈:

PSLIST_ENTRY WINAPI InterlockedFlushSList(

 __inout  PSLIST_HEADER ListHead

);

 

QueryDepthSList函数用于返回栈中元素的数量:

USHORT WINAPI QueryDepthSList(

 __in  PSLIST_HEADER ListHead

);

 

单向链表栈中元素的结构是:

typedef struct _SLIST_ENTRY {

 struct _SLIST_ENTRY *Next;

} SLIST_ENTRY, *PSLIST_ENTRY;

注意:所有单向链表栈中的元素必须以MEMORY_ALLOCATION_ALIGNMENT方式对齐,使用_aligned_malloc函数即可。

 

实例如下:

#include <windows.h>

#include <malloc.h>

#include <stdio.h>

 

// Structure to be used for a list item;the first member is the

// SLIST_ENTRY structure, and additionalmembers are used for data.

// Here, the data is simply a signature fortesting purposes.

 

 

typedef struct _PROGRAM_ITEM {

   SLIST_ENTRY ItemEntry;

   ULONG Signature;

} PROGRAM_ITEM, *PPROGRAM_ITEM;

 

int main( )

{

   ULONG Count;

   PSLIST_ENTRY pFirstEntry, pListEntry;

   PSLIST_HEADER pListHead;

   PPROGRAM_ITEM pProgramItem;

 

   // Initialize the list header to a MEMORY_ALLOCATION_ALIGNMENT boundary.

   pListHead = (PSLIST_HEADER)_aligned_malloc(sizeof(SLIST_HEADER),

      MEMORY_ALLOCATION_ALIGNMENT);

   if( NULL == pListHead )

    {

       printf("Memory allocation failed./n");

       return -1;

    }

   InitializeSListHead(pListHead);

 

   // Insert 10 items into the list.

   for( Count = 1; Count <= 10; Count += 1 )

    {

       pProgramItem = (PPROGRAM_ITEM)_aligned_malloc(sizeof(PROGRAM_ITEM),

           MEMORY_ALLOCATION_ALIGNMENT);

       if( NULL == pProgramItem )

       {

           printf("Memory allocation failed./n");

           return -1;

        }

       pProgramItem->Signature = Count;

       pFirstEntry = InterlockedPushEntrySList(pListHead,

                      &(pProgramItem->ItemEntry));

    }

 

   // Remove 10 items from the list and display the signature.

   for( Count = 10; Count >= 1; Count -= 1 )

    {

       pListEntry = InterlockedPopEntrySList(pListHead);

 

       if( NULL == pListEntry )

       {

           printf("List is empty./n");

           return -1;

       }

 

       pProgramItem = (PPROGRAM_ITEM)pListEntry;

       printf("Signature is %d/n", pProgramItem->Signature);

 

   // This example assumes that the SLIST_ENTRY structure is the

   // first member of the structure. If your structure does not

   // follow this convention, you must compute the starting address

   // of the structure before calling the free function.

 

       _aligned_free(pListEntry);

    }

 

   // Flush the list and verify that the items are gone.

   pListEntry = InterlockedFlushSList(pListHead);

   pFirstEntry = InterlockedPopEntrySList(pListHead);

   if (pFirstEntry != NULL)

    {

       printf("Error: List is not empty./n");

       return -1;

    }

 

   _aligned_free(pListHead);

 

   return 1;

}

  以上转自http://blog.csdn.net/asce1885/article/details/5729912

C++11

多线程编程经常需要操作共享的内存,在读/写过程中会导致竞争条件。

例如:

int  counter = 0;

............

++counter;  //因为++counter不时原子操作,多个线程中出现此操作时不是线程安全的。

 

应该用:

atomic<int>   counter(0);  //等效于atomic_int   counter(0);

............

++counter;  //此时多个线程执行++counter是一个原子操作,是线程安全的。

例:

void  func( std::atomic<int>&  counter)

{  

for( int   i=0;  i<1000;   ++i )    

++counter;

}int  main()

{  std::atomic<int>   counter(0);  

std::vector<std::thread>  threads;  

for( int i=0;   i<10;   ++i )    //线程参数总是值传递,若要传递引用,须加std::ref()。(头文件<functional>中)    

threads.push_back( std::thread{ func, std::ref(counter)} );  

for( auto&   t  : threads )    

t.join();    //调用join,如果线程未结束,则main函数阻塞于此。  std::count<<"Result="<<counter<<std::endl;  

return 0;

}/*join的调用会导致调用线程阻塞于此,若不希望调用线程阻塞,但又想知道被调线程是否结束,应当用其它方式,例如消息...*/