云端TensorFlow读取数据IO的高效方式

时间:2024-05-17 09:08:02

低效的IO方式

最近通过观察PAI平台上TensoFlow用户的运行情况,发现大家在数据IO这方面还是有比较大的困惑,主要是因为很多同学没有很好的理解本地执行TensorFlow代码和分布式云端执行TensorFlow的区别。本地读取数据是server端直接从client端获得graph进行计算,而云端服务server在获得graph之后还需要将计算下发到各个worker处理(具体原理可以参考视频教程-Tensorflow高级篇:https://tianchi.aliyun.com/competition/new_articleDetail.html)。 
云端TensorFlow读取数据IO的高效方式

本文通过读取一个简单的CSV文件为例,帮助大家快速了解如何使用TensorFlow高效的读取数据。CSV文件如下:

1,1,1,1,1
2,2,2,2,2
3,3,3,3,3

首先我们来看下大家容易产生问题的几个地方。

1.不建议用python本地读取文件的方式

PAI支持Python的自带IO方式,但是需要将数据源和代码打包上传的方式使用,这种读取方式是将数据写入内存之后再计算,效率比较低,不建议使用。范例代码如下:

import csv
csv_reader=csv.reader(open('csvtest.csv'))
for row in csv_reader:
print(row)

2.尽量不要用第三方库的读取文件方法

很多同学使用第三方库的一些数据IO的方式进行数据读取,比如TFLearn、Panda的数据IO方式,这些方法很多都是通过封装python的读取方式实现的,所以在PAI平台使用的时候也会造成效率低下问题。

3.尽量不要用preload的方式读取文件

很多人在用PAI的服务的时候表示GPU并没有比本地的CPU速度快的明显,主要问题可能就出在数据IO这块。preload的方式是先把数据全部都读到内存中,然后再通过session计算,比如feed的读取方式。这样要先进行数据读取,再计算,不同步造成性能浪费,同时因为内存限制也无法支持大数据量的计算。举个例子:假设我们的硬盘中有一个图片数据集0001.jpg,0002.jpg,0003.jpg……我们只需要把它们读取到内存中,然后提供给GPU或是CPU进行计算就可以了。这听起来很容易,但事实远没有那么简单。事实上,我们必须要把数据先读入后才能进行计算,假设读入用时0.1s,计算用时0.9s,那么就意味着每过1s,GPU都会有0.1s无事可做,这就大大降低了运算的效率。 
云端TensorFlow读取数据IO的高效方式

下面我们看下高效的读取方式。

高效的IO方式

高效的TensorFlow读取方式是将数据读取转换成OP,通过session run的方式拉去数据。另外,读取线程源源不断地将文件系统中的图片读入到一个内存的队列中,而负责计算的是另一个线程,计算需要数据时,直接从内存队列中取就可以了。这样就可以解决GPU因为IO而空闲的问题!

云端TensorFlow读取数据IO的高效方式

下面我们看下代码,如何在PAI平台通过OP的方式读取数据:

import argparse
import tensorflow as tf
import os
FLAGS=None
def main(_):
dirname = os.path.join(FLAGS.buckets, "csvtest.csv")
reader=tf.TextLineReader()
filename_queue=tf.train.string_input_producer([dirname])
key,value=reader.read(filename_queue)
record_defaults=[[''],[''],[''],[''],['']]
d1, d2, d3, d4, d5= tf.decode_csv(value, record_defaults, ',') init=tf.initialize_all_variables() with tf.Session() as sess:
sess.run(init)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess,coord=coord)
for i in range(4):
print(sess.run(d2))
coord.request_stop()
coord.join(threads) if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--buckets', type=str, default='',
help='input data path')
parser.add_argument('--checkpointDir', type=str, default='',
help='output model path')
FLAGS, _ = parser.parse_known_args()
tf.app.run(main=main)
  • dirname:OSS文件路径,可以是数组,方便下一阶段shuffle
  • reader:TF内置各种reader API,可以根据需求选用
  • tf.train.string_input_producer:将文件生成队列
  • tf.decode_csv:是一个splite功能的OP,可以拿到每一行的特定参数
  • 通过OP获取数据,在session中需要tf.train.Coordinator()和tf.train.start_queue_runners(sess=sess,coord=coord)

在代码中,我们的输入是3行5个字段:

1,1,1,1,1
2,2,2,2,2
3,3,3,3,3

我们循环输出4次,打印出第2个字段。结果如图:

云端TensorFlow读取数据IO的高效方式

输出结果也证明了数据结构是成队列。

其它