Arrow 生产消费玩出花了
1.背景
最近在看Arrow代码,看到这一块蒙圈了,直接上代码,看一段Arrow:
finished_ =
Loop([this, executor, options] {
std::unique_lock<std::mutex> lock(mutex_);
int total_batches = batch_count_++;
if (stop_requested_) {
return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
}
lock.unlock();
return generator_().Then(
[=](const util::optional<ExecBatch>& maybe_batch) -> ControlFlow<int> {
std::unique_lock<std::mutex> lock(mutex_);
if (IsIterationEnd(maybe_batch) || stop_requested_) {
stop_requested_ = true;
return Break(total_batches);
}
lock.unlock();
ExecBatch batch = std::move(*maybe_batch);
if (executor) {
auto status =
task_group_.AddTask([this, executor, batch]() -> Result<Future<>> {
return executor->Submit([=]() {
outputs_[0]->InputReceived(this, std::move(batch));
return Status::OK();
});
});
if (!status.ok()) {
outputs_[0]->ErrorReceived(this, std::move(status));
return Break(total_batches);
}
} else {
outputs_[0]->InputReceived(this, std::move(batch));
}
return Continue();
},
[=](const Status& error) -> ControlFlow<int> {
// NB: ErrorReceived is independent of InputFinished, but
// ErrorReceived will usually prompt StopProducing which will
// prompt InputFinished. ErrorReceived may still be called from a
// node which was requested to stop (indeed, the request to stop
// may prompt an error).
std::unique_lock<std::mutex> lock(mutex_);
stop_requested_ = true;
lock.unlock();
outputs_[0]->ErrorReceived(this, error);
return Break(total_batches);
},
options);
}).Then([&](int total_batches) {
outputs_[0]->InputFinished(this, total_batches);
return task_group_.End();
});
不知道有几个人能看懂这一块逻辑!
本节的目标是简单讲讲这里面的逻辑是什么,涉及到哪些,为了理解期间,我们将上面代码简化:
finish_ = Loop([this, executor, options] {
return generator_().Then(on_success, on_failure, options);
}).Then(on_success);
这下看起来贼简单。
1.深入细节
先来看看Loop,我们简化一下原有代码之后,代码变为:
创建一个Future对象
调用iterate,并获取返回的future
使用iterate与空的future构造一个回调对象,其中break_fut会存储每次迭代的结果。
Callback有两个成员:
iterate
break_fut
template <typename Iterate,
typename Control = typename detail::result_of_t<Iterate()>::ValueType,
typename BreakValueType = typename Control::value_type>
Future<BreakValueType> Loop(Iterate iterate) {
// remove Callback
auto break_fut = Future<BreakValueType>::Make();
auto control_fut = iterate();
control_fut.AddCallback(Callback{std::move(iterate), break_fut});
return break_fut;
}
再看看外层的.Then(),只传入了一个lambda,实现的是最终处理了多少batch数据,结束任务组,作为下面on_success参数,添加到了Future的callback数组当中。
Loop([this, executor, options] {}).Then([&](int total_batches) {
outputs_[0]->InputFinished(this, total_batches);
return task_group_.End();
});
template <typename OnSuccess, typename OnFailure = PassthruOnFailure<OnSuccess>,
typename OnComplete = ThenOnComplete<OnSuccess, OnFailure>,
typename ContinuedFuture = typename OnComplete::ContinuedFuture>
ContinuedFuture Then(OnSuccess on_success, OnFailure on_failure = {},
CallbackOptions options = CallbackOptions::Defaults()) const {
auto next = ContinuedFuture::Make();
AddCallback(OnComplete{std::forward<OnSuccess>(on_success),
std::forward<OnFailure>(on_failure), next},
options);
return next;
}
接下来看看看Loop的iterator,里面做了什么。入参是一个匿名的lambda,返回值是一个future对象,源代码比较复杂,这里进行了简化。
[this, executor, options] {
std::unique_lock<std::mutex> lock(mutex_); // 获取锁
int total_batches = batch_count_++; // 统计生成的batch数量
if (stop_requested_) { // 如果有停止请求,则停止生成数据
return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
}
lock.unlock(); // 释放锁
// this is a dummy result for iterate
return Future<BreakValueType>;
}
继续看看返回值,这里我继续进行了简化。
return generator_().Then(on_success, on_failure, options);
先来看generator_,这个是一个AsyncGenerator<util::optional<ExecBatch>>
,而AsyncGenerator又是个Future:
template <typename T>
using AsyncGenerator = std::function<Future<T>()>;
所以这里可以改为:
return std::function<Future<T>()>().Then(on_success, on_failure, options);
进一步的:
// 生产数据
Future<util::optional<ExecBatch>> f = std::function<Future<util::optional<ExecBatch>>()>();
// 接受数据,做下一步工作
return f.Then(on_success, on_failure, options);
这下代码可以看懂了。
Then里面呢继续看看入参:
on_success,成功回调
将生产者的数据作为lambda参数,传递进来进行处理,逻辑参考注释。
[=](const util::optional<ExecBatch>& maybe_batch) -> ControlFlow<int> {
std::unique_lock<std::mutex> lock(mutex_); // 获取锁
if (IsIterationEnd(maybe_batch) || stop_requested_) { // 判断是否需要停止生成数据
stop_requested_ = true; // 更新stop_requested_为true,通知生成数据的循环停止
return Break(total_batches); // 返回当前生成batch的数量
}
lock.unlock(); // 释放锁
ExecBatch batch = std::move(*maybe_batch); // 移动batch到本地变量
if (executor) { // 如果存在executor,则提交task_group_.AddTask任务
auto status =
task_group_.AddTask([this, executor, batch]() -> Result<Future<>> {
return executor->Submit([=]() {
outputs_[0]->InputReceived(this, std::move(batch)); // 将数据传递给下游节点
return Status::OK();
});
});
if (!status.ok()) { // 如果task_group_.AddTask失败,则返回当前生成batch的数量
outputs_[0]->ErrorReceived(this, std::move(status));
return Break(total_batches);
}
} else { // 如果不存在executor,则直接将数据传递给下游节点
outputs_[0]->InputReceived(this, std::move(batch));
}
return Continue(); // 结束,继续处理下一个batch
}
on_failure,失败回调
[=](const Status& error) -> ControlFlow<int> { // 处理生成数据过程中的错误
std::unique_lock<std::mutex> lock(mutex_); // 获取锁
stop_requested_ = true; // 更新stop_requested_为true,通知生成数据的循环停止
lock.unlock(); // 释放锁
outputs_[0]->ErrorReceived(this, error); // 将错误信息传递给下游节点
return Break(total_batches); // 返回当前生成batch的数量
}
options, 选项设置
2.总结
简单来说,这一块的逻辑是先调用generator()获取到数据,随后调用Then去消费数据,当然这两步是异步的,所以里面会涉及到加锁、任务组等内容。
finish_ = Loop([this, executor, options] {
return generator_().Then(on_success, on_failure, options);
}).Then(on_success);
里面还有特别多的细节,需要我们去理解,例如Future怎么实现,与C++新特性中的有何区别,optional与新特性的实现有何区别,异步任务是怎么处理的等等。