美文网首页
TF2 Keras (4) : 用类的方法创建新的层和模型

TF2 Keras (4) : 用类的方法创建新的层和模型

作者: 数科每日 | 来源:发表于2021-01-07 09:35 被阅读0次

    本文是对官方文档 的学习笔记。


    Layer 类

    在 Keras 中 layer 是一个核心组件, 它主要包含了连接权重以及算法。 如下是一个全连接层 Densely-connected.

    class Linear(keras.layers.Layer):
        def __init__(self, units=32, input_dim=32):
            super(Linear, self).__init__()
            w_init = tf.random_normal_initializer()
            self.w = tf.Variable(
                initial_value=w_init(shape=(input_dim, units), dtype="float32"),
                trainable=True,
            )
            b_init = tf.zeros_initializer()
            self.b = tf.Variable(
                initial_value=b_init(shape=(units,), dtype="float32"), trainable=True
            )
    
        def call(self, inputs):
            return tf.matmul(inputs, self.w) + self.b
    

    调用上面的 Layer

    x = tf.ones((2, 2))
    linear_layer = Linear(4, 2)
    y = linear_layer(x)
    print(y)
    

    利用 add_weight() 初始化 weight

    class Linear(keras.layers.Layer):
        def __init__(self, units=32, input_dim=32):
            super(Linear, self).__init__()
            self.w = self.add_weight(
                shape=(input_dim, units), initializer="random_normal", trainable=True
            )
            self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)
    
        def call(self, inputs):
            return tf.matmul(inputs, self.w) + self.b
    
    
    x = tf.ones((2, 2))
    linear_layer = Linear(4, 2)
    y = linear_layer(x)
    print(y)
    

    不可训练权重 (non-trainable weights)

    开发者可以在自己的 Layer 中加入一些不可变权重, 这些权重在反向传播(backpropagation) 中不会被改变。

    class ComputeSum(keras.layers.Layer):
        def __init__(self, input_dim):
            super(ComputeSum, self).__init__()
            self.total = tf.Variable(initial_value=tf.zeros((input_dim,)), trainable=False)
    
        def call(self, inputs):
            self.total.assign_add(tf.reduce_sum(inputs, axis=0))
            return self.total
    
    
    x = tf.ones((2, 2))
    my_sum = ComputeSum(2)
    y = my_sum(x)
    print(y.numpy())
    y = my_sum(x)
    print(y.numpy())
    
    print("weights:", len(my_sum.weights))
    print("non-trainable weights:", len(my_sum.non_trainable_weights))
    
    # It's not included in the trainable weights:
    print("trainable_weights:", my_sum.trainable_weights)
    

    最佳实践(Best practice):知道Input shape 之后再创建权重

    如下,不要再 init 中创建weight, 而是将其放在 build 函数中。 在模型第一次被使用时,call 会自动调用 build 函数, 因而weight 就会被创建。

    class Linear(keras.layers.Layer):
        def __init__(self, units=32):
            super(Linear, self).__init__()
            self.units = units
    
        def build(self, input_shape):
            self.w = self.add_weight(
                shape=(input_shape[-1], self.units),
                initializer="random_normal",
                trainable=True,
            )
            self.b = self.add_weight(
                shape=(self.units,), initializer="random_normal", trainable=True
            )
    
        def call(self, inputs):
            return tf.matmul(inputs, self.w) + self.b
    
    # At instantiation, we don't know on what inputs this is going to get called
    linear_layer = Linear(32)
    
    # The layer's weights are created dynamically the first time the layer is called
    y = linear_layer(x)
    

    Layer 可以嵌套 Layer

    外层Layer 会tracking 内部layer 的权重, 创建子层的操作最好放在 init 函数中:

    # Let's assume we are reusing the Linear class
    # with a `build` method that we defined above.
    
    class MLPBlock(keras.layers.Layer):
        def __init__(self):
            super(MLPBlock, self).__init__()
            self.linear_1 = Linear(32)
            self.linear_2 = Linear(32)
            self.linear_3 = Linear(1)
    
        def call(self, inputs):
            x = self.linear_1(inputs)
            x = tf.nn.relu(x)
            x = self.linear_2(x)
            x = tf.nn.relu(x)
            return self.linear_3(x)
    
    
    mlp = MLPBlock()
    y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
    print("weights:", len(mlp.weights))
    print("trainable weights:", len(mlp.trainable_weights))
    

    add_loss() 函数

    可以利用 add_loss() 创建一个 Regularization Layer , 专门计算Regularization。

    # A layer that creates an activity regularization loss
    class ActivityRegularizationLayer(keras.layers.Layer):
        def __init__(self, rate=1e-2):
            super(ActivityRegularizationLayer, self).__init__()
            self.rate = rate
    
        def call(self, inputs):
            self.add_loss(self.rate * tf.reduce_sum(inputs))
            return inputs
    
    class OuterLayer(keras.layers.Layer):
        def __init__(self):
            super(OuterLayer, self).__init__()
            self.activity_reg = ActivityRegularizationLayer(1e-2)
    
        def call(self, inputs):
            return self.activity_reg(inputs)
    
    
    layer = OuterLayer()
    assert len(layer.losses) == 0  # No losses yet since the layer has never been called
    
    _ = layer(tf.zeros(1, 1))
    assert len(layer.losses) == 1  # We created one loss value
    
    # `layer.losses` gets reset at the start of each __call__
    _ = layer(tf.zeros(1, 1))
    assert len(layer.losses) == 1  # This is the loss created during the call above
    

    所有子layer 的 Regularization 都会被最外层计算

    class OuterLayerWithKernelRegularizer(keras.layers.Layer):
        def __init__(self):
            super(OuterLayerWithKernelRegularizer, self).__init__()
            self.dense = keras.layers.Dense(
                32, kernel_regularizer=tf.keras.regularizers.l2(1e-3)
            )
    
        def call(self, inputs):
            return self.dense(inputs)
    
    
    layer = OuterLayerWithKernelRegularizer()
    _ = layer(tf.zeros((1, 1)))
    
    # This is `1e-3 * sum(layer.dense.kernel ** 2)`,
    # created by the `kernel_regularizer` above.
    print(layer.losses)
    

    新加的Loss在训练的时候, 会被记入损失值

    # Instantiate an optimizer.
    optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    
    # Iterate over the batches of a dataset.
    for x_batch_train, y_batch_train in train_dataset:
      with tf.GradientTape() as tape:
        logits = layer(x_batch_train)  # Logits for this minibatch
        # Loss value for this minibatch
        loss_value = loss_fn(y_batch_train, logits)
        # Add extra losses created during this forward pass:
        loss_value += sum(model.losses)
    
      grads = tape.gradient(loss_value, model.trainable_weights)
      optimizer.apply_gradients(zip(grads, model.trainable_weights))
    

    这里涉及到自己实现训练,具体信息可以参考 guide to writing a training loop from scratch.

    这些 Loss 函数也可以配合 Keras 内建的 fit

    import numpy as np
    
    inputs = keras.Input(shape=(3,))
    outputs = ActivityRegularizationLayer()(inputs)
    model = keras.Model(inputs, outputs)
    
    # If there is a loss passed in `compile`, thee regularization
    # losses get added to it
    model.compile(optimizer="adam", loss="mse")
    model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
    
    # It's also possible not to pass any loss in `compile`,
    # since the model already has a loss to minimize, via the `add_loss`
    # call during the forward pass!
    model.compile(optimizer="adam")
    model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
    

    add_metric

    类似 loss 函数, metric 也可以自行定义。

    class LogisticEndpoint(keras.layers.Layer):
        def __init__(self, name=None):
            super(LogisticEndpoint, self).__init__(name=name)
            self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
            self.accuracy_fn = keras.metrics.BinaryAccuracy()
    
        def call(self, targets, logits, sample_weights=None):
            # Compute the training-time loss value and add it
            # to the layer using `self.add_loss()`.
            loss = self.loss_fn(targets, logits, sample_weights)
            self.add_loss(loss)
    
            # Log accuracy as a metric and add it
            # to the layer using `self.add_metric()`.
            acc = self.accuracy_fn(targets, logits, sample_weights)
            self.add_metric(acc, name="accuracy")
    
            # Return the inference-time prediction tensor (for `.predict()`).
            return tf.nn.softmax(logits)
    

    这样使用 Metric 的好处是易于跟中其在训练中的变化

    layer = LogisticEndpoint()
    
    targets = tf.ones((2, 2))
    logits = tf.ones((2, 2))
    y = layer(targets, logits)
    
    print("layer.metrics:", layer.metrics)
    print("current accuracy value:", float(layer.metrics[0].result()))
    

    在 fit 中也是同样

    inputs = keras.Input(shape=(3,), name="inputs")
    targets = keras.Input(shape=(10,), name="targets")
    logits = keras.layers.Dense(10)(inputs)
    predictions = LogisticEndpoint(name="predictions")(logits, targets)
    
    model = keras.Model(inputs=[inputs, targets], outputs=predictions)
    model.compile(optimizer="adam")
    
    data = {
        "inputs": np.random.random((3, 3)),
        "targets": np.random.random((3, 10)),
    }
    model.fit(data)
    

    在 Layer 中开启序列化

    如果在Layer 类中实现 get_config 那么就可以实现对 layer 的序列化, 进而单独 save/load 一层

    class Linear(keras.layers.Layer):
        def __init__(self, units=32):
            super(Linear, self).__init__()
            self.units = units
    
        def build(self, input_shape):
            self.w = self.add_weight(
                shape=(input_shape[-1], self.units),
                initializer="random_normal",
                trainable=True,
            )
            self.b = self.add_weight(
                shape=(self.units,), initializer="random_normal", trainable=True
            )
    
        def call(self, inputs):
            return tf.matmul(inputs, self.w) + self.b
    
        def get_config(self):
            return {"units": self.units}
    
    
    # Now you can recreate the layer from its config:
    layer = Linear(64)
    config = layer.get_config()
    print(config)
    new_layer = Linear.from_config(config)
    

    如果打算序列化层, 那么在 __init__ 函数里,最好把输入的重要参数都保存到 config 中,以便使用。

    class Linear(keras.layers.Layer):
        def __init__(self, units=32, **kwargs):
            super(Linear, self).__init__(**kwargs)
            self.units = units
    
        def build(self, input_shape):
            self.w = self.add_weight(
                shape=(input_shape[-1], self.units),
                initializer="random_normal",
                trainable=True,
            )
            self.b = self.add_weight(
                shape=(self.units,), initializer="random_normal", trainable=True
            )
    
        def call(self, inputs):
            return tf.matmul(inputs, self.w) + self.b
    
        def get_config(self):
            config = super(Linear, self).get_config()
            config.update({"units": self.units})
            return config
    
    
    layer = Linear(64)
    config = layer.get_config()
    print(config)
    new_layer = Linear.from_config(config)
    

    如果想定制反序列化, 可以重载 from_config :

    def from_config(cls, config):
      return cls(**config)
    

    在 Call 函数中使用 Training 参数

    有些 Layer 在训练和预测时表现不一样, 所以就需要在Call 函数中得知当前是在训练还是在预测。

    class CustomDropout(keras.layers.Layer):
        def __init__(self, rate, **kwargs):
            super(CustomDropout, self).__init__(**kwargs)
            self.rate = rate
    
        def call(self, inputs, training=None):
            if training:
                return tf.nn.dropout(inputs, rate=self.rate)
            return inputs
    

    在 Call 函数中使用 Mask 参数

    简介没看明白, 详情请移步 "understanding padding and masking".

    Model 类

    对于定制化模型,一般来说,用Layer 来实现内部结果, 然后用Model 类来作为外层封装。 Model类和 Layer 类的 API 格式相同, 但有如下区别:

    • 会暴露内建的训练函数,比如:model.fit(), model.evaluate(), model.predict()
    • 会通过 model.layers 暴露内部的层结构
    • 会暴露save 和序列化函数 :save(), save_weights() ...

    什么时候用 Model 什么时候用 Layer ?
    如果这个类会用到 fit , save 则需要用 Model 类, 否则用 fit。

    class ResNet(tf.keras.Model):
    
        def __init__(self):
            super(ResNet, self).__init__()
            self.block_1 = ResNetBlock()
            self.block_2 = ResNetBlock()
            self.global_pool = layers.GlobalAveragePooling2D()
            self.classifier = Dense(num_classes)
    
        def call(self, inputs):
            x = self.block_1(inputs)
            x = self.block_2(x)
            x = self.global_pool(x)
            return self.classifier(x)
    
    
    resnet = ResNet()
    dataset = ...
    resnet.fit(dataset, epochs=10)
    resnet.save(filepath)
    

    一个End-To-End 的例子

    这里用一个 AutoEncoder 来展示如何用 Layer ,Model 来完成一个 端到端 (End-To-End ) 的模型:

    from tensorflow.keras import layers
    
    
    class Sampling(layers.Layer):
        """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
    
        def call(self, inputs):
            z_mean, z_log_var = inputs
            batch = tf.shape(z_mean)[0]
            dim = tf.shape(z_mean)[1]
            epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
            return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    
    class Encoder(layers.Layer):
        """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""
    
        def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
            super(Encoder, self).__init__(name=name, **kwargs)
            self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
            self.dense_mean = layers.Dense(latent_dim)
            self.dense_log_var = layers.Dense(latent_dim)
            self.sampling = Sampling()
    
        def call(self, inputs):
            x = self.dense_proj(inputs)
            z_mean = self.dense_mean(x)
            z_log_var = self.dense_log_var(x)
            z = self.sampling((z_mean, z_log_var))
            return z_mean, z_log_var, z
    
    
    class Decoder(layers.Layer):
        """Converts z, the encoded digit vector, back into a readable digit."""
    
        def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
            super(Decoder, self).__init__(name=name, **kwargs)
            self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
            self.dense_output = layers.Dense(original_dim, activation="sigmoid")
    
        def call(self, inputs):
            x = self.dense_proj(inputs)
            return self.dense_output(x)
    
    
    class VariationalAutoEncoder(keras.Model):
        """Combines the encoder and decoder into an end-to-end model for training."""
    
        def __init__(
            self,
            original_dim,
            intermediate_dim=64,
            latent_dim=32,
            name="autoencoder",
            **kwargs
        ):
            super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
            self.original_dim = original_dim
            self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
            self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
    
        def call(self, inputs):
            z_mean, z_log_var, z = self.encoder(inputs)
            reconstructed = self.decoder(z)
            # Add KL divergence regularization loss.
            kl_loss = -0.5 * tf.reduce_mean(
                z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
            )
            self.add_loss(kl_loss)
            return reconstructed
    

    利用这个模型来完成 MINST 训练

    original_dim = 784
    vae = VariationalAutoEncoder(original_dim, 64, 32)
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    mse_loss_fn = tf.keras.losses.MeanSquaredError()
    
    loss_metric = tf.keras.metrics.Mean()
    
    (x_train, _), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape(60000, 784).astype("float32") / 255
    
    train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
    train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)
    
    epochs = 2
    
    # Iterate over epochs.
    for epoch in range(epochs):
        print("Start of epoch %d" % (epoch,))
    
        # Iterate over the batches of the dataset.
        for step, x_batch_train in enumerate(train_dataset):
            with tf.GradientTape() as tape:
                reconstructed = vae(x_batch_train)
                # Compute reconstruction loss
                loss = mse_loss_fn(x_batch_train, reconstructed)
                loss += sum(vae.losses)  # Add KLD regularization loss
    
            grads = tape.gradient(loss, vae.trainable_weights)
            optimizer.apply_gradients(zip(grads, vae.trainable_weights))
    
            loss_metric(loss)
    
            if step % 100 == 0:
                print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
    

    因为VAE 类是 Model 类的子类, 它有内建的训练功能, 所以也可以这样写:

    vae = VariationalAutoEncoder(784, 64, 32)
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    
    vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
    vae.fit(x_train, x_train, epochs=2, batch_size=64)
    

    用 Functional API 来实现

    上面的例子使用了面向对象(object-oriented) 的方式来完成, 同样的也可以用流程化的变成方式:

    original_dim = 784
    intermediate_dim = 64
    latent_dim = 32
    
    # Define encoder model.
    original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")
    x = layers.Dense(intermediate_dim, activation="relu")(original_inputs)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()((z_mean, z_log_var))
    encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")
    
    # Define decoder model.
    latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")
    x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)
    outputs = layers.Dense(original_dim, activation="sigmoid")(x)
    decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")
    
    # Define VAE model.
    outputs = decoder(z)
    vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae")
    
    # Add KL divergence regularization loss.
    kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
    vae.add_loss(kl_loss)
    
    # Train.
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
    vae.fit(x_train, x_train, epochs=3, batch_size=64)
    

    这里也用了 Sampling , 形式不重要,实战中可以使用混合形式, 最高的目标是达到你的目标。

    相关文章

      网友评论

          本文标题:TF2 Keras (4) : 用类的方法创建新的层和模型

          本文链接:https://www.haomeiwen.com/subject/fjiooktx.html