caffe源码学习(五) data layer

时间:2022-06-09 04:12:35

通过前面的学习,了解了protobufblobcpu和gpu数据管理基类Layer。在使用caffe时,我们首先在prototxt文件中定义数据层,可以参考官网教程。这样我们就可以通过数据层来读取和预处理我们指定格式的数据,并将其送入网络。接下来学习的目的:了解caffe是怎样实现这样的数据层的,对于自己的特殊数据能够写出自己的数据层。

参考官网列出的各类继承关系,逐一学习它们的作用。

1.源码
根据继承关系,首先来读BaseDataLayer类的源码。

base_data_layer.hpp

#ifndef CAFFE_DATA_LAYERS_HPP_
#define CAFFE_DATA_LAYERS_HPP_

#include <vector>

#include "caffe/blob.hpp"
// DataTransformer类实现了一些常用的数据预处理操作,如尺度变换,减均值,镜像变换等
#include "caffe/data_transformer.hpp"
// 涉及多线程
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
// 与多线程有关
#include "caffe/util/blocking_queue.hpp"

namespace caffe {

/**
* @brief Provides base for data layers that feed blobs to the Net.
*
* TODO(dox): thorough documentation for Forward and proto params.
*/

// 这个类是data layers将blobs送入网络的基础
template <typename Dtype>
class BaseDataLayer : public Layer<Dtype> {
public:
// 构造函数,参数是在caffe.proto中定义的LayerParameter类的引用
explicit BaseDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden except by the BasePrefetchingDataLayer.
// 虚函数,实现了一般data layer的设置功能,并调用DataLayerSetUp来完成具体data layer的设置,
// 该方法除了BasePrefetchingDataLayer可以不被重写。
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// Data layers should be shared by multiple solvers in parallel
// 数据层应该被分享
virtual inline bool ShareInParallel() const { return true; }
// 具体的data layer应该重写这个函数来完成特定层的设置
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
// Data layers have no bottoms, so reshaping is trivial.
// data layer没有bottoms,所以reshaping是不必要的
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}

// 由于这也是个基类,具体实现留给其子类
virtual void Backward_cpu(const vector<Blob<Dtype>*>& top,
const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {}
virtual void Backward_gpu(const vector<Blob<Dtype>*>& top,
const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {}

protected:
// 在caffe.proto中定义的参数类
TransformationParameter transform_param_;
// DataTransformer类的指针
shared_ptr<DataTransformer<Dtype> > data_transformer_;
// 是否有labels
bool output_labels_;
};

// Batch类,类里面是两个Blob类的变量data_和label_
template <typename Dtype>
class Batch {
public:
Blob<Dtype> data_, label_;
};

//BasePrefetchingDataLayer类,继承了BaseDataLayer和InternalThread
template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
public:
// 构造函数,参数是在caffe.proto中定义的LayerParameter类的引用
explicit BasePrefetchingDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden.
// 虚函数,实现了一般data layer的设置功能,并调用DataLayerSetUp来完成具体data layer的设置,
// 该方法可以不被重写。
void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);

// Forward_cpu和Forward_gpu
virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);

// Prefetches batches (asynchronously if to GPU memory)
// 提前获取batch的数量
static const int PREFETCH_COUNT = 3;

protected:
// 有关多线程
virtual void InternalThreadEntry();
// 纯虚函数,load batch,参数是Batch类指针
virtual void load_batch(Batch<Dtype>* batch) = 0;

// 成员变量,Batch类数组
Batch<Dtype> prefetch_[PREFETCH_COUNT];
// 有关多线程
BlockingQueue<Batch<Dtype>*> prefetch_free_;
BlockingQueue<Batch<Dtype>*> prefetch_full_;

// 转换过的Blob数据
Blob<Dtype> transformed_data_;
};

} // namespace caffe

#endif // CAFFE_DATA_LAYERS_HPP_

base_data_layer.cpp

#include <boost/thread.hpp>
#include <vector>

#include "caffe/blob.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/layers/base_data_layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/blocking_queue.hpp"

namespace caffe {

// 构造函数初始化,先用LayerParameter& param初始化父类Layer,
// 再用param.transform_param()初始化transform_param_
// 在caffe.proto中可以看到LayerParameter中的成员中有TransformationParameter
template <typename Dtype>
BaseDataLayer<Dtype>::BaseDataLayer(const LayerParameter& param)
: Layer<Dtype>(param),
transform_param_(param.transform_param()) {
}

// 根据层中的bottom和top来设置层
template <typename Dtype>
void BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
// 获取是否有label
if (top.size() == 1) {
output_labels_ = false;
} else {
output_labels_ = true;
}
// 新建DataTransformer类的shared_ptr指针,
// 用来预处理数据
data_transformer_.reset(
new DataTransformer<Dtype>(transform_param_, this->phase_));
data_transformer_->InitRand();
// The subclasses should setup the size of bottom and top
// 子类应该设置bottom和top的size
DataLayerSetUp(bottom, top);
}

// BasePrefetchingDataLayer构造函数,
// 应该是初始化PREFETCH_COUNT个线程
template <typename Dtype>
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
const LayerParameter& param)
: BaseDataLayer<Dtype>(param),
prefetch_free_(), prefetch_full_() {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_free_.push(&prefetch_[i]);
}
}

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
// 先调用父类BaseDataLayer的LayerSetUp
BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
// Before starting the prefetch thread, we make cpu_data and gpu_data
// calls so that the prefetch thread does not accidentally make simultaneous
// cudaMalloc calls when the main thread is running. In some GPUs this
// seems to cause failures if we do not so.
// 在开启prefetch线程之前,调用cpu_data和gpu_data,
// 这样主线程正在运行时,prefetch线程避免同时调用cudaMalloc,
// 这样做避免了某些gpu上出现错误
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_cpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_cpu_data();
}
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_gpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_gpu_data();
}
}
}
#endif
DLOG(INFO) << "Initializing prefetch";
this->data_transformer_->InitRand();
StartInternalThread();
DLOG(INFO) << "Prefetch initialized.";
}

// 如果有空闲线程,让该线程load data
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
#ifndef CPU_ONLY
cudaStream_t stream;
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
}
#endif

try {
while (!must_stop()) {
Batch<Dtype>* batch = prefetch_free_.pop();
load_batch(batch);
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
batch->data_.data().get()->async_gpu_push(stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
}
#endif
prefetch_full_.push(batch);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamDestroy(stream));
}
#endif
}

// 将预处理过的batch,送到top
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
// Reshape to loaded data.
top[0]->ReshapeLike(batch->data_);
// Copy the data
caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
top[0]->mutable_cpu_data());
DLOG(INFO) << "Prefetch copied";
if (this->output_labels_) {
// Reshape to loaded labels.
top[1]->ReshapeLike(batch->label_);
// Copy the labels.
caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
top[1]->mutable_cpu_data());
}

prefetch_free_.push(batch);
}

#ifdef CPU_ONLY
STUB_GPU_FORWARD(BasePrefetchingDataLayer, Forward);
#endif

INSTANTIATE_CLASS(BaseDataLayer);
INSTANTIATE_CLASS(BasePrefetchingDataLayer);

} // namespace caffe

记下来是DataLayer类

data_layer.hpp

#ifndef CAFFE_DATA_LAYER_HPP_
#define CAFFE_DATA_LAYER_HPP_

#include <vector>

#include "caffe/blob.hpp"
#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/layers/base_data_layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/db.hpp"

namespace caffe {

// 继承了BasePrefetchingDataLayer
template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
// 构造函数
explicit DataLayer(const LayerParameter& param);
// 析构函数
virtual ~DataLayer();
// setup函数
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
// DataLayer uses DataReader instead for sharing for parallelism
// DataLayer类不用共享而是用DataReader来实现并行
virtual inline bool ShareInParallel() const { return false; }
// 返回层的类型
virtual inline const char* type() const { return "Data"; }
// 返回bottom blobs的数量为0
virtual inline int ExactNumBottomBlobs() const { return 0; }
// 返回最小top blobs的数量为1
virtual inline int MinTopBlobs() const { return 1; }
// 返回最大top blobs的数量为2
virtual inline int MaxTopBlobs() const { return 2; }

protected:
// load batch
virtual void load_batch(Batch<Dtype>* batch);

// 读数据DataReader类
DataReader reader_;
};

} // namespace caffe

#endif // CAFFE_DATA_LAYER_HPP_

data_layer.cpp

#ifdef USE_OPENCV
#include <opencv2/core/core.hpp>
#endif // USE_OPENCV
#include <stdint.h>

#include <vector>

#include "caffe/data_transformer.hpp"
#include "caffe/layers/data_layer.hpp"
#include "caffe/util/benchmark.hpp"

namespace caffe {

// 用LayerParameter& param初始化DataReader reader_,LayerParameter中有一个optional DataParameter data_param = 107;
// 所以DataReader类需要DataParameter信息来读数据的
// DataReader为读数据的类
template <typename Dtype>
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param),
reader_(param) {
}

template <typename Dtype>
DataLayer<Dtype>::~DataLayer() {
this->StopInternalThread();
}

// 在DataLayer类中实现了DataLayerSetUp来完成特定的设置
// 如前面所述主要完成top blobs的shape size设定
template <typename Dtype>
void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
// 获取batchsize
const int batch_size = this->layer_param_.data_param().batch_size();
// Read a data point, and use it to initialize the top blob.
// 获取读的数据指针,然后用它初始化top blob
// Datum是在caffe.prototxt中定义的,DataReader用LayerParameter初始化后(内含有DataParameter),
// 可以获取要读的数据信息,并返回Datum,后面在根据Datum来reshape
Datum& datum = *(reader_.full().peek());

// Use data_transformer to infer the expected blob shape from datum.
// 从datum中推断出top blob shape
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
// tansformed_data_ reshape成top_shape
this->transformed_data_.Reshape(top_shape);
// Reshape top[0] and prefetch_data according to the batch_size.
// 更新top_shape中的batchsize,之前的到的vector<int> top_shape = this->data_transformer_->InferBlobShape(datum)
// 应该是1,这样得到一个batch的top blob shape,然将top[0]即存数据的blob reshape
top_shape[0] = batch_size;
top[0]->Reshape(top_shape);
// reshape每个线程的prefetch data
for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
this->prefetch_[i].data_.Reshape(top_shape);
}
LOG(INFO) << "output data size: " << top[0]->num() << ","
<< top[0]->channels() << "," << top[0]->height() << ","
<< top[0]->width();
// label
// 如果存在labels,reshape
if (this->output_labels_) {
vector<int> label_shape(1, batch_size);
top[1]->Reshape(label_shape);
for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
this->prefetch_[i].label_.Reshape(label_shape);
}
}
}

// This function is called on prefetch thread
// load_batch由prefetch thread调用
template<typename Dtype>
void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
CPUTimer batch_timer;
batch_timer.Start();
double read_time = 0;
double trans_time = 0;
CPUTimer timer;
CHECK(batch->data_.count());
CHECK(this->transformed_data_.count());

// Reshape according to the first datum of each batch
// on single input batches allows for inputs of varying dimension.
// 不太理解。。。
// 根据每个batch的第一个基准(datum)来reshape
// 不同batch允许不同的输入维数???
const int batch_size = this->layer_param_.data_param().batch_size();
Datum& datum = *(reader_.full().peek());
// Use data_transformer to infer the expected blob shape from datum.
// 用data_transformer从datum中推断blob shape
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
// reshape transformed_data_
this->transformed_data_.Reshape(top_shape);
// Reshape batch according to the batch_size.
// 与上面类似,reshape batch
top_shape[0] = batch_size;
batch->data_.Reshape(top_shape);

// 得到batch中blobs的mutable指针top_data和top_label
Dtype* top_data = batch->data_.mutable_cpu_data();
Dtype* top_label = NULL; // suppress warnings about uninitialized variables

if (this->output_labels_) {
top_label = batch->label_.mutable_cpu_data();
}
// 下面load并处理一个batch数据
for (int item_id = 0; item_id < batch_size; ++item_id) {
timer.Start();
// get a datum
Datum& datum = *(reader_.full().pop("Waiting for data"));
read_time += timer.MicroSeconds();
timer.Start();
// Apply data transformations (mirror, scale, crop...)
int offset = batch->data_.offset(item_id);
this->transformed_data_.set_cpu_data(top_data + offset);
this->data_transformer_->Transform(datum, &(this->transformed_data_));
// Copy label.
if (this->output_labels_) {
top_label[item_id] = datum.label();
}
trans_time += timer.MicroSeconds();

reader_.free().push(const_cast<Datum*>(&datum));
}
timer.Stop();
batch_timer.Stop();
DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms.";
DLOG(INFO) << " Read time: " << read_time / 1000 << " ms.";
DLOG(INFO) << "Transform time: " << trans_time / 1000 << " ms.";
}

INSTANTIATE_CLASS(DataLayer);
REGISTER_LAYER_CLASS(Data);

} // namespace caffe

2.总结

到这里,已经基本了解caffe是怎样通过DataLayer来获取数据的了,接下来通过有关数据获取类的继承关系图简答总结一下。

caffe源码学习(五) data layer

目前,我们已经读完了图中最左边分支上的四类,其中,Layer是所有层的基类;BaseDataLayer继承了Layer的特性,又定义了有关获取数据的新的特性,是获取各种具体类型数据的基类;BasePrefetchingDataLayer是被多线程调用,并行的从磁盘中读数据,它又被具体类型的数据层所继承;DataLayer类获取的时lmdb或leveldb类型的数据,从caffe.proto定义中可以得知。那么剩下的类:MemoryDataLayer是从内存中获取数据,ImageDataLayer是从图像文件中获取数据,WindowDataLayer是在图像文件上滑窗获取数据。这样,后面需要时在去看剩下有关数据获取的几个类。

个人理解,如有错误,请指正。