深入探讨C++多线程性能优化
在现代软件开发中,多线程编程已成为提升应用程序性能和响应速度的关键技术之一。尤其在C++领域,多线程编程不仅能充分利用多核处理器的优势,还能显著提高计算密集型任务的效率。然而,多线程编程也带来了诸多挑战,特别是在性能优化方面。本文将深入探讨影响C++多线程性能的一些关键因素,比较锁机制与原子操作的性能。通过这些内容,希望能为开发者提供有价值的见解和实用的优化策略,助力于更高效的多线程编程实践。
先在开头给一个例子,你认为下面这段benchmark代码结果会是怎样的。这里的逻辑很简单,将0-20000按线程切成n片,每个线程在一个Set里查找这个数字存不存在,存在则计数+1。
#include <benchmark/benchmark.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
constexpr int kSetSize = 10000;
class MyBenchmark : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
std::call_once(flag, [this]() {
s = std::make_shared<std::unordered_set<int>>();
for (int i = 0; i < kSetSize; i++) {
s->insert(i);
}
});
}
std::shared_ptr<std::unordered_set<int>> GetSet() { return s; }
private:
std::shared_ptr<std::unordered_set<int>> s;
std::once_flag flag;
};
BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
for (auto _ : state) {
int size_sum = kSetSize * 2;
int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
int sum = 0;
int start = state.thread_index() * size_per_thread;
int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
for (int i = start; i < end; i++) {
auto inst = GetSet();
if (inst->count(i) > 0) {
sum++;
}
}
}
}
// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
BENCHMARK_MAIN();
跑出来的结果如下:
将任务切分成多个片段并交由多线程执行后,整体性能不仅没有提升,反而下降了,且性能与线程数成反比。那么,问题来了:导致这种结果的原因是什么?如何才能实现合理的并行执行,从而降低CPU的执行时间?在接下来的部分,笔者将为你揭示答案。
影响多线程性能的因素
笔者认为,影响多线程性能的主要因素有以下两个:
1.Lock Contention。
2.Cache Coherency。
Lock Contention对应使用锁来处理多线程同步问题的场景,而Cache Coherency则对应使用原子操作来处理多线程同步问题的场景。
Lock Contention
在多线程环境中,多个线程同时尝试获取同一个锁(Lock)时,会发生竞争现象,这就是所谓的锁竞争(Lock Contention)。锁竞争会导致线程或进程被阻塞,等待锁被释放,从而影响系统的性能和响应时间。大多数情况下,开发人员会选择使用锁来解决线程间的同步问题,因此锁竞争问题也变得广为人知且容易理解。由于锁的存在,位于临界区的代码在同一时刻只能由一个线程执行。因此,优化的思路就是尽量避免多个线程同时访问同一资源。常见的优化方向有两种:
1.减少临界区大小:临界区越小,这段代码的执行时间就越短,从而在整体程序运行时间中所占的比例也越小,冲突也就越少。
2.对共享资源进行分桶操作:每个线程只会在某个桶*问资源,理想情况下,每个线程都会访问不同的桶,这样就不会有冲突。
减少临界区大小需要开发者对自己的代码进行仔细思考,将不必要的操作放在临界区外,例如一些初始化和内存分配操作。
对共享资源进行分桶操作在工程实践中也非常常见。例如,LevelDB的LRUCache中,每个Key只会固定在一个桶上。如果hash函数足够优秀且数据分布足够随机,这种方法可以大大提高LRUCache的性能。
Cache Coherency
缓存一致性(Cache Coherency)是指在多处理器系统中,确保各个处理器的缓存中的数据保持一致的机制。由于现代计算机系统通常包含多个处理器,每个处理器都有自己的缓存(如L1、L2、L3缓存),因此在并发访问共享内存时,可能会出现缓存数据不一致的问题。缓存一致性协议旨在解决这些问题,确保所有处理器在访问共享内存时看到的是一致的数据。
当我们对一个共享变量进行写入操作时,实际上需要通过缓存一致性协议将该变量的更新同步到其他线程的缓存中,否则可能会读到不一致的值。实际上,这个同步过程的单位是一个缓存行(Cache Line),而且同步过程相对较慢,因为涉及到跨核通信。
由此引申出两个严重影响性能的现象:
1.Cache Ping-Pong。
2.False Sharing。
Cache Ping-Pong
缓存乒乓效应(Cache Ping-Pong)是指在多处理器系统中,多个处理器频繁地对同一个缓存行(Cache Line)进行读写操作,导致该缓存行在不同处理器的缓存之间频繁地来回传递。这种现象会导致系统性能下降,因为缓存行的频繁传递会引起大量的缓存一致性流量和处理器间通信开销。
讲到这里,其实就可以解释为什么开头那段代码会随着线程数量的增加而性能反而下降。代码中的变量 s 是一个共享资源,但它使用了 shared_ptr。在复制 shared_ptr 时,会引起引用计数的增加(计数+1),多个线程频繁对同一个缓存行进行读写操作,从而引发缓存乒乓效应,导致性能下降。最简单的修改方式就是去掉 shared_ptr,代码如下,同时还可以得到我们预期的结果,即CPU时间随着线程数的增加而降低:
#include <benchmark/benchmark.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
constexpr int kSetSize = 10000;
class MyBenchmark : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
std::call_once(flag, [this]() {
for (int i = 0; i < kSetSize; i++) {
s.insert(i);
}
});
}
const std::unordered_set<int>& GetSet() { return s; }
private:
std::unordered_set<int> s;
std::once_flag flag;
};
BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
for (auto _ : state) {
int size_sum = kSetSize * 2;
int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
int sum = 0;
int start = state.thread_index() * size_per_thread;
int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
for (int i = start; i < end; i++) {
const auto& inst = GetSet();
if (inst.count(i) > 0) {
benchmark::DoNotOptimize(sum++);
}
}
}
}
// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
BENCHMARK_MAIN();
False Sharing
伪共享(False Sharing)实际上是一种特殊的缓存乒乓效应(Cache Ping-Pong)。它指的是在多处理器系统中,多个处理器访问不同的数据,但这些数据恰好位于同一个缓存行中,导致该缓存行在不同处理器的缓存之间频繁传递。尽管处理器访问的是不同的数据,但由于它们共享同一个缓存行,仍然会引发缓存一致性流量,导致性能下降。
为了更好地理解这一现象,我们可以对上面的代码进行一些修改。假设我们使用一个 vector<atomic> 来记录不同线程的 sum 值,这样虽然不同线程修改的是不同的sum,但是还是在一个缓存行上。使用 atomic 是为了强制触发缓存一致性协议,否则操作系统可能会进行优化,不会立即将修改反映到主存。
#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
constexpr int kSetSize = 10000;
class MyBenchmark : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
std::call_once(flag, [this, &state]() {
for (int i = 0; i < kSetSize; i++) {
s.insert(i);
}
sums = std::vector<std::atomic<int>>(state.threads());
});
}
const std::unordered_set<int>& GetSet() { return s; }
std::vector<std::atomic<int>>& GetSums() { return sums; }
private:
std::unordered_set<int> s;
std::once_flag flag;
std::vector<std::atomic<int>> sums;
};
BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
for (auto _ : state) {
int size_sum = kSetSize;
int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
int sum = 0;
int start = state.thread_index() * size_per_thread;
int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
for (int i = start; i < end; i++) {
const auto& inst = GetSet();
if (inst.count(i) > 0) {
benchmark::DoNotOptimize(GetSums()[state.thread_index()]++);
}
}
}
}
// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
BENCHMARK_MAIN();
可以看到,尽管不同线程没有使用同一个变量,但由于 sums 里面的元素共享同一个缓存行(Cache Line),同样会导致性能急剧下降。
针对这种情况,只要我们将 sums 中的元素隔离,使它们不在同一个缓存行上,就不会引发这个问题。一般来说,缓存行的大小为64字节,我们可以使用一个类填充到64个字节来实现隔离。优化后的代码如下:
#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
constexpr int kSetSize = 10000;
struct alignas(64) PaddedCounter {
std::atomic<int> value{0};
char padding[64 - sizeof(std::atomic<int>)]; // 填充到缓存行大小
};
class MyBenchmark : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
std::call_once(flag, [this, &state]() {
for (int i = 0; i < kSetSize; i++) {
s.insert(i);
}
sums = std::vector<PaddedCounter>(state.threads());
});
}
const std::unordered_set<int>& GetSet() { return s; }
std::vector<PaddedCounter>& GetSums() { return sums; }
private:
std::unordered_set<int> s;
std::once_flag flag;
std::vector<PaddedCounter> sums;
};
BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
for (auto _ : state) {
int size_sum = kSetSize;
int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
int sum = 0;
int start = state.thread_index() * size_per_thread;
int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
for (int i = start; i < end; i++) {
const auto& inst = GetSet();
if (inst.count(i) > 0) {
benchmark::DoNotOptimize(GetSums()[state.thread_index()].value++);
}
}
}
}
// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
BENCHMARK_MAIN();
Lock VS Atomic
Lock Atomic Benchmark
很多人都认为锁(lock)比原子操作(atomic)要更慢,那么实际上真的是这样吗?下面我们通过两个测试来进行对比。
公平起见,我们将使用一个基于 atomic 变量实现的自旋锁(SpinLock)与 std::mutex 进行性能对比。自旋锁的实现摘自 Folly 库。其原理是使用一个 atomic 变量来标记是否被占用,并使用 acquire-release 内存序来保证临界区的正确性。在冲突过大时,自旋锁会使用 sleep 让出 CPU。代码如下:
#pragma once
#include <atomic>
#include <cstdint>
class Sleeper {
static const uint32_t kMaxActiveSpin = 4000;
uint32_t spin_count_;
public:
constexpr Sleeper() noexcept : spin_count_(0) {}
inline __attribute__((always_inline)) static void sleep() noexcept {
struct timespec ts = {0, 500000};
nanosleep(&ts, nullptr);
}
inline __attribute__((always_inline)) void wait() noexcept {
if (spin_count_ < kMaxActiveSpin) {
++spin_count_;
#ifdef __x86_64__
asm volatile("pause" ::: "memory");
#elif defined(__aarch64__)
asm volatile("yield" ::: "memory");
#else
// Fallback for other architectures
#endif
} else {
sleep();
}
}
};
class SpinLock {
enum { FREE = 0, LOCKED = 1 };
public:
constexpr SpinLock() : lock_(FREE) {}
inline __attribute__((always_inline)) bool try_lock() noexcept { return cas(FREE, LOCKED); }
inline __attribute__((always_inline)) void lock() noexcept {
Sleeper sleeper;
while (!try_lock()) {
do