C++ concurrency::task实现异步编程(Windows)

时间:2021-08-25 19:01:26

最近一直在看js、python、lua等脚本语言的异步编程,今天脑子一热突然想看看C++能否也有支持异步的相关类库,上网一搜还真的有

microsoft官方文档https://msdn.microsoft.com/library/windows/apps/Hh780559.aspx


主要使用task class 及其相关类型和函数,它们都包含在 concurrency 命名空间中且在 <ppltasks.h> 中定义。concurrency::task 类是一个通用类型,但是当使用 /ZW 编译器开关(对于 Windows 运行时应用和组件而言是必需的)时,任务类会封装 Windows 运行时异步类型,以便更容易地完成以下工作:
1.将多个异步和同步操作结合在一起
2.在任务链中处理异常
3.在任务链中执行取消
4.确保单个任务在相应的线程上下文或单元中运行



简单的一个小demo:

#include "stdafx.h"
#include <numeric>
#include <ppltasks.h>
#include <sstream>
#include <array>
#include <iostream>
#include <time.h>
using namespace Concurrency;
using namespace std;

int main()
{

array<task<int>, 3> tasks = {
task<int>([] { Sleep(3000); cout << "the f1 output\n"; Sleep(10000); return 100; }),
task<int>([] { Sleep(1000); cout << "the f2 output\n" ; Sleep(10000); return 10; }),
task<int>([] { Sleep(2000); cout << "the f3 output\n" ; Sleep(10000); return 1; })
};

//when_all().wait():容器里面所有的task都被执行后,才继续向下执行。
//when_any().wait():容器里第一个task完成之后,就继续向下执行。
auto TaskList = when_all(tasks.begin(), tasks.end())
.then([](vector<int> results)
{
cout << "The sum is "
<< accumulate(results.begin(), results.end(), 0)
<< endl;
});

time_t t1 = time(NULL);

cout << "begin\n";

TaskList.wait();

cout << "end\n";

time_t t2 = time(NULL);

cout << "the runtime length " << t2 - t1 << "s " << endl;

system("pause");
}


C++  concurrency::task实现异步编程(Windows)


我们定义了三个task,且每个return前都又10s的sleep来代表io操作,其结果总耗时为最长执行的f1(sleep(3000)+sleep(10000))。


官网文档相关说明:

通过任务使用异步操作

以下示例说明如何利用任务类来使用返回 IAsyncOperation 接口且其操作会生成一个值的async方法。以下是基本步骤:
调用 create_task 方法并将其传递到 IAsyncOperation^ 对象。
调用任务上的成员函数 task::then 并且提供一个将在异步操作完成时调用的 lambda。

#include <ppltasks.h>
using namespace concurrency;
using namespace Windows::Devices::Enumeration;
...
void App::TestAsync()
{
//Call the *Async method that starts the operation.
IAsyncOperation<DeviceInformationCollection^>^ deviceOp =
DeviceInformation::FindAllAsync();

// Explicit construction. (Not recommended)
// Pass the IAsyncOperation to a task constructor.
// task<DeviceInformationCollection^> deviceEnumTask(deviceOp);

// Recommended:
auto deviceEnumTask = create_task(deviceOp);

// Call the task’s .then member function, and provide
// the lambda to be invoked when the async operation completes.
deviceEnumTask.then( [this] (DeviceInformationCollection^ devices )
{
for(int i = 0; i < devices->Size; i++)
{
DeviceInformation^ di = devices->GetAt(i);
// Do something with di...
}
}); // end lambda
// Continue doing work or return...
}

由 task::then 函数创建并返回的任务称为延续。 用户提供的 lambda 输入参数(在此情况下)是任务操作在完成时产生的结果。它与你在直接使用 IAsyncOperation 接口时通过调用 IAsyncOperation::GetResults 检索到的值相同。
task::then 方法立即返回,并且其代理直至异步工作成功完成后才运行。 在本示例中,如果异步操作导致引发异常,或由于取消请求而以取消状态结束,则延续永远不会执行。 稍后,我们将介绍如何编写即使上一个任务被取消或失败也会执行的延续。
尽管你在本地堆栈上声明任务变量,但它仍然管理其生存期,这样在其所有操作完成并且对其的所有引用离开作用域之前都不会被删除(即使该方法在操作完成之前返回)。


创建任务链

在异步编程中,常见的做法是定义一个操作序列,也称作任务链,其中每个延续只有在前一个延续完成后才能执行。在某些情况下,上一个(或先行)任务会产生一个被延续接受为输入的值。通过使用 task::then 方法,你可以按照直观且简单的方式创建任务链;该方法返回一个 task<T>,其中 T 是 lambda 函数的返回类型。你可以将多个延续组合到一个任务链中: myTask.then(…).then(…).then(…);
当延续创建一个新的异步操作时,任务链尤其有用;此类任务称为异步任务。以下示例将介绍具有两个延续的任务链。初始任务获取一个现有文件的句柄,当该操作完成后,第一个延续会启动一个新的异步操作来删除该文件。当该操作完成后,第二个延续将运行,并且输出一条确认消息。

#include <ppltasks.h>
using namespace concurrency;
...
void App::DeleteWithTasks(String^ fileName)
{
using namespace Windows::Storage;
StorageFolder^ localFolder = ApplicationData::Current::LocalFolder;
auto getFileTask = create_task(localFolder->GetFileAsync(fileName));

getFileTask.then([](StorageFile^ storageFileSample) ->IAsyncAction^ {
return storageFileSample->DeleteAsync();
}).then([](void) {
OutputDebugString(L"File deleted.");
});
}

上一个示例说明了四个要点:
第一个延续将 IAsyncAction^ 对象转换为 task<void> 并返回 task。
第二个延续执行无错误处理,因此接受 void 而非 task<void> 作为输入。 它是一个基于值的延续。
第二个延续直至 DeleteAsync 操作完成后才执行。
因为第二个延续是基于值的,所以如果通过调用 DeleteAsync 启动的操作引发异常,则第二个延续根本不会执行。

注意  创建任务链只是使用 task 类组合异步操作的一种方式。还可以通过使用连接和选择运算符 && 和 || 来组合操作。有关更多信息,请参阅任务并行度(并发运行时)。


Lambda 函数返回类型和任务返回类型

在任务延续中,lambda 函数的返回类型包含在 task 对象中。如果该 lambda 返回 double,则延续任务的类型为 task<double>。 但是,任务对象的设计目的是为了不生成无需嵌套的返回类型。如果 lambda 返回 IAsyncOperation<SyndicationFeed^>^,则延续返回 task<SyndicationFeed^>,而不是 task<task<SyndicationFeed^>> 或 task<IAsyncOperation<SyndicationFeed^>^>^。此过程称为异步解包,并且它还确保延续内部的异步操作在调用下一个延续之前完成。
请注意,在上一个示例中,即使其 lambda 返回 IAsyncInfo 对象,该任务也仍然会返回 task<void>。下表总结了在 lambda 函数和封闭任务之间发生的类型转换:

C++  concurrency::task实现异步编程(Windows)

取消任务


为用户提供取消异步操作的选项通常是一个不错的方法。另外,在某些情况下,你可能必须以编程方式从任务链外部取消操作。尽管每个 *Async 返回类型都具有一个从 IAsyncInfo 继承的 Cancel 方法,但将其公开给外部方法是不可取的。在任务链中支持取消的首选方式是使用 cancellation_token_source 创建 cancellation_token,然后将该令牌传递给初始任务的构造函数。如果使用取消令牌创建异步任务,并且调用 cancellation_token_source::cancel,则该任务会自动对 IAsync* 操作调用 Cancel,并且将取消请求沿着其延续链向下传递。下面的伪代码演示了基本方法。

//Class member:
cancellation_token_source m_fileTaskTokenSource;

// Cancel button event handler:
m_fileTaskTokenSource.cancel();

// task chain
auto getFileTask2 = create_task(documentsFolder->GetFileAsync(fileName),
m_fileTaskTokenSource.get_token());
//getFileTask2.then ...

取消任务后,task_canceled 异常将沿着任务链向下传播。基于值的延续将不执行,但是基于任务的延续将在调用 task::get 时导致引发异常。如果存在错误处理延续,请确保它明确捕获 task_canceled 异常。(此异常不是派生自 Platform::Exception。)
取消是一种协作式操作。如果你的延续要执行一些长时间的工作,而不只是调用 Windows 运行时方法,则你需要负责定期检查取消令牌的状态,并且在其被取消后停止执行。在清理延续中分配的所有资源之后,请调用 cancel_current_task 以取消该任务并将取消向下传播到后跟的任何基于值的延续。下面是另外一个示例:你可以创建一个任务链表示 FileSavePicker 操作的结果。如果用户选择“取消”按钮,则不会调用 IAsyncInfo::Cancel 方法。相反,操作成功,但返回 nullptr。 延续可以测试输入参数,如果输入是 nullptr,则调用 cancel_current_task。
有关更多信息,请参阅 PPL 中的取消


在任务链中处理错误

如果希望让一个延续即使在先行被取消或引发异常的情况下也能够执行,请将该延续的 lambda 函数的输入指定为 task<TResult> 或 task<void>,使该延续成为基于任务的延续,但前提是先行任务的 lambda 返回 IAsyncAction^。
若要在任务链中处理错误和取消,则无需使每个延续成为基于任务的延续,也不用将每个可能引发异常的操作封装在 try…catch 块中。相反,你可以将基于任务的延续添加至任务链的末尾,并且在那里处理所有错误。任何异常(包括 task_canceled 异常)都将沿着任务链向下传播并绕过所有基于值的延续,因此你可以在基于任务的错误处理延续中处理这些异常。我们可以重写上一个示例以使用基于任务的错误处理延续:


#include <ppltasks.h>
void App::DeleteWithTasksHandleErrors(String^ fileName)
{
using namespace Windows::Storage;
using namespace concurrency;

StorageFolder^ documentsFolder = KnownFolders::DocumentsLibrary;
auto getFileTask = create_task(documentsFolder->GetFileAsync(fileName));

getFileTask.then([](StorageFile^ storageFileSample)
{
return storageFileSample->DeleteAsync();
})

.then([](task<void> t)
{

try
{
t.get();
// .get() didn't throw, so we succeeded.
OutputDebugString(L"File deleted.");
}
catch (Platform::COMException^ e)
{
//Example output: The system cannot find the specified file.
OutputDebugString(e->Message->Data());
}

});
}

在基于任务的延续中,我们调用成员函数 task::get 以获取任务结果。即使该操作是不产生任何结果的 IAsyncAction,我们仍然需要调用 task::get,因为 task::get 也会获取已经传输到该任务的所有异常。如果输入任务正在存储某个异常,则该异常将在调用 task::get 时被引发。 如果不调用 task::get,或者不在任务链的末尾使用基于任务的延续,或者不捕获所引发的异常类型,则当对该任务的所有引用都被删除后,会引发 unobserved_task_exception。
请只捕获你可以处理的异常。如果你的应用遇到无法恢复的错误,则最好让该应用崩溃,而不要让其继续在未知状态下运行。另外,一般情况下,不要尝试捕获 unobserved_task_exception 本身。 该异常主要用于诊断目的。当引发 unobserved_task_exception 时,通常表示代码中存在错误。 原因通常是应该处理的异常,或由代码中的某个其他错误导致的不可恢复的异常。


管理线程上下文

Windows 运行时应用的 UI 在单线程单元 (STA) 中运行。其 lambda 返回 IAsyncAction 或 IAsyncOperation 的任务具有单元意识。如果该任务是在 STA 中创建的,则默认情况下,除非你另外指定,否则该任务的所有要运行的延续也将在该 STA 中运行。 换句话说,整个任务链从父任务继承单元意识。该行为可帮助简化与 UI 控件的交互,这些 UI 控件只能从 STA 访问。
例如,在 Windows 运行时应用中,在任何表示 XAML 页面的类的成员函数中,你可以从 task::then 方法内部填充 ListBox 控件,而无需使用 Dispatcher 对象。

#include <ppltasks.h>
void App::SetFeedText()
{
using namespace Windows::Web::Syndication;
using namespace concurrency;
String^ url = "http://windowsteamblog.com/windows_phone/b/wmdev/atom.aspx";
SyndicationClient^ client = ref new SyndicationClient();
auto feedOp = client->RetrieveFeedAsync(ref new Uri(url));

create_task(feedOp).then([this] (SyndicationFeed^ feed)
{
m_TextBlock1->Text = feed->Title->Text;
});
}

如果任务不返回 IAsyncAction 或 IAsyncOperation,则它不具有单元意识,并且默认情况下,其延续在第一个可用的后台线程上运行。
你可以使用 task_continuation_context 的接受 task::then 的重载来覆盖任一种任务的默认线程上下文。例如,在某些情况下,在后台线程上计划具有单元意识的任务的延续或许是可取的。在此情况下,你可以传递 task_continuation_context::use_arbitrary 以便在多线程单元中的下一个可用线程上计划该任务的工作。这可以改善延续的性能,因为其工作不必与 UI 线程上发生的其他工作同步。
以下示例将演示何时指定 task_continuation_context::use_arbitrary 选项是有用的,还说明了默认延续上下文在同步非线程安全集合上的并发操作方面是多么有用。在此段代码中,我们遍历 RSS 源的 URL 列表,并为每个 URL 启动一个异步操作以检索源数据。 我们无法控制检索订阅的顺序,而我们其实并不关心。当每个 RetrieveFeedAsync 操作完成后,第一个延续接受 SyndicationFeed^ 对象并使用它来初始化应用定义的 FeedData^ 对象。因为上述每个操作都独立于其他操作,所以我们可以通过指定 task_continuation_context::use_arbitrary 延续上下文来提高运行速度。 但是,在初始化每个 FeedData 对象之后,我们必须将其添加至一个不是线程安全集合的 Vector 中。因此,我们要创建一个延续并且指定 task_continuation_context::use_current 以确保所有对 Append 的调用都发生在同样的应用程序单线程单元 (ASTA) 上下文中。由于 task_continuation_context::use_default 是默认上下文,因此我们无需进行明确指定,但是此处为了清楚起见而予以指定。


#include <ppltasks.h>
void App::InitDataSource(Vector<Object^>^ feedList, vector<wstring> urls)
{
using namespace concurrency;
SyndicationClient^ client = ref new SyndicationClient();

std::for_each(std::begin(urls), std::end(urls), [=,this] (std::wstring url)
{
// Create the async operation. feedOp is an
// IAsyncOperationWithProgress<SyndicationFeed^, RetrievalProgress>^
// but we don’t handle progress in this example.

auto feedUri = ref new Uri(ref new String(url.c_str()));
auto feedOp = client->RetrieveFeedAsync(feedUri);

// Create the task object and pass it the async operation.
// SyndicationFeed^ is the type of the return value
// that the feedOp operation will eventually produce.

// Then, initialize a FeedData object by using the feed info. Each
// operation is independent and does not have to happen on the
// UI thread. Therefore, we specify use_arbitrary.
create_task(feedOp).then([this] (SyndicationFeed^ feed) -> FeedData^
{
return GetFeedData(feed);
}, task_continuation_context::use_arbitrary())

// Append the initialized FeedData object to the list
// that is the data source for the items collection.
// This all has to happen on the same thread.
// By using the use_default context, we can append
// safely to the Vector without taking an explicit lock.
.then([feedList] (FeedData^ fd)
{
feedList->Append(fd);
OutputDebugString(fd->Title->Data());
}, task_continuation_context::use_default())

// The last continuation serves as an error handler. The
// call to get() will surface any exceptions that were raised
// at any point in the task chain.
.then( [this] (task<void> t)
{
try
{
t.get();
}
catch(Platform::InvalidArgumentException^ e)
{
//TODO handle error.
OutputDebugString(e->Message->Data());
}
}); //end task chain

}); //end std::for_each
}


嵌套任务(即在延续内部创建的新任务)不继承初始任务的单元意识。


处理进度更新

在操作完成之前,支持 IAsyncOperationWithProgress 或 IAsyncActionWithProgress 的方法会在操作执行过程中定期提供进度更新。进度报告独立于任务和延续概念。 你只需为对象的 Progress 属性提供委派。委派的一个典型用途是更新 UI 中的进度栏。