《c++并发教程实战》之第3章 线程共享
3.1 共享问题
只读数据无问题。数据改动可能引发问题。
3.1.1 条件竞争
线程执行各自操作的结果取决于执行的相对次序。
3.1.2 防止恶性条件竞争
-
锁
采取保护措施包装数据结构,确保不变量被破坏时,中间状态只对执行改动的线程可见。 -
无锁
修改数据结构的设计及其不变量,由一连串不开拆分的改动完成数据变更,每个改动都维持不变量不被破坏。 -
修改数据结构当作事务(transaction,类似数据库,本书不考虑)处理。
3.2 互斥保护共享数据
访问数据结构钱,锁住互斥;访问结束后,解锁互斥。
3.2.1 使用互斥
3.1.cpp
#include <list>
#include <mutex>
#include <algorithm>
#include <iostream>
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}
int main()
{
add_to_list(42);
std::cout << "contains(1)=" << list_contains(1) << ", contains(42)=" << list_contains(42) << std::endl;
return 0;
}
3.2.2 组织和编排代码保护共享数据
不得向锁所在的作用域之外传递指针和引用,指向受保护的共享数据,无论是通过函数返回值将它们保存到对外可见的内存,还是将它们作为参数传递给使用者提供的函数。
3.2.cpp
#include <string>
#include <mutex>
class some_data
{
int a;
std::string b;
public:
void do_something() {}
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template <typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data);//向malicious_function函数传递了some_data引用
}
};
some_data *unprotected;
void malicious_function(some_data &protected_data)
{
unprotected = &protected_data;
}
data_wrapper x;
void foo()
{
x.process_data(malicious_function);//unprotected指向了x中data
unprotected->do_something();//不可控
}
int main()
{
foo();
return 0;
}
3.2.3 接口固有的条件竞争
3.3.cpp
#include <deque>
#include <cstddef>
template <typename T, typename Container = std::deque<T>>
class stack
{
public:
explicit stack(const Container &);
explicit stack(Container && = Container());
template <class Alloc> explicit stack(const Alloc &);
template <class Alloc> stack(const Container &, const Alloc &);
template <class Alloc> stack(Container &&, const Alloc &);
template <class Alloc> stack(stack &&, const Alloc &);
bool empty() const;
size_t size() const;
T &top();
T const &top() const;
void push(T const &);
void push(T &&);
void pop();
void swap(stack &&);
template<class... Args> void emplace(Args&&... args);
};
int main()
{
return 0;
}
stact<int> s;
if(!s.empty()) { //1
int const value = s.top(); //2
s.pop(); //3
do_something(value);
}
多线程情况下,1,2,3之间交替运行会出错。
消除条件竞争的方法:
- 传入引用
std::vector<int> result;
some_stack.pop(result);
短处:构造result需要代价,栈容器存储类别要可赋值。
-
提供不抛出异常的拷贝构造函数或移动构造函数
-
返回指针,指向弹出的元素
指针可*的复制,建议使用std::shared_ptr。
- 结合方法1和方法2,或结合方法1和方法3
template <typename T>
class stack {
public:
bool pop(T &t) {
lock_guard<mutex> l(m);
if(data.empty())
return false;
t = std::move(data.top()); //T需要移动构造函数
data.pop();
return true;
}
private:
deque<T> data;
mutex m;
}
- 线程安全的栈容器类
简要定义
3.4.cpp
#include <exception>
#include <memory>
struct empty_stack : std::exception
{
const char *what() const throw();
};
template <typename T>
class threadsafe_stack
{
public:
threadsafe_stack();
threadsafe_stack(const threadsafe_stack &);
threadsafe_stack &operator=(const threadsafe_stack &) = delete;
void push(T new_value);
std::shared_ptr<T> pop();
void pop(T &value);
bool empty() const;
};
int main()
{
return 0;
}
详尽定义
3.5.cpp
#include <exception>
#include <stack>
#include <mutex>
#include <memory>
#include <iostream>
using namespace std;
struct empty_stack : std::exception
{
const char *what() const throw()
{
return "empty stack";
}
};
template <typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack &other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
threadsafe_stack &operator=(const threadsafe_stack &) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T &value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
int main()
{
threadsafe_stack<int> si;
si.push(5);
si.push(6);
auto data = si.pop();
cout << *data << endl;
if (!si.empty())
{
int x;
si.pop(x);
cout << x << endl;
}
//error
//si.pop();
return 0;
}
3.2.4 死锁问题和解决方法
为了某项操作而对多个互斥加锁,都锁住一个互斥,等着给另一个互斥加锁,双方苦等对方解锁互斥,形成死锁。
常见解决方法,始终按相同顺序对两个互斥加锁;同时锁住多个互斥。
3.6.cpp
#include <mutex>
class some_big_object
{
};
void swap(some_big_object &lhs, some_big_object &rhs)
{
}
class X
{
private:
some_big_object some_detail;
mutable std::mutex m;
public:
X(some_big_object const &sd) : some_detail(sd) {}
friend void swap(X &lhs, X &rhs)
{
if (&lhs == &rhs)
return;
// c++11
std::lock(lhs.m, rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
// c++17
// std::scoped_lock guard(lhs.m, rhs.m);
// std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);
swap(lhs.some_detail, rhs.some_detail);
}
};
int main()
{
return 0;
}
3.2.5 防范死锁的补充原则
只要另一线程有可能正在等待当前线程,那么当前线程千万不能反过来等待它。
- 避免嵌套锁
已持有锁,则不用试图获取第二个锁(每个线程最多一个锁)。需获取多个锁,调用std::lock()函数,一次同时获取全部锁。
- 持锁后,避免调用用户接口
用户程序接口可能随意操作,包括试图获取锁。
-
固定顺序获取锁
-
按层级加锁
3.7.cpp
#include <mutex>
class hierarchical_mutex
{
public:
explicit hierarchical_mutex(unsigned level)
{
}
void lock()
{
}
void unlock()
{
}
};
hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(5000);
int do_low_level_stuff()
{
return 42;
}
int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}
void high_level_stuff(int some_param)
{
}
void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
high_level_stuff(low_level_func());
}
//锁住高层high_level_mutex,后锁住低层low_level_mutex
//先调用低层函数low_level_func,后调用高层函数high_level_stuff(int)
void thread_a()
{
high_level_func();
}
hierarchical_mutex other_mutex(100);
void do_other_stuff()
{
}
void other_stuff()
{
high_level_func();
do_other_stuff();
}
//无视规则,先锁住最低层other_mutex
void thread_b()
{
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff();
}
int main()
{
return 0;
}
3.8.cpp
#include <mutex>
#include <stdexcept>
#include <climits>
class hierarchical_mutex
{
std::mutex internal_mutex;
unsigned long const hierarchy_value;
//临时保存线程值this_thread_hierarchy_value
unsigned long previous_hierarchy_value;
//thread_local线程值
static thread_local unsigned long this_thread_hierarchy_value;
//未按照从大到小的层级加锁时,会报错
void check_for_hierarchy_violation()
{
if (this_thread_hierarchy_value <= hierarchy_value)
{
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value()
{
previous_hierarchy_value = this_thread_hierarchy_value;
this_thread_hierarchy_value = hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value) : hierarchy_value(value),
previous_hierarchy_value(0)
{
}
void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
void unlock()
{
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if (!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
int main()
{
hierarchical_mutex m1(42);
hierarchical_mutex m2(2000);
return 0;
}
- 将准则推广到操作以外
死锁现象并不单单因加锁操作而发生,任何同步机制导致的循环等待都会导致死锁出现。
3.2.6 std::unique_lock<>灵活加锁
可完全替代std::lock_guard(优先使用lock_guard),有性能损失。
mutex mtx;
mtx.lock();
//std::adopt_lock表明mtx已调用lock(),lock构造函数不再调用lock()
std::unique_lock<std::mutex> lock(mtx, std::adopt_lock);
mutex mtx;
//std::defer_lock表明lock构造函数不调用lock(),后面会调用
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
3.9.cpp
#include <mutex>
class some_big_object
{
};
void swap(some_big_object &lhs, some_big_object &rhs)
{
}
class X
{
private:
some_big_object some_detail;
mutable std::mutex m;
public:
X(some_big_object const &sd) : some_detail(sd) {}
friend void swap(X &lhs, X &rhs)
{
if (&lhs == &rhs)
return;
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
std::lock(lock_a, lock_b);
swap(lhs.some_detail, rhs.some_detail);
}
};
int main()
{
return 0;
}
3.2.7 不同作用域间转移互斥归属权
互斥归属权可在多个std::unique_lock实例间转移。
std::unique_lock<std::mutex> get_lock(){
extern std::mutex some_mutex:
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}
void process_data(){
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}
3.2.8 按合适粒度加锁
仅仅在访问共享数据期间才锁住互斥,让数据处理尽可能不用锁保护。
持锁期间,避免耗时操作,如读写文件。
void get_and_process_data(){
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process = get_next_data_chunk();
my_lock.unlock();
result_type result = process(data_to_process);
my_lock.lock();
write_result(data_to_process, result);
}
3.10.cpp
#include <mutex>
class Y
{
private:
int some_detail;
mutable std::mutex m;
int get_detail() const
{
std::lock_guard<std::mutex> lock_a(m);
return some_detail;
}
public:
Y(int sd) : some_detail(sd) {}
friend bool operator==(Y const &lhs, Y const &rhs)
{
if (&lhs == &rhs)
return true;
int const lhs_value = lhs.get_detail();
int const rhs_value = rhs.get_detail();
return lhs_value == rhs_value;
}
};
int main()
{
return 0;
}
3.3 其它工具保护共享数据
共享数据仅初始化(创建)过程中受到保护,之后无需保护(只读)。
3.3.1 初始化过程中保护共享数据
创建共享数据开销不菲(建立数据库连接、分配大量内存等),等到必要时(使用)才创建,延迟初始化(lazy initialization),常见于单线程。
std::shared_ptr<some_resource> resource_ptr;
void foo() {
if(!resource_ptr){
resource_ptr.reset(new some_resource());
}
resource_ptr->do_something();
}
互斥实现线程安全的延迟初始化
3.11.cpp
#include <memory>
#include <mutex>
struct some_resource
{
void do_something()
{
}
};
std::shared_ptr<some_resource> resource_ptr = nullptr;
std::mutex resource_mutex;
void foo()
{
resource_mutex.lock();
if (!resource_ptr)
resource_ptr.reset(new some_resource);
resource_mutex.unlock();
resource_ptr->do_something();
}
//双重检验锁定模式(double-checked locking pattern)
void undefined_behaviour_with_double_checked_locking()
{
if (!resource_ptr)
{
std::lock_guard<std::mutex> lk(resource_mutex);
if (!resource_ptr)
resource_ptr.reset(new some_resource);
}
resource_ptr->do_something();
}
int main()
{
foo();
undefined_behaviour_with_double_checked_locking();
return 0;
}
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;
void init_resource(){
resource_ptr.reset(new some_resource);
}
void foo(){
std::call_once(resource_flag, init_resource);
resource_ptr->do_something();
}
once_flag和call_once实现延迟初始化
3.12.cpp
#include <mutex>
struct connection_info
{
};
struct data_packet
{
};
struct connection_handle
{
void send_data(data_packet const &)
{
}
data_packet receive_data()
{
return data_packet();
}
};
struct remote_connection_manager
{
connection_handle open(connection_info const &)
{
return connection_handle();
}
} connection_manager;
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection = connection_manager.open(connection_details);
}
public:
X(connection_info const &connection_details_) : connection_details(connection_details_)
{
}
void send_data(data_packet const &data)
{
std::call_once(connection_init_flag, &X::open_connection, this);
connection.send_data(data);
}
data_packet receive_data()
{
std::call_once(connection_init_flag, &X::open_connection, this);
return connection.receive_data();
}
};
int main()
{
X x(connection_info{});
x.send_data(data_packet{});
auto data = x.receive_data();
return 0;
}
代替std::call_once()
class my_class{
};
my_class& get_my_class_instance(){
static my_class instance;
return instance;
}
3.3.2 保护甚少更新的数据结构(读写锁)
C++11无,C++14增加std::shared_timed_mutex,C++17增加std::shared_mutex。
读锁(共享锁)std::shared_lockstd::shared_mutex
写锁(排他锁)std::lock_guardstd::shared_mutex和std::unique_guardstd::shared_mutex
3.13.cpp
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>
class dns_entry
{
};
class dns_cache
{
std::map<std::string, dns_entry> entries;
std::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const &domain)
{
std::shared_lock<std::shared_mutex> lk(entry_mutex);
std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain);
return (it == entries.end()) ? dns_entry() : it->second;
}
void update_or_add_entry(std::string const &domain, dns_entry const &dns_details)
{
std::lock_guard<std::shared_mutex> lk(entry_mutex);
entries[domain] = dns_details;
}
};
int main()
{
return 0;
}
3.3.3 递归加锁
互斥的同一实例多次加锁,一般是设计需要修改。
std::lock_guard<std::recursive_mutex>