C++——并发编程

时间:2024-04-08 12:34:44

一、高级接口:async()和Future

1.1 async()和Future的第一个用例

  假设需要计算两个操作数的总和,而这两个操作数是两个函数的返回值。寻常加法如下:

    func1() + func2()

这意味着对操作数的处理是循序发生的。程序首先调用func1()然后调用func2(),或是颠倒过来(根据语言规则,这一顺序无法预期)。不论哪种情况,整体处理时间是func1()所花时间加上func2()所花时间,再加上计算总和所花的时间。

  几年来,使用多处理器的硬件几乎处处可见,我们因此可以将上述计算做的更好,可以尝试并行运行func1()和func2(),使其整体运行时间只需是“func1()和func2()运行时间中的较大者”加上计算总和的时间。

  下面是示范程序:

 #include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <random>
#include <exception> using namespace std; int doSomething(char c){
std::default_random_engine dre(c);
std::uniform_int_distribution<int> id(, ); for (int i=; i < ; ++i) {
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush();
}
return c;
} int func1(){
return doSomething('.');
} int func2(){
return doSomething('+');
} int main(int argc, const char * argv[]){
cout << "func1()在后台运行,func2()在前台运行" << endl;
future<int> result1(async(func1));
int result2 = func2();
int result = result1.get() + result2;
cout << endl;
cout << "func1() + func2() 的结果:" << result << endl;
return ;
} 输出结果:
func1()在后台运行,func2()在前台运行
+.+++.++..+..++..+..
func1() + func2() 的结果:

  为了让事情视觉化,将func1()和func2()内的繁复处理简化为对doSomething()的调用,它不时打印一个被当作实参传递进去的字符并最终返回该字符的int值。所谓“不时”是借由一个“随机数生成器”(用以指定时间间隔)实现,其中std::this_thread::sleep()作为当前线程的暂停时间。注意,这需要一个独一无二的seed(种子)交给“随机数生成器”构造函数,此处采用被传入的字符c,确保产生不同的随机数序列。

  现在,不再这么调用

    int result = func1() + func2();

而改为调用:

    std::future<int> result1(std::async(func1));

    int result2 = func2();

    int result = result1.get() + result2;

首先使用std::async()尝试启动fucn1()于后台,并将结果赋给某个std::future object:

    std::future<int> result1(std::async(func1));

在这里,async()尝试将其获得的函数立即启动于一个分离线程内。因此概念上func1()在这里被启动了,不会造成main()停滞。基于两个原因,返回future object是必要的:

    1、它允许取得“传给async()的那个函数”的未来结果——也许是个返回值,也许会是个异常。这个future object已受到“被启动函数”返回类型的特化,如果启动的是一个无返回值的后台任务,这就回事std::future<void>。

    2、它必须存在,确保“目标函数”或快或慢终会被调用。注意先前说async()尝试启动目标函数。如果这样的事情没有发生,稍后就需要这个future object才能强迫启动之。

  因此,即使对启动于后台的那个函数的结果并不感兴趣,还是需要握有这个future object。

  为了能够在“启动及控制函数”处与“返回的future object”之间交换数据,二者都指向一个所谓的shared state。

  当然,可以(并且通常)使用auto来声明future object:

    auto result1(std::async(func1));

  接下来启动func2()于前台,这是个正常的同步化调用,于是程序在此停滞:

    int result2 = func2();

  如果先前的func1()成功地被async()启动并且尚未结束,现在func1()和func2()就是并行运作。

  接下来处理总和。这就是需要func1()成果的时刻。为了获得它,对先前返回的future object调用get():

    int result = result1.get() + result2;

  随着ger()被调用,以下三件事情之一会发生:

    1、如果func1()被async()启动于一个分离线程中并且已结束,就会立刻获得其结果。

    2、如果func1()被启动但并未结束,get()会引发停滞待func1()结束后获得结果。

    3、如果func1()尚未启动,会被强迫启动如同一个同步调用;get() 会停滞直至产生结果。

  这样的行为很重要,因为这确保了在单线程环境中,或是当async()无法启动新线程时(不论基于任何理由),程序仍能有效运作。

  调用async()并不保证传入的函数被启动和结束。如果有个线程处于可用状态,那么它的确会被启动,但如果不是这样(也许运行环境不支持多线程,或者也许当时无线程可用),这一调用会被推迟到明确说明需要结果或只是希望目标函数完成其任务。

  因此,

    std::future<int> result1(std::async(func1));

    result1.get()

的组合允许以某种方式优化程序:(1)如果可能,当main线程的下一个语句被处理时func1()被并行运行,(2)如果无法并行运行,那么func1()会在get()被调用时被循序调用。这就意味着无论如何都能在保证至少在get()执行后一定会调用func1()——不是异步就是同步。

  于是这个程序的输出结果又两种可能。如果async()成功启动func1(),输出结果可能如下:

    func1()在后台运行,func2()在前台运行

    +.+++.++..+..++..+..

    func1() + func2() 的结果:89

如果async()无法启动func1(),后者会在“func2()结束后且get()被调用时”执行起来,于是可能输出下面的结果:

    func1()在后台运行,func2()在前台运行

    ..........++++++++++

    func1() + func2() 的结果:89

  所以,根据第一个例子,可以定义让程序更快速的一般性做法:可以修改程序使它受益于并行处理(如果低层平台对此有所支持),但仍能够在单线程环境中正确运作。为了达到这个目标,必须:

    *#include<future>

    *传递某些可并行执行的函数,交给std::async()作为一个可调用对象。

    *将执行结果赋给一个future<ReturnType> object

    *需要被启动函数的执行结果时,或想确保该函数结束,就对future<>object调用get()。

  然而请注意,这只适用于不发生数据竞争的情况下。数据竞争指两个线程并发使用同一笔数据而导致不可预期的行为。

  注意,如果没有调用get()就不保证func1()一定会被调用。如果async()无法立刻启动它所接收到的函数,就会推迟调用,使得当程序“调用get()意欲明确索求目标函数的结果”(或调用wait())才被调用。如果没有那样一个明确请求,即使main()终止造成程序结束,也不会唤醒后台线程。

  也请注意,必须确保只在最必要时才索取“被async()启动”的那个函数的执行结果。例如下面的:

    std::future<int> result(std::async(func1));

    int result = func2() + result.get();   //func2()可能在func1()结束后才执行

  为了获得最佳效果,一般而言应该将调用async()和调用get()之间的距离最大化:早调而晚返回。

  如果传给async()的函数不返回任何东西,async()会产生一个future<void>,那是future<>的一个偏特化版,这种情况下get()返回“无物”:

    std::future<void> f(std::async(func));  //试图异步启动func

    //...

    f.get();     //等待func执行结束

  最后请注意,传给async()的东西可以是任何类型的callable object:可以是函数、成员函数、函数对象或lamdba。可以采用inline形式将“应该在专属线程中运行”的函数写成一个lamdba并传递之:

    std::async([]{...});

Launch(发射)策略

  也可以强迫async()绝不推迟目标函数的执行,只要明确传入一个launch策略用以指挥async(),告诉它当它被调用时应明确地以异步方式启动目标函数:

    std::future<long> result1 = std::async(std::launch::async, func1);

  如果异步调用在此处无法实现,程序会抛出一个std::system_error异常并带差错码resource_unavailable_try_again。

  有了这个async发射策略,就不必非得调用get()了,因为如果返回的future生命即将结束,这个程序会必会等待func1()结束。因此,如果不调用get(),当离开future object作用域时(此处指main()结束),程序会等待后台任务结束。尽管如此,程序结束前调用get()会让行为更加清晰。

  如果不将std::async(std::launch::async, ...)的结果赋值出去,调用者会在此处停滞等到目标函数结束,那就相当于一个完完全全的同步调用。

  于此类似,也可以强制延缓执行:以std::launch::deferred为发射策略传给async()。下面的做法允许延缓func1()直到对f调用get():

    std::future<...> f(std::async(std::launch::deferred, func1));

这保证func1()绝不会在没有调用get()(或wait())的情况下启动。这个策略的特别在于允许写出lazy evaluation(缓式求值)。例如:

    auto f1 = std::async(std::launch::deferred, task1);

    auto f2 = std::async(std::launch::deferred, task2);

    ...

    auto val = thisOrThatIsTheCase() ? f1.get() : f2.get();

  此外,明确申明deferred发射策略也许有助于在一个单线程环境中模拟async()的行为,或是简化调试——除非需要考虑race condition(竞争形势)。

处理异常

  目前讨论的是线程和后台任务成功执行的情况。然而万一出现异常将如何?

  好消息是:没有什么特别事情会发生;“对future调用get()”也能处理异常。事实上当get()被调用,且后台操作已经(或随后由于异常)终止,该异常不会在此线程内被处理,而是在此被传播出去。因此,欲处理后台操作所生的异常,只需要偕同get()做出“以同步方式调用该操作”所做的动作即可。

  现在总结async()接口和future如下:async()提供一种编程环境,让我们有机会并行启动某些“稍后(当get()被调用时)才会用到其结果”的动作。换句话说,如果有某个独立机能(函数)f,有可能受益于并行机制,做法是在需要调用f时改而把f传给async(),然后在需要f的结果时改为“对async()返回的future调用get()”。于是,拥有相同的行为,但是有机会获得较佳效率,因为f有可能并行运行——在f的执行结果被索取之前。

等待和轮询

  一个future<>只能被调用get()一次。在那之后future就处于无效的状态,而这种状态只能借由“对future调用valid()”来检查。此情况下对它的任何调用(析构除外)会导致不可预期的行为。

  但是future也提供一个接口,允许等待后台操作完成而不需要处理其结果。这个接口可被调用一次以上;也可以结合一个duration(时间段)或timepoint(时间点)以限制等待时间。

  只要对某个future调用wait(),就可以强制启动该future象征的线程并等待这一后台操作终止:

    std::future<...> f(std::async(func));

    ...

    f.wait();

  另外还有两个类似函数,但它们并不强制启动线程(如果线程未启动的话):

    1、使用wait_for()并给予一个时间段,就可以让“异步、运行中”的操作等待一段有限时间:

      std::future<...> f(std::async(func));

      ...

      f.wait_for(std::chrono::seconds(10));

    2、使用wait_until(),就可以等待直到到达某特定时间点:

      std::future<...> f(std::async(func));

      ....

      f.wait_until(std::system_clock::now()+std::chrono::seconds(1));

  不论wait_for()或wait_until()都返回以下三种东西之一:

    *std::future_status::deferred——如果async()延缓了操作而程序中又完全没有调用wait()或get()(那会强制启动)。这种情况下上述两个函数都会立刻返回。

    *std::future_status::timeout——如果某个操作被异步启动但尚未结束,而waiting又已逾期(对于给定的时间段而言)。  

    *std::future_status::ready——如果操作已完成。

  wait_for()或wati_until()特别让我们得以写出所谓的speculative execution(投机性运行)。举个例子,考虑这样的情景:必须在某个时间段内获得某一运算之尚堪可用的结果,而如果有精确结果更好。

 int quick();  //能够迅速得到结果,但是不够准确
int slow(); //能够得到准确结果,但是不够迅速 std::future<int> f; //f必须是一个全局变量,如果是一个本地变量,会因为函数的时间过短无法完成slow()而调用其析构函数,future析构函数会停滞直到异步操作结束 int best(){
auto pt = std::chrono::system_clock::now() + std::chorine::seconds();
f = std::async(std::launch::async, slow);
int guess = quick();
std::future_status s = f.wait_until(tp);
if(s == std::future_status::ready){
return f.get();
}
else{
return guess;
}
}

注意,future f不能是声明在best()内部的local对象,那样的话若时间太短以至于无法完成slow(),future的析构函数会停滞直到异步操作结束。

  如果传入一个zero时间段,或一个过去的时间点,就可以仅轮询是否有个后台任务已经启动,和/或是否它正在运行:

    future<...> f(async(func));

    ...

    //可以在线程没有结束期间做一些事情

    while ( f.wait_for(chrono::seconds(0)) != future_status::ready ){

      ...

    }

  然而请注意,如此循环有可能不会结束,因为(例如)在单线程环境中,这一调用将被推迟到get()被调用。因此,若非调用async()并以其第一实参指定发射策略为std::launch::async,就该明确检查是否wait_for()返回std::future_status::deferred:

 future<...> f(async(func));
if (f.wait_for(chrono::seconds()) != future_status::deferred){//确保异步线程的确已经启动而不是被推迟执行,不然将会陷入一个死循环      
while (f.wait_for(chrono::second()) != future_status::ready){
   ...
  }
}
...
auto r = f.get();

  引发无限循环的另一个可能原因是,运行次循环的线程完全占用处理器,其他线程无法获得丝毫时间来备妥future。这回巨幅降低程序速度。最简单的修正就是在循环内调用yield():

    std::this_thread::yield();

以及/或是睡眠一小段时间。

  关于时间段和时间点,它们可能成为wait_for()和wait_until()的实参。注意,当面对system_time调整时,wati_for()和wait_until()往往不同。

1.2 示例:等待两个Task

  下面的程序示范了前面提到的一些功能:

 #include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <random>
#include <exception> using namespace std; void doSomething(char c)
{
default_random_engine dre(c);
uniform_int_distribution<int> id(, );
for (int i = ; i < ; ++i) {
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush();
}
} int main(int argc, const char * argv[]) {
cout << "异步启动两个操作:\n";
auto f1 = async([]{doSomething('.');});
auto f2 = async([]{doSomething('+');});
if (f1.wait_for(chrono::seconds()) != future_status::deferred ||
f2.wait_for(chrono::seconds()) != future_status::deferred
) {
while (f1.wait_for(chrono::seconds()) != future_status::ready &&
f2.wait_for(chrono::seconds()) != future_status::ready
) {
this_thread::yield();
}
}
cout.put('\n').flush();
try {
f1.get();
f2.get();
} catch (const exception& e) {
cout << "\nEXCEPTION: " << e.what() << endl;
}
cout << "\ndone" << endl;
return ;
}

  有个操作函数doSomething()不时打印一个被传为实参的字符。

  现在,借由async(),在后台启动doSomething()两次,打印两种不同的字符,使用不同的延迟时间,后者由相应的随机数序列产生:

    auto f1 = async([]{doSomething('.');});

    auto f2 = async([]{doSomething('+');});

  在多线程环境中,此时将同时运行起两个操作,不定时打印出不同的字符。

  接下来,轮询是否其中一个操作已完成:

    while (f1.wait_for(chrono::seconds(0)) != future_status::ready &&

        f2.wait_for(chrono::seconds(0)) != future_status::ready) {

        //...

        this_thread::yield();

    }

  然而万一async()被调用时上述两个task都未被在后台启动,这个循环将永远不会结束,所以必须先检查是否至少有一个操作未被推迟:

    if (f1.wait_for(chrono::seconds(0)) != future_status::deffered ||

      f2.wait_for(chrono::seconds(0)) != future_status::deffered){

      //...

    }

  另一种做法是,调用async()并给予发射策略std::launch::async。

  一旦至少有一个后台操作已完成,或两个操作都没有被启动,就写出一个newline字符,然后等待两个循环结束:

    f1.get();

    f2.get();

  这里使用get()处理可能发生任何异常。

  在一个多线程环境中,此程序可能有如下输出:

异步启动两个操作:
+.+++.++..+..++..+
..
done

注意,程序输出三种字符 '.'、'+'和newline,其次序没有任何保证。典型情况是,首先出现字符'.',它来自第一个被启动的操作,但是正如此处所见第一个出现的也可能是'+'。字符'.'和'+'可能混杂,但是也不一定。事实上,如果移除sleep_for()语句(它会在每次打印字符时强制推迟),第一循环会在首次切换至其他线程前全部做完,那么程序的输出比较可能像下面这样:

异步启动两个操作:
.+.+.+.+.+.+.+.+.+.+ done

  如果环境不支持多线程,这份输出还是会出现,因为这种情况下对doSomething()的两次调用将会借由对get()的调用而被同步调用。

  newline字符何时被打印?这同样不明确,有可能发生于任何其他字符被写出之前——如果两个后台任务被推迟“直至get()被调用”才执行,那么被推迟的任务将会“结束一个后才进行另一个”:

异步启动两个操作:

..........++++++++++
done

  唯一确定的是,newline绝不会在两个循环中的某一个完成前被打印出。甚至无法保证newline近邻于“序列之最末字符”之后,因为“循环之一结束后记录相应的future object”以及“该future被核值”可能需要花一些时间。基于这个原因,说不定会获得一份如下的输出,其中若干'+'字符被写在最后一个'.'之后且newline字符之前:

异步启动两个操作:
.+..+..+..+.+..++
+++
done

传递实参

  前一个例子示范了“传递实参给后台任务”的一种做法:使用一个lambda并让调用后台函数:

    auto f1 = std::async([]{doSomething('.');});

当然,也可以传递“在async()语句之前就已存在”的实参。一如以往,可以采用按值方式或按引用方式传递它们:
    char c = '@';

    auto f = std::async([=]{doSomething(c);});

由于定义capture为[=],因此传递给lambda的是c的拷贝及其所有其他的可以访问的对象,所以在lambda内可以传递那个c给doSomething()。

  然而另有其他方法可以传递实参给async(),因为async()提供了callable object的惯用接口。举个例子,如果传递function pointer作为第一实参传递给async(),则可以传递更多的实参,它们将成为被调用的那个函数的参数:

    char c = '@';

    auto f = std::async(doSomething, c);

也可以采用按引用方式传递实参,但这么做的风险是被传递值甚至在后台任务启动前就变得无效。这对于lambda及“直接被调用的函数”都适用:

    char c ='@';

    auto f = std::async([&]{doSomething(c);});

    char c = '@';

    auto f = std::async(doSomething, std::ref(c));

但如果能够控制实参寿命,使他超过后台认为的生命,就可以按引用传递实参。例如:
    void doSomething(cont char& c);

    ...

    char c = '@';

    auto f = std::async([&]{doSomething(c);});

    ...

    f.get();  //needs lifetime of c until here

但是,当心,如果“以按引用方式传递实参”只是为了可在另一个线程中改动它们,可能会轻易落入不明确行为之中。考虑下面的例子,在试图启动一个输出循环(于后台打印一个字符)后,改变该字符:

    void doSomething(const char& c);

    ...

    char c = '@';

    auto f = std::async([&]{doSomething(c);});

    ...

    c = '-';

    f.get();

首先,“这里”以及“doSomething()内”对c的处理,其次序无法预期。因此,该字符的变换可能发生在输出循环之前、之中或之后。更糟的是,在某一线程中改动c,在另一个线程中读取c,这是对同一对象的异步并发处理(所谓的data race),这将导致不可预期的行为,除非使用mutex或atomic保护并发处理动作。

  因此,如果使用async(),就因该以按值传递所有“用来处理目标函数”的必要object,使async()只需要使用局部拷贝。如果复制成本太高,就让那些object以const refrence的形式传递,且不使用mutable。

  也可以传给async()一个“指向成员函数”的pointer。这种情况下,位于该成员函数名称之后的第一个实参必须是一个reference或pointer,指向某个object,后者将调用该成员函数:

 #include <iostream>
#include <thread>
#include <future>
#include <chrono> using namespace std; class Person
{
public:
void show(const int value)
{
auto _value = value;
while (_value > ) {
this_thread::sleep_for(chrono::milliseconds());
cout << _value << endl;
--_value;
}
}
}; int main(int argc, const char * argv[]) {
Person per;
auto f = std::async(&Person::show, &per, );
if (f.wait_for(chrono::seconds()) != std::future_status::deferred) {
while (f.wait_for(chrono::seconds()) != std::future_status::ready) {
cout << "..." << endl;
this_thread::yield();
}
}
f.get();
return ;
}