美文网首页
TensorFlow 实战Google深度学习框架(第2版)第七

TensorFlow 实战Google深度学习框架(第2版)第七

作者: emm_simon | 来源:发表于2020-03-11 16:26 被阅读0次

    第七章:图像数据处理

    * 7.1TF_Record输入数据格式
           * 7.1.1TF_Record格式介绍
           * 7.1.2TF_Record样例程序
    * 7.2图像数据处理
           * 7.2.1TensorFlow图像处理函数
           * 7.2.2图像预处理完整样例
    * 7.3多线程输入数据处理框架
           * 7.3.1队列与多线程
           * 7.3.2输入文件队列
           * 7.3.3组合训练数据(batching)
           * 7.3.4输入数据处理框架
    * 7.4数据集(DataSet)
           * 7.4.1数据集的基本使用方法
           * 7.4.2数据集的高层操作

    在第6章中详细介绍了卷积神经网络,并提到卷积神经网络给图像识别技术带来了突破性进展。这一章将从另外一个维度来进一步提升图像识别的精度以及训练的速度。
    * 喜欢摄影的读者都知道图像的亮度、对比度等属性对图像的影响是非常大的,相同物体在不同亮度、对比度下差别非常大。然而在很多图像识别问题中,这些因素都不应该影响最后的识别结果。所以本章将介绍如何对图像数据进行预处理使训练得到的神经网络模型尽可能小地被无关因素所影响。
    * 但与此同时,复杂的预处理过程可能导致训练效率的下降。为了减小预处理对于训练速度的影响,在本章中也将详细地介绍TensorFlow中多线程处理输入数据的解决方案。
    本章将根据数据预处理的先后顺序来组织不同的小节。

    -7.1- TF_Record输入数据格式

    本节,将介绍如何统一输入数据的格式,使得在之后系统中可以更加方便地处理。
    来自实际问题的数据往往有很多格式和属性,这一节将介绍的TFRecord格式可以统一不同的原始数据格式,井更加有效地管理不同的属性。
    TensorFlow 提供了一种统一的格式来存储数据, 这个格式就是TFRecord
    6.5节给出了一个程序来处理花朵分类的数据。在这个程序中,使用了一个从类别名称到所有数据列表的词典来维护图像和类别的关系。这种方式的可扩展性非常差,当数据来源更加复杂、每一个样例中的信息更加丰富之后,这种方式就很难有效地记录输入数据中的信息了。
    于是TensorFlow提供了TFRecord的格式来统一存储数据。
    这一节将介绍如何使用 TFRecord 来统一输入数据的格式。

    -7.1.1- TF_Record格式介绍

    TFRecord文件中的数据都是通过tf.train.ExampleProtocolBuffer的格式存储的。
    以下代码给出了 tf.train.Example 的定义:

    message Example { 
        Features features = 1;
    };
    
    message Features {
        map<string, Feature> feature = 1;
    };
    
    message Feature { 
        oneof kind {
            BytesList bytes_list = 1; 
            FloatList float_list = 2; 
            Int64List int64_list = 3;
        }
    };
    

    从以上代码可以看出tf.train.Example的数据结构是比较简洁的。tf.train.Example中包含了一个从属性名称到取值的字典。其中属性名称为一个字符串,属性的取值可以为字符串(BytesList)、实数列表(FloatList)或者整数列表(Int64List)。比如将一张解码前的图像存为一个字符串,图像所对应的类别编号存为整数列表。在7.1.2节中将给出一个使用TFRecord的具体样例 。

    -7.1.2- TF_Record样例程序

    本节将给出具体的样例程序来读写 TFRecord 文件。
    以下程序给出了如何将MNIST输入数据转化为TFRecord的格式

    import tensorflow as tf 
    from tensorflow.example.tutorials.mnist import input_data 
    import numpy as np 
    
    # 生成整数型的属性
    def _int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[vslue]))
    
    mnist = input_data.read_data_sets(
            "/path/to/mnist/data",
            dtype=tf.unit8,
            one_hot=True
        )
    
    images =  mnist.train.images 
    
    # 训练数据所对应的正确答案,可以作为一个属性保存在TFRecords中。
    labels =  mnist.train.labels 
    
    # 训练数据的图像分辨率,这可以作为Example中的一个属性。
    pixels = images.shape[1]
    num_examples = mnist.train.num_examples 
    
    # 输出TFRecord文件的地址
    filename = "/path/to/output.tfrecords"
    # 创建一个writer来写TFRecord文件
    

    -7.2- 图像数据处理

    本节,将介绍如何对图像数据进行预处理。
    这一节将列举TensorFlow支持的图像处理函数,并介绍如何使用这些处理方式来弱化与图像识别无关的因素
    在之前的几章中多次使用到了图像识别数据集。然而在之前的章节中都是直接使用图
    像原始的像素矩阵。这一节将介绍图像的预处理过程。
    通过对图像的预处理,可以尽量避免模型中受到无关因素的影响。在大部分图像识别问题中,通过图像预处理过程可以提高模型的准确率。

    -7.2.1- TensorFlow图像处理函数

    在本节中,将
    * 介绍 TensorFlow 提供的主要图像处理函数
    * 给出具体图像在处理前和处理后的变化让读者有一个直观的了解

    TensorFlow提供了几类图像处理函数,在本节中将一一介绍这些图像处理函数。

    图像编码处理

    在之前的章节中提到一张RGB色彩模式的图像可以看成一个三维矩阵,矩阵中的每一个数表示了图像上不同位置,不同颜色的亮度。然而图像在存储时并不是直接记录这些矩阵中的数字,而是记录经过压缩编码之后的结果。所以要将一张图像还原成一个三维矩阵,需要解码的过程。TensorFlow提供了对jpeg和png格式图像的编码/解码函数。
    以下代码示范了如何使用TensorFlow中对jpeg格式图像进行编码/解码。

    # matplotlib.pyplot是一个python的画图工具。
    # 在这一节中将使用这个工具来可视化经过TensorFlow处理的图像。
    import matplotlib.pyplot as plt 
    import tensorflow as tf 
    
    # 读取图像的原始数据
    img_raw_data = tf.gfile.FastGFile("/path/to/picture", 'r').read() 
    
    with tf.Session() as sess:
        # 对图像进行jpeg的格式解码从而得到图像对应的三维矩阵。
        # TensorFlow还提供了tf.image.decode_png()函数对png格式的图像进行解码。
        # 解码之后的结果作为一个张量,在使用它的取值之前需要明确调用运行的过程。
        img_data = tf.image.decode_jpeg(image_raw_data)
    
        print(img_data.eval())
        # 输出解码之后的三维矩阵,上面这一行代码将输出以下内容。
        """
            [
                [[165 160 138]
                ...,
                [105 140 50]]
    
                [[166 161 139]
                ...,
                [106 139 48]]
    
                ...,
    
                [[207 200 181]
                ..., 
                [106 81 50]]
            ]
        """
        
        # 使用pyplot工具可视化得到的图像。
        # 可以得到图7-1中展示的图像。
        plt.imshow(img_data.eval())
        plt.show()
    
        # 将表示一张图片的三维矩阵重新按照jpeg格式编码存入文件中。
        # 打开这张图像,可以得到和原始图像一样的图像。
        encoded_image = tf.image.encode_jpeg(img_data)
        with tf.gfile.GFile("/path/to/output", 'wb') as f:
            f.write(encoded_image.eval())
    
    图7-1

    图7-1显示了以上代码可视化输出的一张图像,在下面的篇幅中将继续使用这张图像介绍TensorFlow其他图像处理的函数。

    图像大小调整

    一般来说,网络上获取的图像大小是不固定,但神经网络输入节点的个数是固定的。所以在将图像的像素作为输入提供给神经网络之前,需要先将图像的大小统一。这就是图像大小调整需要完成的任务。
    图像大小调整有两种方式:
    * 第一种是通过算法使得新的图像尽量保存原始图像上的所有信息。TensorFlow提供了4种不同的方法,并且将它们封装到了tf.image.resize_images()函数。
    以下代码示范了如何使用这个函数:

    # 加载原始图像,定义会话等过程和图像处理中代码一致,
    # 在下面的样例中中就全部略去了,
    # 假设img_data是已经解码的图像。
    ...
    # 首先将图片数据转化为实数类型。
    # 这一步将0~255的像素值转化为0.0~1.0范围内的实数。
    # 大多数图像处理API支持整数和实数类型的输入。
    # 如果输入是整数类型,这些API会在内部将输入转化为实数后处理,再将输出转化为整数。
    # 如果有多个处理步骤,在整数和实数之间反复转化将导致精度损失,因此推荐在图像处理前将其转化为实数类型。
    # 下面的样例将略去这一步骤,假设img_data是经过此类型转化的图像。
    img_data = tf.image.convert_image_dtype(img_data, dtype=tf.float32)
    
    # 通过tf.image.resize_images()函数调整图像的大小。
    # 这个函数第一个参数为原始图像,
    # 第二个和第三个参数为调整后的图像的大小,
    # method参数给出了调整图像大小的算法。
    # 注意:如果输入数据是unit8格式,那么输出将是0~255之间的实数,不方便后续处理,
    # 本书将以在调整图像大小前,先转化为实数类型。
    resized = tf.image.resize_images(img_data, [300, 300], method=0)
    
    # 通过pyplot可视化的过程和图像编码处理中给出的代码一致,
    # 在以下代码中也将略去
    
    表7-1

    表7-1给出了tf.image.resize_images()函数的method参数取值对应的图像大小调整算法。



    图7-2

    图7-2对比了不同大小调整算法得到的结果。从图7-2中可以看出,不同算法调整出来的结果会有细微差别,但不会相差太远。

    * 除了将整张图像信息完整保存,TensorFlow还提供了API对图像进行裁剪或者填充。
    以下代码展示了通过tf.image.resize_image_with_crop_or_pad()函数来调整图像大小的功能:

    # 通过tf.image.resize_image_with_crop_or_pad()函数调整图像的大小。
    # 这个函数的第一个参数为原始图像,后面两个参数是调整后的目标图像大小。
    # 如果原始图像的尺寸大于目标图像,那么这个函数会自动截取原图像中剧中的部分(如图7-3(b)所示)。
    # 如果目标图像大于原始图像,这个函数会自动在原始图像的四周填充全0背景(如图7-3(c)所示)。
    # 因为原始图像的大小为1797x2673,所以下面的第一个命令会自动剪裁、而第二个命令会自动填充。
    croped = tf.image.resize_image_with_crop_or_pad(img_data, 1000, 1000)
    padded = tf.iamge.resize_image_with_crop_or_pad(img_data, 3000, 3000)
    
    图7-3

    TensorFlow还支持通过比例调整图像大小,以下代码给出了一个样例:

    # 通过tf.image.central_crop函数可以按比例剪裁图像。
    # 这个函数的第一个参数为原始图像,
    # 第二个参数为调整比例,这个比例需要是一个(0,1]的实数。
    # 图7-4(b)中显示了调整之后的图像。
    central_cropped = tf.image.central_crop(img_data, 0.5)
    
    图7-4

    上面介绍的图像剪裁函数都是截取或者填充图像中间的部分。
    TensorFlow也提供了tf.image.crop_to_bounding_box()函数和tf.image.pad_to_bounding_box()函数来剪裁或者填充给定区域的图像。这两个函数都要求给出的尺寸满足一定的要求,否则程序会报错。比如在使用tf.image.crop_to_bounding_box()函数时,TensorFlow要求提供的图像尺寸要大于目标尺寸,也就是要求原始图像能够剪裁出目标图像的大小。这里就不再给出每个函数的具体样例,有兴趣的读者可以自行参考TensorFlow的API文档。

    图像翻转

    TensorFlow提供了一些函数来支持对图像的翻转。
    以下代码实现了将图像上下翻转、 左右翻转以及沿对角线翻转的功能:

    # 将图像上下翻转,翻转后的效果见图7-5(b)
    flipped = tf.image.flip_up_down(img_data)
    
    # 将图像左右翻转,翻转后的效果见图7-5(c)
    flipped = tf.image.flip_left_right(img_data)
    
    # 将图像沿对角线翻转,翻转后的效果见图7-5(d)
    transposed = tf.image.transpose_image(img_data)
    
    图7-5

    在很多图像识别问题中,图像的翻转不会影响识别的结果。于是在训练图像识别的神经网络模型时,可以随机地翻转训练图像,这样训练得到的模型可以识别不同角度的实体。比如假设在训练数据中所有的猫头都是向右的,那么训练出来的模型就无法很好地识别猫头向左的猫。虽然这个问题可以通过收集更多的训练数据来解决,但是通过随机翻转训练图像的方式可以在零成本的情况下很大程度地缓解该问题。所以随机翻转训练图像是一种很常用的图像预处理方式。TensorFlow提供了方便的API完成随机图像翻转的过程。

    #  以50%概率上下翻转图像
    flipped = tf.image.random_flip_up_down(img_data)
    
    # 以50%概率左右翻转图像
    flipped = tf.image.random_flip_left_right(img_data)
    
    图像色彩调整

    和图像翻转类似,调整图像的亮度、对比度、饱和度和色相在很多图像识别应用中都不会影响识别结果。所以在训练神经网络模型时,可以随机调整训练图像的这些属性,从而使训练得到的模型尽可能小地受到无关因素的影响。TensorFlow提供了调整这些色彩相关属性的API。
    以下代码显示了如何修改图像的亮度:

    # 将图像的亮度-0.5,得到的图像效果如图7-6(b)所示。
    adjusted = tf.image.adjust_brightness(img_data, -0.5)
    
    # 色彩调整的API可能导致像素的实数值超出0.0~1.0的范围,
    # 因此在输出最终的图像之前,需要将其值截断在0.0~1.0的范围区间,
    # 否则不仅图像无法正常可视化,以此为输入的神经网络的训练质量也可能受到影响。
    
    # 如果对图像进行多项处理操作,那么这一截断过程应当在所有处理完成后进行。
    # 举例而言,假如对图像依次提高亮度和减少对比度,
    # 那么第二个操作可能将第一个操作生成的部分过亮的像素拉回到不超过1.0的范围内,
    # 因此在第一个操作后不应该立即截断,而是在最后截断
    # 下面的样例假设截断操作在最终可视化图像前进行
    adjusted = tf.clip_by_value(adjusted, 0.0, 1.0)
    
    # 将图像的亮度+0.5,得到的图像效果如图7-6(c)所示。
    adjusted = tf.image.adjust_brightness(img_data, 0.5)
    
    # 在[-max_delta, max_delta)的范围随机调整图像的亮度。
    adjusted = tf.image.random_brightness(img_data, max_delta)
    
    图7-6

    以下代码显示了如何调整图像的对比度:

    # 将图像的对比度减少到0.5倍,得到的图像效果如图7-7(b)所示。
    adjusted = tf.image.adjust_contrast(img_data, 0.5)
    
    # 将图像的对比度增加5倍,得到的图像效果如图7-7(c)所示。
    adjusted = tf.image.adjust_contrast(img_data, 5)
    
    # 在[lower, upper]的范围随机调整图像的对比度
    adjusted = tf.image.random_contrast(img_data, lower, upper)
    
    图7-7

    以下代码显示了如何调整图像的色相:

    # 下面4条命令分别将色相想家0.1、0.3、0.6和0.9,
    # 得到的效果分别在图7-8(a)、(b)、(c)、(d)中展示。
    adjusted = tf.image.adjust_hue(img_data, 0.1)
    adjusted = tf.image.adjust_hue(img_data, 0.3)
    adjusted = tf.image.adjust_hue(img_data, 0.6)
    adjusted = tf.image.adjust_hue(img_data, 0.9)
    
    # 在[-max_delta, max_delta]的范围内随机调整图像的色相
    adjusted = tf.iamge.random_hue(img_data, max_delta)
    
    图7-8

    以下代码显示了如何调整图像的饱和度:

    # 将图像的饱和度-5,得到的图像效果如图7-9(b)所示。
    adjusted = tf.image.adjust_saturation(img_data, -5)
    
    # 将图像的饱和度+5,得到的图像效果如图7-9(c)所示。
    adjusted = tf.image.adjust_saturation(img_data, 5)
    
    # 在[lower, upper]的范围内随机调整图像的饱和度
    adjusted = tf.image.random_saturation(img_data, lower, upper)
    
    图7-9

    除了调整图像的亮度、对比度、饱和度和色相,TensorFlow还提供API来完成图像标准化的过程。这个操作就是将图像上的亮度均值变化为0,方差变为1。
    以下代码实现了这个功能:

    # 将代表一张图像的三维矩阵中的数字均值变为0,方差变为1。
    # 调整后的图像如图7-10(b)所示。
    adjusted = tf.image.per_image_standardization(img_data)
    
    图7-10
    处理标注框

    在很多图像识别的数据集中,图像中需要关注的物体通常会被标注框圈出来。TensorFlow提供了一些工具来处理标注框。
    以下代码展示了如何通过tf.image.draw_bounding_boxes()函数在图像中加入标注框:

    # 将图像缩小一些,这样可视化能然难过标注框更加清楚。
    img_data = tf.image.resize_images(img_data, [180, 267], method=1)
    
    # tf.image.draw_bounding_boxes()函数要求图像矩阵中的数字为实数,
    # 所以需要先将图像矩阵转化为实数类型。
    # tf.image.draw_bounding_boxes()函数图像的输入是一个batch的数据,
    # 也就是多张图像组成的四维矩阵,
    # 所以需要将解码之后的图像矩阵加一维。
    batched = tf.expand_dims(
            tf.image.convert_image_dtype(img_data, tf.float32), 
            0 
        )
    
    # 给出每一张图像的所有标注框。
    # 一个标注框有4个数字,分别代表[Ymin, Xmin, Ymax, Xmax]。
    # 注意这里给出的数字都是图像的相对位置。
    # 比如在180x267的图像中,[0.35, 0.47, 0.5, 0.56]代表了从(63, 125)到(90, 150)的图像。
    boxes = tf.constant([[
            [0.05, 0.05, 0.9, 0.7], [0.35, 0.47, 0.5, 0.56]
        ]])
    
    # 图7-11显示了加入标注框的图像
    result = tf.image.draw_bounding_boxes(batched, boxes)
    
    图7-11

    和随机翻转图像、随机调整颜色类似,随机截取图像上有信息含量的部分也是一个提高模型健壮性(robustness)的一种方式。这样可以使训练得到的模型不受被识别物体大小的影响。
    以下程序中展示了如何通过tf.image.sample_distorted_bounding_box()函数来完成随机截取图像的过程。

    boxes = tf.constant([[
            [0.05, 0.05, 0.9, 0.7], [0.35, 0.47, 0.5, 0.56]
        ]])
    # 可以通过提供标注框的方式来告诉随机截取图像的算法哪些部分是「有信息的」。
    # min_object_covered=0.4表示截取部分至少包含某个标注框40%的内容。
    begin, size, bbox_for_draw = tf.image.sample_distorted_bounding_box(
            tf.shape(img_data), 
            bounding_boxes=boxes,
            min_object_covered=0.4
        )
    # 通过标注框可以随机截取得到的图像。
    # 得到的结果如图7-12左所示。
    batched = tf.expand_dims(
            tf.image.convert_image_dtype(img_data, tf.float32),
            0 
        )
    image_with_box = tf.image.draw_bounding_boxes(
            batched, 
            bbox_for_draw
        )
    # 截取随机出来的图像。
    # 得到的结果如图7-12右所示。
    # 因为算法带有随机成分,所以每次得到的结果会有所不同。
    distorted_image = tf.slice(img_data, begin, size)
    
    图7-12
    -7.2.2- 图像预处理完整样例

    在本节中将给出一个完整的图像预处理流程。

    在7.2.1节中详细讲解了TensorFlow提供的主要的图像处理函数。在解决真实的图像识别问题时,一般会同时使用多种处理方法。这一个节将给出一个完整的样例程序展示如何将不同的图像处理函数结合成一个完成的图像预处理流程。
    以下TensorFlow程序完成了从图像片段截取,到图像大小调整再到图像翻转及色彩调整的整个图像预处理过程:

    import tensorflow as tf 
    
    import numpy as np 
    import matplotlib.pyplot as plt
    
    # 给定一张图像,随机调整图像的色彩。
    # 因为调整亮度、对比度、饱和度和色相的顺序会影响最后得到的结果,
    # 所以可以定义多种不同的顺序。
    # 具体可以使用哪一种顺序可以在训练数据预处理时随机的选择一种。
    # 这样可以进一步降低无关因素对模型的影响。
    def distort_color(image, color_ordering=0):
        if color_ordering == 0 :
            image = tf.image.random_brightness(image, max_delta=32./255.)
            image = tf.iamge.random_saturation(image, lower=0.5, upper=1.5)
            image = tf.iamge.random_hue(image, max_delta=0.2)
            image = tf.image.random_contrast(iamge, lower=0.5, upper=1.5)
        elif color_ordering == 1:
            image = tf.iamge.random_saturation(image, lower=0.5, upper=1.5)
            image = tf.image.random_brightness(image, max_delta=32./255.)
            image = tf.image.random_contrast(image, lower=0.5, upper=1.5)
            image = tf.image.random_hue(image, max_delta=0.2)
        elif color_ordering == 2:
            # 还可以定义其他的排列,但在这里就不再一一列出
            ... 
        return tf.clip_by_value(image, 0.0, 1.0)
    
    # 给定一张解码后的图像、目标图像的尺寸以及图像上的标注框,
    # 此函数可以对给出的图像进行预处理。
    # 这个函数的输入是图像识别问题中原始的训练图像,
    # 而输出是神经网络模型的输入层。
    # 注意这里只处理模型的「训练数据」,对于「预测数据」一般不需要使用随机变换的步骤。
    def preprocess_for_train(image, height, width, bbox):
        # 如果没有提供标注框,则认为整个图像就是需要关注的部分。
        if bbox is None:
            bbox = tf.constant(
                [0.0, 0.0, 1.0, 1.0],
                dtype=tf.float32, 
                shape=[1, 1, 4]
            )
    
        # 转换图像张量的类型。
        if image.dtype != tf.float32:
            image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    
        # 随机截取图像,减小需要关注的物体大小对图像识别算法的影响。
        bbox_begin, bbox_size, _ = tf.image.sample_distorted_bounding_box(
                tf.shape(image), 
                bounding_boxes=bbox 
            )
        distorted_image = tf.slice(image, bbox_begin, bbox_size)
    
        # 将随机截图的图像调整为神经网络输入层的大小。
        # 大小调整的算法是随机选择的。
        distorted_image = tf.image.resize_images(
            distorted_image, 
            [height, width], 
            method=np.ranndom.randint(4)
        )
    
        # 随机左右翻转图像
        distorted_image = distort_color(
            distorted_image, 
            np.random.randint(2)
        )
    
        return distorted_image 
    
    image_raw_data = tf.gfile.FastGFile("/path/to/picture", "r").read()
    
    with tf.Session() as sess:
        img_data = tf.image.decode_jpeg(img_raw_data)
        boxes = tf.constant(
            [[
                [0.05, 0.05, 0.9, 0.7], 
                [0.35, 0.47, 0.5, 0.56]
            ]]
        )
        #  运行6次获得6种不同的图像,在图7-13展示了这些图像的效果。
        for i in range(6):
            # 将图像的尺寸调整为299x299
            result = preprocess_for_train(img_data, 299, 299, boxes)
            plt.imshow(result.eval())
            plt.show()
    
    图7-13

    运行以上程序,可以得到类似图7-13中所示的图像。这样就可以通过一张训练图像衍生出很多训练样本。通过将训练图像进行预处理,训练得到的神经网络模型可以识别不同大小、方位、色彩等方面的实体。

    -7.3- 多线程输入数据处理框架

    本节,将介绍TensorFlow利用队列进行多线程数据预处理流程。
    多线程用于加速数据预处理过程,防止复杂的图像处理函数降低训练的速度。
    将首先介绍TensorFlow中多线程和队列的概念,这是TensorFlow多线程数据预处理的基本组成部分。
    然后将具体介绍数据预处理流程中的每个部分,并将给出一个完整的多线程数据预处理流程图和程序框架。

    在 7.2 节中介绍了使用TensorFlow对图像数据进行预处理的方法。虽然使用这些图像数据预处理的方法可以减小无关因素对图像识别模型效果的影响,但这些复杂的预处理过程也会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练效率的瓶颈,TensorFlow提供了一套多线程处理输入数据的框架。在本节中将详细介绍这个框架。


    图7-14

    图7-14总结了一个经典的输入数据处理的流程,在以下的各个小节中,将依次介绍这个流程的不同部分。

    -7.3.1- 队列与多线程

    本节将首先介绍TensorFlow中队列的概念。
    在TensorFlow中,队列不仅是一种数据结构,它更提供了多线程机制。
    队列也是TensorFlow多线程输入数据处理框架的基础。

    在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。其他的计算节点可以修改它们的状态。对于变量,可以通过赋值操作修改变量的取值。对于队列,修改队列状态的操作主要有EnqueueEnqueueManyDequeue
    以下程序展示了如何使用这些函数来操作一个队列:

    import tensorflow as tf 
    
    # 创建一个先进先出队列,指定队列中最多可以保存两个元素,并指定类行为整数。
    q = tf.FIFOQueue(2, "int32")
    
    # 使用enqueue_many()函数来初始化队列中的元素。
    # 和变量初始化类似,在使用队列之前需要明确调用这个初始化过程。
    init = q.enqueue_many(([0, 10],))
    
    # 使用Dequeue()函数将队列中的第一个元素出队列。这个元素的值将被存在变量x中。
    x = q.dequeue()
    
    # 将得到的值加1。 
    y = x + 1 
    
    # 将加1后的值再重新加入队列。
    q_inc = q.enqueue([y])
    
    with tf.Session() as sess:
        # 运行初始化队列操作。
        init.run()
        for _ in range(5):
            # 运行q_inc将执行数据出队列、出队列的元素+1、重新加入队列的整个过程
            v , _ = sess.run([x, q_inc])
            # 打印出队元素的取值
            print(v)
    
    """
    队列开始有[0, 10]两个元素,
    第一个出队的为0,加1之后再次入队,得到的队列为[10, 1];
    第二次出队的为10,加1之后再次入队,得到的队列为[1, 11];
    依次类推,最后得到的输出为:
    0
    10
    1
    11
    2
    """
    

    TensorFlow中提供了FIFOQueueRandomShuffleQueue两种队列。在以上程序中,己经展示了如何使用FIFOQueue,它的实现的是一个先进先出队列。RandomShuffleQueue会将队列中的元素打乱,每次出队列操作得到的是从当前队列所有元素中随机选择的一个。在训练神经网络时希望每次使用的训练数据尽量随机,RandomShuffleQueue就提供了这样的功能。

    在TensorFlow中,队列不仅仅是一种数据结构,还是异步计算张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素,或者同时读取一个队列中的元素。在后面的小节中将具体介绍TensorFlow是如何利用队列来实现多线程输入数据处理的。

    在本节之后的内容中将先介绍TensorFlow提供的辅助函数来更好地协同不同的线程。TensorFlow提供了tf.Coordinatortf.QueueRunner两个类来完成多线程协同的功能。

    * tf.Coordinator

    tf.Coordinator主要用于协同多个线程一起停止,并提供了should_stop()、request_stop()和join()三个函数。

    在启动线程之前,需要先声明一个tf.Coordinator类,并将这个类传入每一个创建的线程中。
    启动的线程需要一直查询tf.Coordinator类中提供的should_stop()函数,当这个函数的返回值为True时,则当前线程也需要退出。
    每一个启动的线程都可以通过调用request_stop()函数来通知其他线程退出。当某一个线程调用request_stop()函数之后,should_stop()函数的返回值将被设置为True,这样其他的线程就可以同时终止了。

    以下程序 展示了如何使用 tf.Coordinator:

    import tensorflow as tf 
    import numpy as np
    import threading 
    import time 
    
    # 线程中运行的程序,这个程序每隔1s判断是否需要停止,并打印自己的ID。
    def MyLoop(coord, worker_id):
        # 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止。
        while not coord.shold_stop():
            # 随机停止所有的线程。
            if np.random.rand() < 0.1 :
                print("Stoping from id: %d\n" % worker_id)
                coord.request_stop()
            else:
                # 打印当前线程的ID
                print("Working on id: %d\n" % worker_id)
            # 暂停1秒
            time.sleep(1)
    
    # 声明一个tf.train.Coordinator类来协同多个线程。
    coord = tf.train.Coordinator()
    # 声明创建5个线程。
    threads = [
            threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)
        ]
    
    # 启动所有的线程
    for t in threads: t.start()
    
    # 等待所有的线程退出
    coord.join(threads)
    

    运行以上程序,可以得到类似下面的结果:

    Working on id: 0 
    Working on id: 1 
    Working on id: 2 
    Working on id: 4
    Working on id: 3
    Working on id: 0
    Stoping from id: 4 
    Working on id: 1 
    

    当所有线程启动之后,每个线程会打印各自的ID,于是前面4行打印出了它们的ID。然后在暂停1秒之后,所有线程又开始第二遍打印ID。在这个时候有一个线程退出的条件达到,于是调用了coord.request_stop()函数来停止所有其他的线程。
    然而在打印"Stoping from id: 4"之后,可以看到有线程仍然在输出。这是因为这些线程已经执行完coord.should_stop()的判断,于是仍然会继续输出自己的ID。但在下一轮判断是否需要停止时将退出线程。于是在打印一次ID之后就不会再有输出了。

    * tf.QueueRunner

    tf.QueueRunner主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的tf.Coordinator类来统一管理。
    以下代码展示了如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作:

    import tensorflow as tf 
    
    # 声明一个先进先出的队列,队列中最多100个元素,类型为实数。
    queue = tf.FIFOQueue(100, "float")
    
    # 定义队列的入队操作。
    enqueue_op = queue.enqueue([tf.random_normal([1])])
    
    # 使用tf.train.QueueRunner来创建多个线程运行队列的入队操作。
    # tf.train.QueueRunner的第一个参数来给出了被操作的队列
    # [enqueue_op]*5表示了需要启动5个线程,每个线程中运行的是enqueue_op操作。
    qr = tf.train.QueueRunner(queue, [enqueue_op]*5)
    
    # 将定义过的QueueRunner加入TensorFlow计算图上指定的集合。
    # tf.train.add_queue_runner函数没有指定集合,则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。
    # 下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS集合。
    tf.train.add_queue_runner(qr)
    
    # 定义出队操作。
    outer_tensor = queue.dequeue()
    
    with tf.Session() as sess:
        # 使用tf.train.Coordinator来协同启动的线程。
        coord = tf.train.Coordinator()
        
        # 使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners来启动所有线程。
        # 否则因为没有线程运行入队操作,当调用出队操作时,程序会一直等待入队操作被运行。
        # tf.train.start_queue_runners()函数会默认启动tf.GraphKeys.QUEUE_RUNNERS集合中所有的QueueRunner。
        # 因为这个函数只支持启动指定集合中的QueueRunner,所以一般来说tf.train.add_queue_runner()函数和tf.trainn.sart_queue_runners()函数会指定同一个集合。
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
        # 获取队列中的取值
        for _ in range(3):
            print(sess.run(out_tennsor)[0])
    
        # 使用tf.train.Coordinator来停止所有的线程
        coord.request_stop()
        coord.join(threads)
    
    """
    以上程序将启动5个线程来执行队列入队操作,其中每一个线程都是将随机数写入队列。
    于是在每次运行出队操作时,可以得到一个随机数。
    运行这段程序,剋已得到类似下面的结果:
    -0.315963 
    -1.06425 
    0.347479
    
    """
    
    -7.3.2- 输入文件队列

    本节中将介绍如何在TensorFlow中实现图7-14中的前三步。
    TensorFlow提供了tf.train.string_input_producer()函数来有效管理原始输入文件列表。
    本节中将重点介绍如何使用tf.train.string_input_producer()这个函数。
    图7-14 中数据预处理的部分已经在7.2节中有过详细介绍,本节不再重复。

    本节将介绍如何使用TensorFlow中的队列管理输入文件列表。
    在这一节中 ,假设所有的输入数据都己经整理成了TFRecord格式。虽然一个TFRecord文件中可以存储多个训练样例,但是当训练数据量较大时,可以将数据分成多个TFRecord文件来提高处理效率。
    TensorFlow提供了tf.train.match_filenames_once()函数来获取符合一个正则表达式的所有文件,得到的文件列表可以通过tf.train.string_input_producer()函数进行有效的管理。
    tf.train.string_input_producer()函数会使用初始化时提供的文件列表创建一个输入队列,输入队列中原始的元素为文件列表中的所有文件。如7.1节中的样例代码所示,创建好的输入队列可以作为文件读取函数的参数。每次调用文件读取函数时,该函数会先判断当前是否己有打开的文件可读,如果没有或者打开的文件己经读完,这个函数会从输入队列中出队一个文件并从这个文件中读取数据。
    通过设置shuffle参数,tf.train.string_input_producer()函数支持随机打乱文件列表中文件出队的顺序。当shuffle参数为True时,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。随机打乱文件顺序以及加入输入队列的过程会跑在一个单独的线程上,这样不会影响获取文件的速度。tf.train.string_input_producer()生成的输入队列可以同时被多个文件读取线程操作,而且输入队列会将队列中的文件均匀地分给不同的线程,不出现有些文件被处理过多次而有些文件还没有被处理过的情况。
    当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列。tf.train.string_input_producer()函数可以设置num_epochs参数来限制加载初始文件列表的最大轮数。当所有文件都己经被使用了设定的轮数后,如果继续尝试读取新的文件,输入队列会报OutOfRange的错误。在测试神经网络模型时,因为所有测试数据只需要使用一次,所以可以将num_epochs参数设置为1。这样在计算完一轮之后程序将自动停止。
    在展示tf.train.match_filenames_once()和tf.train.string_input_producer()函数的使用方法之前,下面先给出一个简单的程序来生成样例数据:

    import tensorflow as tf 
    
    # 创建TFRrecords文件的帮助函数
    def __int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
    
    # 模拟海量数据情况下将数据下入不同的文件。
    # num_shards定义了总共写入多少个文件。
    # instances_per_shard定义了每个文件中写多少个数据。
    num_shards = 2 
    instances_per_shard = 2 
    for i in range(num_shards):
        # 将数据分为多个文件时,可以将不同文件以类似0000n-of-000m的后缀区分。
        # 其中m表示了数据总共被存在了多少个文件中,n表示当前文件的编号。
        # 式样的方式既方便了通过正则表达式获取文件列表,又在文件名中加入了更多的信息。
        file_name = ('/path/to/data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
        writer = tf.python_io.TFRecordWriter(filename)
        # 将数据封装成Example结构,并写入TFRecords文件。
        for j in range(instances_per_shard):
            # Example结构仅包含当前样例属于第几个文件,以及是当前文件的第几个样本。
            example = tf.train.Example(features=tf.train.Features(feature={
                'i': _innt64_feature(i),
                'j': _int64_features(j)
            }))
            writer.write(examples.SerializeToString())
        writer.close()
    

    程序运行之后,在指定的目录下将生成两个文件: /path/to/data.tfrecords-00000-of-00002/path/to/data.tfrecords-00001-of-00002。每一个文件中存储了两个样例。
    在生成了样例数据之后,以下代码展示了tf.train.match_filenames_once()函数和tf.train.string_input_producer()函数的使用方法:

    import tensorflow as tf 
    
    # 使用tf.train.match_filenames_once函数获取文件列表。
    files = tf.train.match_filenames_once("/path/to/data.tfrecords-*")
    
    # 通过tf.train.string_input_producer()函数创建输入队列,
    # 输入队列中的文件列表为tf.train.match_filenames_once()函数获取的文件列表。
    # 这里将shuffle参数设置为False来避免随机打乱读文件的顺序。
    # 但一般在解决真实问题时,会将shuffle参数设置为True。
    filename_queue = tf.train.string_input_producer(files, shuffle=False)
    
    # 如图7.1节中所示读取并解析一个样本。
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    features = tf.parse_single_example(
            serialized_example,
            features={
                'i': tf.FixedLenFeature([], tf.int64), 
                'j': tf.FixedLenFeature([], tf.int64)
            }
        )
    
    with tf.Session() as sess:
        # 虽然在本段程序中没有声明任何变量,
        # 但使用tf.train.match_filenames_once()函数需要初始化一些变量。
        tf.local_variables_initializer().run()
        """
        打印文件列表将得到以下结果:
        ['/path/to/data.tfrecords-00000-of-00002'
         '/path/to/data.tfrecords-00001-of-00002'
        ]
        """
        print(sess.run(files))
    
        # 声明tf.train.Cordinator类来协同不同线程,并启动线程。
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
        # 多次执行获取数据的操作
        for i in range(6):
            print(sess.run([features['i'], features['j']]))
        coord.request_stop()
        coord.join(threads)
    

    以上打印将输出:

    [0, 0]
    [0, 1]
    [1, 0]
    [1, 1]
    [0, 0]
    [0, 1]
    

    在不打乱文件列表的情况下,会依次读出样例数据中的每一个样例。而且当所有样例都被读完之后,程序会自动从头开始。
    如果限制num_epochs为1,那么程序将会报错:

    tensorflow.python.framework.errors.OutOfRangeError: FIFOQueue
    '_0_input_producer' is closed and has innsufficient elements (requested 1, current size 0)
    [
        [Node: ReaderRead = ReaderRead[
            _class=["loc:@TFRecordReader", "loc:@input_producer"],
            _device="/job:localhost/replica:0/task:0/cpu:0"](TFRecordReader, input_producer)
        ]
    ]
    
    -7.3.3- 组合训练数据(batching)

    本节中将介绍图7-14中的最后一个流程。
    这个流程将处理好的单个训练数据整理成训练数据batch,这些batch就可以作为神经网络的输入。
    本节将介绍tf.train.shuffle_batch_join()和tf.train.shuffle_batch()函数,并比较不同函数的多线程并行方式。

    在7.3.2节中己经介绍了如何从文件列表中读取单个样例,将这些单个样例通过7.2节中介绍的预处理方法进行处理,就可以得到提供给神经网络输入层的训练数据了。
    在第4章介绍过,将多个输入样例组织成一个batch可以提高模型训练的效率。所以在得到单个样例的预处理结果之后,还需要将它们组织成batch,然后再提供给神经网络的输入层。TensorFlow提供了tf.train.batch()tf.train.shuffle_batch()函数来将单个的样例组织成batch的形式输出。这两个函数都会生成一个队列,队列的入队操作是生成单个样例的方法,而每次出队得到的是一个batch的样例。它们唯一的区别在于是否会将数据顺序打乱。

    以下代码展示了这两个函数的使用方法:

    import tensorflow as tf 
    
    # 使用7.3.2节中的方法读取并解析得到样例。
    # 这里假设Example结构中i表示一个样例的example,比如一张图像的像素矩阵。
    # 而j表示该样例对应的标签。
    example, label = features['i'], features['j']
    
    # 一个batch中样例的个数。
    batch_size= 3 
    
    # 组合样例的队列中最多可以存储的样例个数。
    # 这个队列如果太大,那么需要占用很多内存资源;
    # 队列如果太小,那么出队操作可能会因为没有数据而被阻碍(block),从而导致训练效率降低。
    # 一般来说这个队列的大小会和每一个batch的大小相关,
    # 下面一行代码展示了设置队列大小的一种方式。
    capacity  = 1000 + 3*batch_size
    
    # 使用tf.train.batch()函数来组合样例。
    # [example, label]参数给出了需要组合的元素,一般example和label分别代表训练样本和这个样本对应的正确标签。
    # batch_size参数给出了每个batch中样例的个数。
    # capacity给出了队列的最大容量。
    # 当队列长度等于容量时,TensorFlow将暂停入队操作,而只是等待元素出队。
    # 当元素个数小于容量时,TensorFlow将自动重新启动入队操作。
    example_batch, label_batch = tf.train.batch(
            [example, label], 
            batch_size=batch_size, 
            capacity=capacity
        )
    
    with tf.Session() as sess:
        tf.initialize_all_variables().run()
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
        # 获取并打印组合之后的样例。
        # 在真实问题中,这个输出一般会作为神经网络的输出。
        for i in range(2):
            cur_example_batch, cur_label_batch = sess.run(
                    [example_batch, label_batch]
                )
            print(cur_example_batch, cur_label_batch)
            
        coord.request_stop()  
        coord.join(threads)
    
    """
    运行以上程序可以得到以下输出:
    [0 0 1] [0 1 0]
    [1 0 0] [1 0 1]
    从这个输出可以看到tf.train.batch()函数可以将单个的数据组织称3个一组batch。
    在example、lable中读到的数据依次为:
    example: 0, label: 0
    example: 0, label: 1
    example: 1, label: 0
    example: 1, label: 1
    这是因为tf.train.batch()函数不会随机打乱顺序,所以组合之后得到的数据组合成了上面给出的输出。
    """
    

    下面一段代码展示了tf.train.shuffle_batch()函数的使用方法。

    # 和tf.train.batch()的代码一样产生example和label。
    example, label = features["i"], features["j"]
    
    # 使用 tf.train.shuffle_batch()函数来组合样例。
    # tf.train.shuffle_batch()函数的参数大部分都和tf.train.batch()函数相似,
    # 但是min_after_dequeue参数是tf.train.shuffle_batch()函数特有的。
    # min_after_dequeue参数限制了出队时队列元素中元素的最少个数。
    # 当队列中元素太少时,随机打乱样例顺序的作用就不大了。
    # 所以tf.train.shuffle_btach()函数提供了限制出队时最少元素的个数来保证随机打乱顺序的作用。
    # 当出队函数被调用,但是队列中的元素不够时,出队操作将等待更多的元素入队才会完成。
    # 如果min_after_dequeue参数被设定,capacity也应该相应调整来满足性能需求。
    example_batch, label_batch = tf.train.shuffle_batch(
            [example, label], 
            batch_size=batch_size, 
            capacity=capacity,
            min_after_dequeue=30
        )
    
    # 和tf.train.batch()的样例代码一样打印example_batch, label_batch。 
    
    """
    运行以上代码可以得到以下输出:
    [0 1 1] [0 1 0]
    [1 0 0] [0 0 1]
    从数据可以看到,得到的样例顺序已经被打乱了
    """
    

    tf.train.batch()函数和tf.train.shuffle_batch()函数除了可以将单个训练数据整理成输入batch,也提供了并行化处理输入数据的方法。tf.train.batch()函数和tf.train.shuffle_batch()函数并行化的方式一样,所以在本节中仅以应用得更多的tf.train.shuffle_batch()函数为例。通过设置tf.train.shuffle_batch()函数中的num_threads参数,可以指定多个线程同时执行入队操作。tf.train.shuffle_batch()函数的入队操作就是数据读取及预处理的过程。当num_threads参数大于1时,多个线程会同时读取一个文件中的不同样例并进行预处理。如果需要多个线程处理不同文件中的样例时,可以使用tf.train.shuffle_batch_join()函数。此函数会从输入文件队列中获取不同的文件分配给不同的线程。一般来说,输入文件队列是通过7.3.2节中介绍的tf.train.tring_input_producer()函数生成的。这个函数会平均分配文件以保证不同文件中的数据会被尽量平均地使用。

    tf.train.shuffle_batch()函数和tf.train.shuffle_batch_join()函数都可以完成多线程并行的方式来进行数据预处理,但它们各有优劣。
    * 对于tf.train.shuffle_batch()函数,不同线程会读取同一个文件。如果一个文件中的样例比较相似(比如都属于同一个类别),那么神经网络的训练效果可能会受到影响。所以在使用tf.train.shuffle_batch()函数时,需要尽量将同一个TFRecords文件中的样例随机打乱。
    * 而使用tf.train.shuffle_batch_join()函数时,不同线程会读取不同文件。如果读取数据的线程比总文件数还大,那么多个线程可能会读到同一个文件中相近部分的数据。而且多个线程读取多个文件可能会导致过多的硬盘寻址,从而使得读取效率降低。不同的井行化方式各有所长,具体采用哪一种方法需要根据具体情况来确定。

    -7.3.4- 输入数据处理框架

    本节中将给出一个完整的TensorFlow程序来展示整个输入数据处理框架。
    在前面的小节中已经介绍了图 7-14所展示的流程图中的所有步骤。 在这一节将把这些步骤串成一个完成的 TensorFlow 来处理输入数据 。

    以下代码给出了这个完成的程序:

    import tensorflow as tf 
    
    # 创建文件列表,并通过文件列表创建输入文件队列。
    # 在调用输入数据处理流程前,需要统一所有原始数据的格式并将它们存储到TFRecord文件中。
    # 下面给出的文件列表应该包含所有提供训练数据的TFRecord文件。
    files = tf.train.match_filenames_once("/path/to/file_pattern-*")
    filename_queue = tf.train.string_input_producer(files, shuffle=False)
    
    # 使用类似7.1节中介绍的方法解析TFRecord文件里的数据。
    # 这里假设image中存储的是图像的原始数据,label为该样例所对应的标签。
    # height、width和channels给出了图片的维度。
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    features = tf.parse_single_example(
            serialized_example, 
            features={
                'image': tf.FixedLenFeature([], tf.string),
                'label': tf.FixedLenFeature([], tf.int64), 
                'height': tf.FixedLenFeature([]. tf.int64), 
                'width': tf.FixedLenFeature([], tf.int64), 
                'channels': tf.FixedLenFeature([], tf.int64), 
            }
        )
    image, label = features['image'], features['label']
    height, width = features['height'], features['width']
    channels = features['channels']
    
    # 从原始图像数据解析出像素矩阵,并根据图像尺寸还原图像。
    decoded_image = tf.decode_raw(image, tf.unit8)
    decoded_image.set_shape([height, width, channels])
    
    # 定义神经网络输入层图片的大小。
    image_size = 299
    
    # preprocess_for_train为7.2.2节中介绍的图像预处理程序。
    distorted_image = preprocess_for_train(
            decoded_image, 
            iamge_size, 
            image_size, 
            None
        )
    
    # 将处理后的图像和标签数据通过tf.train.shuffle_batch()函数整理成神经网络训练时需要的batch。
    min_after_dequeue = 10000
    batch_size = 100 
    capacity = min_after_dequeue + 3*batch_size 
    image_batch, label_batch = tf.train.shuffle_batch(
            [distorted_image, label], 
            batch_size=batch_size, 
            capacity=capacity,
            min_after_dequeue=min_after_dequeue
        )
    
    # 定义神经网络的结构以及优化过程。
    # image_batch可以作为输入提供给神经网络的输入层。
    # label_batch则提供输入batch中样例的正确答案。 
    learning_rate = 0.01 
    logit = inference(image_batch)
    loss = calc_loss(logit, label_batch)
    train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)
    
    # 声明会话并运行神经网络的优化过程。
    with tf.Session() as sess:
        # 神经网络训练准备工作。
        # 这些工作包括变量初始化、线程启动。
        sess.run(
            (tf.global_variables_initializer(),
            tf.local_variable_initializer()
            )
        )
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runner(
            sess=sess,
            coord=coord
        )
    
        # 神经网络训练过程
        TRAINING_ROUNDS = 5000
        for i in range(TRAINNING_ROUNNDS):
            sess.run(train_step)
    
        # 停止所有线程
        coord.request_stop()
        coord.join(threads)
    
    图7-15

    图7-15展示了以上代码中输入数据处理的整个流程。从图7-15中可以看出,输入数据处理的第一步为获取存储训练数据的文件列表。在图7-15中,这个文件列表为{A,B,C}。
    * 通过tf.train.string_input_producer()函数,可以选择性地将文件列表中文件的顺序打乱,并加入输入队列。因为是否打乱文件的顺序是可选的,所以在图7-15中通过虚线表示。tf.train.string_input_producer()函数会生成并维护一个输入文件队列,不同线程中的文件读取函数可以共享这个输入文件队列。
    * 在读取样例数据之后,需要将图像进行预处理。图像预处理的过程也会通过tf.train.shuffle_batch()提供的机制井行地跑在多个线程中。输入数据处理流程的最后通过tf.train.shuffle_batch()函数将处理好的单个输入样例整理成batch提供给神经网络的输入层。通过这种方式,可以有效地提高数据预处理的效率,避免数据预处理成为神经网络模型训练过程中的性能瓶颈。

    -7.4- 数据集(DataSet)

    本节,将介绍最新的数据集(Dataset)API
    数据集从Tensorflow1.3起成为官方推荐的数据输入框架,它使数据的输入和处理大大简化。

    上一节介绍了通过队列进行多线程输入的方法。除队列以外,TensorFlow还提供了一套更高层的数据处理框架。在新的框架中,每一个数据来源被抽象成一个“数据集”,开发者可以以数据集为基本对象,方便地进行batching随机打乱(shuffle)等操作。从1.3版本起,TensorFlow正式推荐使用数据集作为输入数据的首选框架。从1.4 版本起,数据集框架从tf.contrib.data迁移到tf.data,成为TensorFlow的核心组成部件。

    -7.4.1- 数据集的基本使用方法

    本小节将介绍数据集(Dataset)的基本使用方法
    包括从文件创建数据集、使用迭代器遍历数据集等 。

    在数据集框架中,每一个数据集代表一个数据来源:数据可能来自一个张量,一个TFRecord文件,一个文本文件,或者经过sharding的一系列文件,等等。由于训练数据通常无法全部写入内存中,从数据集中读取数据时需要使用一个迭代器( iterator)按顺序进行读取,这点与队列的dequeue()操作和Reader的read()操作相似。与队列相似,数据集也是计算图上的一个节点。

    下面先看一个简单的例子。这个例子从一个张量创建一个数据集,遍历这个数据集, 并对每个输入输出 y=x2 的值。

    import tensorflow as tf 
    
    # 从一个数组创建数据集。
    input_data = [1, 2, 3, 5, 8]
    dataset = tf.data.DataSet.from_tensor_slices(input_data)
    
    # 定义一个迭代用于遍历数据集。
    # 因为上面定义的数据集没有用placeholder作为输入参数,所以这里可以使用最简单的one_shot_iterator。
    iterator = dataset.make_one_iterator()
    
    # get_next()返回一个代表一个输入数据的张量,类似于队列的dequeue() .
    x = iterator.get_next()
    y = x * x
    
    with tf.Session() as sess:
        for i in range(len(input_data)):
            print(sess.run(y))
    
    """
    运行以上程序可以得到以下输出:
    1 
    4 
    9 
    25 
    64
    """
    

    从以上例子可以看到,利用数据集读取数据有三个基本步骤:
    * 1 定义数据集的构造方法
    这个例子使用了tf.data.DataSet.from_tensor_slices(),表明数据集是从一个张量中构建的。如果数据集是从文件中构建的,则需要相应调整不同的构造方法。

    * 2 定义遍历器
    这个例子使用最简单的one_shot_iterator来便利数据集,稍后会介绍更加灵活的initializable_iterator。

    * 3 使用get_next()方法从遍历器中读取张量,作为计算图其他部分的输入
    在真实项目中,训练数据通常是保存在硬盘上的。比如在自然语言处理的任务中,训练数据通常是以每一条数据的形式存在问本文件中,这时可以用TextLineDataset来更方便地读取数据:

    import tensorflow as tf 
    
    # 从文本文件创建数据集。
    # 假定每行文字是一个训练例子。
    # 注意这里可以提供多个文件。
    input_files = ["/path/to/input_file1", "/path/to/input_file2"]
    dataset = tf.data.TextLineDataset(input_files)
    
    # 定义迭代器用于遍历数据。
    iterator = dataset.make_one_shot_iterator()
    
    # 这里get_next()返回一个字符串类型的张量,代表文件中的一行。
    x = iterator.get_next()
    with tf.Session() as sess:
        for i in range(3):
            print(sess.run(x)) 
    

    在图像相关任务中,输入数据通常以TFRecord形式存储,这时可以用TFRrcordDataset来读取数据。与文本文件不同,每一个TFRecord都有自己不同的feature格式,因此在读取TFRecord时,需要提供一个parse()函数来解析所读取的TFRecord的数据格式。

    import tensorflow as tf 
    
    # 解析一个TFRecord的方法。
    # record是文件中读取的一个样例。
    # 7.1节中具体介绍了如何解析TFRecord样例。
    def parse(record):
        # 解析读入的一个样例。
        features = tf.parse_single_example(
            record, 
            features={
                'feat1': tf.FixedLenFeature([]. tf.int64), 
                'feat2': tf.FixledLenFeature([]. tf.int64)
            }
        )
        return features['feat1'], features['feat2']
    
    # 从TFRecord文件创建数据集。
    input_files = ["/path/to/input_file1", "/path/to/input_file2"]
    dataset = tf.data.TFRecordDataset(input_files)
    
    # map()函数表示对数据集中的每一条数据进行调用相应方法。
    # 使用TFRecordDataaset读出的是二进制的数据,这里需要通过map()来调用parser()对二进制数据进行解析。
    # 类似地,map()函数也可以用来完成其他的数据预处理工作。
    dataset = dataset.map(parser)
    
    # 定义遍历数据集的迭代器。
    iterator = dataset.make_one_shot_iterator()
    
    # feat1, feat2是parser()返回的一维int64型张量,可以作为输入用于进一步的计算。
    feat1, feat2 = iterator.get_next()
    
    with tf.Session() as sess:
        for i in range(10):
            f1, f2 = sess.run([feat1, feat2])
    

    以上例子使用了最简单的one_shot_iterator来遍历数据集。在使用one shot_iterator时, 数据集的所有参数必须已经确定,因此one_shot_iterator不需要特别的初始化过程。
    如果需要用placeholder来初始化数据集,那就需要用到initializable_iterator。以下代码给出了用initializable_iterator来动态初始化数据集的例子。

    import tensorflow as tf 
    # 解析一个TFRecord的方法。与上面的例子相同,不再重复。
    def parser(record):
        ...
    
    # 从TFRecord文件创建数据集,具体文件路径是一个placeholder,稍后再提供具体路径。
     input_files = tf.placeholder(tf.string)
    dataset = tf.data.TFRecordDataset(input_files)
    dataset = dataset.map(parser)
    
    # 定义遍历dataset的initializable_iterator。
    iterator = dataset.make_initializable_iterator()
    feat1, feat2 = iterator.get_next()
    
    with tf.Session() as sess:
        # 首先初始化iterator,并给出input_files的值。
        sess.run(
            iterator.initializer,
            feed_dict= {
                input_files: ["/path/to/input_file1", "/path/to/input_files2"]
            }
        )
    
        # 遍历所有数据一个epoch。
        # 当便利结束时,程序会跑出OutOfRangeError。
        while True:
            try:
                sess.run([feat1, feat2])
            except tf.error.OutOfRangeError:
                break 
    

    在上面的例子中,文件路径使用placeholderfeed_dict的方式传给数据集。使用这种方法,在实际项目中就不需要总是将参数写入计算图的定义,而可以使用程序参数的方式动态指定参数。
    另外注意到,上面例子中的循环体不是指定循环运行10次sess.run(),而是使用while(True)try-except的形式来将所有数据遍历一遍(即一个epoch)。这是因为在动态指定输入数据时,不同数据来源的数据量大小难以预知,而这个方法使我们不必提前知道数据量的精确大小。

    以上介绍的两种iterator足以满足大多数项目的需求。除这两种以外,TensorFlow还提供了reinitializable_iteratorfeedable_iterator两种更加灵活的迭代器。前者可以多次initialize用于遍历不同的数据来源,而后者可以用feed_dict的方式动态指定运行哪个iterator。此处不再多加介绍,感兴趣的读者可以参考Google提供的官方API文档。

    -7.4.2- 数据集的高层操作

    本小节中将介绍在数据集上的高层操作,
    并给出一个运用这些方法处理训练数据和测试数据的完整的例子。

    在上一小节中介绍了数据集的基础用法。在这一小节中,将介绍数据集框架提供的一些方便实用的高层API。
    在7.4.1小节中介绍过map方法来对TFRecord进行解析操作:

    dataset = dataset.map(parser)
    

    map是在数据集上进行操作的最常用的方法之一。在这里,map(parser)方法表示对数据集中的每一条数据调用参数中指定parser方法。对每一条数据进行处理后,map将处理后的数据包装成一个新的数据集返回。map函数非常灵活,可以用于对数据的任何预处理操作。例如在7.3.4小节中,在队列框架下曾使用如下方法来对数据进行预处理:

    distorted_image = preprocess_for_train(
        decoded_image, 
        image_size, 
        image_size,
        None
    )
    

    而在数据集框架中,可以通过map来对每一条数据调用preprocess_for_train方法:

    dataset = dataset.map(
        lambda x: preprocess_for_train(x, image_size, image_size, None)
    )
    

    在上面的代码中,lambda表达式的作用是将原来有4个参数的函数转化为只有1个参数的函数。preprocess_for_train函数的第一个参数decoded_image变成了lambda表达式中的x,这个参数就是原来函数中的参数decoded_image。preprocess_for_train函数中后3个参数都被换成了具体的数值。注意这里的image_size是一个变量,有具体取值,该值需要在程序的上文中给出。
    从表面上看,新的代码在长度上似乎并没有缩短,然而由于map方法返回的是一个新的数据集,可以直接继续调用其他高层操作。
    * 在上一节介绍的队列框架中,预处理、shuffle、batch等操作有的在队列上进行,有的在图片张量上进行,整个处理流程在处理队列和张量的代码片段中来回切换。
    * 而在数据集操作中,所有操作都在数据集上进行,这样的代码结构将非常的干净、简洁。
    7.3.3 小节介绍了队列框架下的tf.train.batch()和tf.train.shuffie_batch()方法 。
    在数据集框架中,shuffle和batch操作由两个方法独立实现:

    dataset = dataset.shuffle(buffer_size)  # 随机打乱顺序
    dataset = dataset.batch(batch_size)  # 将数据组合成batch
    

    其中:
    * shuffle()方法的参数在buffer_size等效于tf.train.shuffle_batch()的min_after_dequeue参数。shuffle算法在内部使用一个缓冲区保存buffer_size条数据,每读入一条数据时,从这个缓冲区中随机地选择一条数据进行输出。缓冲区的大小越大,随机的性能越好,但占用的内存也越多。
    * batch()方法的参数batch_size代表要输出的每个batch由多少条数据组成。如果数据集中包含多个张量,那么batch操作将对每一个张量分开进行。举例而言,如果数据集中的每一个数据(即iterator.get_next()的返回值)是image、label两个张量,其中image的维度是[300, 300],label的维度是[],batch_size是128,那么经过batch操作后的数据集的每一个输出将包含两个维度分别是[128, 300, 300]和[128]的张量。
    repeat()是另一个常用的操作方法。这个方法将数据集中的数据复制多份,其中每一份数据被称为一个epoch。

    dataset = dataset.repeat(N)  # 将数据集重复N份。
    

    需要指出的是,如果数据集在repeat前已经进行了shuffle操作,输出的给个epoch中随机shuffle的结果并不会相同。例如,如果输入数据是[2, 1, 3]。shuffle后输出的第一个epoch是[2,1,3],而第二个epoch则有可能是[3, 2, 1]。 repeat()map()shuffle()batch()等操作一样,都只是计算图中的一个计算节点。repeat()只代表重复相同的处理过程,并不会记录前一epoch的处理结果。
    除这些方法以外,数据集还提供了其他多种操作。例如,concatenate()将两个数据集顺序连接起来,take(N)从数据集中读取前N项数据,skip(N)在数据集中跳过前N项数据, flap_map()从多个数据集中轮流读取数据,等等,这里不再一一介绍,有需要的读者可以查询TensorFlow相关文档。

    以下例子将这些方法组合起来,使用数据集实现了7.3.4小节中的数据输入流程。与7.3.4小节中介绍的类似,该例子从文件中读取原始数据,进行预处理、shuffle、batching等操作,并通过repeat方法训练多个epoch。

    不同的是,以下例子在训练数据集之外,还另外读取了测试数据集,并对测试集和数据集进行了略微不同的预处理。在训练时,调用7.2.2小节中的preprocess_for_train()方法对图像进行随机反转等预处理操作;而在测试时,测试数据以原本的样子直接输入测试。

    import tensorflow as tf 
    
    # 列举输入文件。
    # 训练和测试使用不同的数据。
    train_files = tf.train.match_filenames_once("/path/to/train_file-*")
    test_files = tf.train.match_filenames_once("/path/to/test_file-*")
    
    # 定义parser方法从TFRecord中解析数据。
    # 这里假设image中存储的是图像的原始数据,label为该样例所对应的标签。
    # height、width和channels给出了图片的维度。
    def parser(record):
        feature =  tf.parse_single_example(
            record, 
            features={
                'image': tf.FixedLenFeature([], tf.string),
                'label': tf.FixedLenFeature([], tf.int64), 
                'height': tf.FixedLenFeature([], tf.int64), 
                'width': tf.FixedLenFeature([], tf.int64), 
                'channels': tf.FixedLenFeature([], tf.int64)
            }
        )
    
        # 从原始图像数据解析出像素矩阵,并根据图像尺寸还原图像。
        decoded_image = tf.decode_raw(features['image'], tf.uint8)
        decoded_image.set_shape(
            [features['height'], features['width'], features['channels']]
        )
        label = features['label']
    
        return decoded_image, label
    
    image_size = 299  # 定义神经网络输入层图片的大小。
    batch_size = 100  # 定义组合数据batch的大小。
    shuffle_buffer = 10000   # 定义随机打乱数据时buffer的大小。
    
    # 定义读取训练数据的数据集。
    dataset = tf.data.TFRecordDataset(train_files)
    dataset = dataset.map(parser)
    
    # 对数据依次进行预处理、shuffle和batching操作。
    # preprocess_for_train为7.2.2小节中介绍的图像预处理程序。
    # 因为上一个map得到的数据集中提供了decoded_image和label两个结果,
    # 所以这个map需要提供一个有2个参数的函数来处理数据。
    # 在下面的代码中,lambda中的image代表的就是第一个map返回的decode_iamge,label代表的就是第一个map返回的label。
    # 在这个lambda表达式中我们首先将decode_image传入preprocess_for_train来进一步对图像数据进行预处理。
    # 然后再将处理好的图像和label组成最终的输出。
    dataset = dataset.map(lambda image, label: 
            (preprocess_for_train(image, image_size, image_size, None), 
            label)
        )
    dataset = dataset.shuffle(shuffle_buffer).batch(batch_size)
    
    # 重复NUM_EPOCHS个epoch。
    # 在7.3.4小节中TRAINING_ROUNDS指定了训练的轮数,
    # 而这里指定了整个数据集的重复次数,它也间接的确定了训练的轮数。
    NUM_EPOCHS = 10 
    dataset = dataset.repeat(NUM_EPOCHS)
    
    # 定义数据集迭代器。
    # 虽然定义数据集时没有直接使用placeholder来提供文件地址,
    # 但是tf.train.match_filenames_once()方法得到的结果和与placeholder的机制类似,
    # 也需要初始化,所以这里使用的是initializable_iterator。
    iterator = dataset.make_initializable_iterator()
    image_batch, label_batch = iterator.get_next()
    
    # 定义神经网络的结构以及优化过程。
    # 这里与7.3.4小节相同。
    learning_rate = 0.01 
    logit = inference(image_batch) 
    loss = calc_loss(logit, label_batch)
    train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)
    
    # 定义测试用的Dataset。
    # 与训练时不同,测试数据的Dataset不需要经过随机翻转等预处理操作,
    # 也不需要打乱顺序和重复多个epoch。
    # 这里使用与训练数据相同的parser进行解析,调整分辨率到网络输入层大小,然后直接进行batching操作。
    test_dataset = tf.data.TFRecordDataset(test_files)
    test_dataset = test_dataset.map(parser).map(lambda image, label : (
            tf.image.resize_images(image, [image_size, image_size]), 
            label 
        )) 
    test_dataset = test_dataset.batch(batch_size)
    
    # 定义测试数据上的迭代器。
    test_iterator = test_dataset.make_initializable_iterator()
    test_image_batch, test_label_batch = test_iterator.get_next() 
    
    # 定义预测结果为logit值最大的分类。
    test_logit = inference(test_image_batch)
    predictions = tf.argmax(
        test_logit, 
        axis=-1, 
        output_type=tf.int32
    )
    
    # 声明会话并运行神经网络的优化过程。
    with tf.Session() as sess:
        # 初始化变量。
        sess.run((
            tf.global_variables_initializer(), 
            tf.local_variables_initializer()
        ))
    
        # 初始化训练数据的迭代器。
        sess.run(iterator.initializer)
    
        # 循环进行训练,直到数据集完成输入、抛出OutOfRangeError错误。
        while True:
            try:
                sess.run(train_step)
            except tf.errors.OutOfRangeError:
                break 
    
        # 初始化测试数据的迭代器。
        sess.run(test_iterator.initializer)
        # 获取预测结果。 
        test_results = [] 
        test_labels = []
        while True:
            try:
                pred, label = sess.run([predictions, test_label_batch]) 
                test_results.expend(pred)
                test_labels.extend(label)
            except tf.errors.OutOfRangeError:
                break 
    
    # 计算准确率。
    correct = [float(y == y_) for (y, y_) in zip(test_results, test_labels)]
    accuracy = sum(correct) / len(correct)
    print("Test accuracy is : ", accuracy) 
    
    # 在MNIST数据集上运行以上程序,可以得到类似下面的结果:
    # Test accuracy is : 0.9052
    

    相关文章

      网友评论

          本文标题:TensorFlow 实战Google深度学习框架(第2版)第七

          本文链接:https://www.haomeiwen.com/subject/oathjhtx.html