线程队列与IO操作
记录,成为更好的自己
1. 队列和线程
2. 文件读取
3. 图片处理
1. 队列和线程
IO操作相对于cpu的计算来说,速度较慢。
现在要读2G的文件,一次性读取数据,消耗内存。一次性进行训练。关键的问题在于速度慢,训练的模型都在等数据输入到模型中。
- tensorflow如何读取?
- 多线程,并行的执行任务。
- 队列
- 文件的改善(tfrecords)
-
队列与队列管理器
-
Tensorflow队列
在训练样本的时候,希望读入的训练样本时读取的数据是有序的
- tf.FIFOQueue 先进先出队列,按顺序出队列
- tf.RandomShuffleQueue随机出队列
-
FIFOQueue(capacity,dtypes, name="fifo_queue")
创建一个以先进先出的顺序对元素进行排队的队列
- capacity:整数。可能存储在此队列中的元素数量的上限。
- dtypes:Dtype对象列表。长度dtypes必须等于每个队列元素中的张量数,dtype的类型形状,决定了后面进队列元素形状。
- method:
- dequeue(name=None)
- enqueue(vals, name=None)
- enqueue_many(vals,name=None):vals列表或元祖,返回一个进队列操作
- size(name=None)
-
完成一个出队列、+1、入队列操作(同步操作)
# 模拟一下同步,先处理数据,然后才能取数据 #tensorflow中,运行操作有依赖性 # 1. 首先定义队列 Q = tf.FIFOQueue(3, tf.float32) # 放一些数据 enq_many = Q.enqueue_many([[0.1,0.2,0.3],]) # 2.定义一些处理数据的过程,取数据的过程,取数据,+1,入队列 out_q = Q.dequeue() data = out_q + 1 en_q = Q.enqueue(data) with tf.Session() as sess: # 初始化队列 sess.run(enq_many) # 处理数据 for i in range(100): sess.run(en_q) # 训练数据 for i in range(Q.size().eval()): print(sess.run(Q.dequeue()))
分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个线程,实现异步读取。
-
队列管理器
其实就是创建线程
- tf.train.QueueRunner(queue, enqueue_ops=None)
创建一个QueueRunner- queue:一个队列
- enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程
- create_threads(sess, coord=None, start=False)
创建线程来运行给定会话的入队操作- start:布尔值。如果True启动线程,如果为False调用者必须调用start()启动线程
- coord : 线程协调器,后面线程管理需要用到
- return:线程的实例
实现异步操作,通过队列管理器来实现变量加1,入队,主线程出队列的操作。
- tf.train.QueueRunner(queue, enqueue_ops=None)
# 模拟异步,子线程存入,主线程读取 # 1.定义一个队列 Q = tf.FIFOQueue(1000, tf.float32) # 2.定义要做的事情,循环值+1,放入队列 var = tf.Variable(0.0) # 实现一个变量自增 data = tf.assign_add(var, tf.constant(1.0)) en_q = Q.enqueue(data) # 3. 定义队列管理器op,指定多少个子线程,子线程该干什么事情 qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q]*2) # 初始化变量的op init_op = tf.global_variables_initializer() # 开启会话 with tf.Session() as sess: # 初始化变量 sess.run(init_op) # 真正开启子线程 threads = qr.create_threads(sess, start=True) # 主线程,不断读取数据训练 for i in range(300): print(sess.run(Q.dequeue()))
运行后结果会有一个这样的错误:
tensorflow.python.framework.errors_impl.CancelledError: Enqueue operation was cancelled [[Node: fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](fifo_queue, AssignAdd)]]
分析:这时候有一个问题就是,入队自顾自的去执行,在需要的出队操作完成之后,程序没法结束。需要一个实现线程间的同步,终止其他线程。
-
-
线程协调器
通过线程协调器来解决如上问题。主线程已经结束了,子线程还在做。
- tf.train.Coordinator()
- 线程协调员,实现一个简单的机制来协调一组线程的终止。
- request_stop()
- should_stop()
- join(threads=None, stop_grace_period_secs=120)等待线程终止
- return:线程协调员实例
with tf.Session() as sess: # 初始化变量 sess.run(init_op) # 开启线程管理器 coord = tf.train.Coordinator() # 真正开启子线程 threads = qr.create_threads(sess, coord=coord, start=True) # 主线程,不断读取数据训练 for i in range(300): print(sess.run(Q.dequeue())) # 回收线程 coord.request_stop() coord.join(threads)
网友评论