TensorFlow实现(十一)Attention机制

作者: 致Great | 来源:发表于2018-06-11 17:10 被阅读7549次

    原理介绍

    图片1
    图片2
    图片3
    更多资料:
    https://distill.pub/2016/augmented-rnns/#attentional-interfaces
    https://www.cnblogs.com/shixiangwan/p/7573589.html#top
    http://baijiahao.baidu.com/s?id=1587926245504773589&wfr=spider&for=pc
    https://talbaumel.github.io/blog/attention/
    浅谈Attention-based Model【原理篇】

    论文阅读

    Hierarchical Attention Networks for Document Classification
    这篇文章主要讲述了基于Attention机制实现文本分类

    假设我们有很多新闻文档,这些文档属于三类:军事、体育、娱乐。其中有一个文档D有L个句子si(i代表s是文档D的第i个句子),每个句子包含Ti个词(word),wit代表第i个句子的word,t∈[0,T]

    Word Encoder
    ①给定一个句子si,例如 The superstar is walking in the street,由下面表示[wi1,wi2,wi3,wi4,wi5,wi6,wi1,wi7],我们使用一个词嵌入矩阵W将单词编码为向量


    ②使用双向GRU编码整个句子关于单词wit的隐含向量:

    那么最终隐含向量为前向隐含向量和后向隐含向量拼接在一起

    Word Attention:
    给定一句话,并不是这个句子中所有的单词对个句子语义起同等大小的“贡献”,比如上句话“The”,“is”等,这些词没有太大作用,因此我们需要使用attention机制来提炼那些比较重要的单词,通过赋予权重以提高他们的重要性。
    ①通过一个MLP获取hit的隐含表示:

    ②通过一个softmax函数获取归一化的权重:

    ③计算句子向量:
    通过每个单词获取的hit与对应权重αit乘积,然后获取获得句子向量

    代码实现

    attenton.py

    import tensorflow as tf
    
    
    def attention(inputs, attention_size, time_major=False, return_alphas=False):
    
    
        if isinstance(inputs, tuple):
            # In case of Bi-RNN, concatenate the forward and the backward RNN outputs.
            inputs = tf.concat(inputs, 2)
    
        if time_major:
            # (T,B,D) => (B,T,D)
            inputs = tf.array_ops.transpose(inputs, [1, 0, 2])
    
        hidden_size = inputs.shape[2].value  # D value - hidden size of the RNN layer
    
        # Trainable parameters
        w_omega = tf.Variable(tf.random_normal([hidden_size, attention_size], stddev=0.1))
        b_omega = tf.Variable(tf.random_normal([attention_size], stddev=0.1))
        u_omega = tf.Variable(tf.random_normal([attention_size], stddev=0.1))
    
        with tf.name_scope('v'):
            # Applying fully connected layer with non-linear activation to each of the B*T timestamps;
            #  the shape of `v` is (B,T,D)*(D,A)=(B,T,A), where A=attention_size
            v = tf.tanh(tf.tensordot(inputs, w_omega, axes=1) + b_omega)
    
        # For each of the timestamps its vector of size A from `v` is reduced with `u` vector
        vu = tf.tensordot(v, u_omega, axes=1, name='vu')  # (B,T) shape
        alphas = tf.nn.softmax(vu, name='alphas')         # (B,T) shape
    
        # Output of (Bi-)RNN is reduced with attention vector; the result has (B,D) shape
        output = tf.reduce_sum(inputs * tf.expand_dims(alphas, -1), 1)
    
        if not return_alphas:
            return output
        else:
            return output, alphas
    
    

    train.py

    
    from __future__ import print_function, division
    
    import numpy as np
    import tensorflow as tf
    from keras.datasets import imdb
    from tensorflow.contrib.rnn import GRUCell
    from tensorflow.python.ops.rnn import bidirectional_dynamic_rnn as bi_rnn
    from tqdm import tqdm
    
    from attention import attention
    from utils import get_vocabulary_size, fit_in_vocabulary, zero_pad, batch_generator
    
    NUM_WORDS = 10000
    INDEX_FROM = 3
    SEQUENCE_LENGTH = 250
    EMBEDDING_DIM = 100
    HIDDEN_SIZE = 150
    ATTENTION_SIZE = 50
    KEEP_PROB = 0.8
    BATCH_SIZE = 256
    NUM_EPOCHS = 3  # Model easily overfits without pre-trained words embeddings, that's why train for a few epochs
    DELTA = 0.5
    MODEL_PATH = './model'
    
    # Load the data set
    (X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=NUM_WORDS, index_from=INDEX_FROM)
    
    # Sequences pre-processing
    vocabulary_size = get_vocabulary_size(X_train)
    X_test = fit_in_vocabulary(X_test, vocabulary_size)
    X_train = zero_pad(X_train, SEQUENCE_LENGTH)
    X_test = zero_pad(X_test, SEQUENCE_LENGTH)
    
    # Different placeholders
    with tf.name_scope('Inputs'):
        batch_ph = tf.placeholder(tf.int32, [None, SEQUENCE_LENGTH], name='batch_ph')
        target_ph = tf.placeholder(tf.float32, [None], name='target_ph')
        seq_len_ph = tf.placeholder(tf.int32, [None], name='seq_len_ph')
        keep_prob_ph = tf.placeholder(tf.float32, name='keep_prob_ph')
    
    # Embedding layer
    with tf.name_scope('Embedding_layer'):
        embeddings_var = tf.Variable(tf.random_uniform([vocabulary_size, EMBEDDING_DIM], -1.0, 1.0), trainable=True)
        tf.summary.histogram('embeddings_var', embeddings_var)
        batch_embedded = tf.nn.embedding_lookup(embeddings_var, batch_ph)
    
    # (Bi-)RNN layer(-s)
    rnn_outputs, _ = bi_rnn(GRUCell(HIDDEN_SIZE), GRUCell(HIDDEN_SIZE),
                            inputs=batch_embedded, sequence_length=seq_len_ph, dtype=tf.float32)
    tf.summary.histogram('RNN_outputs', rnn_outputs)
    
    # Attention layer
    with tf.name_scope('Attention_layer'):
        attention_output, alphas = attention(rnn_outputs, ATTENTION_SIZE, return_alphas=True)
        tf.summary.histogram('alphas', alphas)
    
    # Dropout
    drop = tf.nn.dropout(attention_output, keep_prob_ph)
    
    # Fully connected layer
    with tf.name_scope('Fully_connected_layer'):
        W = tf.Variable(tf.truncated_normal([HIDDEN_SIZE * 2, 1], stddev=0.1))  # Hidden size is multiplied by 2 for Bi-RNN
        b = tf.Variable(tf.constant(0., shape=[1]))
        y_hat = tf.nn.xw_plus_b(drop, W, b)
        y_hat = tf.squeeze(y_hat)
        tf.summary.histogram('W', W)
    
    with tf.name_scope('Metrics'):
        # Cross-entropy loss and optimizer initialization
        loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y_hat, labels=target_ph))
        tf.summary.scalar('loss', loss)
        optimizer = tf.train.AdamOptimizer(learning_rate=1e-3).minimize(loss)
    
        # Accuracy metric
        accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.round(tf.sigmoid(y_hat)), target_ph), tf.float32))
        tf.summary.scalar('accuracy', accuracy)
    
    merged = tf.summary.merge_all()
    
    # Batch generators
    train_batch_generator = batch_generator(X_train, y_train, BATCH_SIZE)
    test_batch_generator = batch_generator(X_test, y_test, BATCH_SIZE)
    
    train_writer = tf.summary.FileWriter('./logdir/train', accuracy.graph)
    test_writer = tf.summary.FileWriter('./logdir/test', accuracy.graph)
    
    session_conf = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))
    
    saver = tf.train.Saver()
    
    if __name__ == "__main__":
        with tf.Session(config=session_conf) as sess:
            sess.run(tf.global_variables_initializer())
            print("Start learning...")
            for epoch in range(NUM_EPOCHS):
                loss_train = 0
                loss_test = 0
                accuracy_train = 0
                accuracy_test = 0
    
                print("epoch: {}\t".format(epoch), end="")
    
                # Training
                num_batches = X_train.shape[0] // BATCH_SIZE
                for b in tqdm(range(num_batches)):
                    x_batch, y_batch = next(train_batch_generator)
                    seq_len = np.array([list(x).index(0) + 1 for x in x_batch])  # actual lengths of sequences
                    loss_tr, acc, _, summary = sess.run([loss, accuracy, optimizer, merged],
                                                        feed_dict={batch_ph: x_batch,
                                                                   target_ph: y_batch,
                                                                   seq_len_ph: seq_len,
                                                                   keep_prob_ph: KEEP_PROB})
                    accuracy_train += acc
                    loss_train = loss_tr * DELTA + loss_train * (1 - DELTA)
                    train_writer.add_summary(summary, b + num_batches * epoch)
                accuracy_train /= num_batches
    
                # Testing
                num_batches = X_test.shape[0] // BATCH_SIZE
                for b in tqdm(range(num_batches)):
                    x_batch, y_batch = next(test_batch_generator)
                    seq_len = np.array([list(x).index(0) + 1 for x in x_batch])  # actual lengths of sequences
                    loss_test_batch, acc, summary = sess.run([loss, accuracy, merged],
                                                             feed_dict={batch_ph: x_batch,
                                                                        target_ph: y_batch,
                                                                        seq_len_ph: seq_len,
                                                                        keep_prob_ph: 1.0})
                    accuracy_test += acc
                    loss_test += loss_test_batch
                    test_writer.add_summary(summary, b + num_batches * epoch)
                accuracy_test /= num_batches
                loss_test /= num_batches
    
                print("loss: {:.3f}, val_loss: {:.3f}, acc: {:.3f}, val_acc: {:.3f}".format(
                    loss_train, loss_test, accuracy_train, accuracy_test
                ))
            train_writer.close()
            test_writer.close()
            saver.save(sess, MODEL_PATH)
            print("Run 'tensorboard --logdir=./logdir' to checkout tensorboard logs.")
    
    

    utils.py

    from __future__ import print_function
    
    import numpy as np
    
    
    def zero_pad(X, seq_len):
        return np.array([x[:seq_len - 1] + [0] * max(seq_len - len(x), 1) for x in X])
    
    
    def get_vocabulary_size(X):
        return max([max(x) for x in X]) + 1  # plus the 0th word
    
    
    def fit_in_vocabulary(X, voc_size):
        return [[w for w in x if w < voc_size] for x in X]
    
    
    def batch_generator(X, y, batch_size):
        """Primitive batch generator 
        """
        size = X.shape[0]
        X_copy = X.copy()
        y_copy = y.copy()
        indices = np.arange(size)
        np.random.shuffle(indices)
        X_copy = X_copy[indices]
        y_copy = y_copy[indices]
        i = 0
        while True:
            if i + batch_size <= size:
                yield X_copy[i:i + batch_size], y_copy[i:i + batch_size]
                i += batch_size
            else:
                i = 0
                indices = np.arange(size)
                np.random.shuffle(indices)
                X_copy = X_copy[indices]
                y_copy = y_copy[indices]
                continue
    
    
    if __name__ == "__main__":
        # Test batch generator
        gen = batch_generator(np.array(['a', 'b', 'c', 'd']), np.array([1, 2, 3, 4]), 2)
        for _ in range(8):
            xx, yy = next(gen)
            print(xx, yy)
    
    

    代码地址:https://github.com/ilivans/tf-rnn-attention

    运行结果:



    在训练集上准确率达到96%,测试集达到86%,效果还是很强大

    相关文章

      网友评论

        本文标题:TensorFlow实现(十一)Attention机制

        本文链接:https://www.haomeiwen.com/subject/wtlheftx.html