美文网首页
Seq2Seq in PyTorch

Seq2Seq in PyTorch

作者: Jarkata | 来源:发表于2021-05-14 01:48 被阅读0次

本文为转载,原文链接:https://wmathor.com/index.php/archives/1448/

本文介绍一下如何使用 PyTorch 复现 Seq2Seq,实现简单的机器翻译应用,请先简单阅读论文 Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation(2014),了解清楚 Seq2Seq 结构是什么样的,之后再阅读本篇文章,可达到事半功倍的效果

我看了很多 Seq2Seq 网络结构图,感觉 PyTorch 官方提供的这个图是最好理解的


首先,从上面的图可以很明显的看出,Seq2Seq 需要对三个变量进行操作,这和之前我接触到的所有网络结构都不一样。我们把 Encoder 的输入称为 enc_input,Decoder 的输入称为 dec_input, Decoder 的输出称为 dec_output。下面以一个具体的例子来说明整个 Seq2Seq 的工作流程

下图是一个由 LSTM 组成的 Encoder 结构,输入的是 "go away" 中的每个字母(包括空格),我们只需要最后一个时刻隐藏状态的信息,即 h_tc_t

然后将 Encoder 输出的 h_tc_t作为 Decoder 初始时刻隐藏状态的输入h_0c_0,如下图所示。同时 Decoder 初始时刻输入层输入的是代表一个句子开始的标志(由用户定义,"<SOS>","\t","S" 等均可,这里以 "\t" 为例),之后得到输出 "m",以及新的隐藏状态 h_1c_1

再将 h_1c_1 和 "m" 作为输入,得到输入 "a",以及新的隐藏状态 h_2c_2

重复上述步骤,直到最终输出句子的结束标志(由用户定义,"<EOS>","\n","E" 等均可,这里以 "\n" 为例)

在 Decoder 部分,大家可能会有以下几个问题,我做下解答

  • 训练过程中,如果 Decoder 停不下来怎么办?即一直不输出句子的终止标志
    • 首先,训练过程中 Decoder 应该要输出多长的句子,这个是已知的,假设当前时刻已经到了句子长度的最后一个字符了,并且预测的不是终止标志,那也没有关系,就此打住,计算 loss 即可
  • 测试过程中,如果 Decoder 停不下来怎么办?例如预测得到 "wasd s w \n sdsw \n..........(一直输出下去)
    • 不会停不下来的,因为测试过程中,Decoder 也会有输入,只不过这个输入是很多个没有意义的占位符,例如很多个 "<pad>"。由于 Decoder 有有限长度的输入,所以 Decoder 一定会有有限长度的输出。那么只需要获取第一个终止标志之前的所有字符即可,对于上面的例子,最终的预测结果为 "wasd s w"
  • Decoder 的输入和输出,即 dec_inputdec_output 有什么关系?
    • 在训练阶段,不论当前时刻 Decoder 输出什么字符,下一时刻 Decoder 都按照原来的 "计划" 进行输入。这叫: Teacher Forcing. 举个例子,假设 dec_input="\twasted",首先输入 "\t" 之后,Decoder 输出的是 "m" 这个字母,记录下来就行了,并不会影响到下一时刻 Decoder 继续输入 "w" 这个字母。
    • 在验证或者测试阶段,Decoder 每一时刻的输出是会影响到输入的,因为在验证或者测试时,网络是看不到结果的,所以它只能循环的进行下去。举个例子,我现在要将英语 "wasted" 翻译为德语 "verschwenden"。那么 Decoder 一开始输入 "\t",得到一个输出,假如是 "m",下一时刻 Decoder 会输入 "m",得到输出,假如是 "a",之后会将 "a" 作为输入,得到输出...... 如此循环往复,直到最终时刻

这里说句题外话,其实我个人觉得 Seq2Seq 与 AutoEncoder 非常相似。

下面开始代码讲解

首先导库,这里我用 'S' 作为开始标志,'E' 作为结束标志,如果输入或者输入过短,我使用 '?' 进行填充

import torch
import numpy as np
import torch.nn as nn
import torch.utils.data as Data


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# S: Symbol that shows starting of decoding input
# E: Symbol that shows ending of decoding output
# ?: Symbol that will fill in blank sequence if current batch data size is short than n_step

定义数据集以及参数。
n_step 保存的是最长单词的长度,其它所有不够这个长度的单词,都会在其后用'?'填充

letter = [c for c in 'SE?abcdefghijklmnopqrstuvwxyz']
letter2idx = {n: i for i,n in enumerate(letter)}

seq_data = [['man','women'],['black','white'],['king','queen'],['girl','boy'],['up','down'],['high','low']]

# Seq2Seq Parameter
n_step = max([max(len(i),len(j)) for i,j in seq_data]) # max_len(=5)
n_hidden = 128
n_class = len(letter2idx) # classification problem
batch_size = 3

下面是对数据进行处理,主要做的是,
首先对单词长度不够的,用 '?' 进行填充;
然后将 Deocder 的输入数据末尾添加终止标志 'E',
Decoder 的输入数据开头添加开始标志 'S',
Decoder 的输出数据末尾添加结束标志 'E',
其实也就如下图所示


def make_data(seq_data):
    enc_input_all, dec_input_all, dec_output_all = [],[],[]

    for seq in seq_data:
        for i in range(2):
            seq[i] = seq[i] + '?' * (n_step - len(seq[i])) # 'man??', 'women'

        enc_input = [letter2idx[n] for n in (seq[0]+'E')] # ['m','a','n','?','?','E']
        dec_input = [letter2idx[n] for n in ('S' + seq[1])] # ['S', 'w', 'o', 'm', 'e', 'n']
        dec_output = [letter2idx[n] for n in (seq[1] + 'E')] # ['w', 'o', 'm', 'e', 'n', 'E']

        # one-hot encode
        enc_input_all.append(np.eye(n_class)[enc_input])
        dec_input_all.append(np.eye(n_class)[dec_input])
        dec_output_all.append(dec_output) # not one-hot

    # make tensor
    return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)

enc_input_all, dec_input_all, dec_output_all = make_data(seq_data)

由于这里有三个数据要返回,所以需要自定义DataSet,继承torch.utils.data.Dataset类
然后实现里面的__len__以及__getitem__方法
RNN API: https://pytorch.org/docs/stable/generated/torch.nn.RNN.html


class TranslateDataSet(Data.Dataset):
    def __init__(self,enc_input_all, dec_input_all, dec_output_all):
        self.enc_input_all = enc_input_all
        self.dec_input_all = dec_input_all
        self.dec_output_all = dec_output_all

    def __len__(self): # return dataset size
        return len(self.enc_input_all)

    def __getitem__(self,idx):
        return self.enc_input_all[idx], self.dec_input_all[idx], self.dec_output_all[idx]

loader = Data.DataLoader(TranslateDataSet(enc_input_all,dec_input_all,dec_output_all),batch_size,True)

下面定义Seq2Seq模型,这里使用简单的RNN作为编码器和解码器。

class Seq2Seq(nn.Module):
    def __init__(self):
        super(Seq2Seq, self).__init__()
        self.encoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
        self.decoder = nn.RNN(input_size=n_class,hidden_size=n_hidden,dropout=0.5)
        self.fc = nn.Linear(n_hidden,n_class)

    def forward(self,enc_input,enc_hidden,dec_input):
        # enc_input(=input_batch): [batch_size, n_step+1, n_class]
        # dec_inpu(=output_batch): [batch_size, n_step+1, n_class]
        enc_input = enc_input.transpose(0,1) # enc_input: [n_step+1,batch_size,n_class]
        dec_input = dec_input.transpose(0,1) # dec_input: [n_step+1,batch_size,n_class]

        # h_t : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
        _, h_t = self.encoder(enc_input,enc_hidden)
        # outputs: [n_step+1, batch_size, num_directions(=1)*n_hidden(=128)]
        outputs, _ = self.decoder(dec_input, h_t)

        model = self.fc(outputs) # model : [n_step+1, batch_size, n_class]
        return model


model = Seq2Seq().to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

训练
由于输出的pred是个三维的数据,所以计算loss需要每个样本单独计算


for epoch in range(5000):
    for enc_input_batch, dec_input_batch, dec_output_batch in loader:
        # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
        # initial hidden_size
        h_0 = torch.zeros(1,batch_size,n_hidden).to(device)

        (enc_input_batch,dec_input_batch,dec_output_batch) = (enc_input_batch.to(device),dec_input_batch.to(device),dec_output_batch.to(device))

        # enc_input_batch : [batch_size, n_step+1, n_class]
        # dec_intput_batch : [batch_size, n_step+1, n_class]
        # dec_output_batch : [batch_size, n_step+1], not one-hot
        pred = model(enc_input_batch,h_0,dec_input_batch)
        # pred : [n_step+1, batch_size, n_class]
        pred = pred.transpose(0,1)
        loss = 0

        # 每一个时间步的loss相加
        for i in range(len(dec_output_batch)):
            # pred[i] : [n_step+1,n_class]
            # dec_output_batch[i] : [n_step+1]
            loss += criterion(pred[i],dec_output_batch[i])
        if (epoch+1)%1000 == 0:
            print('Epoch:', '%04d' % (epoch+1), 'cost=', '{:.6f}'.format(loss))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

测试

def translate(word):
    enc_input, dec_input, _ = make_data([[word, '?'*n_step]])
    enc_input, dec_input = enc_input.to(device),dec_input.to(device)

    # make hidden shape [num_layers*num_directions,batch_size,n_hidden]
    hidden = torch.zeros(1,1,n_hidden).to(device)
    output = model(enc_input,hidden,dec_input)

    # output : [n_step+1, batch_size, n_class]
    predict = output.data.max(2, keepdim=True)[1] # select n_class dimension
    decoded = [letter[i] for i in predict]
    translated = ''.join(decoded[:decoded.index('E')])

    return translated.replace('?','')

print('test')
print('man ->', translate('man'))
print('mans ->', translate('mans'))
print('king ->', translate('king'))
print('black ->', translate('black'))
print('up ->', translate('up'))

完整代码:

# code by Tae Hwan Jung(Jeff Jung) @graykode, modify by wmathor
import torch
import numpy as np
import torch.nn as nn
import torch.utils.data as Data

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# ?: Symbol that will fill in blank sequence if current batch data size is short than n_step

letter = [c for c in 'SE?abcdefghijklmnopqrstuvwxyz']
letter2idx = {n: i for i, n in enumerate(letter)}

seq_data = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]

# Seq2Seq Parameter
n_step = max([max(len(i), len(j)) for i, j in seq_data]) # max_len(=5)
n_hidden = 128
n_class = len(letter2idx) # classfication problem
batch_size = 3

def make_data(seq_data):
    enc_input_all, dec_input_all, dec_output_all = [], [], []

    for seq in seq_data:
        for i in range(2):
            seq[i] = seq[i] + '?' * (n_step - len(seq[i])) # 'man??', 'women'

        enc_input = [letter2idx[n] for n in (seq[0] + 'E')] # ['m', 'a', 'n', '?', '?', 'E']
        dec_input = [letter2idx[n] for n in ('S' + seq[1])] # ['S', 'w', 'o', 'm', 'e', 'n']
        dec_output = [letter2idx[n] for n in (seq[1] + 'E')] # ['w', 'o', 'm', 'e', 'n', 'E']

        enc_input_all.append(np.eye(n_class)[enc_input])
        dec_input_all.append(np.eye(n_class)[dec_input])
        dec_output_all.append(dec_output) # not one-hot

    # make tensor
    return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)

'''
enc_input_all: [6, n_step+1 (because of 'E'), n_class]
dec_input_all: [6, n_step+1 (because of 'S'), n_class]
dec_output_all: [6, n_step+1 (because of 'E')]
'''
enc_input_all, dec_input_all, dec_output_all = make_data(seq_data)

class TranslateDataSet(Data.Dataset):
    def __init__(self, enc_input_all, dec_input_all, dec_output_all):
        self.enc_input_all = enc_input_all
        self.dec_input_all = dec_input_all
        self.dec_output_all = dec_output_all
    
    def __len__(self): # return dataset size
        return len(self.enc_input_all)
    
    def __getitem__(self, idx):
        return self.enc_input_all[idx], self.dec_input_all[idx], self.dec_output_all[idx]

loader = Data.DataLoader(TranslateDataSet(enc_input_all, dec_input_all, dec_output_all), batch_size, True)

# Model
class Seq2Seq(nn.Module):
    def __init__(self):
        super(Seq2Seq, self).__init__()
        self.encoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # encoder
        self.decoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # decoder
        self.fc = nn.Linear(n_hidden, n_class)

    def forward(self, enc_input, enc_hidden, dec_input):
        # enc_input(=input_batch): [batch_size, n_step+1, n_class]
        # dec_inpu(=output_batch): [batch_size, n_step+1, n_class]
        enc_input = enc_input.transpose(0, 1) # enc_input: [n_step+1, batch_size, n_class]
        dec_input = dec_input.transpose(0, 1) # dec_input: [n_step+1, batch_size, n_class]

        # h_t : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
        _, h_t = self.encoder(enc_input, enc_hidden)
        # outputs : [n_step+1, batch_size, num_directions(=1) * n_hidden(=128)]
        outputs, _ = self.decoder(dec_input, h_t)

        model = self.fc(outputs) # model : [n_step+1, batch_size, n_class]
        return model

model = Seq2Seq().to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(5000):
  for enc_input_batch, dec_input_batch, dec_output_batch in loader:
      # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
      h_0 = torch.zeros(1, batch_size, n_hidden).to(device)

      (enc_input_batch, dec_intput_batch, dec_output_batch) = (enc_input_batch.to(device), dec_input_batch.to(device), dec_output_batch.to(device))
      # enc_input_batch : [batch_size, n_step+1, n_class]
      # dec_intput_batch : [batch_size, n_step+1, n_class]
      # dec_output_batch : [batch_size, n_step+1], not one-hot
      pred = model(enc_input_batch, h_0, dec_intput_batch)
      # pred : [n_step+1, batch_size, n_class]
      pred = pred.transpose(0, 1) # [batch_size, n_step+1(=6), n_class]
      loss = 0
      for i in range(len(dec_output_batch)):
          # pred[i] : [n_step+1, n_class]
          # dec_output_batch[i] : [n_step+1]
          loss += criterion(pred[i], dec_output_batch[i])
      if (epoch + 1) % 1000 == 0:
          print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
          
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
    
# Test
def translate(word):
    enc_input, dec_input, _ = make_data([[word, '?' * n_step]])
    enc_input, dec_input = enc_input.to(device), dec_input.to(device)
    # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
    hidden = torch.zeros(1, 1, n_hidden).to(device)
    output = model(enc_input, hidden, dec_input)
    # output : [n_step+1, batch_size, n_class]

    predict = output.data.max(2, keepdim=True)[1] # select n_class dimension
    decoded = [letter[i] for i in predict]
    translated = ''.join(decoded[:decoded.index('E')])

    return translated.replace('?', '')

print('test')
print('man ->', translate('man'))
print('mans ->', translate('mans'))
print('king ->', translate('king'))
print('black ->', translate('black'))
print('up ->', translate('up'))

相关文章

网友评论

      本文标题:Seq2Seq in PyTorch

      本文链接:https://www.haomeiwen.com/subject/ndszdltx.html