C++ 对多线程/并发的支持(上)

时间:2022-02-16 14:06:40

前言:

本文翻译自 C++ 之父 Bjarne Stroustrup 的 C++ 之旅( A Tour of C++ )一书的第 13 章 Concurrency。作者用短短数十页,带你一窥现代 C++ 对并发/多线程的支持。原文地址:现代 C++ 对多线程/并发的支持(上) -- 节选自 C++ 之父的 《 A Tour of C++ 》 水平有限,有条件的建议直接阅读原版书籍。

1、 并发介绍

并发,即同时执行多个任务,常用来提高吞吐量(通过利用多处理器进行同一个计算)或者改善响应性(等待回复的时候,允许程序的其他部分继续执行)。所有现代语言都支持并发。C++ 标准库提供了可移植、类型安全的并发支持,经过 20 多年的发展,几乎被所有现代硬件所支持。标准库提供的主要是系统级的并发支持,而非复杂的、更高层次的并发模型;其他库可以基于标准库,提供更高级别的并发支持。

C++ 提供了适当的内存模型(memory model)和一组原子操作(atomic operation),以支持在同一地址空间内并发执行多个线程。原子操作使得无锁编程成为可能。内存模型保证了在避免数据竞争(data races,不受控地同时访问可变数据)的前提下,一切按照预期工作。

本章将给出标准库对并发的主要支持示例:threadmutexlock()packaged_task 以及 future。这些特征直接基于操作系统构建,相较于操作系统原生支持,不会带来性能损失,也不保证会有显著的性能提升。

那为什么要用标准库而非操作系统的并发?可移植性。

不要把并发当作灵丹妙药:如果顺序执行可以搞定,通常顺序会比并发更简单、更快速!

2、 任务和线程

如果一个计算有可能(potentially)和另一个计算并发执行,我们称之为任务(task)。线程是任务的系统级表示。任务可以通过构造一个 std::thread 来启动,任务作为参数。

  • 任务是一个函数或者函数对象。
  • 任务是一个函数或者函数对象。
  • 任务是一个函数或者函数对象。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void f();              // 函数
 
struct F {             // 函数对象
    void operator()()  // F 的调用操作符
};
 
void user()
{
    thread t1 {f};     // f() 在另一个线程中执行
    thread t2 {F()};   // F()() 在另一个线程中执行
 
    t1.join();  // 等待 t1
    t2.join();  // 等待 t2
}

join() 确保线程完成后才退出 user() ,“join 线程”的意思是“等待线程结束”。

一个程序的线程共享同一地址空间。线程不同于进程,进程通常不直接共享数据。线程间可以通过共享对象(shared object)通信,这类通信一般用锁或其他机制控制,以避免数据竞争。

编写并发任务可能会非常棘手,假如上述例子中的 f 和 F 实现如下:

?
1
2
3
4
5
void f() {cout << "Hello ";}
 
struct F {
    void operator()() {cout << "Parallel World!\n";}
};

这里有个严重的错误:f 和 F() 都用到了 cout 对象,却没有任何形式的同步。这会导致输出的结果不可预测,多次执行的结果可能会得到不同的结果:因为两个任务的执行顺序是未定义的。程序可能产生诡异的输出,比如:

?
1
PaHerallllel o World!

定义一个并发程序中的任务时,我们的目标是保持任务之间完全独立。最简单的方法就是把并发任务看作是一个恰巧可以和调用者同时运行的函数:我们只要传递参数、取回结果,保证该过程中没有使用共享数据(没有数据竞争)即可。

3、传递参数

一般来说,任务需要处理一些数据。我们可以通过参数传递数据(或者数据的指针或引用)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void f(vector<double>& v); // 处理 v 的函数
 
struct F {                 // 处理 v 的函数对象
    vector<double>& v;
    F(vector<double>& vv) : v(vv) {}
    void operator()();
};
 
int main()
{
    vector<double> some_vec{1,2,3,4,5,6,7,8,9};
    vector<double> vec2{10,11,12,13,14};
 
    thread t1{f,ref(some_vec)}; // f(some_vec) 在另一个线程中执行
    thread t2{F{vec2}};         // F{vec2}() 在另一个线程中执行
 
    t1.join();
    t2.join();
}

F{vec2} 在 F 中保存了参数 vector 的引用。F 现在可以使用这个 vector。但愿在 F 执行时,没有其他任务访问 vec2。如果通过值传递 vec2 则可以消除这个隐患。

t1 通过 {f,ref(some_vec)} 初始化,用到了 thread 的可变参数模板构造,可以接受任意序列的参数。ref() 是来自 <functional> 的类型函数。为了让可变参数模板把 some_vec 当作一个引用而非对象,ref() 不能省略。编译器检查第一个参数可以通过其后面的参数调用,并构建必要的函数对象,传递给线程。如果 F::operator()() 和 f() 执行了相同的算法,两个任务的处理几乎是等同的:两种情况下,都各自构建了一个函数对象,让 thread 去执行。

可变参数模板需要用 ref()、cref() 传递引用

4、返回结果

3 的例子中,我传了一个非 const 的引用。只有在希望任务修改引用数据时我才这么做。这是一种很常见的获取返回结果的方式,但这么做并不能清晰、明确地向他人传达你的意图。稍好一点的方式是通过 const 引用传递输入数据,通过另外单独的参数传递储存结果的指针。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void f(const vector<double>& v, double *res); // 从 v 获取输入; 结果存入 *res
 
class F {
public:
    F(const vector<double>& vv, double *p) : v(vv), res(p) {}
    void operator()();  // 结果保存到 *res
 
private:
    const vector<double>& v;  // 输入源
    double *p;                // 输出地址
};
 
int main()
{
    vector<double> some_vec;
    vector<double> vec2;
 
    double res1;
    double res2;
 
    thread t1{f,cref(some_vec),&res1}; // f(some_vec,&res1) 在另一个线程中执行
    thread t2{F{vec2,&res2}};          // F{vec2,&res2}() 在另一个线程中执行
 
    t1.join();
    t2.join();
}

这么做没问题,也很常见。但我不觉得通过参数传递返回结果有多优雅,我会在 13.7.1 节再次讨论这个话题。

通过参数(出参)传递结果并不优雅

5、共享数据

有时任务需要共享数据,这种情况下,对共享数据的访问需要进行同步,同一时刻只能有一个任务访问数据(但是多任务同时读取不变量是没有问题的)。我们要考虑如何保证在同一时刻最多只有一个任务能够访问一组对象。

解决这个问题需要通过 mutex(mutual exclusion object,互斥对象)。thread 通过 lock() 获取 mutex

?
1
2
3
4
5
6
7
8
int shared_data;
mutex m;          // 用于控制 shared_data 的 mutex
 
void f()
{
    unique_lock<mutex> lck{m};  // 获取 mutex
    shared_data += 7;           // 操作共享数据
}   // 离开 f() 作用域,隐式自动释放 mutex

unique_lock 的构造函数通过调用 m.lock() 获取 mutex。如果另一个线程已经获取这个 mutex,当前线程等待(阻塞)直到另一个线程(通过 m.unlock( ) )释放该 mutex。当 mutex 释放,等待该 mutex 的线程恢复执行(唤醒)。互斥、锁在 <mutex> 头文件中。

共享数据和 mutex 之间的关联需要自行约定:程序员需要知道哪个 mutex 对应哪个数据。这样很容易出错,但是我们可以通过一些方式使得他们之间的关联更清晰明确:

?
1
2
3
4
class Record {
public:
    mutex rm;
};

不难猜到,对于一个 Record 对象 rec,在访问 rec 其他数据之前,你应该先获取 rec.rm。最好通过注释或者良好的命名让读者清楚地知道 mutex 和数据的关联。

有时执行某些操作需要同时访问多个资源,有可能导致死锁。例如,thread1 已经获取了 mutex1,然后尝试获取 mutex2;与此同时,thread2 已经获取 mutex2,尝试获取 mutex1。在这种情况下,两个任务都无法进行下去。为解决这一问题,标准库支持同时获取多个锁:

?
1
2
3
4
5
6
7
8
9
void f()
{
    unique_lock<mutex> lck1{m1,defer_lock};  // defer_lock:不立即获取 mutex
    unique_lock<mutex> lck2{m2,defer_lock};
    unique_lock<mutex> lck3{m3,defer_lock};
 
    lock(lck1,lck2,lck3);                    // 尝试获取所有锁
    // 操作共享数据
}   // 离开 f() 作用域,隐式自动释放所有 mutexes

lock() 只有在获取参数里所有的 mutex 之后才会继续执行,并且在其持有 mutex 期间,不会阻塞(go to sleep)。每个 unique_lock 的析构会确保离开作用域时,自动释放所有的 mutex

通过共享数据通信是相对底层的操作。编程人员要设计一套机制,弄清楚哪些任务完成了哪些工作,还有哪些未完成。从这个角度看, 使用共享数据不如直接调用函数、返回结果。另一方面,有些人认为共享数据比拷贝参数和返回值效率更高。这个观点可能在涉及大量数据的时候成立,但是 locking unlocking 也是相对耗时的操作。不仅如此,现代计算机很擅长拷贝数据,尤其是像 vector 这种元素连续存储的结构。所以,不要仅仅因为“效率”而选用共享数据进行通信,除非你真正实际测量过。

6、等待事件

有时线程需要等待外部事件,比如另一个线程完成了任务或者经过了一段时间。最简单的事件是时间。借助 <chrono>,可以写出:

?
1
2
3
4
5
6
7
using namespace std::chrono;
 
auto t0 = high_resolution_clock::now();
this_thread::sleep_for(milliseconds{20});
auto t1 = high_resolution_clock::now();
 
cout << duration_cast<nanoseconds>(t1-t0).count() << " nanoseconds passed\n";

注意,我甚至没有启动一个线程;默认情况下,this_thread 指当前唯一的线程。我用 duration_cast 把时间单位转成了我想要的 nanoseconds

condition_variable 提供了对通过外部事件通信的支持,允许一个线程等待另一个线程,比如等待另一个线程(完成某个工作,然后)触发一个事件/条件。

condition_variable 支持很多优雅、高效的共享形式,但也可能会很棘手。考虑一个经典的生产者-消费者例子,两个线程通过一个队列传递消息:

?
1
2
3
4
5
6
class Message { /**/ }; // 通信的对象
 
queue<Message> q;       // 消息队列
condition_variable cv;  // 传递事件的变量
mutex m;                // locking 机制
queue、condition_variable 以及 mutex 由标准库提供。

消费者读取并处理 Message

?
1
2
3
4
5
6
7
8
9
10
11
12
void consumer()
{
    while(true){
        unique_lock<mutex> lck{m}; // 获取 mutex m
        cv.wait(lck);              // 先释放 lck,等待事件/条件唤醒
                                   // 唤醒时再次重新获得 lck
        auto m = q.front();        // 从队列中取出 Message m
        q.pop();
        lck.unlock();              // 后续处理消息不再操作队列 q,提前释放 lck
        // 处理 m
    }
}

这里我显式地用 unique_lock<mutex> 保护 queue condition_variable 上的操作。condition_variable 上的 cv.wait(lck) 会释放参数中的锁 lck,直到等待结束(队列非空),然后再次获取 lck。

相应的生产者代码:

?
1
2
3
4
5
6
7
8
9
10
void producer()
{
    while(true) {
        Message m;
        // 填充 m
        unique_lock<mutex> lck{m}; // 保护操作
        q.push(m);
        cv.notify_one();           // 通知/唤醒等待中的 condition_variable
    } // 作用域结束自动释放锁
}

到目前为止,不论是 threadmutexlock 还是 condition_variable,都还是低层次的抽象。接下来我们马上就能看到 C++ 对并发的高级抽象支持。

7、通信任务

标准库还在头文件 <future> 中提供了一些机制,能够让程序员在更高的任务的概念层次上工作,而不是直接使用低层的线程、锁:

  • future promise:用于从另一个线程中返回一个值
  • packaged_task:帮助启动任务,封装了 future promise,并且建立两者之间的关联
  • async():像调用一个函数那样启动一个任务。形式最简单,但也最强大!

到此这篇关于C++ 对多线程/并发的支持(上)的文章就介绍到这了,更多相关C++ 对多线程并发的支持内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/tengzijian/p/a-tour-of-cpp-modern-cpp-concurrency-1.html