TensorFlow官方教程翻译2:线程和队列

作者: 马小李23 | 来源:发表于2017-08-01 11:49 被阅读67次

    原文地址https://www.tensorflow.org/programmersguide/threadingand_queues

    主要内容

    队列使用概述
    Coordinator
    QueueRunner
    处理异常

    队列对于使用TensorFlow来进行异步计算是一个强大的机制。
    像所有的TensorFlow中的东西一样,一个队列是一个TensorFlow图中的一个节点。这是个状态节点,像一个变量:其他节点可以修改它的内容。特别的来说,其他节点可以将新的元素加入队列,或者从队列中出队现存的元素。
    为了找点对于队列概念的感觉,让我们来考虑一个简单的例子。我们会创建一个“先进,先出”的队列(FIFOQueue),并且用0填充这个队列。然后,我们构建了一个图,出队一个元素,将这个元素加1,然后将这个元素放回到这个队列的尾部。于是,队列中的数字都在逐渐增加。

    Example1:Enqueue and Dequeue Op

    Enqueue,EnqueueMany和Dequeue都是特殊的节点。他们保存着指向队列的指针而不是一般的数值,这样使得它们能够修改队列。我们建议你将这些方法认为是和队列的方法类似。实际上,在Python的API中,他们是队列对象的方法(例如q.enqueue(…))。
    注意:队列的方法(例如q.enqueue())必须和队列本身运行在相同的硬件上。在创建这些操作的时候,不一致的硬件部署指令会被忽略。
    现在你应该对于队列有些感觉了,那让我们深入细节。

    队列使用概述

    例如像tf.FIFOQueue和tf.RandomShuffleQueue这样的队列,对于在图中异步计算张量而言,是非常重要的TensorFlow对象。
    举例来说,一个典型的输入架构就是使用RandomShuffleQueue来为训练模型准备输入:

    • 多线程准备训练数据,并将其压入队列。
    • 一个训练线程运行训练操作,这个操作从队列中出队小批量的数据。

    这个架构有很多好处,正如Reading data中所强调的那样,同时这篇文章也一些简化构建输入管道的函数的概述。
    TensorFlow的Session对象是多线程的,因此多线程可以很容易的使用相同的图,和并行的运行操作。但是实现一个如上所述的使用线程的Python程序,并不总是那么容易。所有的线程必须能够同时停止,异常需要被捕捉并通知(reported),而队列比如在线程停止的时候,被恰当的关闭。
    TensorFlow提供两个类来帮助上述任务的实现:tf.train.Coordinator和tf.train.QueueRunner。这两个类被设计为同时使用。Coordinator类帮助多线程同时停止,并向那些等待他们停止的程序报告异常。QueueRunner类被用来创建多个线程,这些线程用来协作在相同的队列中入队张量。

    Coordinator

    Coordinator类帮助多线程同时停止。 它的关键方法为:

    • tf.train.Coordinator.should_stop:如果线程应该停止,那么返回True
    • tf.train.Coordinator.request_stop:请求停止线程、
    • tf.train.Coordinator.join:等待,直到指定的线程已经停止

    你首先创建一个Coordinator对象,然后创建一些线程来使用Coordinator。通常,线程运行循环,而这个循环会在should_stop()返回为True的时候终止。
    任何线程可以决定这次计算应该终止。它仅仅需要调用requeststop(),然后其他线程会随着shouldstop()返回为True而终止。

    # Thread body: loop until the coordinator indicates a stop was requested.
    # If some condition becomes true, ask the coordinator to stop.
    def MyLoop(coord):
      while not coord.should_stop():
    ...do something...
    if ...some condition...:
      coord.request_stop()
    
    # Main thread: create a coordinator.
    coord = tf.train.Coordinator()
    
    # Create 10 threads that run 'MyLoop()'
    threads = [##threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
    
    # Start the threads and wait for all of them to stop.
    for t in threads:
      t.start()
    coord.join(threads)
    

    显然,协调器能管理线程做不同的事情。他们并不需要像上述例子中一样,全都是做同样的事情。协调器同样支持捕捉和报告异常。参见tf.train.Coordinator文档获取更详细的信息。

    QueueRunner

    QueueRunner类创建一些线程,来反复的运行入队操作。这些线程可以使用一个协调器来同时终止。除此之外,一个队列运行器运行一个关系更为密切的线程,它在协调器报告异常的情况下,自动的关闭队列。
    你可以使用队列UN星期来实现上述的架构。
    首先构建一个图,使用TenorFlow的队列(比如tf.RandomShuffleQueue)来输入样本。然后添加操作来处理样本,并讲他们入队。最后,添加以出队元素开始的训练操作。

    example = ...ops to create one example...
    # Create a queue, and an op that enqueues examples one at a time in the queue.
    queue = tf.RandomShuffleQueue(...)
    enqueue_op = queue.enqueue(example)
    # Create a training graph that starts by dequeuing a batch of examples.
    inputs = queue.dequeue_many(batch_size)
    train_op = ...use 'inputs' to build the training part of the graph...  
    在Python的训练程序中,创建一个QueueRunner对象,会运行多个线程来处理和入队样本。创建一个Coordinator的对象,并用coordinator(协调器)来要求队列运行器来启动它的线程。编写训练的循环也可以使用coordinator(协调器)。
    # Create a queue runner that will run 4 threads in parallel to enqueue
    # examples.
    qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
    
    # Launch the graph.
    sess = tf.Session()
    # Create a coordinator, launch the queue runner threads.
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
    # Run the training loop, controlling termination with the coordinator.
    for step in xrange(1000000):
        if coord.should_stop():
            break
        sess.run(train_op)
    # When done, ask the threads to stop.
    coord.request_stop()
    # And wait for them to actually do it.
    coord.join(enqueue_threads)  
    

    处理异常

    以队列运行器开始的线程不仅仅是运行入队操作。它们也会捕捉并处理由队列产生的异常,这些异常包括tf.errors.OutOfRangeError异常,这个异常被用来报道队列被关闭。
    使用协调器的训练程序必须在其主循环中,同样地捕捉并报道异常。
    这里是上面训练循环的增强版。

    try:
        for step in xrange(1000000):
            if coord.should_stop():
                break
            sess.run(train_op)
    except Exception, e:
        # Report exceptions to the coordinator.
        coord.request_stop(e)
    finally:
        # Terminate as usual. It is safe to call `coord.request_stop()` twice.
        coord.request_stop()
        coord.join(threads)
    

    相关文章

      网友评论

        本文标题:TensorFlow官方教程翻译2:线程和队列

        本文链接:https://www.haomeiwen.com/subject/mqkjlxtx.html