TF2.0 - Data Pipelines

作者: Manfestain | 来源:发表于2019-12-26 17:08 被阅读0次

学习TF2.0时的一些总结,参考资官方文档https://www.tensorflow.org/guide/data#top_of_page

首先加载相关库

import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

tf.data.Dataset

tf.data中提供了一个tf.data.Dataset()抽象,它可以表示一系列元素(图像和对应的标签)。Dataset的创建必须从数据开始.

dataset = tf.data.Dataset.from_tensor_slices([9, 3, 1, 5, 3, 7, 5, 0])
for elem in dataset:   # 使用for循环
    print(elem.numpy())
it = iter(dataset)   # 使用iterator对象
print(next(it).numpy())

Dataset对象的element_spec属性可以查看每个元素的类型,返回一个tf.TypeSpec对象

dataset1 = tf.data.Dataset.from_tensor_slices(tf.random.uniform([4, 10]))
print(dataset1.element_spec)

Dataset对象支持任何结构,可以使用Dataset.map()Dataset.filter()对每个元素进行操作

for z in dataset1:
    print(z.numpy())

数据生成器

使用python的生成器作为数据源,Dataset.from_generator()将python生成器转化为tf.data.Dataset对象

def count(stop):
    i = 0
    while i < stop:
        yield i
        i += 1

for n in count(5):
    print(n)

from_generator()有三个参数需要注意:args是需要传给函数的参数,output_types是创建tf.Graph时需要的参数,output_shapes是返回的数据大小

ds_counter = tf.data.Dataset.from_generator(
    count, args=[25], output_types=tf.int32)
for count_batch in ds_counter.repeat().batch(7).take(5):   # 每个batch取7个,总共取5个batch
    print(count_batch.numpy())

一般情况下,最好将output_typesoutput_shape明确指定

def gen_series():
    i = 0
    while True:
        size = np.random.randint(0, 10)
        yield i, np.random.normal(size=(size,))
        i += 1

for i, series in gen_series():
    print(i, ":", series)
    if i > 5:
        break

ds_series = tf.data.Dataset.from_generator(
    gen_series,
    output_types=(tf.int32, tf.float32),
    output_shapes=((), (None,)))    # 明确指定outpu_types和output_shapes
print(ds_series)

当对一个变长的数据进行batch时,可以使用Dataset.padded_batch

ds_series_batch = ds_series.shuffle(
    20).padded_batch(10, padded_shapes=([], [None]))
ids, sequence_batch = next(iter(ds_series_batch))
print(ids.numpy())
print()
print(sequence_batch.numpy())

加载图像数据

在处理图像数据时,将preprocessing.image.ImageDataGenerator封装为一个tf.data.Dataset

flowers = tf.keras.utils.get_file(
    'flower_photos',
    'https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz',
    untar=True
)
img_gen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255, rotation_range=20)
images, labels = next(img_gen.flow_from_directory(flowers))   # 使用iteration方式遍历
ds = tf.data.Dataset.from_generator(
    img_gen.flow_from_directory,
    args=[flowers],
    output_types=(tf.float32, tf.float32)
    output_shapes=([32, 256, 256, 3], [32, 5])
)

加载CSV数据

也可以将CSV类型的数据直接通过from_tensor_slices()生成tf.data.Dataset

titanic_file = tf.keras.utils.get_file(
    "train.csv",
    "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
df = pd.read_csv(titanic_file, index_col=None)
titanic_slices = tf.data.Dataset.from_tensor_slices(dict(df))   # 通过字典的形式加载数据
for feature_batch in titanic_slices.take(1):
    for key, value in feature_batch.items():
        print("  {!r:20s}: {}".format(key, value))

tf.data有一种更灵活的数据加载方式,experimental.make_csv_dataset()支持列级别的查询,batching和shuffling。

titanic_batchs = tf.data.experimental.make_csv_dataset(
    titanic_file,
    batch_size=4,
    label_name='survived'
)

for feature_batch, label_batch in titanic_batchs.take(1):
    print("'survived' : {}".format(label_batch))
    print("features:")
    for key, value in feature_batch.items():
        print("  {!r:20s}: {}".format(key, value))

同时可以使用select_columns参数选择特征的子集

titanic_batchs = tf.data.experimental.make_csv_dataset(
    titanic_file,
    batch_size=4,
    label_name='survived',
    # select_columns里一定要包含label_name
    select_columns=['class', 'fare', 'survived']
)

for feature_batch, label_batch in titanic_batchs.take(1):
    print("'survived: {}".format(label_batch))
    for key, value in feature_batch.items():
        print("  {!r:20s}: {}".format(key, value))

还有一个底层的类experimental.CsvDataset提供更精细的控制,这种方法不支持列的查询,同时必须指定每一列的数据类型

titanic_types = [tf.int32, tf.string, tf.float32, tf.int32,
                 tf.int32, tf.float32, tf.string, tf.string, tf.string, tf.string]
dataset = tf.data.experimental.CsvDataset(
    titanic_file, titanic_types, header=True)
for line in dataset.take(10):
  print([item.numpy() for item in line])

加载文本数据

文本加载,tf.data.TextLineDataset很容易从文本文件中提取数据,它将文件的每行变为一个str元素

file_paths = ['./DataSet/cowper.txt', './DataSet/derby.txt']
dataset = tf.data.TextLineDataset(file_paths)
for line in dataset.take(5):
    print(line.numpy())
print()

Dataset.interleave可以实现文件之间的行替换,cycle_length表示每次同时处理输入元素的数量,block_length表示每个输入元素要产生的连续元素的数量

files_ds = tf.data.Dataset.from_tensor_slices(file_paths)
lines_ds = files_ds.interleave(tf.data.TextLineDataset, cycle_length=3)
for i, line in enumerate(lines_ds.take(12)):
    if i % 4 == 0:
        print()
    print(line.numpy())

a = tf.data.Dataset.range(1, 6)
b = a.interleave(
    lambda x: tf.data.Dataset.from_tensors(x).repeat(6),
    cycle_length=3,
    block_length=4
)
for i in b:
    print(i.numpy())
print()

TextLineDataset默认加载文件的所有行,可以使用Dataset.skip()或Dataset.filter()用来移除

titanic_lines = tf.data.TextLineDataset(titanic_file)


def survived(line):
    return tf.not_equal(tf.strings.substr(line, 0, 1), "0")


survivors = titanic_lines.skip(1).filter(survived)   # filter()接受一个函数作为参数,
for line in survivors.take(10):
    print(line.numpy())

Dataset.shuffle()实现随机打乱数据,它维护了一个可变的缓冲区,并从缓冲区中随机选择下一个元素

lines = tf.data.TextLineDataset(titanic_file)
counter = tf.data.experimental.Counter()
dataset = tf.data.Dataset.zip((counter, lines))
dataset1 = dataset.shuffle(buffer_size=100)
dataset1 = dataset1.batch(20)   # 缓存大小为100,batch大小为20, 所以第一个batch的序号不会超过120
print(next(iter(dataset1))[0].numpy())

print()
dataset = tf.data.Dataset.range(1, 101)
shuffled = dataset.shuffle(buffer_size=10).batch(2).repeat(2)

for n in shuffled.skip(50).take(5):   # skip(k)跳过k个元素(每次都是随机跳过)
    print(n.numpy())
shuffle_report = [n.numpy().mean() for n in shuffled]
print(len(shuffle_report))
plt.plot(shuffle_report, label="shuffle().report()")
plt.ylabel('Mean item ID')
plt.legend()
plt.show()

Batching

Dataset.batch()可以将n个连续的元素堆叠到一个元素中,tf.stack()的功能与该函数相同

inc_dataset = tf.data.Dataset.range(100)
dec_dataset = tf.data.Dataset.range(0, -100, -1)
dataset = tf.data.Dataset.zip((inc_dataset, dec_dataset))
batched_dataset = dataset.batch(4)   # 每个batch取4个case
for batch in batched_dataset.take(5):   # 取5个batch
    print([arr.numpy() for arr in batch])

Dataset.batch()可能会造成不明确的batch大小,因为最后一个batch可能不足,drop_remainder可以忽略掉最后一个batch

batched_dataset = dataset.batch(7, drop_remainder=True)
print(batched_dataset)

针对大多数输入大小可变的模型,Dataset.padded_batch()可以对不同维度的大小进行改变

dataset = tf.data.Dataset.range(10)
dataset = dataset.map(lambda x: tf.fill([tf.cast(x, tf.int32)], x))   # tf.fill()将x扩展到第一个参数的维度,x必须是标量
dataset = dataset.padded_batch(4, padded_shapes=(None,), drop_remainder=True)
for batch in dataset.repeat().take(5):
    print(batch.numpy())
    print()

图片数据的解码和预处理

flowers = './DataSet/flower_photos/'
flowers_root = pathlib.Path(flowers)
for item in flowers_root.glob('*'):
    print(item.name)
list_ds = tf.data.Dataset.list_files(
    str(flowers_root/'*/*'))   # list_files()按照glob的方式匹配所有字符串

def parse_image(filename):
    parts = tf.strings.split(filename, '/')
    label = parts[-2]

    image = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, [64, 128])
    return image, label

file_path = next(iter(list_ds))
image, label = parse_image(file_path)

def show(image, label):
    plt.figure()
    plt.imshow(image)
    plt.title(label.numpy().decode('utf-8'))
    plt.axis('off')
    plt.show()

show(image, label)

可以通过Dataset.map()将方法作用域整个数据集

images_ds = list_ds.map(parse_image)
for image, label in images_ds.take(1):
    show(image, label)

可以添加其他已有的python逻辑操作:旋转,对称,反转等

def random_rotate_image(image):   # 旋转
    return ndimage.rotate(image, np.random.uniform(-30, 30), reshape=False)

image = random_rotate_image(image)
show(image, label)

也可以将tf.py_function()Dataset.map()结合使用

def tf_random_rotate_image(image, label):
    im_shape = image.shape
    [image, ] = tf.py_function(random_rotate_image, [image], [tf.float32])   # 将python封装为tensorflow函数,第二参数为python函数的参数列表
    image.set_shape(im_shape)
    return image, label

rot_ds = images_ds.map(tf_random_rotate_image)
for image, label in rot_ds.take(1):
    show(image, label)

针对时间类型的数据,可以很方便的按步长进行变换

time_data = tf.data.Dataset.range(10000)
batches = time_data.batch(10, drop_remainder=True)

def dense_1_step(batch):   # 按照step=1进行shift
    return batch[:-1], batch[1:]

predict_dense_1_step = batches.map(dense_1_step)

for features, label in predict_dense_1_step.take(5):
    print(features.numpy(), " => ", label.numpy())
print()

将每例数据的features和下一个数据的labels进行重叠

feature_length = 10
label_length = 5

features = time_data.batch(feature_length, drop_remainder=True)

labels = time_data.batch(feature_length).skip(1).map(lambda labels: labels[:-5])   
predict_5_steps = tf.data.Dataset.zip((features, labels))   # 使用Dataset.zip()来实现

for features, labels in predict_5_steps.take(5):
    print(features.numpy(), ' => ', labels.numpy())
print()

可以使用Dataset.window进行完全的控制,该方法返回一个Dataset

windwo_size = 5

windows = time_data.window(windwo_size, shift=1)

for sub_ds in windows.flat_map(lambda x: x).take(50):
    print(sub_ds.numpy())

可以将上述方法合在一起

def make_window_dataset(ds, window_size=5, shift=1, stride=1):
    windows = ds.window(window_size, shift=shift, stride=stride)

    def sub_to_batch(sub):
        return sub.batch(window_size, drop_remainder=True)

    windows = windows.flat_map(sub_to_batch)
    return windows

    
ds = make_window_dataset(time_data, window_size=10, shift=5, stride=3)
dense_labels_ds = ds.map(dense_1_step)

for features, labels in dense_labels_ds.take(10):
    print(features.numpy(), ' => ', labels.numpy())

重采样

相关文章

网友评论

    本文标题:TF2.0 - Data Pipelines

    本文链接:https://www.haomeiwen.com/subject/dambwctx.html