目录
- 传统同步方案的缺点
- folly/Synchronized.h 简单使用
- Synchronized的模板参数
- withLock()/withRLock()/withWLock() —— 更易用的加锁方式
- 升级锁
- ulock()和 withULockPtr()
- Timed Locking
- Synchronized 与 std::condition_variable
- acquireLocked() —— 同时锁多个数据
- 使用一把锁,锁多个数据
- struct
- std::tuple
- Benchmark
folly/Synchronized.h 提供了一种更简单、更不容易出错的同步机制,可以用来替代传统 C++标准库中使用较复杂、较容易出错的同步机制。
传统同步方案的缺点
一般是将需要同步的数据和锁一一配对,即 —— associate mutexes with data, not code :
class RequestHandler {
...
std::mutex requestMutex_;
RequestQueue requestQueue_;
processRequest(const Request& request);
};
void RequestHandler::processRequest(const Request& request) {
std::lock_guard<std::mutex> lg(requestMutex_);
requestQueue_.push_back(request);
}
然而,操作这些数据成员,开发人员必须注意,正确的获取锁、获取正确的锁。
一些常见的错误包括:
- 操作数据之前没有获取锁。
- 获取了不配对的锁,这个锁不是用来锁这个数据的。
- 获取了读锁,但是试图去修改数据。
- 获取了写锁,但是对数据只有 const access.
一般在使用时,需要提醒开发人员:“别忘了 xxxx”,那一般都会出错,比如 new 的对象别忘了 delete : )
folly/Synchronized.h 简单使用
上面的代码可以用 folly/Synchronized.h 重写为:
class RequestHandler {
folly::Synchronized<RequestQueue> requestQueue_;
processRequest(const Request& request);
};
void RequestHandler::processRequest(const Request& request) {
requestQueue_.wlock()->push_back(request);
}
为什么 folly/Synchronized.h 更加有效呢?
- 与传统使用方式不同,这里锁和数据是结合成了一个对象 —— requestQueue_。传统方案中,需要寻找锁和数据的配对关系。
- 几乎不可能在不获取锁的情况下,去操作数据,还是因为它们被封装成了一个对象。传统方案加不加锁全靠自觉。
- 在 push_back 后,锁立即被释放。
如果在临界区有多个操作,那么可以使用如下方法:
{
auto lockedQueue = requestQueue_.wlock();
lockedQueue->push_back(request1);
lockedQueue->push_back(request2);
}
wlock 返回一个 LockedPtr 对象,这个对象可以被理解为指向数据成员的指针。只有这个对象存在,那么锁就会被锁住,所以最好为这个对象显示定义一个 scope.
更好的方式,是使用 lambdas :
void RequestHandler::processRequest(const Request& request) {
requestQueue_.withWLock([&](auto& queue "&") {
// withWLock() automatically holds the lock for the
// duration of this lambda function
queue.push_back(request);
});
}
使用 withWLock 配合 lambdas 强制定义了一个 scope,更清晰。
Synchronized的模板参数
Synchronized 有两个模板参数,数据类型和锁类型:
template <class T, class Mutex = SharedMutex>
如果不指定第二个模板参数,默认是 folly::SharedMutex。只要被 folly::LockTraits 支持的都可以使用,比如 std::mutex、std::recursive_mutex、std::timed_mutex,。std::recursive_timed_mutex、folly::SharedMutex、folly::RWSpinLock、folly::SpinLock.
根据锁类型的不同,Synchronized 会提供不同的 API:
- 共享锁和升级锁:如果存在 lock_shared()成员函数,Synchronized 会提供 wlock(),rlock(),ulock()三个方法来获取不同的锁类型。其中,rlock()只提供对数据成员 const access.
- 排他锁:lock()
withLock()/withRLock()/withWLock() —— 更易用的加锁方式
withLock()在上面提到过了,可以用来替代 lock()。在持有锁的期间,执行一个 lambda 或者 function. withRLock()/withWLock()同理可以替代 rlock()/wlock().
我们再详细说一下这种方式的好处。下面的函数将 vector 里的所有元素都 double:
auto locked = vec.lock();
for (int& n : *locked) {
n *= 2;
}
使用 lock()/wlock()/rlock()的一个重要注意事项:一个指向数据的指针或者引用,它的生命周期一定不要比 LockedPtr 对象长(lock()/wlock()/rlock()的返回值类型)。 如果我们将上面的例子这样写就会出问题:
// No. NO. NO!
for (int& n : *vec.wlock()) {
n *= 2;
}
vec.wlock()返回的 LockPtr 对象在 range iterators 建立后就销毁了(详细解释见 Range-based for loop Temporary range expression 小节),range iterators 指向了 vector data,但此时锁已经被释放。想想如果要 debug 这种问题,会用多少时间
这时 withLock()/withRLock()/withWLock()的好处就体现出来了,锁会在 for loop 期间一直持有:
vec.withLock([](auto& data "") {
for (int& n : data) {
n *= 2;
}
});
withLock 定义为(withRLock/withWLock 类似):
/**
* Invoke a function while holding the lock.
*
* A reference to the datum will be passed into the function as its only
* argument.
*
* This can be used with a lambda argument for easily defining small critical
* sections in the code. For example:
*
* auto value = obj.withLock([](auto& data "") {
* data.doStuff();
* return data.getValue();
* });
*/
template <class Function>
auto withLock(Function&& function) {
return function(*lock());
}
template <class Function>
auto withLock(Function&& function) const {
return function(*lock());
}
升级锁
ulock()和 withULockPtr()
Synchronized 还支持升级锁。升级锁与共享锁可以共存,但是与排它锁互斥。
/**
* An enum to describe the "level" of a mutex. The supported levels are
* Unique - a normal mutex that supports only exclusive locking
* Shared - a shared mutex which has shared locking and unlocking functions;
* Upgrade - a mutex that has all the methods of the two above along with
* support for upgradable locking
*/
enum class MutexLevel { UNIQUE, SHARED, UPGRADE };
升级锁解决的问题是:先对数据进行读操作,然后根据一定的条件会进行写操作。
升级锁可以通过 uclock()或者 withULockPtr()获得:
{
// only const access allowed to the underlying object when an upgrade lock
// is acquired
auto ulock = vec.ulock();
auto newSize = ulock->size();
}
auto newSize = vec.withULockPtr([](auto ulock "") {
// only const access allowed to the underlying object when an upgrade lock
// is acquired
return ulock->size();
});
通过下面的函数可以进行升级或者降级:
- moveFromUpgradeToWrite()
- moveFromWriteToUpgrade()
- moveFromWriteToRead() // withWLockPtr()获得的 wlock 可以调用此函数降级为 rlock
- moveFromUpgradeToRead()
调用这些函数的 LockedPtr 会被设置为 invalid null state,并返回另一个锁住特定锁的 LockedPtr。这些操作都是原子性的,中间不会出现 unlocked 状态。
比如现在有一个 cache,数据结构为 unordered_map,需求是先检查对应的 key 是否在 unordered_map 中,如果在则返回对应的 value,不在则初始化 value 为 0:
folly::Synchronized<std::unordered_map<int64_t, int64_t>> cache;
int64_t res = cache.withULockPtr([key,value](auto ulock "key,value") {
int64_t cache_value;
auto iter = ulock->find(key);
if (iter != ulock->end()) {
cache_value = iter->second;
} else {
cache_value = 0;
// ulock is now null
auto wlock = ulock.moveFromUpgradeToWrite();
(*wlock)[key] = cache_value;
}
return cache_value;
});
Timed Locking
如果初始化 Synchronized 的锁类型支持时间,lock()/wlock()/rlock()可以传入一个类型为 std::chrono::duration 的参数:
void fun(Synchronized<vector<string>>& vec) {
{
auto locked = vec.lock(10ms);
if (!locked) {
throw std::runtime_error("failed to acquire lock");
}
locked->push_back("hello");
locked->push_back("world");
}
LOG(INFO) << "successfully added greeting";
}
Synchronized 与 std::condition_variable
如果 Synchronized 的锁类型是 std::mutex,那么可以和 std::condition_variable 配合使用。
Synchronized<vector<string>, std::mutex> vec;
std::condition_variable emptySignal;
// Assuming some other thread will put data on vec and signal
// emptySignal, we can then wait on it as follows:
auto locked = vec.lock();
emptySignal.wait(locked.getUniqueLock(),
[&] { return !locked->empty(); });
getUniqueLock()返回一个 std::unique_lockstd::mutex的引用。但是不推荐这么使用,因为这绕过了 Synchronized 的 API,可以直接操作对应的锁:
/**
* Get a reference to the std::unique_lock.
*
* This is provided so that callers can use Synchronized<T, std::mutex>
* with a std::condition_variable.
*
* While this API could be used to bypass the normal Synchronized APIs and
* manually interact with the underlying unique_lock, this is strongly
* discouraged.
*/
std::unique_lock<std::mutex>& getUniqueLock() { return lock_; }
acquireLocked() —— 同时锁多个数据
假如需要将一个 vector 的数据拷贝到另一个 vector,wlock()可能会实现需求:
void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
auto lockedA = a.wlock();
auto lockedB = b.wlock();
... use lockedA and lockedB ...
}
但是如果一个线程调用 fun(x,y),另一个线程调用 func(y,x),就很有可能出现死锁。经典的解决方式是,所有的线程以同样的顺序获取锁。许多库的实现是通过比较锁地址的大小来决定加锁顺序:
void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
auto ret = folly::acquireLocked(a, b);
auto& lockedA = std::get<0>(ret);
auto& lockedB = std::get<1>(ret);
... use lockedA and lockedB ...
}
// 实现:通过比较锁地址的大小
/**
* Acquire locks for multiple Synchronized<T> objects, in a deadlock-safe
* manner.
*
* The locks are acquired in order from lowest address to highest address.
* (Note that this is not necessarily the same algorithm used by std::lock().)
* For parameters that are const and support shared locks, a read lock is
* acquired. Otherwise an exclusive lock is acquired.
*
* use lock() with folly::wlock(), folly::rlock() and folly::ulock() for
* arbitrary locking without causing a deadlock (as much as possible), with the
* same effects as std::lock()
*/
template <class Sync1, class Sync2>
std::tuple<detail::LockedPtrType<Sync1>, detail::LockedPtrType<Sync2>>
acquireLocked(Sync1& l1, Sync2& l2) {
if (static_cast<const void*>(&l1) < static_cast<const void*>(&l2)) {
auto p1 = l1.contextualLock();
auto p2 = l2.contextualLock();
return std::make_tuple(std::move(p1), std::move(p2));
} else {
auto p2 = l2.contextualLock();
auto p1 = l1.contextualLock();
return std::make_tuple(std::move(p1), std::move(p2));
}
}
C++17 引入了 structured binding syntax,可以使代码更简单:
void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
auto [lockedA, lockedB] = folly::acquireLocked(a, b);
... use lockedA and lockedB ...
}
acquireLockedPair()返回 std::pair,在不支持 C++17 的编译器情况下,使用也很方便。
使用一把锁,锁多个数据
比如一个 bidirectional map,需要同时操作。一般有两个方案:
Struct
class Server {
struct BiMap {
map<int, string> direct;
map<string, int> inverse;
};
Synchronized<BiMap> bimap_;
...
};
...
bimap_.withLock([](auto& locked "") {
locked.direct[0] = "zero";
locked.inverse["zero"] = 0;
});
std::tuple
class Server {
Synchronized<tuple<map<int, string>, map<string, int>>> bimap_;
...
};
...
bimap_.withLock([](auto& locked "") {
get<0>(locked)[0] = "zero";
get<1>(locked)["zero"] = 0;
});
Benchmark
下篇文章写一下 Synchronized 的基本实现 :)
参考资料:
(完)
朋友们可以关注下我的公众号,获得最及时的更新: