分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个线程,实现异步读取。
队列管理器
虽然叫队列管理器,但是其作用是创建线程
队列管理器
import tensorflow as tf
#模拟异步子线程存入样本,主线程读取样本
#1. 定义一个队列, 1000
Q = tf.FIFOQueue(1000, tf.float32)
#2. 定义子线程要做的事情 循环值 +1 放入队列当中
var = tf.Variable(0.0)
#实现一个自增 tf.assign_add
data = tf.assign_add(var, tf.constant(1.0)) #注意,这里的data是一个op
en_q = Q.enqueue(data)
#3. 定义队列管理器op:指定子线程要做的事情
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q]*2) #用两个线程执行en_q这个op
#初始化变量的op
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
#初始化变量
sess.run(init_op)
#真正开启子线程
qr.create_threads(sess, start=True) #在这里即开启子线程
#主线程,不断取数据
for i in range(300):
print(sess.run(Q.dequeue()))
注意:以上代码会报tensorflow.python.framework.errors_impl.CancelledError: Enqueue operation was cancelled
异常
这是因为主线程已经结束,所以Session已经被释放,子线程就没有了资源
线程协调器
import tensorflow as tf
#模拟异步子线程存入样本,主线程读取样本
#1. 定义一个队列, 1000
Q = tf.FIFOQueue(1000, tf.float32)
#2. 定义子线程要做的事情 循环值 +1 放入队列当中
var = tf.Variable(0.0)
#实现一个自增 tf.assign_add
data = tf.assign_add(var, tf.constant(1.0)) #注意,这里的data是一个op
en_q = Q.enqueue(data)
#3. 定义队列管理器op:指定子线程要做的事情
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q]*2) #用两个线程执行en_q这个op
#初始化变量的op
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
#初始化变量
sess.run(init_op)
#开启线程管理器
coord = tf.train.Coordinator()
#真正开启子线程
threads = qr.create_threads(sess,coord=coord, start=True) #在这里即开启子线程。并告诉了子线程“你们的线程协调器是哪个”,让子线程受到其管理
#主线程,不断取数据
for i in range(300):
print(sess.run(Q.dequeue()))
#回收
coord.request_stop()
coord.join(threads=threads)
网友评论