美文网首页
线程队列与IO操作(一)

线程队列与IO操作(一)

作者: Daily_Note | 来源:发表于2018-11-19 20:31 被阅读0次

    线程队列与IO操作

    记录,成为更好的自己

    1. 队列和线程

    2. 文件读取

    3. 图片处理


    1. 队列和线程

    IO操作相对于cpu的计算来说,速度较慢。

    现在要读2G的文件,一次性读取数据,消耗内存。一次性进行训练。关键的问题在于速度慢,训练的模型都在等数据输入到模型中。

    • tensorflow如何读取?
      • 多线程,并行的执行任务。
      • 队列
      • 文件的改善(tfrecords)
    1. 队列与队列管理器

      1. Tensorflow队列

        在训练样本的时候,希望读入的训练样本时读取的数据是有序的

        • tf.FIFOQueue 先进先出队列,按顺序出队列
        • tf.RandomShuffleQueue随机出队列
      • FIFOQueue(capacity,dtypes, name="fifo_queue")

        创建一个以先进先出的顺序对元素进行排队的队列

        • capacity:整数。可能存储在此队列中的元素数量的上限。
        • dtypes:Dtype对象列表。长度dtypes必须等于每个队列元素中的张量数,dtype的类型形状,决定了后面进队列元素形状。
        • method:
        • dequeue(name=None)
        • enqueue(vals, name=None)
        • enqueue_many(vals,name=None):vals列表或元祖,返回一个进队列操作
        • size(name=None)
      • 完成一个出队列、+1、入队列操作(同步操作)

      # 模拟一下同步,先处理数据,然后才能取数据
      #tensorflow中,运行操作有依赖性
      
      # 1. 首先定义队列
      Q = tf.FIFOQueue(3, tf.float32)
      
      # 放一些数据
      enq_many = Q.enqueue_many([[0.1,0.2,0.3],])
      
      # 2.定义一些处理数据的过程,取数据的过程,取数据,+1,入队列
      out_q = Q.dequeue()
      data = out_q + 1
      en_q = Q.enqueue(data)
      
      with tf.Session() as sess:
          # 初始化队列
          sess.run(enq_many)
      
          # 处理数据
          for i in range(100):
              sess.run(en_q)
          # 训练数据
          for i in range(Q.size().eval()):
              print(sess.run(Q.dequeue()))
      

      分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个线程,实现异步读取。


      1. 队列管理器

        其实就是创建线程

        • tf.train.QueueRunner(queue, enqueue_ops=None)
          创建一个QueueRunner
          • queue:一个队列
          • enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程
          • create_threads(sess, coord=None, start=False)
            创建线程来运行给定会话的入队操作
            • start:布尔值。如果True启动线程,如果为False调用者必须调用start()启动线程
            • coord : 线程协调器,后面线程管理需要用到
            • return:线程的实例

        实现异步操作,通过队列管理器来实现变量加1,入队,主线程出队列的操作。

      # 模拟异步,子线程存入,主线程读取
      # 1.定义一个队列
      Q = tf.FIFOQueue(1000, tf.float32)
      
      # 2.定义要做的事情,循环值+1,放入队列
      var = tf.Variable(0.0)
      
      # 实现一个变量自增
      data = tf.assign_add(var, tf.constant(1.0))
      en_q = Q.enqueue(data)
      
      # 3. 定义队列管理器op,指定多少个子线程,子线程该干什么事情
      qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q]*2)
      # 初始化变量的op
      init_op = tf.global_variables_initializer()
      
      # 开启会话
      with tf.Session() as sess:
          # 初始化变量
          sess.run(init_op)
      
          # 真正开启子线程
          threads = qr.create_threads(sess, start=True)
      
          # 主线程,不断读取数据训练
          for i in range(300):
              print(sess.run(Q.dequeue()))
      

      运行后结果会有一个这样的错误:

      tensorflow.python.framework.errors_impl.CancelledError: Enqueue operation was cancelled
       [[Node: fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](fifo_queue, AssignAdd)]]
      
      

      分析:这时候有一个问题就是,入队自顾自的去执行,在需要的出队操作完成之后,程序没法结束。需要一个实现线程间的同步,终止其他线程。

    2. 线程协调器

      通过线程协调器来解决如上问题。主线程已经结束了,子线程还在做。

      • tf.train.Coordinator()
      • 线程协调员,实现一个简单的机制来协调一组线程的终止。
        • request_stop()
        • should_stop()
        • join(threads=None, stop_grace_period_secs=120)等待线程终止
        • return:线程协调员实例
      with tf.Session() as sess:
          # 初始化变量
          sess.run(init_op)
      
          # 开启线程管理器
          coord = tf.train.Coordinator()
          # 真正开启子线程
          threads = qr.create_threads(sess, coord=coord, start=True)
      
          # 主线程,不断读取数据训练
          for i in range(300):
              print(sess.run(Q.dequeue()))
          # 回收线程
          coord.request_stop()
          coord.join(threads)
      

    相关文章

      网友评论

          本文标题:线程队列与IO操作(一)

          本文链接:https://www.haomeiwen.com/subject/botzfqtx.html