一、说明:在使用tensorflow的过程中,出现过程序不报错又不接下去执行的错误,后来分析了原因是tf的数据线程没有启动,导致数据流图没办法计算,整个程序就卡在哪里。
更深层次的原因是tensorflow的计算和数据读入是异步的,合理的方式是主线程进行模型的训练,然后开一个数据读入线程异步读入数据.tensorflow会在内存中维护一个队列,然后数据线程异步从磁盘中将样本推入队列当中。并且,因为tensorflow的训练和读数据是异步的,故即使当前没有数据进来,tensorflow也没办法报错,因为可能接下来会有数据进队列,所以,tensorflow就一直处于等待的状态
说明:我是在修改Tensorflow的源码ptb_word_lm.py的时候遇到上述的问题的。下面就该源码来解释说明这个问题:
tensorflow的reader.py文件:
"""Utilities for parsing PTB text files.""" #-*- coding:utf-8 -*- from __future__ import absolute_import from __future__ import division from __future__ import print_function import collections import os import tensorflow as tf #将文件中所有的word收集起来 def _read_words(filename): with tf.gfile.GFile(filename, "r") as f: return f.read().decode("utf-8").replace("\n", "<eos>").split() #将收集到的word映射到id def _build_vocab(filename): data = _read_words(filename) counter = collections.Counter(data) count_pairs = sorted(counter.items(), key=lambda x: (-x[1], x[0])) words, _ = list(zip(*count_pairs)) word_to_id = dict(zip(words, range(len(words)))) return word_to_id #使用训练集的word建立word的映射表 # def _file_to_word_ids(filename, word_to_id): data = _read_words(filename) return [word_to_id[word] for word in data if word in word_to_id] def ptb_raw_data(data_path=None): """Load PTB raw data from data directory "data_path". Reads PTB text files, converts strings to integer ids, and performs mini-batching of the inputs. The PTB dataset comes from Tomas Mikolov's webpage: http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz Args: data_path: string path to the directory where simple-examples.tgz has been extracted. Returns: tuple (train_data, valid_data, test_data, vocabulary) where each of the data objects can be passed to PTBIterator. """ train_path = os.path.join(data_path, "ptb.train.txt") valid_path = os.path.join(data_path, "ptb.valid.txt") test_path = os.path.join(data_path, "ptb.test.txt") word_to_id = _build_vocab(train_path) train_data = _file_to_word_ids(train_path, word_to_id) valid_data = _file_to_word_ids(valid_path, word_to_id) test_data = _file_to_word_ids(test_path, word_to_id) vocabulary = len(word_to_id) return train_data, valid_data, test_data, vocabulary def ptb_producer(raw_data, batch_size, num_steps, name=None): """Iterate on the raw PTB data. This chunks up raw_data into batches of examples and returns Tensors that are drawn from these batches. Args: raw_data: one of the raw data outputs from ptb_raw_data. batch_size: int, the batch size. num_steps: int, the number of unrolls. name: the name of this operation (optional). Returns: A pair of Tensors, each shaped [batch_size, num_steps]. The second element of the tuple is the same data time-shifted to the right by one. Raises: tf.errors.InvalidArgumentError: if batch_size or num_steps are too high. """ with tf.name_scope(name, "PTBProducer", [raw_data, batch_size, num_steps]): raw_data = tf.convert_to_tensor(raw_data, name="raw_data", dtype=tf.int32) data_len = tf.size(raw_data) batch_len = data_len // batch_size data = tf.reshape(raw_data[0 : batch_size * batch_len], [batch_size, batch_len]) epoch_size = (batch_len - 1) // num_steps assertion = tf.assert_positive( epoch_size, message="epoch_size == 0, decrease batch_size or num_steps") with tf.control_dependencies([assertion]): epoch_size = tf.identity(epoch_size, name="epoch_size") i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue() x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps]) y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps]) return x, y
说明:详解这个reader.py文件:
1、产生一个队列,里面的数是0到epoch_size-1.然后定义了一个出队操作,说明队列也是数据流图中的一个结点.使用了range_input_producer之后,会自动产生一个QueueRunner. A QueueRunner
for the Queue is added to the current Graph
'sQUEUE_RUNNER
collection.
i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()2、定义了切片操作,返回训练样本的x和y
x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps]) y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])3、具体使用说明:
在使用的过程中,只要每次迭代的时候,我们取一下x,y。那么,就会触发跟x,y相关联的操作,也即出队操作和切片操作,为我们生成数据.但是,通过队列的方式来读入数据都是一种多线程读入数据的方式,要在session当中将该线程开启,不然就会挂起。
二、分析错误的情况&相应的修改办法
1、错误的情况
#-*- coding:utf-8 -*- import numpy as np import tensorflow as tf from tensorflow.models.rnn.ptb import reader class PTBInput(object): """The input data.""" def __init__(self, config, data, name=None): self.batch_size = batch_size = config.batch_size self.num_steps = num_steps = config.num_steps #为何要进行-1操作 self.epoch_size = ((len(data) // batch_size) - 1) // num_steps self.input_data, self.targets = reader.ptb_producer( data, batch_size, num_steps, name=name) class SmallConfig(object): """Small config.""" init_scale = 0.1 learning_rate = 1.0 max_grad_norm = 5 num_layers = 2 num_steps = 20 hidden_size = 200 max_epoch = 4 max_max_epoch = 13 keep_prob = 1.0 lr_decay = 0.5 batch_size = 20 vocab_size = 10000 if __name__ == '__main__': config = SmallConfig() data_path = '/home/jdlu/jdluTensor/data/simple-examples/data' raw_data = reader.ptb_raw_data(data_path) train_data, valid_data, test_data, _ = raw_data train_input = PTBInput(config=config, data=train_data, name="TrainInput") print "end--------------------------------" #wrong,使用session就会出现读不出数据的错误,读不出数据,整个数据流图就无法计算,整个程序就处于挂起的状态 #使用session会出错 with tf.Session() as sess: for step in range(1): print sess.run(train_input.input_data)说明:在Session当中,没有启动数据读入线程。所以,sess.run(train_input.input_data)就是无数据可取,程序就处于一种挂起的状态。
2、解决方案
#-*- coding:utf-8 -*- import numpy as np import tensorflow as tf from tensorflow.models.rnn.ptb import reader class PTBInput(object): """The input data.""" def __init__(self, config, data, name=None): self.batch_size = batch_size = config.batch_size self.num_steps = num_steps = config.num_steps #为何要进行-1操作 self.epoch_size = ((len(data) // batch_size) - 1) // num_steps self.input_data, self.targets = reader.ptb_producer( data, batch_size, num_steps, name=name) class SmallConfig(object): """Small config.""" init_scale = 0.1 learning_rate = 1.0 max_grad_norm = 5 num_layers = 2 num_steps = 20 hidden_size = 200 max_epoch = 4 max_max_epoch = 13 keep_prob = 1.0 lr_decay = 0.5 batch_size = 20 vocab_size = 10000 if __name__ == '__main__': config = SmallConfig() data_path = '/home/jdlu/jdluTensor/data/simple-examples/data' raw_data = reader.ptb_raw_data(data_path) train_data, valid_data, test_data, _ = raw_data train_input = PTBInput(config=config, data=train_data, name="TrainInput") print "end--------------------------------" #right,使用Supervisor() #sv = tf.train.Supervisor() #with sv.managed_session() as sess: # for step in range(1): # print sess.run(train_input.input_data) #right # Create a session for running operations in the Graph. sess = tf.Session() # Start input enqueue threads. coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) # Run training steps or whatever try: for step in range(2): print sess.run(train_input.input_data) except Exception,e: #Report exceptions to the coordinator coord.request_stop(e) coord.request_stop() # Terminate as usual. It is innocuous to request stop twice. coord.join(threads) sess.close()
说明:使用tf.train.range_input_producer(epoch_size, shuffle=False),会默认将
QueueRunner
添加到全局图中,我们必须使用tf.train.start_queue_runners(sess=sess),去启动该线程。然后使用coord = tf.train.Coordinator()去做一些线程的同步工作。
3、解决方案:
#-*- coding:utf-8 -*- import numpy as np import tensorflow as tf from tensorflow.models.rnn.ptb import reader class PTBInput(object): """The input data.""" def __init__(self, config, data, name=None): self.batch_size = batch_size = config.batch_size self.num_steps = num_steps = config.num_steps #为何要进行-1操作 self.epoch_size = ((len(data) // batch_size) - 1) // num_steps self.input_data, self.targets = reader.ptb_producer( data, batch_size, num_steps, name=name) class SmallConfig(object): """Small config.""" init_scale = 0.1 learning_rate = 1.0 max_grad_norm = 5 num_layers = 2 num_steps = 20 hidden_size = 200 max_epoch = 4 max_max_epoch = 13 keep_prob = 1.0 lr_decay = 0.5 batch_size = 20 vocab_size = 10000 if __name__ == '__main__': config = SmallConfig() data_path = '/home/jdlu/jdluTensor/data/simple-examples/data' raw_data = reader.ptb_raw_data(data_path) train_data, valid_data, test_data, _ = raw_data train_input = PTBInput(config=config, data=train_data, name="TrainInput") print "end--------------------------------" #right,使用Supervisor() sv = tf.train.Supervisor() with sv.managed_session() as sess: for step in range(1): print sess.run(train_input.input_data)
说明:使用sv = tf.train.Supervisor()会比较方便,文档上说, The Supervisor is a small wrapper around a
Coordinator
, a
Saver
, and a
SessionManager
也即使用了Supervisor(),那么保存模型,线程同步的事情都不用我们去干涉了。