美文网首页tensorflow技术解析与实战——阅读笔记
tensorflow初探七之队列-Tensorflow技术解析与

tensorflow初探七之队列-Tensorflow技术解析与

作者: 欠我的都给我吐出来 | 来源:发表于2019-01-29 15:39 被阅读0次

    队列

    队列(queue)本身也是图中的一个节点,是一种有状态的节点,其他节点,如入队节点(enqueue)和出队节点(dequeue),可以修改它的内容。例如,入队节点可以把新元素插到队列末尾,出队节点可以把队列前面的元素删除。TensorFlow 中主要有两种队列,即 FIFOQueue 和 RandomShuffleQueue。

    • FIFOQueue 创建一个先入先出队列。例如,我们在训练一些语音、文字样本时,使用循环神经网络的网络结构,希望读入的训练样本是有序的,就要用 FIFOQueue。
    • RandomShuffleQueue 创建一个随机队列,在出队列时,是以随机的顺序产生元素的。例如,我们在训练一些图像样本时,使用 CNN 的网络结构,希望可以无序地读入训练样本,就要用RandomShuffleQueue,每次随机产生一个训练样本。RandomShuffleQueue 在 TensorFlow 使用异步计算时非常重要。因为 TensorFlow 的会话是支持多线程的,我们可以在主线程里执行训练操作,使用 RandomShuffleQueue 作为训练输入,开多个线程来准备训练样本,将样本压入队列后,主线程会从队列中每次取出 mini-batch 的样本进行训练。
    #  FIFOQueue 先进先出
    q=tf.FIFOQueue(3,"float")
    init=q.enqueue_many(([0.1,0.2,0.3],))
    
    x=q.dequeue()
    y=x+1
    q_inc=q.enqueue([y])
    
    with tf.Session() as sess:
        sess.run(init)
        for i in range(2):
            sess.run(q_inc)
        quelen=sess.run(q.size())
        for i in range(quelen):
            print(sess.run(q.dequeue()))
    
    #RandomShuffleQueue
    q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")
    with tf.Session() as sess:
        for i in range(0,10):
            sess.run(q.enqueue(i))
        for i in range(0,8):
            print(sess.run(q.dequeue()))
    
    #3.0
    #4.0
    #8.0
    #2.0
    #0.0
    #1.0
    #7.0
    #9.0
    

    注意到RandomShuffleQueue的参数有容量和最小长度。当队列长度等于最小值,执行出队操作以及队列长度等于最大值,执行入队操作时,会有阻断情况发生。只有当队列满足要求后,才能继续执行。可以通过设置绘画在运行时的等待时间来解除阻断。

    q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")
    with tf.Session() as sess:
        for i in range(0,10):
            sess.run(q.enqueue(i))
        for i in range(0,10):
            run_options = tf.RunOptions(timeout_in_ms = 10000) # 等待 10 秒
            try:
                print(sess.run(q.dequeue(), options=run_options))
            except tf.errors.DeadlineExceededError:
                print('out of range')
                break;
    

    如果入队操作是在主线程中进行,那么当入队产生阻断时,会影响后续的读数据以及训练操作。会话中可以运行多个线程,我们使用线程管理器 QueueRunner 创建一系列的新线程进行入队操作,让主线程继续使用数据,即训练网络和读取数据是异步的,主线程在训练网络,另一个线程在将数据从硬盘读入内存。

    队列管理器

    q=tf.FIFOQueue(1000,'float')
    counter=tf.Variable(0.0)
    incre_op=tf.assign_add(counter,tf.constant(1.0))
    enqueue_op=q.enqueue(counter)
    
    qr=tf.train.QueueRunner(q,enqueue_ops=[incre_op,enqueue_op]*1)
    
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        #启动入队操作
        enqueue_threads=qr.create_threads(sess,start=True)
        #主线程是取数据操作
        for i in range(10):
            print(sess.run(q.dequeue()))
    
    #3.0
    #11.0
    #69.0
    #73.0
    #90.0
    #97.0
    #107.0
    #226.0
    #266.0
    #274.0
    

    输出的队列也不是我们期待的自然数列,并且线程被阻断。这是因为加 1 操作和入队操作不同步,可能加 1 操作执行了很多次之后,才会进行一次入队操作。

    上述代码最后报异常,然后会话自动关闭。入队线程自顾自地执行,在需要的出队操作完成之后,程序没法结束,一直到超过队列的容量之后,会导致cancalledError。因此需要协调器来管理线程。

    协调器coordinator
    可以解决上面的入队线程不受控制的情况。

    q=tf.FIFOQueue(1000,'float')
    counter=tf.Variable(0.0)
    incre_op=tf.assign_add(counter,tf.constant(1.0))
    enqueue_op=q.enqueue(counter)
    
    qr=tf.train.QueueRunner(q,enqueue_ops=[incre_op,enqueue_op]*1)
    sess=tf.Session()
    sess.run(tf.global_variables_initializer())
    coord=tf.train.Coordinator() #协调器,协调线程间的关系可以视为一种信号量,用来做同步
    enqueue_threads = qr.create_threads(sess, coord = coord,start=True)
    
    for i in range(0,10):
        print(sess.run(q.dequeue()))
        
    coord.request_stop()# 通知其他线程关闭
    coord.join(enqueue_threads)# join 操作等待其他线程结束,其他所有线程关闭之后,这一函数才能返回
    
    #3.0
    #20.0
    #304.0
    #1118.0
    #1164.0
    #1242.0
    #1311.0
    #1320.0
    #1381.0
    #1387.0
    

    这个很奇怪,并没有按照书上说的关闭线程之后再执行出队操作,就会抛出 tf.errors.OutOfRange 错误。而且

    print(sess.run(q.size()))
    #每次都不一样,这次是170
    for i in range(0,10):
        print(sess.run(q.dequeue()))
    #在此实行上面的代码,得到10个1387
    

    相关文章

      网友评论

        本文标题:tensorflow初探七之队列-Tensorflow技术解析与

        本文链接:https://www.haomeiwen.com/subject/uupxsqtx.html