Caffe I/O模块

2017/6/24 posted in  Caffe I/O模块

这里我们讨论学习Caffe的I/O模块,即与数据打交道的模块。

我们在运行Caffe例程前,首先需要将原始数据转换为LMDB格式,训练网络时则需要由数据读取层(DataLayer)不断地从LMDB读取数据,送入后续卷积、下采样 等计算层。作为基础,Caffe I/O模块的效率直接影响到处理效果。

数据读取层

Caffe数据读取层(DataLayer)是Layer的派生类。除了读取LMDB、LEVELDB之外,也可以从原始图像直接读取(ImageDataLayer)。

数据结构描述

我们在caffe.proto中可以找到,关于数据结构的描述

message DataParameter {
//输入数据使用的DB类型
  enum DB {
    LEVELDB = 0;    //使用 LEVELDB
    LMDB = 1;       //使用 LMDB
  }
  // Specify the data source.(源数据的路径)
  optional string source = 1;
  // Specify the batch size.( 一个批量数据包含的图片数)
  optional uint32 batch_size = 4;
  // The rand_skip variable is for the data layer to skip a few data points
  // to avoid all asynchronous sgd clients to start at the same point. The skip
  // point would be set as rand_skip * rand(0,1). Note that rand_skip should not
  // be larger than the number of keys in the database.
  // DEPRECATED. Each solver accesses a different subset of the database.
  //随机跳过若干图片,跳跃数为rand_skip * rand(0, 1)
  optional uint32 rand_skip = 7 [default = 0];
  //默认输入数据使用DB类型,默认为LEVELDB
  optional DB backend = 8 [default = LEVELDB];
  // DEPRECATED. See TransformationParameter. For data pre-processing, we can do
  // simple scaling and subtracting the data mean, if provided. Note that the
  // mean subtraction is always carried out before scaling.
  //scale、mean_file、crop_size、mirror 均为旧版参数,现已转移到 TransformationParameter
  optional float scale = 2 [default = 1];
  optional string mean_file = 3;
  // DEPRECATED. See TransformationParameter. Specify if we would like to randomly
  // crop an image.
  optional uint32 crop_size = 5 [default = 0];
  // DEPRECATED. See TransformationParameter. Specify if we want to randomly mirror
  // data.
  optional bool mirror = 6 [default = false];
  //强制编码图像为三通道彩色图像
  optional bool force_encoded_color = 9 [default = false];
  // Prefetch queue (Increase if data feeding bandwidth varies, within the
  // limit of device memory for GPU training)
  //预取队列(预先放到主机内存中的批量数.默认为4个Batch)
  optional uint32 prefetch = 10 [default = 4];
}

数据读取层的实现

数据读取层声明位于include/caffe/layers/base_data_layers.hpp中,如果需要单独使用该层,则应包含这个头文件.

namespace caffe {

/**
 * @brief Provides base for data layers that feed blobs to the Net.
 *
 * TODO(dox): thorough documentation for Forward and proto params.
 */
template <typename Dtype>
class BaseDataLayer : public Layer<Dtype> {
 public:
 //显式构造函数
  explicit BaseDataLayer(const LayerParameter& param);
  // LayerSetUp: implements common data layer setup functionality, and calls
  // DataLayerSetUp to do special data layer setup for individual layer types.
  // This method may not be overridden except by the BasePrefetchingDataLayer.
  //层配置,实现通用层配置功能,之后调用DataLayerSetUp进行数据读取层的特别配置 
  virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);
  virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {}
  // Data layers have no bottoms, so reshaping is trivial.(数据读取没有Bottom Blob,变形操作很简单 )
  virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {}
//反向传播函数不需要做任何操作
  virtual void Backward_cpu(const vector<Blob<Dtype>*>& top,
      const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {}
  virtual void Backward_gpu(const vector<Blob<Dtype>*>& top,
      const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {}

 protected:
 //数据预处理变换器参数
  TransformationParameter transform_param_;
  //数据预处理变换器
  shared_ptr<DataTransformer<Dtype> > data_transformer_;
  //是否输出标签数据
  bool output_labels_;
};
//批量数据,用于存放数据读取层输出
template <typename Dtype>
class Batch {
 public:
 //包含两个Blob: data_用于存放图片数据,label_用于存放标签
  Blob<Dtype> data_, label_;
};

//带预取功能的数据读取派生于BaseDataLayer和InternalThread
template <typename Dtype>
class BasePrefetchingDataLayer :
    public BaseDataLayer<Dtype>, public InternalThread {
 public:
 //显式构造函数
  explicit BasePrefetchingDataLayer(const LayerParameter& param);
  // LayerSetUp: implements common data layer setup functionality, and calls
  // DataLayerSetUp to do special data layer setup for individual layer types.
  // This method may not be overridden.
  //层设置函数
  void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);
//前向传播
  virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);
  virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top);

 protected:
  virtual void InternalThreadEntry();   //内部线程入口
  virtual void load_batch(Batch<Dtype>* batch) = 0; //载入批量数据,纯虚函数

  vector<shared_ptr<Batch<Dtype> > > prefetch_; //预取Buffer
  BlockingQueue<Batch<Dtype>*> prefetch_free_;  //空闲Batch队列
  BlockingQueue<Batch<Dtype>*> prefetch_full_;  //已加载Batch队列
  Batch<Dtype>* prefetch_current_;  //当前Batch(猜的)

  Blob<Dtype> transformed_data_;    //变换后的数据
};


数据读取层的实现位于src/caffe/layers/base_data_layer.cpp中,内容如下:

#include <boost/thread.hpp>
#include <vector>

#include "caffe/blob.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/layers/base_data_layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/blocking_queue.hpp"

namespace caffe {
//构造函数,初始化Layer参数、数据变换器参数
template <typename Dtype>
BaseDataLayer<Dtype>::BaseDataLayer(const LayerParameter& param)
    : Layer<Dtype>(param),
      transform_param_(param.transform_param()) {
}
//BaseDataLayer层设置 
template <typename Dtype>
void BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {
  if (top.size() == 1) {    //判断输出Blob个数,若为1只输出data,若为2则输出data和label
    output_labels_ = false;
  } else {
    output_labels_ = true;
  }
  //初始化数据变化器对象
  data_transformer_.reset(
      new DataTransformer<Dtype>(transform_param_, this->phase_));
  data_transformer_->InitRand();    //生成随机数种子
  // The subclasses should setup the size of bottom and top
  //子类负责设置Top Blob形状
  DataLayerSetUp(bottom, top);
}
//BasePrefetchingDataLayer 构造函数
template <typename Dtype>
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
    const LayerParameter& param)
    : BaseDataLayer<Dtype>(param),
      prefetch_(param.data_param().prefetch()),
      prefetch_free_(), prefetch_full_(), prefetch_current_() {
  for (int i = 0; i < prefetch_.size(); ++i) {
    prefetch_[i].reset(new Batch<Dtype>());
    prefetch_free_.push(prefetch_[i].get());    //将Batch对象都放入空闲队列中
  }
}
//BasePrefetchingDataLayer层配置函数
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
    const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
  BaseDataLayer<Dtype>::LayerSetUp(bottom, top);

  // Before starting the prefetch thread, we make cpu_data and gpu_data
  // calls so that the prefetch thread does not accidentally make simultaneous
  // cudaMalloc calls when the main thread is running. In some GPUs this
  // seems to cause failures if we do not so.
  //在幵启数据预取线程前,通过调用Blob相应函数先进行cudaMalloc,避免在多线程情况下同时进行cudaMalloc.会导致CUDA API调用失败
  for (int i = 0; i < prefetch_.size(); ++i) {
    prefetch_[i]->data_.mutable_cpu_data();
    if (this->output_labels_) {
      prefetch_[i]->label_.mutable_cpu_data();
    }
  }
  //如果编译选项没有CPU_ONLY,则需要编译GPU代码
#ifndef CPU_ONLY    
  if (Caffe::mode() == Caffe::GPU) {
    for (int i = 0; i < prefetch_.size(); ++i) {
      prefetch_[i]->data_.mutable_gpu_data();
      if (this->output_labels_) {
        prefetch_[i]->label_.mutable_gpu_data();    //功能同上
      }
    }
  }
#endif
  DLOG(INFO) << "Initializing prefetch";
  this->data_transformer_->InitRand();
  StartInternalThread();    //开启内部预取线程
  DLOG(INFO) << "Prefetch initialized.";
}
//内部线程入口
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
//创建CUDA Stream,非阻塞类型
#ifndef CPU_ONLY
  cudaStream_t stream;
  if (Caffe::mode() == Caffe::GPU) {
    CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
  }
#endif

  try {
    while (!must_stop()) {  //循环载入批量数据
      Batch<Dtype>* batch = prefetch_free_.pop();   //拿到一个空闲Batch
      load_batch(batch);    //载入批量数据
#ifndef CPU_ONLY
      if (Caffe::mode() == Caffe::GPU) {
        batch->data_.data().get()->async_gpu_push(stream);
        if (this->output_labels_) {
          batch->label_.data().get()->async_gpu_push(stream);
        }
        CUDA_CHECK(cudaStreamSynchronize(stream));//同步到GPU
      }
#endif
      prefetch_full_.push(batch);   //加入到带负载的Batch队列中
    }
  } catch (boost::thread_interrupted&) {
    // Interrupted exception is expected on shutdown(捕获到异常,退出while循环)
  }
#ifndef CPU_ONLY
  if (Caffe::mode() == Caffe::GPU) {
    CUDA_CHECK(cudaStreamDestroy(stream));  //销毁CUDA Stream
  }
#endif
}
//前向传波函数
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
    const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
    //从带负载的Batch队列中取出一个Batch对象
  if (prefetch_current_) {
    prefetch_free_.push(prefetch_current_);
  }
  prefetch_current_ = prefetch_full_.pop("Waiting for data");
  // Reshape to loaded data.(Top Blob根据Batch形状进行变形)
  top[0]->ReshapeLike(prefetch_current_->data_);
  //将数据放到Top Blob中
  top[0]->set_cpu_data(prefetch_current_->data_.mutable_cpu_data());
  if (this->output_labels_) {
    // Reshape to loaded labels.(同上)
    top[1]->ReshapeLike(prefetch_current_->label_);
    top[1]->set_cpu_data(prefetch_current_->label_.mutable_cpu_data());
  }
}

#ifdef CPU_ONLY
STUB_GPU_FORWARD(BasePrefetchingDataLayer, Forward);
#endif

INSTANTIATE_CLASS(BaseDataLayer);
INSTANTIATE_CLASS(BasePrefetchingDataLayer);

}  // namespace caffe