《C++那些事》之生产消费模型玩出花了

时间:2020-12-06 01:10:52

Arrow 生产消费玩出花了

1.背景

最近在看Arrow代码,看到这一块蒙圈了,直接上代码,看一段Arrow:

finished_ =
Loop([this, executor, options] {
  std::unique_lock<std::mutex> lock(mutex_);
  int total_batches = batch_count_++;
  if (stop_requested_) {
    return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
  }
  lock.unlock();

  return generator_().Then(
      [=](const util::optional<ExecBatch>& maybe_batch) -> ControlFlow<int> {
        std::unique_lock<std::mutex> lock(mutex_);
        if (IsIterationEnd(maybe_batch) || stop_requested_) {
          stop_requested_ = true;
          return Break(total_batches);
        }
        lock.unlock();
        ExecBatch batch = std::move(*maybe_batch);

        if (executor) {
          auto status =
              task_group_.AddTask([this, executor, batch]() -> Result<Future<>> {
                return executor->Submit([=]() {
                  outputs_[0]->InputReceived(this, std::move(batch));
                  return Status::OK();
                });
              });
          if (!status.ok()) {
            outputs_[0]->ErrorReceived(this, std::move(status));
            return Break(total_batches);
          }
        } else {
          outputs_[0]->InputReceived(this, std::move(batch));
        }
        return Continue();
      },
      [=](const Status& error) -> ControlFlow<int> {
        // NB: ErrorReceived is independent of InputFinished, but
        // ErrorReceived will usually prompt StopProducing which will
        // prompt InputFinished. ErrorReceived may still be called from a
        // node which was requested to stop (indeed, the request to stop
        // may prompt an error).
        std::unique_lock<std::mutex> lock(mutex_);
        stop_requested_ = true;
        lock.unlock();
        outputs_[0]->ErrorReceived(this, error);
        return Break(total_batches);
      },
      options);
}).Then([&](int total_batches) {
  outputs_[0]->InputFinished(this, total_batches);
  return task_group_.End();
});

不知道有几个人能看懂这一块逻辑!

本节的目标是简单讲讲这里面的逻辑是什么,涉及到哪些,为了理解期间,我们将上面代码简化:

finish_ = Loop([this, executor, options] { 
 return generator_().Then(on_success, on_failure, options);
}).Then(on_success);

这下看起来贼简单。

1.深入细节

先来看看Loop,我们简化一下原有代码之后,代码变为:

  • 创建一个Future对象

  • 调用iterate,并获取返回的future

  • 使用iterate与空的future构造一个回调对象,其中break_fut会存储每次迭代的结果。

Callback有两个成员:

  • iterate

  • break_fut

template <typename Iterate,
          typename Control = typename detail::result_of_t<Iterate()>::ValueType,
          typename BreakValueType = typename Control::value_type>
Future<BreakValueType> Loop(Iterate iterate) {
  // remove Callback
  auto break_fut = Future<BreakValueType>::Make();
  auto control_fut = iterate();
  control_fut.AddCallback(Callback{std::move(iterate), break_fut});

  return break_fut;
}

再看看外层的.Then(),只传入了一个lambda,实现的是最终处理了多少batch数据,结束任务组,作为下面on_success参数,添加到了Future的callback数组当中。

Loop([this, executor, options] {}).Then([&](int total_batches) {
  outputs_[0]->InputFinished(this, total_batches);
  return task_group_.End();
});


template <typename OnSuccess, typename OnFailure = PassthruOnFailure<OnSuccess>,
            typename OnComplete = ThenOnComplete<OnSuccess, OnFailure>,
          typename ContinuedFuture = typename OnComplete::ContinuedFuture>
ContinuedFuture Then(OnSuccess on_success, OnFailure on_failure = {},
                     CallbackOptions options = CallbackOptions::Defaults()) const {
  auto next = ContinuedFuture::Make();
  AddCallback(OnComplete{std::forward<OnSuccess>(on_success),
                         std::forward<OnFailure>(on_failure), next},
              options);
  return next;
}

接下来看看看Loop的iterator,里面做了什么。入参是一个匿名的lambda,返回值是一个future对象,源代码比较复杂,这里进行了简化。

[this, executor, options] {
  std::unique_lock<std::mutex> lock(mutex_);  // 获取锁

  int total_batches = batch_count_++;  // 统计生成的batch数量

  if (stop_requested_) {  // 如果有停止请求,则停止生成数据
    return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
  }
  lock.unlock();  // 释放锁
  
  // this is a dummy result for iterate 
  return Future<BreakValueType>;
}

继续看看返回值,这里我继续进行了简化。

return generator_().Then(on_success, on_failure, options);

先来看generator_,这个是一个AsyncGenerator<util::optional<ExecBatch>>,而AsyncGenerator又是个Future:

template <typename T>
using AsyncGenerator = std::function<Future<T>()>;

所以这里可以改为:

return std::function<Future<T>()>().Then(on_success, on_failure, options);

进一步的:

// 生产数据
Future<util::optional<ExecBatch>> f = std::function<Future<util::optional<ExecBatch>>()>();
// 接受数据,做下一步工作
return f.Then(on_success, on_failure, options);

这下代码可以看懂了。

Then里面呢继续看看入参:

  • on_success,成功回调

将生产者的数据作为lambda参数,传递进来进行处理,逻辑参考注释。

[=](const util::optional<ExecBatch>& maybe_batch) -> ControlFlow<int> {
  std::unique_lock<std::mutex> lock(mutex_);  // 获取锁

  if (IsIterationEnd(maybe_batch) || stop_requested_) {  // 判断是否需要停止生成数据
    stop_requested_ = true;  // 更新stop_requested_为true,通知生成数据的循环停止
    return Break(total_batches);  // 返回当前生成batch的数量
  }
  lock.unlock();  // 释放锁

  ExecBatch batch = std::move(*maybe_batch);  // 移动batch到本地变量

  if (executor) {  // 如果存在executor,则提交task_group_.AddTask任务
    auto status =
        task_group_.AddTask([this, executor, batch]() -> Result<Future<>> {
          return executor->Submit([=]() {
            outputs_[0]->InputReceived(this, std::move(batch));  // 将数据传递给下游节点
            return Status::OK();
          });
        });
    if (!status.ok()) {  // 如果task_group_.AddTask失败,则返回当前生成batch的数量
      outputs_[0]->ErrorReceived(this, std::move(status));
      return Break(total_batches);
    }
  } else {  // 如果不存在executor,则直接将数据传递给下游节点
    outputs_[0]->InputReceived(this, std::move(batch));
  }
  return Continue();  // 结束,继续处理下一个batch
}
  • on_failure,失败回调

[=](const Status& error) -> ControlFlow<int> {  // 处理生成数据过程中的错误
  std::unique_lock<std::mutex> lock(mutex_);  // 获取锁
  stop_requested_ = true;  // 更新stop_requested_为true,通知生成数据的循环停止
  lock.unlock();  // 释放锁
  outputs_[0]->ErrorReceived(this, error);  // 将错误信息传递给下游节点
  return Break(total_batches);  // 返回当前生成batch的数量
}
  • options, 选项设置

2.总结

简单来说,这一块的逻辑是先调用generator()获取到数据,随后调用Then去消费数据,当然这两步是异步的,所以里面会涉及到加锁、任务组等内容。

finish_ = Loop([this, executor, options] { 
 return generator_().Then(on_success, on_failure, options);
}).Then(on_success);

里面还有特别多的细节,需要我们去理解,例如Future怎么实现,与C++新特性中的有何区别,optional与新特性的实现有何区别,异步任务是怎么处理的等等。