【后台开发拾遗】异步代码同步化

时间:2022-03-04 04:49:09

在当今的编程世界中,异步编程已经成为了一种习惯。传统的同步阻塞编程,虽然处理流程非常清晰,但是程序常常处于阻塞等待状态,CPU资源利用率低。而早期的异步编程,通过callback的方式进行回调处理,当回调嵌套开始多起来的时候,程序代码可读性变得非常差。
对于C++,协程和Future/Promise的出现,使得我们既可以实现异步编程,又可以将代码写得十分优美,看起来跟同步代码一般清晰。

本文再次回顾总结下Future/Promise的技术原理,并在最后给出一种编码方式供参考。

相关文章

关于Future/Promise,之前我已经陆陆续续介绍过一些了,本次不会过多的去介绍实现细节,而是从宏观的角度来剖析其原理。

C++11 多线程 future/promise简介

C++异步调用利器future/promise实现原理

总览

Promise主要的类关系如下:

【后台开发拾遗】异步代码同步化

由于Future< T >需要针对void进行特化,为避免过多重复的代码,把与特化无关的部分抽离出来形成FutureBase作为基类。

可以看到,Future< T > 和 Promise< T >都有一个同样类型的成员变量:

SharedPtr<detail::FutureObjectInterface<T> > m_future;

m_future是一个引用计数的智能指针。

Promise对象可保存T类型的值,通过Promise的getFuture()方法,产生一个Future对象,此时Promise对象和Future对象内的m_future指向同一个地址,因此Promise对象中保存的T类型的值可被future对象读取(可能在另一个线程中),这是promise提供的同步手段。

类FutureObjectInterface是一个虚类,为Future 和 Promise 提供了m_future的接口。其具体实现包括PromptFutureObject和FutureObject两个类。

Promise的FutureObjectInterface 是一个FutureObject。
Future的FutureObjectInterface有两种情况:直接用一个值来构造Future时(比如调用makeFuture来获取一个future)用的是PromptFutureObject,而其他情况(比如通过Promise获得的future)用的是FutureObject。比如对于使用立即数构造Future时,由于其值已经设定了,不需要等待promise填值进去,因此该future内部的PromptFutureObject是只读的,也就不需要加锁。如果还是使用FutureObject这个版本,将会在加锁解锁上做无用功。因此PromptFutureObject是针对这种场景进行优化的。而当Future 与Promise共享同一个m_future时,由于Future和Promise可能在不同线程中,因此可能同时读写,这里存在race condition,因此需要加锁。FutureObject正是一个加锁的版本。

设置回调——then

上一节我们已经知道,Future< T > 和 Promise< T >共享了值T的状态,假设我们有一个函数:

bool isBlack(string qqNumber);

该函数请求远程服务,来判断该qq号码是否在黑名单内。

那么我们可以如下进行异步调用:

// 1. 定义一个Promise:
promise::Promise< bool > promise;

// 2. 定义一个Callback对象,构造时接收一个Promise对象:
CallBack cb(promise);

// 3. 注册cb(当isBlack函数返回时,通过cb,调用promise的setValue方法对m_future内保存的bool值进行赋值)

// 4. 使用promise的getFuture方法产生一个Future< bool >对象,该Future对象与Promise 共享 m_future。
return promise.getFuture();

于是当isBlack调用返回时,我们通过future.get()即可获取其返回值。

get()属于阻塞函数,它会等待Promise调用setValue()对其承诺的值进行赋值后,方可把值取出来消费。

【后台开发拾遗】异步代码同步化
【后台开发拾遗】异步代码同步化

因此我们还需要向m_future注册一个回调函数,当setValue被调用时,调用该回调函数进行处理(通过get()获取数据等等)。回调函数的注册,由then来实现。

/**
* Register a callback which will be called once the future is satisfied. If an
* exception is thrown the callback will not be registered and then will not be called.
*
* \throws std::bad_alloc if memory is unavailable.
*/

template <typename R>
Future<typename detail::resolved_type<R>::type> then(const Callback<R(const Future&)>& callback) const
{
typedef typename detail::resolved_type<R>::type value_type;

if (!this->m_future)
{
throwException(FutureUninitializedException(__FILE__, __LINE__));
}

Promise<value_type> promise;
this->m_future->
registerCallback(bind(&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
return promise.getFuture();
}

template <typename T>
struct resolved_type
{
typedef T type;
};

template <typename T>
struct resolved_type<Future<T> >
{
typedef T type;
};

then函数接收一个Callback对象,该对象的模板参数是一个返回值类型为R,参数类型为const Future&的函数。(该callback对象可由bind方法生成,后面会介绍)

then的处理思路如下:
1. 通过resolved_type,对回调函数的返回值做一层处理,得到value_type;
2. 定义Promise< value_type > promise;
3. 调用m_future的registerCallback方法,向m_future注册回调函数,并传入promise;
4. 通过promise.getFuture()返回一个future。

可以看到,then最终的返回结果是一个类型为Future< value_type >的future,其对应的promise将在then注册的回调函数处理结束之后被设置,设置的值为回调函数的返回值。利用这一点,我们可以实现链式调用:

a.then(cb1).then(cb2);

首先,向a注册回调cb1,当future a满足条件时,调用cb1进行处理。a.then(cb1)返回一个future,命名为b,则b.then(cb2),注册了回调函数cb2,当future b满足条件时,调用cb2进行处理。而b满足条件的时机为cb1被调用并返回之后。于是cb1和cb2串行执行。

注册回调的过程registerCallback如下:

// pre: `callback' should not throw an exception because it may be run in
// another thread at a later time so the exception can not be caught by the
// caller and is meaningless. And this object must be shared by some
// SharedPtr<FutureObject<T> > instance.
virtual void registerCallback(const CallbackType& callback)
{
PROMISE_ASSERT(callback);

TC_ThreadLock::Lock lock(m_monitor);
if (m_is_done)
{
lock.release();
try
{
callback(this->sharedFromThis());
}
catch (...)
{
// Ignore the exceptions thrown by the callback.
}
}
else
{
m_pending_callbacks.push_back(callback);
}
}

如果future已经满足条件,则直接调用回调函数进行处理,否则,将回调函数加入到m_pending_callbacks列表中,等待setValue之后调用。

注意,then会抛异常,因此代码中需要try/catch。

回调函数到回调对象的转换——bind

bind的功能:把一个具体函数,变成Callback对象。
在future/promise中,then方法接收的回调函数实际上是一个Callback对象,该对象通过bind来生成。

用法示例:

// 注册回调函数
Future<void> mainFuture;
mainFuture.then(bind(&handleGetData, context));

// 回调函数声明
static void handleGetData(Auto_Ptr<GetDataContext> context, const Future<void> future);

bind的实现我们这里不深究,只提几点需要注意的:

  1. 回调函数的最后一个参数是一个Future,比如上例中,当mainFuture中的值被设置时,回调函数handleGetData将被调用,此时mainFuture将作为其最后一个参数传递进来,这样子handle函数中才能将设置的值给取出来;

  2. 使用bind将回调函数转换为Callback对象时,不允许传递非const引用的参数。因为Callback的Invoked分发函数接收原参数,保存的是原参数的一份拷贝。如果想传递非const引用,通过放在智能指针中的context来传递。

  3. 不允许传递原始指针作为参数,因为可能造成内存泄漏。

需使用promise::unretained(), promise::owned()来显式指明内存管理者。
UnretainedWrapper指明调用者需要对绑定的原始指针所指内存进行管理。
OwnedWrapper指明callback需要对绑定的原始指针所指内存进行管理,也就是说,在Callback被析构时,该指针所指的内存也会被析构。

SharedWrapper则指明两者共享该对象。(shared(const SharedPtr< T >& p))

当bind的函数为类成员函数时,需要传递this指针作为第一个参数,这时就需要显式指明内存管理者。

并行处理

要实现并行化,需要两个助手:

  1. 类型tuple,它是一个类似pair类型的模板。pair类型是每个成员变量各自可以是任意类型,但是只能有俩个成员,而tuple与pair不同的是它可以有任意数量的成员,每个确定的tuple类型的成员数目是固定的。

  2. 函数whenAll:接收多个future作为参数,并返回一个组合的future(通过tuple组合),只有所有future都满足条件时,该组合future才满足条件。

下面以两个future为例。

/**
* Parallelly composite two or more futues. Only when all futures are satisfied is
* the composited future satisfied.
*/

template <typename T1, typename T2>
Future<Tuple<Future<T1>, Future<T2> > >
whenAll(const Future<T1>& future1, const Future<T2>& future2)
{
typedef Tuple<Future<T1>, Future<T2> > FutureAllValueType;
typedef Promise<FutureAllValueType> PromiseAll;
typedef detail::ParallelAllCallback<FutureAllValueType> WhenAllCallback;

PromiseAll promise_all;
SharedPtr<WhenAllCallback> future_callback(new WhenAllCallback(promise_all));
future1.then(promise::bind(&WhenAllCallback::template on_future<0>, promise::shared(future_callback)));
future2.then(promise::bind(&WhenAllCallback::template on_future<1>, promise::shared(future_callback)));
return promise_all.getFuture();
}

when_all的思路就是定义一个promise,然后对并行的几个future注册回调,当几个future满足条件时,就调用回调函数对promise中对应的部分进行设置,当所有future对应的部分都设置完成,该promise对应的future满足条件。

使用例子:

Future<void> future1 = promise_async_getData1();
Future<void> future2 = promise_async_getData2();

when_all(future1, future2).then(bind(&handleAll));

static void handleAll(Tuple<Future<void>, Future<void> > futureAll)
{
// do something
}

一个例子

下面我们以一个比较复杂的例子来总结future/promise的使用:

// 定义一个满足条件的mainFuture
Future<void> mainFuture = makeFuture();

// 当mainFuture满足条件时,调用getData1,并将其返回值赋值给getFuture1
Future<void> getFuture1 = mainFuture.then(bind(&getData1, context));
// 当mainFuture满足条件时,调用getData2,并将其返回值赋值给getFuture2
Future<void> getFuture2 = mainFuture.then(bind(&getData2, context));
// 当mainFuture满足条件时,调用getData3,并将其返回值赋值给getFuture3
Future<void> getFuture3 = mainFuture.then(bind(&getData3, context));

// 当getFuture1, getFuture2, getFuture3三者同时满足条件时,调用whenDone,并将其返回值赋值给mainFuture
mainFuture = whenAll(getFuture1, getFuture2, getFuture3).then(bind(
&whenDone<Tuple<Future<void>, Future<void>, Future<void> > >));

// 当新的mainFuture满足条件时(whenDone被调用之后),调用reduceData,并将其返回值赋值给mainFuture
mainFuture = mainFuture.bind(&reduceData, context);

// 当新的mainFuture满足条件时(reduceData被调用之后),调用setData1,并将其返回值赋值给setFuture1
Future<void> setFuture1 = mainFuture .then(bind(&setData1, context));
// 当新的mainFuture满足条件时(reduceData被调用之后),调用setData2,并将其返回值赋值给setFuture2
Future<void> setFuture2 = mainFuture .then(bind(&setData2, context));
// 当新的mainFuture满足条件时(reduceData被调用之后),调用setData3,并将其返回值赋值给setFuture3
Future<void> setFuture3 = mainFuture .then(bind(&setData3, context));

// 当setFuture1, setFuture2, setFuture3三者同时满足条件时,调用whenDone,并将其返回值赋值给mainFuture
mainFuture = whenAll(setFuture1, setFuture2, setFuture3).then(bind(
&whenDone<Tuple<Future<void>, Future<void>, Future<void> > >));

// 当新的mainFuture满足条件时(whenDone被调用之后),调用doResponse
mainFuture = mainFuture.bind(&doResponse, context);

其中getData/setData函数大致如下:

static Future<void> getData(Auto_prt<MyContext> context, const Future<void> &future)
{
return promise_async_getData().then(bind(&handleGetData, context));
}

static Future<void> handleGetData(Auto_prt<MyContext> context, const Future<void> &future)
{
// 对future.get()得到的值做一些处理
// ...

return makeFuture();
}

when_done:

template<typename T>
promise::Future<void> whenDone(const promise::Future<T> &) {
return promise::makeFuture();
}

一张图说明下调用流程:

【后台开发拾遗】异步代码同步化

首先,对mainFuture注册回调getData,3个getData是并行的。
而在getData的返回值中,又注册了回调handleGetData,因此只有handleGetData被调用结束之后,getFutureX才会满足条件。当3个getFutureX都满足条件时,调用whenDone使得mainFuture再次满足条件,于是reduceData被调用,只有reduceData返回之后,setData才能得到回调。

实际上,上面的代码(一系列的then),就是一个注册回调的过程。

通过future/promise和when_all,将并行化的处理流程写成串行的,代码十分清晰。