讲过前馈网络和卷积网络之后,那么就让我们来看看循环网络的实现。
相比前面提到过的网络类型,循环网络存在“记忆”,即它内部会保留过去的状态,并作为下次处理的输入的一部分。因此,在循环网络中,节点的输出如下:
output_t = activation(dot(W, input_t) + dot(U, state_t) + b)
其中,state_t = output_t,上一次的计算结果。将 RNN 概念图展开,得出下图(摘自《Deep.Learning.with.Python》):
data:image/s3,"s3://crabby-images/d638d/d638df0f1f4f0ec047ef79a0359cf6fe25008b63" alt=""
本文同样以 imdb 评论为例来进行说明。
使用 Keras 的做法
Keras 中提供了 RNN 的简单实现: SimpleRNN ,先看看如何用它来搭建模型。注意,这里面需要用到 Embedding 层。
import tensorflow as tf
from tensorflow.keras.datasets import imdb
from tensorflow.keras import preprocessing
max_features = 10000
maxlen = 500
# 加载数据
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
# 预处理数据,将数据分割成相等长度
x_train = preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = preprocessing.sequence.pad_sequences(x_test, maxlen=maxlen)
# 模型定义
model = tf.keras.models.Sequential([
tf.keras.layers.Embedding(max_features, 32, input_length=maxlen),
tf.keras.layers.SimpleRNN(32, return_sequences=True),
tf.keras.layers.SimpleRNN(32),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='rmsprop',
loss='binary_crossentropy',
metrics=['accuracy'])
# 训练
model.fit(x_train, y_train, epochs=10, batch_size=128, validation_split=0.2)
# 预测
print(y_train[:3])
print(model.predict_classes(x_test[:3]).flatten())
对于 SimpleRNN,其输出可以有两种格式,由 return_sequences 控制:
- = True,返回 (batch_size, time_steps, output_features),即每一次的输出结果都会保存,当 SimpleRNN 后跟 SimpleRNN 时使用。
- = False,返回 (batch_size, output_features),即最后一次的输出,当 SimpleRNN 为最后一个 RNN 层时使用。
使用 TF 实现自定义结构
相比起 TF 的正统实现(layer + cell),本文做了相当的简化:只构建 SimpleRNN 层,主要的代码参考了《Deep.Learning.with.Python》。
同样,为了保证每篇的独立性,所有对象的代码都贴出来了,如果不想看之前重复的代码可以直接去看 MySimpleRNN
MyDense
同前。
class MyDense(Layer):
def __init__(self, units=32):
super(MyDense, self).__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(shape=(input_shape[-1], self.units),
initializer='random_normal', trainable=True)
self.b = self.add_weight(shape=(self.units,),
initializer='random_normal', trainable=True)
def call(self, inputs):
return tf.nn.sigmoid(tf.matmul(inputs, self.w) + self.b)
MyEmbedding
同前。
class MyEmbedding(Layer):
def __init__(self, input_unit, output_unit):
super(MyEmbedding, self).__init__()
self.input_unit = input_unit
self.output_unit = output_unit
def build(self, input_shape):
self.embedding = self.add_weight(shape=(self.input_unit, self.output_unit),
initializer='random_normal', trainable=True)
def call(self, inputs):
return tf.nn.embedding_lookup(self.embedding, inputs)
MySimpleRNN
自定义 SimpleRNN 层,注意:
- 继承 Layer 的规范,实现 build 和 call 。
- build 负责初始化状态矩阵、权重和偏置,来自 Layer 。注意它们的 shape:
- 权重:输入 x 输出
- 偏置:无
- 状态矩阵:输出 x 输出
- call 负责计算(内建循环,循环次数即时间步骤,来自第二维长度),来自 Layer 。
class MySimpleRNN(Layer):
def __init__(self, unit, return_sequences=False):
super(MySimpleRNN, self).__init__()
self.units = unit
self.return_sequences = return_sequences
def build(self, input_shape):
self.w = self.add_weight(shape=(input_shape[-1], self.units),
initializer='random_normal', trainable=True)
self.b = self.add_weight(shape=(self.units,),
initializer='random_normal', trainable=True)
self.u = self.add_weight(shape=(self.units, self.units),
initializer='random_normal', trainable=True)
def call(self, inputs):
state = tf.zeros((1, self.units))
outputs = []
for step in range(inputs.shape[1]):
output = tf.nn.tanh(tf.matmul(inputs[:, step, :], self.w) + tf.matmul(state, self.u) + self.b)
outputs.append([output])
state = output
# 注意 shape 的变化
return tf.transpose(tf.concat(outputs, 0), (1, 0, 2)) if self.return_sequences else output
MyModel
同前。
class MyModel(Layer):
def __init__(self, layers):
super(MyModel, self).__init__()
self.layers = layers
def call(self, inputs):
x = self.layers[0](inputs)
for layer in self.layers[1:-1]:
x = layer(x)
result = self.layers[-1](x)
return result
def train(self, x_train, y_train, epochs = 5):
loss = tf.keras.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.RMSprop()
accuracy = tf.keras.metrics.Accuracy()
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shuffle(buffer_size=1024).batch(64)
for epoch in range(epochs):
for step, (x, y) in enumerate(dataset):
with tf.GradientTape() as tape:
# Forward pass.
y_pred = model(x)
# Loss value for this batch.
loss_value = loss(y, y_pred)
# Get gradients of loss wrt the weights.
gradients = tape.gradient(loss_value, model.trainable_weights)
# Update the weights of our linear layer.
optimizer.apply_gradients(zip(gradients, model.trainable_weights))
# Update the running accuracy.
accuracy.update_state(y, tf.cast(y_pred >= 0.5, dtype=tf.int64))
print('Epoch:', epoch, ', Loss from last epoch: %.3f' % loss_value, ', Total running accuracy so far: %.3f' % accuracy.result(), end='\r')
print('\n')
看看效果吧:
# 定义
model = MyModel([
MyEmbedding(max_features, 32),
MySimpleRNN(32, True),
MySimpleRNN(32),
MyDense(1)
])
# 训练
model.train(x_train, y_train, 10)
# 预测
print(y_train[:20])
print(tf.cast(model(x_test[:20]) >= 0.5, dtype=tf.int64).numpy().flatten())
关于代码注意几点:
- MySimpleRNN 中输出的 shape 的变化,参考上面的定义。
- timestep 长度即为 Sequence 大小
- 尽量使用 tf 的操作来进行 shape 变化,否则容易出现“梯度不存在”的错误,参考错误信息如下:
Gradients does not exist for variables ...
data:image/s3,"s3://crabby-images/79ee6/79ee637b4baf5ea30e65c5d8ce48462d41047301" alt=""
网友评论