美文网首页
TensorFlow 多线程输入

TensorFlow 多线程输入

作者: youyuge | 来源:发表于2020-06-07 23:52 被阅读0次

    https://zhuanlan.zhihu.com/p/27238630
    https://www.jianshu.com/p/686b6cecc5ea
    https://zhuanlan.zhihu.com/p/98084488

    一、tf中的Queue,QueueRunner与Coordinate

    https://zhuanlan.zhihu.com/p/31361295
    https://www.jianshu.com/p/d063804fb272

    其实概念只有三个:

    • Queue是TF队列和缓存机制的实现
    • QueueRunner是TF中对操作Queue的线程的封装
    • Coordinator是TF中用来协调线程运行的工具

    一句话概括流程就是:

    • Queue->(构建图阶段)创建队列;
    • QueueRunner->(构建图阶段)创建线程进行入队操作;
    • tf.train.start_queue_runners()->(执行图阶段)填充队列;
    • tf.train.Coordinator() 在线程出错时关闭之。

    1.1 Queue

    根据实现的方式不同,分成具体的几种类型,例如:

    • tf.FIFOQueue 按入列顺序出列的队列
    • tf.RandomShuffleQueue 随机顺序出列的队列

    队列机制的TensorFlow中实现多线程数据输入的基础与核心。

    import tensorflow as tf
     
     #创建一个先进先出队列,指定队列最多可以保存两个元素,并指定类型为整数
     q  = tf.FIFOQueue(2, "int32")
        
     #使用enqueue_many函数来初始化队列中的元素。和变量初始化类似,在使用的队列之前
     #需要明确的调用这个初始化过程
     init  =q.enqueue_many(([0,10],))
     
     #使用Dequeue函数将队列中个的第一个元素出队列。这个元素的值被储存在变量x中
     x = q.dequeue()
     #将得到的值加1
    y = x+1
    ​
    #将加1后的值在重新加入队列
    q_inc = q.enqueue([y])
    ​
    with tf.Session() as  sess:
        #运行初始化队列的操作
        init.run()
        for _ in range(5):
            #运行q_inc将执行数据出队列,出队的元素+1、重新加入队列的整个过程
            v, - = sess.run([x, q_inc])
            #打印出队列元素的取值
            print(v)
    

    1.2 QueueRunner

    Tensorflow的计算主要在使用CPU/GPU和内存,而数据读取涉及磁盘操作,速度远低于前者操作。因此通常会使用多个线程读取数据,然后使用一个线程消费数据。QueueRunner就是来管理这些读写队列的线程的。

    QueueRunner需要与Queue一起使用(这名字已经注定了它和Queue脱不开干系),但并不一定必须使用Coordinator。

    import tensorflow as tf
    ​
    #声明一个先进先出的队列(FIFQUEUE),队列最多100个元素,类型为实数
    queue = tf.FIFOQueue(100, "float")
    #定义队列的入队操作
    enqueue_op = queue.enqueue([tf.random_normal([1]) ])
    ​
    #使用tf.train.QueueRunner来创建多个线程运行队列的入队操作
    #tf.train.QueueRunner的第一个参数给出了被操作的队列。[enqueue_op]*5
    #表示需要5个线程,每个线程中运行的是enqueue_op操作
    qr  = tf.train.QueueRunner(queue, [enqueue_op]*5)
    ​
    #将定义的QueueRunner加入TensorFlow计算图上指定的集合
    #tf.train.add_queue_runner函数没有指定的集合
    #则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。
    #下面的函数就是刚刚定义的qr加入默认的tf.GraphKey.QUEUE_RUNNERS集合
    tf.train.add_queue_runner(qr)
    #定义出队操作
    out_tensor = queue.dequeue()
    ​
    with tf.Session() as sess:
        #使用tf.train.Coordinator来协同启动的线程
        coord = tf.train.Coordinator()
        #使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners来启动所有线程。
        #否则因为没有线程运行入队操作,当调用出队操作时程序会一直的等待入队操作被运行。
        #tf.train.start_queue_runners函数会默认启动
        #tf.GraphKeys.QUEUE_RUNNERS集合中所有的QueueRunner。
        #因为这个函数只是支持启动指定集合的QueueRunner,
        #所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个集合
    ​
        thread= tf.train.start_queue_runners(sess = sess, coord = coord)
        
        #获取队列的取值
        for _ in range(3):print(sess.run(out_tensor)[0])
            
        # 使用tf.train.Coordinator来停止所有的线程
        coord.request_stop()
        coord.join(threads)
        
       "以上将启动5个线程来执行队列入队的操作,其中每一个线程都是将随机数写入队列。于是在每次运行出队操作时,可以得到一个随机数"
    

    使用Queue与QueueRunner有三种方式:

    1. 上面使用了自定义创建tf.train.QueueRunner配合tf.train.start_queue_runners启动。

      • 自定义的QueueRunner一定要手动调用tf.train.add_queue_runner将其加入到tf.GraphKeys.QUEUE_RUNNERS中,这样之后tf.train.start_queue_runners才能去启动collection中的QueueRunner。
    2. 或者,可以自定义创建tf.train.QueueRunner,session中手动调用QueueRunner.create_threads()方法创建线程,运行入队的op。

      create_threads():Create threads to run the enqueue ops for the given session.

    3. 使用tf封装好的tf.train.string_input_producer()等方法,该方法自动创建相应的Queue,并会自动调用QueueRunner进行封装,并添加进collection。之后调用tf.train.start_queue_runners启动即可。

      Returns:
      A queue with the output strings. A QueueRunner for the Queue
      is added to the current Graph's QUEUE_RUNNER collection.

    二、输入文件队列

    tensorflow使用文件名队列+内存队列双队列的形式读入文件,可以很好地管理epoch。
    https://zhuanlan.zhihu.com/p/27238630

    对于文件名队列,我们使用tf.train.string_input_producer函数。这个函数需要传入一个文件名list,系统会自动将它转为一个文件名队列。

    def string_input_producer(string_tensor,
                              num_epochs=None,
                              shuffle=True,
                              seed=None,
                              capacity=32,
                              shared_name=None,
                              name=None,
                              cancel_op=None):
      """Output strings (e.g. filenames) to a queue for an input pipeline.
    
      Note: if `num_epochs` is not `None`, this function creates local counter
      `epochs`. Use `local_variables_initializer()` to initialize local variables.
    
      Returns:
        A queue with the output strings.  A `QueueRunner` for the Queue
        is added to the current `Graph`'s `QUEUE_RUNNER` collection.
    

    其中tf.train.string_input_producer重要的参数:

    • num_epochs:代表了原始数据集的输入在文件名(当然不一定得是文件名)队列中的重复次数,若为None则默认无限循环。若设置5则表示数据集只会重复输入5次,若继续想取出数据则报错OutOfRange error。若不为None,session中必须先调用local_variables_initializer()初始化局部变量,即这个epoch计数器,局部变量代表此变量不会被Saver持久化,只是本次运行临时的变量。
    • shuffle:shuffle是指在一个epoch内文件的顺序是否被打乱。
    • capacity:Queue的容量,默认32。

    在tensorflow中,内存队列不需要我们自己建立,我们只需要使用reader对象从文件名队列中读取数据就可以了,具体实现可以参考下面的实战代码。

    2.1 输入文件队列代码实战

    假设所有的输入数据都已经整理成了TFRecord格式。

    虽然一个TFRecord文件中可以储存多个训练样例,但是当训练数据数量较大时,可以将数据分成多个TFRecord文件来提高处理效率
    tf.train.match_filenames_once函数获取一个正则表达式的所有文件,本质这个函数就是返回一个符合条件的local variable本地变量。因此同样需要local_variables_initializer()初始化局部变量。

    tf.train.string_input_producer函数会使用初始化时提供的文件列表创建一个输入队列

    1. 设置shuffle参数,支持随机打乱文件列表出队的顺序,shuffle为True,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。(随机打乱文件顺序以及即入输入队列的过程会跑在单独的线程)
    2. 输入队列会将队列中的文件均匀地分给不同的线程,不会出现处理重复的现象
    3. 当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列
    4. 设置num_epochs参数来限制记载初始文件列表的最大轮数。所有都使用后,继续读取新文件则会,ERORR:OutOfRange的错误

    生成TFRecords代码:

    import tensorflow as tf
    ​
    #创建TFRecord文件的帮助函数
    def _int64_feature(value):
        return tf.train.Feature(int64_list = tf.train.Int64List(value = [value]))
    ​
    #模拟海量数据情况下将写入不同的文件。num_shards定义写入多少文件
    #instances_per_shard定义每个文件中有多少个数据
    num_shards = 2
    instances_per_shard = 2
    for i in range(num_shards):
        #以0000n-of-0000m的后缀区分,其中m表示多少个文件,n表示编号
        filename = ('/path/to/data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
        writer = tf.python_io.TFRecordWriter(filename)
        #将数据分装成Example结构并写入TFRecord文件
        for j in range (instances_per_shard):
            writer = =tf.python_io.TFRecordWriter(filename)
            #将数据封装成Example结构并写入TFRecord文件
            for j in range(instance_per_shard):
                #Example结构仅包含当前样例属于第几个文件以及是第几个样本
                example = tf.train.Example(features = tf.train.Feactures(feature={
                    'i ': _int64_feature(i),
                    'j ': _int64_feature(j)
                }))
                writer.write(example.SerializeToString())
            writer.close()
           #指定目录下生成两个文件00000和00001 
    

    使用文件名队列,来读取内容:

    import tensorflow as tf
    ​
    #使用tf.train.match_input_producer函数创建输入队列,输入队列中的文件列表为
    #tf.train.match_filename_once函数获取的文件列表。
    #这里的shuffle设置为False来避免打乱文件顺序,但在实际中一般设置为TRUE
    ​
    filename_queue = tf.train.string_input_producer(files, shuffle = False)
    ​
    #
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    features = tf.parse_single_example(
        serialized_example,
        features  ={
            ' i ':tf.FixedLenFeature([] , tf.int64),
            ' j ':tf.FixedLenFeature([] , tf.int64),
        }   
    )
    ​
    with tf.Session() as sess:
        #虽然在本段程序中并没有声明任何向量,但是使用tf.train.match_filenames_once函数时
        #需要初始化一些变量。这里string_input_producer没有指定epoch_num因此不需要局部变量。
        tf.local_variable_initializer().run()
        print(sess.run(files))
       
        #声明tf.train.Coordinator类协同不同程序,并启动线程
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    ​
        #多次执行获取数据的操作
        for i in range(6):
            print(sess.run([features['i'], features['j']]))
       
        coord.request_stop()
        coord.join(threads)
    ​
    #在不打乱文件列表下,会依次读出样例数据中的每一个样例
    

    相关文章

      网友评论

          本文标题:TensorFlow 多线程输入

          本文链接:https://www.haomeiwen.com/subject/egkctktx.html