这里我们讨论学习Caffe的I/O模块,即与数据打交道的模块。
我们在运行Caffe例程前,首先需要将原始数据转换为LMDB格式,训练网络时则需要由数据读取层(DataLayer)不断地从LMDB读取数据,送入后续卷积、下采样 等计算层。作为基础,Caffe I/O模块的效率直接影响到处理效果。
数据读取层
Caffe数据读取层(DataLayer)是Layer的派生类。除了读取LMDB、LEVELDB之外,也可以从原始图像直接读取(ImageDataLayer)。
数据结构描述
我们在caffe.proto
中可以找到,关于数据结构的描述
message DataParameter {
//输入数据使用的DB类型
enum DB {
LEVELDB = 0; //使用 LEVELDB
LMDB = 1; //使用 LMDB
}
// Specify the data source.(源数据的路径)
optional string source = 1;
// Specify the batch size.( 一个批量数据包含的图片数)
optional uint32 batch_size = 4;
// The rand_skip variable is for the data layer to skip a few data points
// to avoid all asynchronous sgd clients to start at the same point. The skip
// point would be set as rand_skip * rand(0,1). Note that rand_skip should not
// be larger than the number of keys in the database.
// DEPRECATED. Each solver accesses a different subset of the database.
//随机跳过若干图片,跳跃数为rand_skip * rand(0, 1)
optional uint32 rand_skip = 7 [default = 0];
//默认输入数据使用DB类型,默认为LEVELDB
optional DB backend = 8 [default = LEVELDB];
// DEPRECATED. See TransformationParameter. For data pre-processing, we can do
// simple scaling and subtracting the data mean, if provided. Note that the
// mean subtraction is always carried out before scaling.
//scale、mean_file、crop_size、mirror 均为旧版参数,现已转移到 TransformationParameter
optional float scale = 2 [default = 1];
optional string mean_file = 3;
// DEPRECATED. See TransformationParameter. Specify if we would like to randomly
// crop an image.
optional uint32 crop_size = 5 [default = 0];
// DEPRECATED. See TransformationParameter. Specify if we want to randomly mirror
// data.
optional bool mirror = 6 [default = false];
//强制编码图像为三通道彩色图像
optional bool force_encoded_color = 9 [default = false];
// Prefetch queue (Increase if data feeding bandwidth varies, within the
// limit of device memory for GPU training)
//预取队列(预先放到主机内存中的批量数.默认为4个Batch)
optional uint32 prefetch = 10 [default = 4];
}
数据读取层的实现
数据读取层声明位于include/caffe/layers/base_data_layers.hpp
中,如果需要单独使用该层,则应包含这个头文件.
namespace caffe {
/**
* @brief Provides base for data layers that feed blobs to the Net.
*
* TODO(dox): thorough documentation for Forward and proto params.
*/
template <typename Dtype>
class BaseDataLayer : public Layer<Dtype> {
public:
//显式构造函数
explicit BaseDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden except by the BasePrefetchingDataLayer.
//层配置,实现通用层配置功能,之后调用DataLayerSetUp进行数据读取层的特别配置
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
// Data layers have no bottoms, so reshaping is trivial.(数据读取没有Bottom Blob,变形操作很简单 )
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
//反向传播函数不需要做任何操作
virtual void Backward_cpu(const vector<Blob<Dtype>*>& top,
const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {}
virtual void Backward_gpu(const vector<Blob<Dtype>*>& top,
const vector<bool>& propagate_down, const vector<Blob<Dtype>*>& bottom) {}
protected:
//数据预处理变换器参数
TransformationParameter transform_param_;
//数据预处理变换器
shared_ptr<DataTransformer<Dtype> > data_transformer_;
//是否输出标签数据
bool output_labels_;
};
//批量数据,用于存放数据读取层输出
template <typename Dtype>
class Batch {
public:
//包含两个Blob: data_用于存放图片数据,label_用于存放标签
Blob<Dtype> data_, label_;
};
//带预取功能的数据读取派生于BaseDataLayer和InternalThread
template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
public:
//显式构造函数
explicit BasePrefetchingDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden.
//层设置函数
void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
//前向传播
virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
protected:
virtual void InternalThreadEntry(); //内部线程入口
virtual void load_batch(Batch<Dtype>* batch) = 0; //载入批量数据,纯虚函数
vector<shared_ptr<Batch<Dtype> > > prefetch_; //预取Buffer
BlockingQueue<Batch<Dtype>*> prefetch_free_; //空闲Batch队列
BlockingQueue<Batch<Dtype>*> prefetch_full_; //已加载Batch队列
Batch<Dtype>* prefetch_current_; //当前Batch(猜的)
Blob<Dtype> transformed_data_; //变换后的数据
};
数据读取层的实现位于src/caffe/layers/base_data_layer.cpp
中,内容如下:
#include <boost/thread.hpp>
#include <vector>
#include "caffe/blob.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/layers/base_data_layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/blocking_queue.hpp"
namespace caffe {
//构造函数,初始化Layer参数、数据变换器参数
template <typename Dtype>
BaseDataLayer<Dtype>::BaseDataLayer(const LayerParameter& param)
: Layer<Dtype>(param),
transform_param_(param.transform_param()) {
}
//BaseDataLayer层设置
template <typename Dtype>
void BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
if (top.size() == 1) { //判断输出Blob个数,若为1只输出data,若为2则输出data和label
output_labels_ = false;
} else {
output_labels_ = true;
}
//初始化数据变化器对象
data_transformer_.reset(
new DataTransformer<Dtype>(transform_param_, this->phase_));
data_transformer_->InitRand(); //生成随机数种子
// The subclasses should setup the size of bottom and top
//子类负责设置Top Blob形状
DataLayerSetUp(bottom, top);
}
//BasePrefetchingDataLayer 构造函数
template <typename Dtype>
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
const LayerParameter& param)
: BaseDataLayer<Dtype>(param),
prefetch_(param.data_param().prefetch()),
prefetch_free_(), prefetch_full_(), prefetch_current_() {
for (int i = 0; i < prefetch_.size(); ++i) {
prefetch_[i].reset(new Batch<Dtype>());
prefetch_free_.push(prefetch_[i].get()); //将Batch对象都放入空闲队列中
}
}
//BasePrefetchingDataLayer层配置函数
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
// Before starting the prefetch thread, we make cpu_data and gpu_data
// calls so that the prefetch thread does not accidentally make simultaneous
// cudaMalloc calls when the main thread is running. In some GPUs this
// seems to cause failures if we do not so.
//在幵启数据预取线程前,通过调用Blob相应函数先进行cudaMalloc,避免在多线程情况下同时进行cudaMalloc.会导致CUDA API调用失败
for (int i = 0; i < prefetch_.size(); ++i) {
prefetch_[i]->data_.mutable_cpu_data();
if (this->output_labels_) {
prefetch_[i]->label_.mutable_cpu_data();
}
}
//如果编译选项没有CPU_ONLY,则需要编译GPU代码
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
for (int i = 0; i < prefetch_.size(); ++i) {
prefetch_[i]->data_.mutable_gpu_data();
if (this->output_labels_) {
prefetch_[i]->label_.mutable_gpu_data(); //功能同上
}
}
}
#endif
DLOG(INFO) << "Initializing prefetch";
this->data_transformer_->InitRand();
StartInternalThread(); //开启内部预取线程
DLOG(INFO) << "Prefetch initialized.";
}
//内部线程入口
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
//创建CUDA Stream,非阻塞类型
#ifndef CPU_ONLY
cudaStream_t stream;
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
}
#endif
try {
while (!must_stop()) { //循环载入批量数据
Batch<Dtype>* batch = prefetch_free_.pop(); //拿到一个空闲Batch
load_batch(batch); //载入批量数据
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
batch->data_.data().get()->async_gpu_push(stream);
if (this->output_labels_) {
batch->label_.data().get()->async_gpu_push(stream);
}
CUDA_CHECK(cudaStreamSynchronize(stream));//同步到GPU
}
#endif
prefetch_full_.push(batch); //加入到带负载的Batch队列中
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown(捕获到异常,退出while循环)
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamDestroy(stream)); //销毁CUDA Stream
}
#endif
}
//前向传波函数
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
//从带负载的Batch队列中取出一个Batch对象
if (prefetch_current_) {
prefetch_free_.push(prefetch_current_);
}
prefetch_current_ = prefetch_full_.pop("Waiting for data");
// Reshape to loaded data.(Top Blob根据Batch形状进行变形)
top[0]->ReshapeLike(prefetch_current_->data_);
//将数据放到Top Blob中
top[0]->set_cpu_data(prefetch_current_->data_.mutable_cpu_data());
if (this->output_labels_) {
// Reshape to loaded labels.(同上)
top[1]->ReshapeLike(prefetch_current_->label_);
top[1]->set_cpu_data(prefetch_current_->label_.mutable_cpu_data());
}
}
#ifdef CPU_ONLY
STUB_GPU_FORWARD(BasePrefetchingDataLayer, Forward);
#endif
INSTANTIATE_CLASS(BaseDataLayer);
INSTANTIATE_CLASS(BasePrefetchingDataLayer);
} // namespace caffe