C++中对共享数据的存取在并发条件下可能会引起data race的undifined行为,需要限制并发程序以某种特定的顺序执行,有两种方式:使用mutex保护共享数据,原子操作:针对原子类型操作要不一步完成,要么不做,不可能出现操作一半被切换CPU,这样防止由于多线程指令交叉执行带来的可能错误。非原子操作下,某个线程可能看见的是一个其它线程操作未完成的数据。
1 关于bool的原子化
1.1 std::atomic_flag是一个bool原子类型有两个状态:set(flag=true) 和 clear(flag=false),必须被ATOMIC_FLAG_INIT初始化此时flag为clear状态,相当于静态初始化。一旦atomic_flag初始化后只有三个操作:test_and_set,clear,析构,均是原子化操作。atomic_flag::test_and_set检查flag是否被设置,若被设置直接返回true,若没有设置则设置flag为true后再返回false。atomic_clear()清楚flag标志即flag=false。不支持拷贝、赋值等操作,这和所有atomic类型一样,因为两个原子类型之间操作不能保证原子化。atomic_flag的可操作性不强导致其应用局限性,还不如atomic<bool>。
使用atomic_flag作为简单的自旋锁例子:本线程可以对flag设置了就跳出循环,避免使用mutex导致线程阻塞
#include <iostream> // std::cout
#include <atomic> // std::atomic_flag
#include <thread> // std::thread
#include <vector> // std::vector
#include <sstream> // std::stringstream
std::atomic_flag lock_stream = ATOMIC_FLAG_INIT;//flag处于clear状态,没有被设置过
std::stringstream stream;
void append_number(int x) {
while (lock_stream.test_and_set()) {}//检查并设置是个原子操作,如以前没有设置过则退出循环,
//每个线程都等待前面一个线程将lock_stream状态清楚后跳出循环
stream << "thread #" << x << '\n';
lock_stream.clear();}
int main (){
std::vector<std::thread> threads;
for (int i=1; i<=10; ++i)
threads.push_back(std::thread(append_number,i));
for (auto& th : threads) th.join(); std::cout << stream.str(); return 0;
}
采用class封装可以用于lock_guard或unique_lock,但是最好不要将此用于任何竞态条件下,这是一个busy loop!
class spinlock_mutex
{
std::atomic_flag flag;
public:
spinlock_mutex():
flag(ATOMIC_FLAG_INIT){}
void lock()
{
while(flag.test_and_set(std::memory_order_acquire));
}
void unlock()
{
flag.clear(std::memory_order_release);
}
};
2 atomic<T>模板类,生成一个T类型的原子对象,并提供了系列原子操作函数。其中T是trivially copyable type满足:要么全部定义了拷贝/移动/赋值函数,要么全部没定义;没有虚成员;基类或其它任何非static成员都是trivally copyable。典型的内置类型bool、int等属于trivally copyable。再如class triviall{public: int x};也是。T能够被memcpy、memcmp函数使用,从而支持compare/exchange系列函数。有一条规则:不要在保护数据中通过用户自定义类型T通过参数指针或引用使得共享数据超出保护的作用域。atomic<T>编译器通常会使用一个内部锁保护,而如果用户自定义类型T通过参数指针或引用可能产生死锁。总之限制T可以更利于原子指令。注意某些原子操作可能会失败,比如atomic<float>、atomic<double>在compare_exchange_strong()时和expected相等但是内置的值表示形式不同于expected,还是返回false,没有原子算术操作针对浮点数;同理一些用户自定义的类型T由于内存的不同表示形式导致memcmp失败,从而使得一些相等的值仍返回false。
atomic<T>的成员函数:
template < class T > struct atomic {
bool is_lock_free() const volatile;//判断atomic<T>中的T对象是否为lock free的,若是返回true。lock free(锁无关)指多个线程并发访问T不会出现data race,任何线程在任何时刻都可以不受限制的访问T
bool is_lock_free() const;
atomic() = default;//默认构造函数,T未初始化,可能后面被atomic_init(atomic<T>* obj,T val )函数初始化
constexpr atomic(T val);//T由val初始化
atomic(const atomic &) = delete;//禁止拷贝
atomic & operator=(const atomic &) = delete;//atomic对象间的相互赋值被禁止,但是可以显示转换再赋值,如atomic<int> a=static_cast<int>(b)这里假设atomic<int> b
atomic & operator=(const atomic &) volatile = delete;//atomic间不能赋值
T operator=(T val) volatile;//可以通过T类型对atomic赋值,如:atomic<int> a;a=10;
T operator=(T val);
operator T() const volatile;//读取被封装的T类型值,是个类型转换操作,默认内存序是memory_order_seq需要其它内存序则调用load
operator T() const;//如:atomic<int> a,a==0或者cout<<a<<endl都使用了类型转换函数
//以下函数可以指定内存序memory_order
T exchange(T val, memory_order = memory_order_seq_cst) volatile;//将T的值置为val,并返回原来T的值
T exchange(T val, memory_order = memory_order_seq_cst);
void store(T val, memory_order = memory_order_seq_cst) volatile;//将T值设为val
void store(T val, memory_order = memory_order_seq_cst);
T load(memory_order = memory_order_seq_cst) const volatile;//访问T值
T load(memory_order = memory_order_seq_cst) const;
bool compare_exchange_weak(T& expected, T val, memory_order = memory_order_seq_cst) volatile;//该函数直接比较原子对象所封装的值与参数 expected 的物理内容,所以某些情况下,对象的比较操作在使用 operator==() 判断时相等,但 compare_exchange_weak 判断时却可能失败,因为对象底层的物理内容中可能存在位对齐或其他逻辑表示相同但是物理表示不同的值(比如 true 和 2 或 3,它们在逻辑上都表示"真",但在物理上两者的表示并不相同)。可以虚假的返回false(和expected相同)。若本atomic的T值和expected相同则用val值替换本atomic的T值,返回true;若不同则用本atomic的T值替换expected,返回false。
bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst);
bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst) volatile;//
与compare_exchange_weak 不同, strong版本的 compare-and-exchange 操作不允许(spuriously 地)返回 false,即原子对象所封装的值与参数 expected 的物理内容相同,比较操作一定会为 true。不过在某些平台下,如果算法本身需要循环操作来做检查, compare_exchange_weak 的性能会更好。因此对于某些不需要采用循环操作的算法而言, 通常采用compare_exchange_strong 更好
bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst);
};
cplusplus给出的例子之一:
// atomic::compare_exchange_weak example:
#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread
#include <vector> // std::vector
// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);
void append (int val) { // append an element to the list
Node* newNode = new Node {val,list_head};
// next is the same as: list_head = newNode, but in a thread-safe way:
while (!list_head.compare_exchange_weak(newNode->next,newNode)) {}
// (with newNode->next updated accordingly if some other thread just appended another node)
}
int main ()
{
// spawn 10 threads to fill the linked list:
std::vector<std::thread> threads;
for (int i=0; i<10; ++i) threads.push_back(std::thread(append,i));
for (auto& th : threads) th.join();
// print contents:
for (Node* it = list_head; it!=nullptr; it=it->next)
std::cout << ' ' << it->value;
std::cout << '\n';
// cleanup:
Node* it; while (it=list_head) {list_head=it->next; delete it;}
return 0;
}
程序输出:
9 8 7 6 5 4 3 2 1 0
3 std::atomic针对整数和指针的特化:
不能像传统那样拷贝和赋值,可以通过内置成员函数load(),store(),exchange()完成赋值,支持复合赋值运算,自增自减运算,还有特有的fetch系列函数
整型特化:
specializations | additional member functions |
---|---|
char extended integral types (if any) |
atomic::fetch_add atomic::fetch_sub atomic::fetch_and atomic::fetch_or atomic::fetch_xor atomic::operator++ atomic::operator-- operator (comp. assign.) |
指针特化:
specializations | additional member functions |
---|---|
U* (for any type U) |
atomic::fetch_add atomic::fetch_sub atomic::operator++ atomic::operator-- operator (comp. assign.) |
T fetch_add (T val, memory_order sync = memory_order_seq_cst) noexcept;//整型
T fetch_add (ptrdiff_t val, memory_order sync = memory_order_seq_cst) noexcept;//指针
将原子对象的封装值加 val,并返回原子对象的旧值(适用于整形和指针类型的 std::atomic 特化版本),整个过程是原子的
T fetch_and (T val, memory_order sync = memory_order_seq_cst) noexcept;//将原子对象的封装值按位与 val,并返回原子对象的旧值(只适用于整型的 std::atomic 特化版本),整个过程是原子的。
T fetch_or (T val, memory_order sync = memory_order_seq_cst) noexcept;//将原子对象的封装值按位或 val,并返回原子对象的旧值(只适用于整型的 std::atomic 特化版本),整个过程是原子的。
fetch_xor (T val, memory_order sync = memory_order_seq_cst) noexcept;//将原子对象的封装值按位异或 val,并返回原子对象的旧值(只适用于整型的 std::atomic 特化版本),整个过程是原子的。
operator++
pre-increment (1)
T operator++() volatile noexcept;
T operator++() noexcept;
post-increment (2)
T operator++ (int) volatile noexcept;
T operator++ (int) noexcept;
自增运算符重载, 第一种形式 (1) 返回自增后的值(即前缀++),第二种形式(2) 返回自增前的值(即后缀++),适用于整形和指针类型的 std::atomic 特化版本。
operator--
自减运算符重载, 第一种形式 (1) 返回自减后的值(即前缀--),第二种形式(2) 返回自减前的值(即后缀--),适用于整形和指针类型的 std::atomic 特化版本。
atomic::operator (comp. assign.)
复合赋值运算符重载,主要包含以下形式:
if T is integral (1)
T operator+= (T val) volatile noexcept;
T operator+= (T val) noexcept;
T operator-= (T val) volatile noexcept;
T operator-= (T val) noexcept;
T operator&= (T val) volatile noexcept;
T operator&= (T val) noexcept;
T operator|= (T val) volatile noexcept;
T operator|= (T val) noexcept;
T operator^= (T val) volatile noexcept;
T operator^= (T val) noexcept;
if T is pointer (2)
T operator+= (ptrdiff_t val) volatile noexcept;
T operator+= (ptrdiff_t val) noexcept;
T operator-= (ptrdiff_t val) volatile noexcept;
T operator-= (ptrdiff_t val) noexcept;
以上各个 operator 都会有对应的 fetch_* 操作,详细见下表:
操作符成员函数支持类型
复合赋值等价于整型指针类型其他类型
+atomic::operator+=atomic::fetch_add是是否
-atomic::operator-=atomic::fetch_sub是是否
&atomic::operator&=atomic::fetch_and是否否
|atomic::operator|=atomic::fetch_or是否否
^atomic::operator^=atomic::fetch_xor是否否
4 C风格的atomic类型及其操作,有点繁杂这里不赘述了,参看:点击打开链接和点击打开链接。前面成员函数前缀atomic_形成原子函数,函数的第一个参数必须是原子类型,如:
atomic_store (volatile atomic<T>* obj, T val)如果你需要显式指定内存序,应该使用atomic_store_explicit。所以前缀atomic_表示c风格的原子*函数,后缀_explicit指定内存序。
5 atomic<bool>,atomic<bool>同样不可以赋值(这里指两个atomic<bool>间的赋值)、拷贝,但是其可以直接初始化,如:atomic<bool> flag(false); flag=true。atomic_flag::test_and_set()被atomic::exchange()替代,更多操作见atomic<T>。
std::atomic<bool> b;compare_exchange_weak/strong函数是保证在比较和交换执行下原子化,但是此函数可能与expected值相等的情形下atomic的T值没有替换为val,这时atomic值未变且返回false,compare_exchange_weak可能失败,特别是线程数多于CPU核心数时compare-exchange这个指令序列可能CPU不能保证原子化,所以经常在循环中:
bool x=b.load(std::memory_order_acquire);
b.store(true);
x=b.exchange(false,std::memory_order_acq_rel);//更改为false并返回原来的值
bool expected=false;
extern atomic<bool> b; // set somewhere else
while(!b.compare_exchange_weak(expected,true) && !expected);
compare_exchange_strong可以保证当atomic不等于expected时返回false,不需要循环保护。 std::atomic_flag是lock free的,但是atomic<bool>不一定是lock free的,可以用atomic<T>::is_lock_free()判断。 一个例子的代码片段:读者写者
#include <vector>
#include <atomic>
#include <iostream>
std::vector<int> data;
std::atomic<bool> data_ready(false);
void reader_thread()
{
while(!data_ready.load())
{
std::this_thread::sleep(std::milliseconds(1));
}
std::cout<<”The answer=”<<data[0]<<”\n”;// 1
}
void writer_thread()
{
data.push_back(42);//2 由于1和2处发生了data race,所以需要线程同步,可以使用mutex,此处使用atomic<bool>强制线程间有个顺序关系
data_ready=true;
}
6 std::atomic<T*> 指针原子化类型,和atomic<bool>一样,其也是不可复制拷贝的,拥有is_lock_free,load,store,exchange,compare_exchange_weak/strong等成员函数,只不过内在成员换为指针类型T*。还提供了指针算术操作,fetch_add()、fetch_sub()内存地址的加减(都是原子操作),+=和-=两个复合赋值操作符,++和--的自增运算符。例如:std::atomic<Foo*> x; x+=3; 表示指向第四个Foo*并返回这个Foo*;x.fetch_add(3)表示x地址向后移动3个,但是返回原来的Foo*。
class Foo{};
Foo some_array[5];
std::atomic<Foo*> p(some_array);
Foo* x=p.fetch_add(2);
assert(x==some_array);
assert(p.load()==&some_array[2]);
x=(p-=1);
assert(x==&some_array[1]);
assert(p.load()==&some_array[1]);
7 整数原子类型(如std::atomic<long long>)和其它atomic一样,均有load,store,exchange,fetch_add,fetch_sub等成员函数,但是整数有自己的一些操作符: fetch_and(), fetch_or(),+=, -=, &=, |=,fetch_xor(),^=,++,--。和atomic<T*>一样,fetch_系列函数返回的是旧值,复合赋值运算返回的是新值。整数原子类型没有乘法、除法、位移操作,因为整数原子类型通常用于计数或者位标记。
8 注意std::shared_ptr<T>是原子类型,所以使用shared_ptr是线程安全的。C风格的*原子函数也适用于shared_ptr。
std::shared_ptr<my_data> p;
void process_global_data()
{
std::shared_ptr<my_data> local=std::atomic_load(&p);
process_data(local);
}
void update_global_data()
{
std::shared_ptr<my_data> local(new my_data);
std::atomic_store(&p,local);
}