美文网首页
Seq2Seq(Attention)的PyTorch实现

Seq2Seq(Attention)的PyTorch实现

作者: Jarkata | 来源:发表于2021-05-14 01:45 被阅读0次

    本文为转载,原文链接: https://wmathor.com/index.php/archives/1451/

    本文主要介绍一下如何使用 PyTorch 复现 Seq2Seq (with Attention),实现简单的机器翻译任务,请先阅读论文 Neural Machine Translation by Jointly Learning to Align and Translate,之后花上 15 分钟阅读我的这两篇文章 Seq2Seq 与注意力机制图解 Attention,最后再来看文本,方能达到醍醐灌顶,事半功倍的效果。

    数据预处理

    数据预处理的代码其实就是调用各种 API,我不希望读者被这些不太重要的部分分散了注意力,因此这里我不贴代码,仅口述一下带过即可。
    这里采用的数据集是torchtext的multi30k

    如下图所示,本文使用的是德语→英语数据集,输入是德语,并且输入的每个句子开头和结尾都带有特殊的标识符。输出是英语,并且输出的每个句子开头和结尾也都带有特殊标识符

    不管是英语还是德语,每句话长度都是不固定的,所以我对于每个 batch 内的句子,将它们的长度通过加 <PAD> 变得一样,也就说,一个 batch 内的句子,长度都是相同的,不同 batch 内的句子长度不一定相同。 下图维度表示分别是 [seq_len, batch_size]

    随便打印一条数据,看一下数据封装的形式


    在数据预处理的时候,需要将源句子和目标句子分开构建字典,也就是单独对德语构建一个词库,对英语构建一个词库

    Encoder

    Encoder这里使用的是单层双向GRU


    双向GRU的隐藏状态输出由两个向量拼接而成,例如h_1=[\overrightarrow {h_1};\overleftarrow {h_T}],h_2=[\overrightarrow{h_2};\overleftarrow{h_{T-1}}]……

    所有时刻的最后一层隐藏状态就构成了 GRU 的 output.
    output=\{ h_1,h_2,...h_T \}
    假设这是个m层GRU,那么最后一个时刻所有层中的隐藏状态就构成了GRU的final hidden states.
    hidden=\{ h^1_T,h^2_T,...,h^m_T \}
    其中
    h^i_T=[\overrightarrow{h^i_T};\overleftarrow{h^i_1}]
    所以,
    hidden=\{[\overrightarrow{h^1_T};\overleftarrow{h^1_1}],[\overrightarrow{h^2_T};\overleftarrow{h^2_1}],...,[\overrightarrow{h^m_T};\overleftarrow{h^m_1}]\}
    根据论文,或者你看了我的图解 Attention 这篇文章就会知道,我们需要的是 hidden 的最后一层输出(包括正向和反向),因此我们可以通过 hidden[-2,:,:]hidden[-1,:,:] 取出最后一层的 hidden states,将它们拼接起来记作 s_0

    最后一个细节之处在于, s_0的维度是 [batch_size, en_hid_dim*2],即便是没有 Attention 机制,将 s_0作为 Decoder 的初始隐藏状态也不对,因为维度不匹配,Decoder 的初始隐藏状态是三维的,而现在我们的 s_0是二维的,因此需要将 s_0的维度转为三维,并且还要调整各个维度上的值。首先我通过一个全连接神经网络,将 s_0的维度变为 [batch_size, dec_hid_dim]

    Encoder 的细节就这么多,下面直接上代码,我的代码风格是,注释在上,代码在下

    
    class Encoder(nn.Module):
        def __init__(self, input_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout):
            super().__init__()
            self.embedding = nn.Embedding(input_dim, emb_dim)
            # single layer, bi-direction GRU
            self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True)
            self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)
            self.dropout = nn.Dropout(dropout)
    
        def forward(self, src):
            '''
            :param src: [src_len, batch_size]
            :return:
            '''
    
            src = src.transpose(0, 1)  # src = [batch_size, src_len]
            # embedded = [src_len, batch_size, emb_dim]
            embedded = self.dropout(self.embedding(src)).transpose(0, 1)
    
            # enc_output = [src_len, batch_size, hid_dim*num_directions]
            # enc_hidden = [n_layers * num_directions, batch_size, hid_dim]
            enc_output, enc_hidden = self.rnn(embedded)  # if h_0 is not given, it will be set 0 acquiescently
    
            # enc_hidden is stacked [forward_1, backward_1, forward_2, backward_2, ...]
            # enc_output are always from the last layer
    
            # enc_hidden [-2, :, : ] is the last of the forwards RNN
            # enc_hidden [-1, :, : ] is the last of the backwards RNN
    
            # initial decoder hidden is final hidden state of the forwards and backwards
            # encoder RNNs fed through a linear layer
            # concatenate the hidden_state of last two layers
            # s = [batch_size, dec_hid_dim]
            s = torch.tanh(self.fc(torch.cat((enc_hidden[-2, :, :], enc_hidden[-1, :, :]), dim=1)))
    
            return enc_output, s
    

    Attention

    attention 无非就是三个公式
    E_t=tanh(attn(s_{t-1},H))\\ \tilde{a_t}=vE_t\\ {a_t}=softmax(\tilde{a_t})
    其中s_{t-1}指的就是Encoder中的变量sH就是指的Encoder中的变量enc_outputattn()其实就是一个简单的全连接神经网络。

    我们可以从最后一个公式反推各个变量的维度是什么,或者维度有什么要求

    首先 a_t的维度应该是 [batch_size, src_len],这是毋庸置疑的,那么 \tilde{a_t}的维度也应该是 [batch_size, src_len],或者 \tilde{a_t} 是个三维的,但是某个维度值为 1,可以通过 squeeze() 变成两维的。这里我们先假设 \tilde{a_t}的维度是 [batch_size, src_len, 1],等会儿我再解释为什么要这样假设

    继续往上推,变量 v的维度就应该是 [?, 1]? 表示我暂时不知道它的值应该是多少。 E_t的维度应该是 [batch_size, src_len, ?]

    现在已知H 的维度是 [batch_size, src_len, enc_hid_dim*2]s_{t-1}目前的维度是 [batch_size, dec_hid_dim],这两个变量需要做拼接,送入全连接神经网络,因此我们首先需要将 s_{t-1}的维度变成 [batch_size, src_len, dec_hid_dim],拼接之后的维度就变成 [batch_size, src_len, enc_hid_dim*2+dec_hid_dim],于是 attn()这个函数的输入输出值也就有了

    attn = nn.Linear(enc_hid_dim*2+dec_hid_dim, ?)
    

    到此为止,除了? 部分的值不清楚,其它所有维度都推导出来了。现在我们回过头思考一下 ? 设置成多少,好像其实并没有任何限制,所以我们可以设置 ? 为任何值(在代码中我设置 ? 为 dec_hid_dim)

    Attention细节就这么多,下面给出代码

    
    class Attention(nn.Module):
        def __init__(self, enc_hid_dim, dec_hid_dim):
            super().__init__()
            # [size(h_t)+size(s_{t-1}), dec_hid_dim]
            self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim, bias=False)
            self.v = nn.Linear(dec_hid_dim, 1, bias=False)
    
        def forward(self, s, enc_output):
            # s = [batch_size, dec_hid_dim]
            # enc_output = [src_len, batch_size, enc_hid_dim * 2]
    
            batch_size = enc_output.shape[1]
            src_len = enc_output.shape[0]
    
            # repeat decoder hidden state src_len times
            # s = [batch_size, src_len, enc_hid_dim * 2]
            # enc_output = [batch_size, src_len, enc_hid_dim * 2]
            s = s.unsqueeze(1).repeat(1, src_len, 1)
            enc_output = enc_output.transpose(0, 1)
    
            # energy = [batch_size, src_len, dec_hid_dim]
            energy = torch.tanh(self.attn(torch.cat((s, enc_output), dim=2)))
    
            # attention = [batch_size, src_len]
            attention = self.v(energy).squeeze(2)
    
            return F.softmax(attention, dim=1)
    
    

    Seq2Seq(with Attention)

    我调换一下顺序,先讲 Seq2Seq,再讲 Decoder 的部分

    传统 Seq2Seq 是直接将句子中每个词连续不断输入 Decoder 进行训练,而引入 Attention 机制之后,我需要能够人为控制一个词一个词进行输入(因为输入每个词到 Decoder,需要再做一些运算),所以在代码中会看到我使用了 for 循环,循环 trg_len-1 次(开头的 <SOS> 我手动输入,所以循环少一次)

    并且训练过程中我使用了一种叫做 Teacher Forcing 的机制,保证训练速度的同时增加鲁棒性,如果不了解 Teacher Forcing 可以看我的这篇文章

    思考一下 for 循环中应该要做哪些事?首先要将变量传入 Decoder,由于 Attention 的计算是在 Decoder 的内部进行的,所以我需要将 dec_inputsenc_output 这三个变量传入 Decoder,Decoder 会返回 dec_output 以及新的 s。之后根据概率对 dec_output 做 Teacher Forcing 即可

    Seq2Seq 细节就这么多,下面给出代码

    
    class Seq2Seq(nn.Module):
        def __init__(self, encoder, decoder, device):
            super().__init__()
            self.encoder = encoder
            self.decoder = decoder
            self.device = device
    
        def forward(self, src, trg, teacher_forcing_ratio=0.5):
            # src = [src_len, batch_size]
            # trg = [trg_len, batch_size]
            # teacher_forcing_ratio is probability to use teacher forcing (scheduled sampling)
            batch_size = src.shape[1]
            trg_len = trg.shape[0]
            trg_vocab_size = self.decoder.output_dim
    
            # tensor to store decoder outputs
            outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
    
            # enc_output is all hidden states of the input sequence, back and forwards
            # s is the final forward and backward hidden states, passed through a linear layer
            # enc_output : [src_len, batch_size, enc_hid_dim * 2]
            # s : [batch_size, dec_hid_dim]
            enc_output, s = self.encoder(src)
    
            # first input to the decoder is the <sos> tokens
            dec_input = trg[0, :]
    
            for t in range(1, trg_len):
                # insert dec_input token embedding, previous hidden state and all encoder hidden states
                # receive output tensor (predictions) and new hidden state
                dec_output, s = self.decoder(dec_input, s, enc_output)
    
                # place predictions in a tensor holding predictions for each token
                outputs[t] = dec_output
    
                # decide if we are going to use teacher forcing or not
                teacher_force = random.random() < teacher_forcing_ratio
    
                # get the highest predicted token from our predictions
                top1 = dec_output.argmax(1)
    
                # if teacher forcing, use actural next token as input
                # if not, use predicted token
                dec_input = trg[t] if teacher_force else top1
    
            return outputs
    
    

    Decoder

    Decoder这里使用单向单层GRU


    Decoder 部分实际上也就是三个公式
    c=a_tH\\ s_t=GRU(emb(y_t), c, s_{t-1})\\ \hat{y_t}=f(emb(y_t), c, s_t)

    H指的是 Encoder 中的变量 enc_output, 指的是将 dec_input 经过 WordEmbedding 后得到的结果, 函数实际上就是为了转换维度,因为需要的输出是 TRG_VOCAB_SIZE 大小。其中有个细节,GRU 的参数只有两个,一个输入,一个隐藏层输入,但是上面的公式有三个变量,所以我们应该选一个作为隐藏层输入,另外两个 "整合" 一下,作为输入

    我们从第一个公式正推各个变量的维度是什么

    首先在 Encoder 中最开始先调用一次 Attention,得到权重a_t ,它的维度是 [batch_size, src_len],而 H的维度是 [src_len, batch_size, enc_hid_dim*2],它俩要相乘,同时应该保留 batch_size 这个维度,所以应该先将 a_t扩展一维,然后调换一下H维度的顺序,之后再按照 batch 相乘(即同一个 batch 内的矩阵相乘)

    a = a.unsqueeze(1) # [batch_size, 1, src_len]
    H = H.transpose(0, 1) # [batch_size, src_len, enc_hid_dim*2]
    c = torch.bmm(a, h) # [batch_size, 1, enc_hid_dim*2]
    

    前面也说了,由于 GRU 不需要三个变量,所以需要将emb(y_t)c 整合一下, y_t实际上就是 Seq2Seq 类中的 dec_input 变量,它的维度是 [batch_size],因此先将 y_t 扩展一个维度,再通过 WordEmbedding,这样他就变成 [batch_size, 1, emb_dim]。最后对 cemb(y_t) 进行 concat

    y = y.unsqueeze(1) # [batch_size, 1]
    emb_y = self.emb(y) # [batch_size, 1, emb_dim]
    rnn_input = torch.cat((emb_y, c), dim=2) # [batch_size, 1, emb_dim+enc_hid_dim*2]
    

    s_{t-1}的维度是 [batch_size, dec_hid_dim],所以应该先将其拓展一个维度 (layer*num_direction维)

    rnn_input = rnn_input.transpose(0, 1) # [1, batch_size, emb_dim+enc_hid_dim*2]
    s = s.unsqueeze(0) # [1, batch_size, dec_hid_dim]
    
    # dec_output = [1, batch_size, dec_hid_dim]
    # dec_hidden = [1, batch_size, dec_hid_dim] = s (new, is not s previously)
    dec_output, dec_hidden = self.rnn(rnn_input, s)
    

    最后一个公式,需要将三个变量全部拼接在一起,然后通过一个全连接神经网络,得到最终的预测。我们先分析下这个三个变量的维度, emb(y_t)的维度是 [batch_size, 1, emb_dim]c的维度是 [batch_size, 1, enc_hid_dim]s_t 的维度是 [1, batch_size, dec_hid_dim],因此我们可以像下面这样把他们全部拼接起来

    emd_y = emb_y.squeeze(1) # [batch_size, emb_dim]
    c = w.squeeze(1) # [batch_size, enc_hid_dim*2]
    s = s.squeeze(0) # [batch_size, dec_hid_dim]
    
    fc_input = torch.cat((emb_y, c, s), dim=1) # [batch_size, enc_hid_dim*2+dec_hid_dim+emb_hid] 
    

    以上就是 Decoder 部分的细节,下面给出代码(上面的那些只是示例代码,和下面代码变量名可能不一样)

    class Decoder(nn.Module):
        def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):
            super().__init__()
            self.output_dim = output_dim
            self.attention = attention
            self.embedding = nn.Embedding(output_dim, emb_dim)
            self.rnn = nn.GRU(enc_hid_dim * 2 + emb_dim, dec_hid_dim)
            self.fc_out = nn.Linear(enc_hid_dim * 2 + dec_hid_dim + emb_dim, output_dim)
            self.dropout = nn.Dropout(dropout)
    
        def forward(self, dec_input, s, enc_output):
            # dec_input = [batch_size]
            # s = [batch_size, dec_hid_dim]
            # enc_output = [src_len, batch_size, enc_hid_dim *2]
    
            # dec_input = [batch_size,1]
            dec_input = dec_input.unsqueeze(1)
    
            # embedded = [1, batch_size, emb_dim]
            embedded = self.dropout(self.embedding(dec_input)).transpose(0, 1)
    
            # s = [batch_size, dec_hid_dim]
            # enc_output = [src_len, batch_size, enc_hid_dim *2]
    
            # a = [batch_size, 1, src_len]
            a = self.attention(s, enc_output).unsqueeze(1)
    
            # enc_output = [batch_size, src_len, enc_hid_dim * 2]
            enc_output = enc_output.transpose(0, 1)
    
            # c = [1, batch_size, enc_hid_dim * 2]
            c = torch.bmm(a, enc_output).transpose(0, 1)
            # torch.bmm: Performs a batch matrix-matrix product of matrices stored in input and mat2
    
            # rnn_input = [1, batch_size, (enc_hid_dim*2) + emb_dim]
            rnn_input = torch.cat((embedded, c), dim=2)
    
            # dec_output = [src_len(=1), batch_size, dec_hid_dim]
            # dec_hidden = [n_layers*num_directions, batch_size, dec_hid_dim]
            dec_output, dec_hidden = self.rnn(rnn_input, s.unsqueeze(0))
    
            # embedded = [batch_size, emb_dim]
            # dec_output = [batch_size, dec_hid_dim]
            # c = [batch_size, enc_hid_dim * 2]
            embedded = embedded.squeeze(0)
            dec_output = dec_output.squeeze(0)
            c = c.squeeze(0)
    
            # pred = [batch_size, output_dim]
            pred = self.fc_out(torch.cat((dec_output, c, embedded), dim=1))
    
            return pred, dec_hidden.squeeze(0)
    
    

    定义模型

    INPUT_DIM = len(SRC.vocab)
    OUTPUT_DIM = len(TRG.vocab)
    ENC_EMB_DIM = 256
    DEC_EMB_DIM = 256
    ENC_HID_DIM = 512
    DEC_HID_DIM = 512
    ENC_DROPOUT = 0.5
    DEC_DROPOUT = 0.5
    
    attn = Attention(ENC_HID_DIM, DEC_HID_DIM)
    enc = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)
    dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn)
    
    model = Seq2Seq(enc, dec, device).to(device)
    TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
    criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    

    倒数第二行CrossEntropyLoss() 中的参数很少见,ignore_index=TRG_PAD_IDX,这个参数的作用是忽略某一类别,不计算其 loss,但是要注意,忽略的是真实值中的类别,例如下面的代码,真实值的类别都是 1,而预测值全部预测认为是 2(下标从 0 开始),同时 loss function 设置忽略第一类的 loss,此时会打印出 0

    label = torch.tensor([1, 1, 1])
    pred = torch.tensor([[0.1, 0.2, 0.6], [0.2, 0.1, 0.8], [0.1, 0.1, 0.9]])
    loss_fn = nn.CrossEntropyLoss(ignore_index=1)
    print(loss_fn(pred, label).item()) # 0
    

    如果设置 loss function 忽略第二类,此时 loss 并不会为 0

    label = torch.tensor([1, 1, 1])
    pred = torch.tensor([[0.1, 0.2, 0.6], [0.2, 0.1, 0.8], [0.1, 0.1, 0.9]])
    loss_fn = nn.CrossEntropyLoss(ignore_index=2)
    print(loss_fn(pred, label).item()) # 1.359844
    

    完整代码

    #!/usr/bin/env python
    # coding: utf-8
    
    # ### Preparing Data
    # 
    
    # In[1]:
    
    
    import torch
    import torch.nn as nn
    import torch.optim as optim
    import torch.nn.functional as F
    
    from torchtext.legacy.datasets import Multi30k
    from torchtext.legacy.data import Field, BucketIterator
    
    import spacy
    import numpy as np
    
    import random
    import math
    import time
    
    
    # Set the random seeds for reproducability
    # 
    # 
    
    # In[2]:
    
    
    SEED = 1234
    random.seed(SEED)
    np.random.seed(SEED)
    torch.manual_seed(SEED)
    torch.cuda.manual_seed(SEED)
    
    
    # 加载German 和 English spaCy模型
    
    # In[3]:
    
    
    spacy_de = spacy.load('de_core_news_sm')
    spacy_en = spacy.load('en_core_web_sm')
    
    
    # 创建tokenizer
    
    # In[4]:
    
    
    def tokenize_de(text):
        # Tokenizes German text from a string into a list of strings
        return [tok.text for tok in spacy_de.tokenizer(text)]
    
    
    def tokenize_en(text):
        # Tokenizes English text from a string into a list of strings
        return [tok.text for tok in spacy_en.tokenizer(text)]
    
    
    # 创建Field对象(torchtext的一种数据类型)
    # Field类将普通文本转为tensor
    # see: https://torchtext.readthedocs.io/en/latest/data.html#field
    
    # In[5]:
    
    
    SRC = Field(tokenize=tokenize_de,
                init_token='<sos>',
                eos_token='<eos>',
                lower=True)
    
    TRG = Field(tokenize=tokenize_en,
                init_token='<sos>',
                eos_token='<eos>',
                lower=True)
    
    
    # 加载数据
    
    # In[7]:
    
    
    # Create dataset objects for splits of the Multi30k dataset.
    train_data, valid_data, test_data = Multi30k.splits(exts=('.de', '.en'), fields=(SRC, TRG))
    
    
    # 建立词表
    
    # In[8]:
    
    
    SRC.build_vocab(train_data, min_freq=2)  #min_freq:最小频率
    TRG.build_vocab(train_data, min_freq=2)
    
    
    # 定义设备
    
    # In[6]:
    
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    
    # 创建迭代器
    
    # In[9]:
    
    
    BATCH_SIZE = 128
    train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
        (train_data, valid_data, test_data),
        batch_size=BATCH_SIZE,
        device=device)
    
    
    # 查看数据size
    # 
    # 可以看到经过padding之后,一个batch内的句子,长度都是相同的,不同的batch内的句子长度不一定相同。
    
    # In[11]:
    
    
    for i, it in enumerate(iter(train_iterator)):
        if i > 10:
            break
        src = it.src  # German
        trg = it.trg  # English
        print(src.shape, trg.shape)
        # torch.Size([seq_len,batch_size])
    
    
    # 查看数据形式
    
    # In[10]:
    
    
    batch_idx = 0
    data = next(iter(train_iterator))
    for idx in data.src[:, batch_idx].cpu().numpy():
        print(SRC.vocab.itos[idx], end=' ')
    
    print()
    for idx in data.trg[:, batch_idx].cpu().numpy():
        print(TRG.vocab.itos[idx], end=' ')
    
    
    # 创建 Seq2Seq Model
    # 
    # 这里Encoder采用单层双向GRU
    
    # In[12]:
    
    
    class Encoder(nn.Module):
        def __init__(self, input_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout):
            super().__init__()
            self.embedding = nn.Embedding(input_dim, emb_dim)
            # single layer, bi-direction GRU
            self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True)
            self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)
            self.dropout = nn.Dropout(dropout)
    
        def forward(self, src):
            '''
            :param src: [src_len, batch_size]
            :return:
            '''
    
            src = src.transpose(0, 1)  # src = [batch_size, src_len]
            # embedded = [src_len, batch_size, emb_dim]
            embedded = self.dropout(self.embedding(src)).transpose(0, 1)
    
            # enc_output = [src_len, batch_size, hid_dim*num_directions]
            # enc_hidden = [n_layers * num_directions, batch_size, hid_dim]
            enc_output, enc_hidden = self.rnn(embedded)  # if h_0 is not given, it will be set 0 acquiescently
    
            # enc_hidden is stacked [forward_1, backward_1, forward_2, backward_2, ...]
            # enc_output are always from the last layer
    
            # enc_hidden [-2, :, : ] is the last of the forwards RNN
            # enc_hidden [-1, :, : ] is the last of the backwards RNN
    
            # initial decoder hidden is final hidden state of the forwards and backwards
            # encoder RNNs fed through a linear layer
            # concatenate the hidden_state of last two layers
            # s = [batch_size, dec_hid_dim]
            s = torch.tanh(self.fc(torch.cat((enc_hidden[-2, :, :], enc_hidden[-1, :, :]), dim=1)))
    
            return enc_output, s
    
    
    # In[13]:
    
    
    class Attention(nn.Module):
        def __init__(self, enc_hid_dim, dec_hid_dim):
            super().__init__()
            # [size(h_t)+size(s_{t-1}), dec_hid_dim]
            self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim, bias=False)
            self.v = nn.Linear(dec_hid_dim, 1, bias=False)
    
        def forward(self, s, enc_output):
            # s = [batch_size, dec_hid_dim]
            # enc_output = [src_len, batch_size, enc_hid_dim * 2]
    
            batch_size = enc_output.shape[1]
            src_len = enc_output.shape[0]
    
            # repeat decoder hidden state src_len times
            # s = [batch_size, src_len, enc_hid_dim * 2]
            # enc_output = [batch_size, src_len, enc_hid_dim * 2]
            s = s.unsqueeze(1).repeat(1, src_len, 1)
            enc_output = enc_output.transpose(0, 1)
    
            # energy = [batch_size, src_len, dec_hid_dim]
            energy = torch.tanh(self.attn(torch.cat((s, enc_output), dim=2)))
    
            # attention = [batch_size, src_len]
            attention = self.v(energy).squeeze(2)
    
            return F.softmax(attention, dim=1)
    
    
    # Seq2Seq Model
    
    # In[14]:
    
    
    class Seq2Seq(nn.Module):
        def __init__(self, encoder, decoder, device):
            super().__init__()
            self.encoder = encoder
            self.decoder = decoder
            self.device = device
    
        def forward(self, src, trg, teacher_forcing_ratio=0.5):
            # src = [src_len, batch_size]
            # trg = [trg_len, batch_size]
            # teacher_forcing_ratio is probability to use teacher forcing (scheduled sampling)
            batch_size = src.shape[1]
            trg_len = trg.shape[0]
            trg_vocab_size = self.decoder.output_dim
    
            # tensor to store decoder outputs
            outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
    
            # enc_output is all hidden states of the input sequence, back and forwards
            # s is the final forward and backward hidden states, passed through a linear layer
            # enc_output : [src_len, batch_size, enc_hid_dim * 2]
            # s : [batch_size, dec_hid_dim]
            enc_output, s = self.encoder(src)
    
            # first input to the decoder is the <sos> tokens
            dec_input = trg[0, :]
    
            for t in range(1, trg_len):
                # insert dec_input token embedding, previous hidden state and all encoder hidden states
                # receive output tensor (predictions) and new hidden state
                dec_output, s = self.decoder(dec_input, s, enc_output)
    
                # place predictions in a tensor holding predictions for each token
                outputs[t] = dec_output
    
                # decide if we are going to use teacher forcing or not
                teacher_force = random.random() < teacher_forcing_ratio
    
                # get the highest predicted token from our predictions
                top1 = dec_output.argmax(1)
    
                # if teacher forcing, use actural next token as input
                # if not, use predicted token
                dec_input = trg[t] if teacher_force else top1
    
            return outputs
    
    
    # Decoder
    
    # In[15]:
    
    
    class Decoder(nn.Module):
        def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):
            super().__init__()
            self.output_dim = output_dim
            self.attention = attention
            self.embedding = nn.Embedding(output_dim, emb_dim)
            self.rnn = nn.GRU(enc_hid_dim * 2 + emb_dim, dec_hid_dim)
            self.fc_out = nn.Linear(enc_hid_dim * 2 + dec_hid_dim + emb_dim, output_dim)
            self.dropout = nn.Dropout(dropout)
    
        def forward(self, dec_input, s, enc_output):
            # dec_input = [batch_size]
            # s = [batch_size, dec_hid_dim]
            # enc_output = [src_len, batch_size, enc_hid_dim *2]
    
            # dec_input = [batch_size,1]
            dec_input = dec_input.unsqueeze(1)
    
            # embedded = [1, batch_size, emb_dim]
            embedded = self.dropout(self.embedding(dec_input)).transpose(0, 1)
    
            # s = [batch_size, dec_hid_dim]
            # enc_output = [src_len, batch_size, enc_hid_dim *2]
    
            # a = [batch_size, 1, src_len]
            a = self.attention(s, enc_output).unsqueeze(1)
    
            # enc_output = [batch_size, src_len, enc_hid_dim * 2]
            enc_output = enc_output.transpose(0, 1)
    
            # c = [1, batch_size, enc_hid_dim * 2]
            c = torch.bmm(a, enc_output).transpose(0, 1)
            # torch.bmm: Performs a batch matrix-matrix product of matrices stored in input and mat2
    
            # rnn_input = [1, batch_size, (enc_hid_dim*2) + emb_dim]
            rnn_input = torch.cat((embedded, c), dim=2)
    
            # dec_output = [src_len(=1), batch_size, dec_hid_dim]
            # dec_hidden = [n_layers*num_directions, batch_size, dec_hid_dim]
            dec_output, dec_hidden = self.rnn(rnn_input, s.unsqueeze(0))
    
            # embedded = [batch_size, emb_dim]
            # dec_output = [batch_size, dec_hid_dim]
            # c = [batch_size, enc_hid_dim * 2]
            embedded = embedded.squeeze(0)
            dec_output = dec_output.squeeze(0)
            c = c.squeeze(0)
    
            # pred = [batch_size, output_dim]
            pred = self.fc_out(torch.cat((dec_output, c, embedded), dim=1))
    
            return pred, dec_hidden.squeeze(0)
    
    
    # 定义模型
    
    # In[16]:
    
    
    INPUT_DIM = len(SRC.vocab)
    OUTPUT_DIM = len(TRG.vocab)
    ENC_EMB_DIM = 256
    DEC_EMB_DIM = 256
    ENC_HID_DIM = 512
    DEC_HID_DIM = 512
    ENC_DROPOUT = 0.5
    DEC_DROPOUT = 0.5
    
    attn = Attention(ENC_HID_DIM, DEC_HID_DIM)
    enc = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)
    dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn)
    
    model = Seq2Seq(enc, dec, device).to(device)
    TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
    criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    
    
    # 定义训练函数
    
    # In[18]:
    
    
    def train(model, iterator, optimizer, criterion):
        model.train()
        epoch_loss = 0
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg  # [trg_len, batch_size]
    
            # pred = [trg_len, batch_size, pred_dim]
            pred = model(src, trg)
            pred_dim = pred.shape[-1]
    
            # trg = [(trg_len - 1)*batch_size]
            # pred = [(trg_len - 1)*batch_size]
            trg = trg[1:].view(-1)
            pred = pred[1:].view(-1, pred_dim)
    
            loss = criterion(pred, trg)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
    
        return epoch_loss / len(iterator)
    
    
    # 定义评估函数
    
    # In[19]:
    
    
    def evaluate(model, iterator, criterion):
        model.eval()
        epoch_loss = 0
        with torch.no_grad():
            for i, batch in enumerate(iterator):
                src = batch.src
                trg = batch.trg # trg = [trg_len, batch_size]
    
                # output = [trg_len, batch_size, output_dim]
                output = model(src, trg, 0) # turn off teacher forcing
    
                output_dim = output.shape[-1]
    
                # trg = [(trg_len - 1) * batch_size]
                # output = [(trg_len - 1) * batch_size, output_dim]
                output = output[1:].view(-1, output_dim)
                trg = trg[1:].view(-1)
    
                loss = criterion(output, trg)
                epoch_loss += loss.item()
    
        return epoch_loss / len(iterator)
    
    
    # 定义一个时间函数
    # 
    
    # In[20]:
    
    
    def epoch_time(start_time, end_time):
        elapsed_time = end_time - start_time
        elapsed_mins = int(elapsed_time / 60)
        elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
        return elapsed_mins, elapsed_secs
    
    
    # 训练
    
    # In[21]:
    
    
    best_valid_loss = float('inf')
    
    for epoch in range(10):
        start_time = time.time()
    
        train_loss = train(model, train_iterator, optimizer, criterion)
        valid_loss = evaluate(model, valid_iterator, criterion)
    
        end_time = time.time()
    
        epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), 'tut3-model.pt')
    
        print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
        print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
        print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')
    
    
    # 保存模型与测试
    
    # In[ ]:
    
    
    model.load_state_dict(torch.load('tut3-model.pt'))
    
    test_loss = evaluate(model, test_iterator, criterion)
    
    print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')
    
    

    相关文章

      网友评论

          本文标题:Seq2Seq(Attention)的PyTorch实现

          本文链接:https://www.haomeiwen.com/subject/qvztjltx.html