原文地址https://www.tensorflow.org/programmersguide/threadingand_queues
主要内容
队列对于使用TensorFlow来进行异步计算是一个强大的机制。
像所有的TensorFlow中的东西一样,一个队列是一个TensorFlow图中的一个节点。这是个状态节点,像一个变量:其他节点可以修改它的内容。特别的来说,其他节点可以将新的元素加入队列,或者从队列中出队现存的元素。
为了找点对于队列概念的感觉,让我们来考虑一个简单的例子。我们会创建一个“先进,先出”的队列(FIFOQueue),并且用0填充这个队列。然后,我们构建了一个图,出队一个元素,将这个元素加1,然后将这个元素放回到这个队列的尾部。于是,队列中的数字都在逐渐增加。
Enqueue,EnqueueMany和Dequeue都是特殊的节点。他们保存着指向队列的指针而不是一般的数值,这样使得它们能够修改队列。我们建议你将这些方法认为是和队列的方法类似。实际上,在Python的API中,他们是队列对象的方法(例如q.enqueue(…))。
注意:队列的方法(例如q.enqueue())必须和队列本身运行在相同的硬件上。在创建这些操作的时候,不一致的硬件部署指令会被忽略。
现在你应该对于队列有些感觉了,那让我们深入细节。
队列使用概述
例如像tf.FIFOQueue和tf.RandomShuffleQueue这样的队列,对于在图中异步计算张量而言,是非常重要的TensorFlow对象。
举例来说,一个典型的输入架构就是使用RandomShuffleQueue来为训练模型准备输入:
- 多线程准备训练数据,并将其压入队列。
- 一个训练线程运行训练操作,这个操作从队列中出队小批量的数据。
这个架构有很多好处,正如Reading data中所强调的那样,同时这篇文章也一些简化构建输入管道的函数的概述。
TensorFlow的Session对象是多线程的,因此多线程可以很容易的使用相同的图,和并行的运行操作。但是实现一个如上所述的使用线程的Python程序,并不总是那么容易。所有的线程必须能够同时停止,异常需要被捕捉并通知(reported),而队列比如在线程停止的时候,被恰当的关闭。
TensorFlow提供两个类来帮助上述任务的实现:tf.train.Coordinator和tf.train.QueueRunner。这两个类被设计为同时使用。Coordinator类帮助多线程同时停止,并向那些等待他们停止的程序报告异常。QueueRunner类被用来创建多个线程,这些线程用来协作在相同的队列中入队张量。
Coordinator
Coordinator类帮助多线程同时停止。 它的关键方法为:
- tf.train.Coordinator.should_stop:如果线程应该停止,那么返回True
- tf.train.Coordinator.request_stop:请求停止线程、
- tf.train.Coordinator.join:等待,直到指定的线程已经停止
你首先创建一个Coordinator对象,然后创建一些线程来使用Coordinator。通常,线程运行循环,而这个循环会在should_stop()返回为True的时候终止。
任何线程可以决定这次计算应该终止。它仅仅需要调用requeststop(),然后其他线程会随着shouldstop()返回为True而终止。
# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):
while not coord.should_stop():
...do something...
if ...some condition...:
coord.request_stop()
# Main thread: create a coordinator.
coord = tf.train.Coordinator()
# Create 10 threads that run 'MyLoop()'
threads = [##threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
# Start the threads and wait for all of them to stop.
for t in threads:
t.start()
coord.join(threads)
显然,协调器能管理线程做不同的事情。他们并不需要像上述例子中一样,全都是做同样的事情。协调器同样支持捕捉和报告异常。参见tf.train.Coordinator文档获取更详细的信息。
QueueRunner
QueueRunner类创建一些线程,来反复的运行入队操作。这些线程可以使用一个协调器来同时终止。除此之外,一个队列运行器运行一个关系更为密切的线程,它在协调器报告异常的情况下,自动的关闭队列。
你可以使用队列UN星期来实现上述的架构。
首先构建一个图,使用TenorFlow的队列(比如tf.RandomShuffleQueue)来输入样本。然后添加操作来处理样本,并讲他们入队。最后,添加以出队元素开始的训练操作。
example = ...ops to create one example...
# Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = ...use 'inputs' to build the training part of the graph...
在Python的训练程序中,创建一个QueueRunner对象,会运行多个线程来处理和入队样本。创建一个Coordinator的对象,并用coordinator(协调器)来要求队列运行器来启动它的线程。编写训练的循环也可以使用coordinator(协调器)。
# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):
if coord.should_stop():
break
sess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(enqueue_threads)
处理异常
以队列运行器开始的线程不仅仅是运行入队操作。它们也会捕捉并处理由队列产生的异常,这些异常包括tf.errors.OutOfRangeError异常,这个异常被用来报道队列被关闭。
使用协调器的训练程序必须在其主循环中,同样地捕捉并报道异常。
这里是上面训练循环的增强版。
try:
for step in xrange(1000000):
if coord.should_stop():
break
sess.run(train_op)
except Exception, e:
# Report exceptions to the coordinator.
coord.request_stop(e)
finally:
# Terminate as usual. It is safe to call `coord.request_stop()` twice.
coord.request_stop()
coord.join(threads)
网友评论