TF2.0 - Data Pipelines

作者: Manfestain | 来源:发表于2019-12-26 17:08 被阅读0次

    学习TF2.0时的一些总结,参考资官方文档https://www.tensorflow.org/guide/data#top_of_page

    首先加载相关库

    import tensorflow as tf
    import matplotlib.pyplot as plt
    import numpy as np
    import pandas as pd
    

    tf.data.Dataset

    tf.data中提供了一个tf.data.Dataset()抽象,它可以表示一系列元素(图像和对应的标签)。Dataset的创建必须从数据开始.

    dataset = tf.data.Dataset.from_tensor_slices([9, 3, 1, 5, 3, 7, 5, 0])
    for elem in dataset:   # 使用for循环
        print(elem.numpy())
    it = iter(dataset)   # 使用iterator对象
    print(next(it).numpy())
    

    Dataset对象的element_spec属性可以查看每个元素的类型,返回一个tf.TypeSpec对象

    dataset1 = tf.data.Dataset.from_tensor_slices(tf.random.uniform([4, 10]))
    print(dataset1.element_spec)
    

    Dataset对象支持任何结构,可以使用Dataset.map()Dataset.filter()对每个元素进行操作

    for z in dataset1:
        print(z.numpy())
    

    数据生成器

    使用python的生成器作为数据源,Dataset.from_generator()将python生成器转化为tf.data.Dataset对象

    def count(stop):
        i = 0
        while i < stop:
            yield i
            i += 1
    
    for n in count(5):
        print(n)
    

    from_generator()有三个参数需要注意:args是需要传给函数的参数,output_types是创建tf.Graph时需要的参数,output_shapes是返回的数据大小

    ds_counter = tf.data.Dataset.from_generator(
        count, args=[25], output_types=tf.int32)
    for count_batch in ds_counter.repeat().batch(7).take(5):   # 每个batch取7个,总共取5个batch
        print(count_batch.numpy())
    

    一般情况下,最好将output_typesoutput_shape明确指定

    def gen_series():
        i = 0
        while True:
            size = np.random.randint(0, 10)
            yield i, np.random.normal(size=(size,))
            i += 1
    
    for i, series in gen_series():
        print(i, ":", series)
        if i > 5:
            break
    
    ds_series = tf.data.Dataset.from_generator(
        gen_series,
        output_types=(tf.int32, tf.float32),
        output_shapes=((), (None,)))    # 明确指定outpu_types和output_shapes
    print(ds_series)
    

    当对一个变长的数据进行batch时,可以使用Dataset.padded_batch

    ds_series_batch = ds_series.shuffle(
        20).padded_batch(10, padded_shapes=([], [None]))
    ids, sequence_batch = next(iter(ds_series_batch))
    print(ids.numpy())
    print()
    print(sequence_batch.numpy())
    

    加载图像数据

    在处理图像数据时,将preprocessing.image.ImageDataGenerator封装为一个tf.data.Dataset

    flowers = tf.keras.utils.get_file(
        'flower_photos',
        'https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz',
        untar=True
    )
    img_gen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255, rotation_range=20)
    images, labels = next(img_gen.flow_from_directory(flowers))   # 使用iteration方式遍历
    ds = tf.data.Dataset.from_generator(
        img_gen.flow_from_directory,
        args=[flowers],
        output_types=(tf.float32, tf.float32)
        output_shapes=([32, 256, 256, 3], [32, 5])
    )
    

    加载CSV数据

    也可以将CSV类型的数据直接通过from_tensor_slices()生成tf.data.Dataset

    titanic_file = tf.keras.utils.get_file(
        "train.csv",
        "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
    df = pd.read_csv(titanic_file, index_col=None)
    titanic_slices = tf.data.Dataset.from_tensor_slices(dict(df))   # 通过字典的形式加载数据
    for feature_batch in titanic_slices.take(1):
        for key, value in feature_batch.items():
            print("  {!r:20s}: {}".format(key, value))
    

    tf.data有一种更灵活的数据加载方式,experimental.make_csv_dataset()支持列级别的查询,batching和shuffling。

    titanic_batchs = tf.data.experimental.make_csv_dataset(
        titanic_file,
        batch_size=4,
        label_name='survived'
    )
    
    for feature_batch, label_batch in titanic_batchs.take(1):
        print("'survived' : {}".format(label_batch))
        print("features:")
        for key, value in feature_batch.items():
            print("  {!r:20s}: {}".format(key, value))
    

    同时可以使用select_columns参数选择特征的子集

    titanic_batchs = tf.data.experimental.make_csv_dataset(
        titanic_file,
        batch_size=4,
        label_name='survived',
        # select_columns里一定要包含label_name
        select_columns=['class', 'fare', 'survived']
    )
    
    for feature_batch, label_batch in titanic_batchs.take(1):
        print("'survived: {}".format(label_batch))
        for key, value in feature_batch.items():
            print("  {!r:20s}: {}".format(key, value))
    

    还有一个底层的类experimental.CsvDataset提供更精细的控制,这种方法不支持列的查询,同时必须指定每一列的数据类型

    titanic_types = [tf.int32, tf.string, tf.float32, tf.int32,
                     tf.int32, tf.float32, tf.string, tf.string, tf.string, tf.string]
    dataset = tf.data.experimental.CsvDataset(
        titanic_file, titanic_types, header=True)
    for line in dataset.take(10):
      print([item.numpy() for item in line])
    

    加载文本数据

    文本加载,tf.data.TextLineDataset很容易从文本文件中提取数据,它将文件的每行变为一个str元素

    file_paths = ['./DataSet/cowper.txt', './DataSet/derby.txt']
    dataset = tf.data.TextLineDataset(file_paths)
    for line in dataset.take(5):
        print(line.numpy())
    print()
    

    Dataset.interleave可以实现文件之间的行替换,cycle_length表示每次同时处理输入元素的数量,block_length表示每个输入元素要产生的连续元素的数量

    files_ds = tf.data.Dataset.from_tensor_slices(file_paths)
    lines_ds = files_ds.interleave(tf.data.TextLineDataset, cycle_length=3)
    for i, line in enumerate(lines_ds.take(12)):
        if i % 4 == 0:
            print()
        print(line.numpy())
    
    a = tf.data.Dataset.range(1, 6)
    b = a.interleave(
        lambda x: tf.data.Dataset.from_tensors(x).repeat(6),
        cycle_length=3,
        block_length=4
    )
    for i in b:
        print(i.numpy())
    print()
    

    TextLineDataset默认加载文件的所有行,可以使用Dataset.skip()或Dataset.filter()用来移除

    titanic_lines = tf.data.TextLineDataset(titanic_file)
    
    
    def survived(line):
        return tf.not_equal(tf.strings.substr(line, 0, 1), "0")
    
    
    survivors = titanic_lines.skip(1).filter(survived)   # filter()接受一个函数作为参数,
    for line in survivors.take(10):
        print(line.numpy())
    

    Dataset.shuffle()实现随机打乱数据,它维护了一个可变的缓冲区,并从缓冲区中随机选择下一个元素

    lines = tf.data.TextLineDataset(titanic_file)
    counter = tf.data.experimental.Counter()
    dataset = tf.data.Dataset.zip((counter, lines))
    dataset1 = dataset.shuffle(buffer_size=100)
    dataset1 = dataset1.batch(20)   # 缓存大小为100,batch大小为20, 所以第一个batch的序号不会超过120
    print(next(iter(dataset1))[0].numpy())
    
    print()
    dataset = tf.data.Dataset.range(1, 101)
    shuffled = dataset.shuffle(buffer_size=10).batch(2).repeat(2)
    
    for n in shuffled.skip(50).take(5):   # skip(k)跳过k个元素(每次都是随机跳过)
        print(n.numpy())
    shuffle_report = [n.numpy().mean() for n in shuffled]
    print(len(shuffle_report))
    plt.plot(shuffle_report, label="shuffle().report()")
    plt.ylabel('Mean item ID')
    plt.legend()
    plt.show()
    

    Batching

    Dataset.batch()可以将n个连续的元素堆叠到一个元素中,tf.stack()的功能与该函数相同

    inc_dataset = tf.data.Dataset.range(100)
    dec_dataset = tf.data.Dataset.range(0, -100, -1)
    dataset = tf.data.Dataset.zip((inc_dataset, dec_dataset))
    batched_dataset = dataset.batch(4)   # 每个batch取4个case
    for batch in batched_dataset.take(5):   # 取5个batch
        print([arr.numpy() for arr in batch])
    

    Dataset.batch()可能会造成不明确的batch大小,因为最后一个batch可能不足,drop_remainder可以忽略掉最后一个batch

    batched_dataset = dataset.batch(7, drop_remainder=True)
    print(batched_dataset)
    

    针对大多数输入大小可变的模型,Dataset.padded_batch()可以对不同维度的大小进行改变

    dataset = tf.data.Dataset.range(10)
    dataset = dataset.map(lambda x: tf.fill([tf.cast(x, tf.int32)], x))   # tf.fill()将x扩展到第一个参数的维度,x必须是标量
    dataset = dataset.padded_batch(4, padded_shapes=(None,), drop_remainder=True)
    for batch in dataset.repeat().take(5):
        print(batch.numpy())
        print()
    

    图片数据的解码和预处理

    flowers = './DataSet/flower_photos/'
    flowers_root = pathlib.Path(flowers)
    for item in flowers_root.glob('*'):
        print(item.name)
    list_ds = tf.data.Dataset.list_files(
        str(flowers_root/'*/*'))   # list_files()按照glob的方式匹配所有字符串
    
    def parse_image(filename):
        parts = tf.strings.split(filename, '/')
        label = parts[-2]
    
        image = tf.io.read_file(filename)
        image = tf.image.decode_jpeg(image)
        image = tf.image.convert_image_dtype(image, tf.float32)
        image = tf.image.resize(image, [64, 128])
        return image, label
    
    file_path = next(iter(list_ds))
    image, label = parse_image(file_path)
    
    def show(image, label):
        plt.figure()
        plt.imshow(image)
        plt.title(label.numpy().decode('utf-8'))
        plt.axis('off')
        plt.show()
    
    show(image, label)
    

    可以通过Dataset.map()将方法作用域整个数据集

    images_ds = list_ds.map(parse_image)
    for image, label in images_ds.take(1):
        show(image, label)
    

    可以添加其他已有的python逻辑操作:旋转,对称,反转等

    def random_rotate_image(image):   # 旋转
        return ndimage.rotate(image, np.random.uniform(-30, 30), reshape=False)
    
    image = random_rotate_image(image)
    show(image, label)
    

    也可以将tf.py_function()Dataset.map()结合使用

    def tf_random_rotate_image(image, label):
        im_shape = image.shape
        [image, ] = tf.py_function(random_rotate_image, [image], [tf.float32])   # 将python封装为tensorflow函数,第二参数为python函数的参数列表
        image.set_shape(im_shape)
        return image, label
    
    rot_ds = images_ds.map(tf_random_rotate_image)
    for image, label in rot_ds.take(1):
        show(image, label)
    

    针对时间类型的数据,可以很方便的按步长进行变换

    time_data = tf.data.Dataset.range(10000)
    batches = time_data.batch(10, drop_remainder=True)
    
    def dense_1_step(batch):   # 按照step=1进行shift
        return batch[:-1], batch[1:]
    
    predict_dense_1_step = batches.map(dense_1_step)
    
    for features, label in predict_dense_1_step.take(5):
        print(features.numpy(), " => ", label.numpy())
    print()
    

    将每例数据的features和下一个数据的labels进行重叠

    feature_length = 10
    label_length = 5
    
    features = time_data.batch(feature_length, drop_remainder=True)
    
    labels = time_data.batch(feature_length).skip(1).map(lambda labels: labels[:-5])   
    predict_5_steps = tf.data.Dataset.zip((features, labels))   # 使用Dataset.zip()来实现
    
    for features, labels in predict_5_steps.take(5):
        print(features.numpy(), ' => ', labels.numpy())
    print()
    

    可以使用Dataset.window进行完全的控制,该方法返回一个Dataset

    windwo_size = 5
    
    windows = time_data.window(windwo_size, shift=1)
    
    for sub_ds in windows.flat_map(lambda x: x).take(50):
        print(sub_ds.numpy())
    

    可以将上述方法合在一起

    def make_window_dataset(ds, window_size=5, shift=1, stride=1):
        windows = ds.window(window_size, shift=shift, stride=stride)
    
        def sub_to_batch(sub):
            return sub.batch(window_size, drop_remainder=True)
    
        windows = windows.flat_map(sub_to_batch)
        return windows
    
        
    ds = make_window_dataset(time_data, window_size=10, shift=5, stride=3)
    dense_labels_ds = ds.map(dense_1_step)
    
    for features, labels in dense_labels_ds.take(10):
        print(features.numpy(), ' => ', labels.numpy())
    

    重采样

    相关文章

      网友评论

        本文标题:TF2.0 - Data Pipelines

        本文链接:https://www.haomeiwen.com/subject/dambwctx.html