tensorflow的数据读取
tensorflow在读取像imagenet这种大量图像数据,不能一次性load进内存时有几个坑,Mark一记,以助后来者。关于多GPU和分布式,本文只讨论数据并行方式,即每个GPU上面运行一个网络,称为tower。
* 数据格式TFRecord
* 数据读取第一层封装——Queue,Coordinator,QueueRunner
* 常用封装——tf.train.xxx_input_producer,tf.reader,tf.train.batch
1. 创建TFRecord
TFRecord本身是一个集合,每条记录是一个tf.train.Example(一种protocol buffer),包含有多个字段(称为feature),TFRecord提供的格式一般包含几个重要的feature,比如代表大小的image/height、image/width,代表类别的image/class/label、image/class/text,代表数据的image/encoded等等。
创建TFRecord文件就是按照feature名把相应字段填充,接口为tf.python_io.TFRecordWriter和tf.train.Example,典型代码如下:
writer = tf.python_io.TFRecordWriter(output_file)
example = tf.train.Example(features=tf.train.Features(feature={
'image/height': _int64_feature(height),
'image/width': _int64_feature(width),
'image/colorspace': _bytes_feature(colorspace),
'image/channels': _int64_feature(channels),
'image/class/label': _int64_feature(label),
'image/class/text': _bytes_feature(text),
'image/format': _bytes_feature(image_format),
'image/filename': _bytes_feature(os.path.basename(filename)),
'image/encoded': _bytes_feature(image_buffer)}))
writer.write(example.SerializeToString())
读取则是读出相应字段,接口为 tf.TFRecordReader和tf.parse_single_example。他们本身都是op,需要在session中执行,无法过程式执行。对于jpg、png等压缩图片格式,读出image/encoded相应数据后需要用tf.image.decode_jpeg,tf.image.decode_png等op来进行解码,解析TFRecord的典型代码如下(tf.TFRecordReader的代码见下节):
feature_map = {
'image/encoded': tf.FixedLenFeature([], dtype=tf.string,
default_value=''),
'image/class/label': tf.FixedLenFeature([1], dtype=tf.int64,
default_value=-1),
'image/class/text': tf.FixedLenFeature([], dtype=tf.string,
default_value='')}
features = tf.parse_single_example(example_serialized, feature_map)
label = tf.cast(features['image/class/label'], dtype=tf.int32)
image = tf.image.decode_jpeg(features['image/encoded'])
需要特别注意的是,TF中一般一个TFRecord文件会写入多个example,这样可以把原始文件个数减少很多,从而减轻filename Queue(见下节)的负担。读取的方法见下节。
代码导读
tensorflow的model zoo中的inception模型示例了imagenet的读取方法。
建立TFRecord文件集只需要略微修改build_image_data或者build_imagenet_data 就好了,他们的主体部分实现了多线程读取图像、转换和输出,前提是图像以dir/label/xxx.jpg 的命名规则存放。关键代码就是每幅图像创建tf.train.Example以及多次调用tf.python_io.TFRecordWriter的write方法写入磁盘。
读取的代码在image_processing中,imagenet_train和imagnet_distributed_train都是调用了这个文件的
2. 单tower读取流程
如前所述,典型情况下,在目录dir下有若干TFRecord文件,每个文件有若干记录,每个记录是一幅图像,可以是编码后的也可以是raw data。
流程的起点,tf读取dir下的所有TFRecord文件名并放在内存,整个系统有三个队列缓冲:FilnameQueue,ExampleQueue,BatchQueue。
那么在训练和测试模型的时候,数据读取的流程就是:
* 把文件名shuffle并放入FilenameQueue
* 多线程,每个线程选择一个文件(从FilenameQueue中dequeue),并从该文件中读取一条记录放到ExampleQueue
* 选择一条记录(从ExampleQueue中dequeue),把若干记录合并成一个batch放到BatchQueue中
* 从BatchQueue中取数据给模型训练或测试
前两个操作如下图所示,注意到在用多个reader的时候一般是多线程的,每个线程有一个reader读取图像,还有一个enqueue的op把处理后的图像放进example enqueue
tf.train.XXXbatchXXX()封装了从一个op–examplequeue.dequeue到读出batchsize个example的过程。在这个调用中,tf创建一个新的BatchQueue,并创建多个线程用来读取ExampleQueue并填到BatchQueue中,典型代码如下:
filename_queue = tf.train.string_input_producer(data_files,shuffle=True,capacity=16)
examples_queue = tf.RandomShuffleQueue(...)
for _ in range(num_readers):
reader = tf.TFRecordReader()
_, value = reader.read(filename_queue)
enqueue_ops.append(examples_queue.enqueue([value]))
runner = tf.train.queue_runner.QueueRunner(examples_queue, enqueue_ops)
tf.train.queue_runner.add_queue_runner(runner)
example_serialized = examples_queue.dequeue()
for thread_id in range(num_preprocess_threads):
image_buffer, label_index, ... = parse_example_proto(example_serialized)
image = image_preprocessing(image_buffer, ...)
images_and_labels.append([image, label_index])
images, label_index_batch = tf.train.batch_join(images_and_labels,...)
此处有诸多值得推敲的细节:
* reader.read(filename_queue)方法在读取文件的时候每次读出一个记录,并在一个文件的所有记录都被读出过之后,filename_queue才会dequeue
* 创建num_readers个reader和examples_queue.enqueue(),并放进QueueRunner中之后,一旦执行tf.train.start_queue_runners(),QueueRunner将会创建num_readers个线程独立执行read和enqueue
* images_and_labels是一个长度为num_preprocess_threads的list,而tf.train.batch_join会在执行时,为每个元素创建一个线程来获取其数据,他们会共用examples_queue.dequeue() ,而独立执行解析和image_preprocessing。
* 把example_serialized = examples_queue.dequeue()放在下面的循环中不会更好,因为这样每个线程虽然可以独立dequeue,但是ExampleQueue本身会在dequeue的时候加锁以保证数据一致性,所以效率并不能提升。
* images并不是真正的数据,只是定义了计算流图,而要想拿到真正的数据需要在session中调用eval方法:
img = tf.placeholder(...)
loss = xxmodel.loss(img,...)
with tf.Session():
tf.initialize_all_variables().run()
tf.train.start_queue_runners()
imagedata = images.eval()
print loss.eval(feed_dict={img:imagedata,...})
或者直接输入到模型中训练或测试:
loss = xxmodel.loss(images)
with tf.Session():
tf.initialize_all_variables().run()
tf.train.start_queue_runners()
print loss.eval()
以上代码是整个数据读取pipeline的核心,计算流图的时候回溯执行,有以下几个步骤:
* tf.train.start_queue_runners()被执行之后,每个线程都要监测自己enqueue的queue是不是满的,不满的话就会执行enqueue
* 计算loss的时候需要往前回溯整个网络,一直到网络的输入,就是从BatchQueue获取的那个Tensor,获取它意味着BatchQueue执行dequeue,从而使得BatchQueue不满
* 负责从ExampleQueue获取、加工数据的线程会把EampleQueue中的数据读出,填到BatchQueue,使得ExampleQueue不满
* 负责填充ExampleQueue的线程会调用TFRecordReader.read()方法来获得硬盘文件的数据。如果该文件中的记录已经全都被读出过了,则使FilenameQueue出队,不满,否则不变
* 如果FilenameQueue不满的情况发生,则自动填充文件名。
注意就读取任务而言,单机上单GPU(单tower)、多GPU(tower)是有区别的,而是否运行于多机集群环境,事实上并不重要,关键还是每台机器上面是多GPU还是单GPU。因为集群中ps并不读取图像数据,而每个worker独立读取各自的文件,并不相互影响。多tower的读取留待下篇,与tensorflow的分布式并行一起讨论。
3. 低层封装
以上介绍了高层API接口,本节其底层原理及接口,下节在此基础上重新看待上述高层接口。本节介绍的接口主要用于理解,以笔者的经验,实际使用中高层接口已经足够了。本节主要解释:
- tf.train.queue_runner.QueueRunner的多线程原理
- tf.train.batch_join的多线程原理
- tf.TFRecordReader读取细节
QueueRunner
典型的多线程FIFO缓冲,
Coordinator的核心实体是一个python标准库里的threading Event Object,然后封装了一些操作方法来同步各个线程一起stop。所以要在创建线程的时候把Coordinator传进去用,这样的用法需要显式开辟和管理线程,会比较麻烦,而这些线程主要用来进行enqueue,所以tf提供了QueueRunner来管理这些用于enqueue的线程。
QueueRunner在初始化时需要提供queue和一个包含enqueue操作的list,然后调用create_thread,提供Coordinator来创建和同步线程。也可以不显式调用create_thread,可以使用tf.train.start_queue_runner()一次性启动所有runner,该函数内部会自己维护一个coordinatior,从而简化了编程。
QueueRunner的线程就是在loop执行enqueue,而enqueue时如果queue已经满了就直接返回,而dequeue时候如果queue空了则抛出异常OutOfRange,见FIFOQueue的源代码。
注意,QueueRunner的enqueue一般需要dequeue前一个queue,所以QueueRunner的enqueue依赖于前一个queue的dequeue操作,即QueueRunner的每个线程不仅对本queue执行了enqueue一般也会对前一个queue执行dequeue
整个过程就是,初始化的时候在主线程中建立各个queue,然后通过Runner和Coordinator建立相应的enqueue子线程。启动session后,子线程start,填满queue之后的loop实际上无操作,然后主线程执行train或者inference,反向遍历网络中的op,到达数据读取那个op,即对example batch queue的dequeue操作,就会产生连锁反应使得各个queue中的entry出队,一直在loop的子线程就会马上读取entry并enqueue进去。
batch_join()
tf.train中的batch族函数有四个,调用他们时都会创建一个Queue和QueueRunner,就像上节所描述的那样,只是做了封装。
tf.train.batch_join()会把tensors_list中的每个元素都开一个线程来单独获取数据,并把这些得到的数据按照batch_size大小组成新的张量放进Queue,然后再调用Queue的dequeue来返回。
reader.read()
一个重要的问题是,shuffle读取batch的时候如何在一个epoch内不重复?一个epoch之后queue是不是就空了?
第二个问题很好回答,因为调用tf.train.string_input_producer的时候会创建相应的QueueRunner来持续填充FilenameQueue。在一开始,producer会存储下文件名集合,它内部有epoch的概念:epoch的大小也就是集合元素数目,每次enqueue都是顺序从集合中取值,并且会计数,够一个epoch之后会把集合shffule,这样就既保证了一个epoch内的随机性,又保证了一个epoch之内所有样本全部遍历一遍且不重复。
代码导读
在Coordinator的源码中可以看出,其主要接口join、request_stop、should_stop都是对自身成员变量_stop_event的操作。
tf的queue系统是用C++实现的