美文网首页
GPT图解:代码记录-GPT

GPT图解:代码记录-GPT

作者: 万州客 | 来源:发表于2024-02-24 09:18 被阅读0次

    一,代码

    import torch # 导入torch
    from transformers import GPT2Tokenizer # 导入GPT2分词器
    from transformers import GPT2LMHeadModel # 导入GPT2语言模型
    model_name = "gpt2"  # 也可以选择其他模型,如"gpt2-medium"、"gpt2-large"等
    tokenizer = GPT2Tokenizer.from_pretrained(model_name) # 加载分词器
    device = "cuda" if torch.cuda.is_available() else "cpu" # 判断是否有可用GPU
    model = GPT2LMHeadModel.from_pretrained(model_name).to(device) # 将模型加载到设备上(CPU或GPU)
    vocab = tokenizer.get_vocab() # 获取词汇表
    
    print("模型信息:", model)
    print("分词器信息:",tokenizer)
    print("词汇表大小:", len(vocab))
    print("部分词汇示例:", (list(vocab.keys())[8000:8005]))
    
    
    from torch.utils.data import Dataset  # 导入Pytorch的Dataset
    # 自定义ChatDataset类,继承自Pytorch的Dataset类
    class ChatDataset(Dataset):
        def __init__(self, file_path, tokenizer, vocab):
            self.tokenizer = tokenizer  # 分词器
            self.vocab = vocab  # 词汇表
            # 加载数据并处理,将处理后的输入数据和目标数据赋值给input_data和target_data
            self.input_data, self.target_data = self.load_and_process_data(file_path)
        # 定义加载和处理数据的方法
        def load_and_process_data(self, file_path):
            with open(file_path, "r") as f: # 读取文件内容
                lines = f.readlines()
            input_data, target_data = [], []
            for i, line in enumerate(lines): # 遍历文件的每一行
                if line.startswith("User:"): # 如以"User:"开头,分词,移除"User: "前缀,并将张量转换为列表
                    tokens = self.tokenizer(line.strip()[6:], return_tensors="pt")["input_ids"].tolist()[0]
                    tokens = tokens + [tokenizer.eos_token_id]  # 添加结束符
                    input_data.append(torch.tensor(tokens, dtype=torch.long)) # 添加到input_data中
                elif line.startswith("AI:"): # 如以"AI:"开头,分词,移除"AI: "前缀,并将张量转换为列表
                    tokens = self.tokenizer(line.strip()[4:], return_tensors="pt")["input_ids"].tolist()[0]
                    tokens = tokens + [tokenizer.eos_token_id]  # 添加结束符
                    target_data.append(torch.tensor(tokens, dtype=torch.long)) # 添加到target_data中
            return input_data, target_data
        # 定义数据集的长度,即input_data的长度
        def __len__(self):
            return len(self.input_data)
        # 定义获取数据集中指定索引的数据的方法
        def __getitem__(self, idx):
            return self.input_data[idx], self.target_data[idx]
    
    file_path = "chat.txt" # 加载chat.txt数据集
    chat_dataset = ChatDataset(file_path, tokenizer, vocab) # 创建ChatDataset对象,传入文件、分词器和词汇表
    for i in range(2): # 打印数据集中前2个数据示例
        input_example, target_example = chat_dataset[i]
        print(f"Example {i + 1}:")
        print("Input:", tokenizer.decode(input_example))
        print("Target:", tokenizer.decode(target_example))
    
    from torch.utils.data import DataLoader  # 导入Dataloader
    
    tokenizer.pad_token = '<pad>'  # 为分词器添加pad token
    tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids('<pad>')
    
    
    # 定义pad_sequence函数,用于将一批序列补齐到相同长度
    def pad_sequence(sequences, padding_value=0, length=None):
        # 计算最大序列长度,如果length参数未提供,则使用输入序列中的最大长度
        max_length = max(len(seq) for seq in sequences) if length is None else length
        # 创建一个具有适当形状的全零张量,用于存储补齐后的序列
        result = torch.full((len(sequences), max_length), padding_value, dtype=torch.long)
        # 遍历序列,将每个序列的内容复制到结果张量中
        for i, seq in enumerate(sequences):
            end = len(seq)
            result[i, :end] = seq[:end]
        return result
    
    
    # 定义collate_fn函数,用于将一个批次的数据整理成适当的形状
    def collate_fn(batch):
        # 从批次中分离源序列和目标序列
        sources, targets = zip(*batch)
        # 计算批次中的最大序列长度
        max_length = max(max(len(s) for s in sources), max(len(t) for t in targets))
        # 使用pad_sequence函数补齐源序列和目标序列
        sources = pad_sequence(sources, padding_value=tokenizer.pad_token_id, length=max_length)
        targets = pad_sequence(targets, padding_value=tokenizer.pad_token_id, length=max_length)
        # 返回补齐后的源序列和目标序列
        return sources, targets
    
    
    # 创建Dataloader
    chat_dataloader = DataLoader(chat_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
    
    # 检查Dataloader输出
    for input_batch, target_batch in chat_dataloader:
        print("Input batch tensor size:", input_batch.size())
        print("Target batch tensor size:", target_batch.size())
        break
    for input_batch, target_batch in chat_dataloader:
        print("Input batch tensor:")
        print(input_batch)
        print("Target batch tensor:")
        print(target_batch)
        break
    
    
    import torch.nn as nn
    import torch.optim as optim
    # 定义损失函数,忽略pad_token_id对应的损失值
    criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
    # 定义优化器
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    # 进行100个epoch的训练
    for epoch in range(500):
        # 遍历数据加载器中的批次
        for batch_idx, (input_batch, target_batch) in enumerate(chat_dataloader):
            optimizer.zero_grad() # 梯度清零
            input_batch, target_batch = input_batch.to(device), target_batch.to(device) # 将输入和目标批次移至设备(CPU或GPU)
            outputs = model(input_batch) # 前向传播
            logits = outputs.logits  # 获取logits
            loss = criterion(logits.view(-1, len(vocab)), target_batch.view(-1)) # 计算损失
            loss.backward() # 反向传播
            optimizer.step()# 更新参数
        if (epoch + 1) % 20 == 0: # 每200个epoch打印一次损失值
            print(f'Epoch: {epoch + 1:04d}, cost = {loss:.6f}')
    
    
    def generate_text_beam_search(model, input_str, max_len=50, beam_width=5):
        model.eval()  # 将模型设置为评估模式(不计算梯度)
        # 对输入字符串进行编码,并将其转换为 PyTorch 张量,然后将其移动到相应的设备上(例如 GPU)
        input_tokens = tokenizer.encode(input_str, return_tensors="pt").to(device)
        # 初始化候选序列列表,包含当前输入序列和其对数概率得分(我们从0开始)
        candidates = [(input_tokens, 0.0)]
        # 禁用梯度计算,以加速预测过程
        with torch.no_grad():
            # 迭代生成最大长度的序列
            for _ in range(max_len):
                new_candidates = []
                # 对于每个候选序列
                for candidate, candidate_score in candidates:
                    # 使用模型进行预测
                    outputs = model(candidate)
                    # 获取输出 logits
                    logits = outputs.logits[:, -1, :]
                    # 获取对数概率得分的 top-k 值(即 beam_width)及其对应的 token
                    scores, next_tokens = torch.topk(logits, beam_width, dim=-1)
                    final_results = []
                    # 遍历 top-k token 及其对应的得分
                    for score, next_token in zip(scores.squeeze(), next_tokens.squeeze()):
                        # 在当前候选序列中添加新的 token
                        new_candidate = torch.cat((candidate, next_token.unsqueeze(0).unsqueeze(0)), dim=-1)
                        # 更新候选序列的得分
                        new_score = candidate_score - score.item()
                        # 如果新的 token 是结束符(eos_token),则将该候选序列添加到最终结果中
                        if next_token.item() == tokenizer.eos_token_id:
                            final_results.append((new_candidate, new_score))
                        # 否则,将新的候选序列添加到新候选序列列表中
                        else:
                            new_candidates.append((new_candidate, new_score))
                # 从新候选序列列表中选择得分最高的 top-k 个序列
                candidates = sorted(new_candidates, key=lambda x: x[1])[:beam_width]
        # 选择得分最高的候选序列
        best_candidate, _ = sorted(candidates, key=lambda x: x[1])[0]
        # 将输出 token 转换回文本字符串
        output_str = tokenizer.decode(best_candidate[0])
        # 移除输入字符串并修复空格问题
        input_len = len(tokenizer.encode(input_str))
        output_str = tokenizer.decode(best_candidate.squeeze()[input_len:])
        return output_str
    
    test_inputs = [
        "what is the weather like today?",
        "hi, how are you?",
        "can you recommend a good book?"
    ]
    
    for i, input_str in enumerate(test_inputs, start=1):
        generated_text = generate_text_beam_search(model, input_str)
        print(f"测试 {i}:")
        print(f"User: {input_str}")
        print(f"AI: {generated_text}")
    
    test_inputs = [
        "what is the weather like today?",
        "hi , how are you?",
        "can you recommend a good book?"
    ]
    
    for i, input_str in enumerate(test_inputs, start=1):
        generated_text = generate_text_beam_search(model, input_str)
        print(f"测试 {i}:")
        print(f"User: {input_str}")
        print(f"AI: {generated_text}")
        print()
    
    
    
    
    import numpy as np
    import torch
    import torch.nn as nn # 导入torch.nn库
    d_k = 64 # K(=Q)维度
    d_v = 64 # V维度
    # 定义缩放点积注意力类
    class ScaledDotProductAttention(nn.Module):
        def __init__(self):
            super(ScaledDotProductAttention, self).__init__()
        def forward(self, Q, K, V, attn_mask):
            # Q K V [batch_size, n_heads, len_q/k/v, dim_q=k/v] (dim_q=dim_k)
            # 计算注意力分数(原始权重)[batch_size,n_heads,len_q,len_k]
            scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
            # 使用注意力掩码,将attn_mask中值为1的位置的权重替换为极小值
            # attn_mask [batch_size,n_heads,len_q,len_k],形状和scores相同
            scores.masked_fill_(attn_mask.bool(), -1e9)
            # 对注意力分数进行softmax
            weights = nn.Softmax(dim=-1)(scores)
            # 计算上下文向量(也就是注意力的输出), 是上下文信息的紧凑表示
            context = torch.matmul(weights, V)
            return context, weights # 返回上下文向量和注意力分数
    
    # 定义多头注意力类
    d_embedding = 512  # Embedding Size
    n_heads = 8  # number of heads in Multi-Head Attention
    batch_size = 3 # 每一批数据量
    class MultiHeadAttention(nn.Module):
        def __init__(self):
            super(MultiHeadAttention, self).__init__()
            self.W_Q = nn.Linear(d_embedding, d_k * n_heads) # Q的线性变换层
            self.W_K = nn.Linear(d_embedding, d_k * n_heads) # K的线性变换层
            self.W_V = nn.Linear(d_embedding, d_v * n_heads) # V的线性变换层
            self.linear = nn.Linear(n_heads * d_v, d_embedding)
            self.layer_norm = nn.LayerNorm(d_embedding)
    
        def forward(self, Q, K, V, attn_mask):
            # Q K V [batch_size,len_q/k/v,embedding_dim]
            residual, batch_size = Q, Q.size(0) # 保留残差连接
            # 将输入进行线性变换和重塑,以便后续处理
            # q_s k_s v_s: [batch_size,n_heads.,len_q/k/v,d_q=k/v]
            q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)
            k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)
            v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)
            # 将注意力掩码复制到多头 [batch_size,n_heads,len_q,len_k]
            attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
            # 使用缩放点积注意力计算上下文和注意力权重
            context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
            # 重塑上下文向量并进行线性变换,[batch_size,len_q,n_heads * dim_v]
            context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
            output = self.linear(context)
            # 与输入(Q)进行残差链接,并进行层归一化后输出[batch_size, len_q, embedding_dim]
            output = self.layer_norm(output + residual)
            return output, weights # 返回层归一化的输出和注意力权重
    
    
    # 定义逐位置前向传播网络类
    class PoswiseFeedForwardNet(nn.Module):
        def __init__(self):
            super(PoswiseFeedForwardNet, self).__init__()
            # 定义一维卷积层1,用于将输入映射到更高维度
            self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=2048, kernel_size=1)
            # 定义一维卷积层2,用于将输入映射回原始维度
            self.conv2 = nn.Conv1d(in_channels=2048, out_channels=d_embedding, kernel_size=1)
            # 定义层归一化
            self.layer_norm = nn.LayerNorm(d_embedding)
    
        def forward(self, inputs):
            # inputs: [batch_size, len_q, embedding_dim]
            residual = inputs  # 保留残差连接
            # 在卷积层1后使用ReLU激活函数
            output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))
            # 使用卷积层2进行降维
            output = self.conv2(output).transpose(1, 2)
            # 与输入进行残差链接,并进行层归一化,[batch_size, len_q, embedding_dim]
            output = self.layer_norm(output + residual)
            return output # 返回层归一化后的输出加上残差连接的结果
    
    
    import numpy as np
    def get_sin_enc_table(n_position, embedding_dim):
        # 根据位置和维度信息,初始化正弦位置编码表
        sinusoid_table = np.zeros((n_position, embedding_dim))
        # 遍历所有位置和维度,计算角度值
        for pos_i in range(n_position):
            for hid_j in range(embedding_dim):
                angle = pos_i / np.power(10000, 2 * (hid_j // 2) / embedding_dim)
                sinusoid_table[pos_i, hid_j] = angle
        # 计算正弦和余弦值
        sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # dim 2i 偶数维
        sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # dim 2i+1 奇数维
        return torch.FloatTensor(sinusoid_table)
    
    # 生成填充注意力掩码的函数,用于在多头自注意力计算中忽略填充部分
    def get_attn_pad_mask(seq_q, seq_k):
        batch_size, len_q = seq_q.size()
        batch_size, len_k = seq_k.size()
        # 生成布尔类型张量[batch_size,1,len_k(=len_q)]
        pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  #<PAD> Token的编码值为0
        # 变形为何注意力分数相同形状的张量 [batch_size,len_q,len_k]
        pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)
        return pad_attn_mask # 形状[batch_size,len_q,len_k]
    
    
    # 生成后续注意力掩码的函数,用于在多头自注意力计算中忽略未来信息
    def get_attn_subsequent_mask(seq):
        # 获取输入序列的形状 [batch_size, seq_len(len_q), seq_len(len_k)]
        attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
        # 使用numpy创建一个上三角矩阵(triu = triangle upper)
        subsequent_mask = np.triu(np.ones(attn_shape), k=1)
        # 将numpy数组转换为PyTorch张量,并将数据类型设置为byte(布尔值)
        subsequent_mask = torch.from_numpy(subsequent_mask).byte()
        return subsequent_mask # [batch_size, seq_len(len_q), seq_len(len_k)]
    
    
    # 构建解码器层
    class DecoderLayer(nn.Module):
        def __init__(self):
            super(DecoderLayer, self).__init__()
            self.self_attn = MultiHeadAttention()  # 多头自注意力层
            self.feed_forward = PoswiseFeedForwardNet()  # 位置前馈神经网络层
            self.norm1 = nn.LayerNorm(d_embedding)  # 第一个层归一化
            self.norm2 = nn.LayerNorm(d_embedding)  # 第二个层归一化
    
        def forward(self, dec_inputs, attn_mask=None):
            # 使用多头自注意力处理输入
            attn_output, _ = self.self_attn(dec_inputs, dec_inputs, dec_inputs, attn_mask)
            # 将注意力输出与输入相加并进行第一个层归一化
            norm1_outputs = self.norm1(dec_inputs + attn_output)
            # 将归一化后的输出输入到位置前馈神经网络
            ff_outputs = self.feed_forward(norm1_outputs)
            # 将前馈神经网络输出与第一次归一化后的输出相加并进行第二个层归一化
            dec_outputs = self.norm2(norm1_outputs + ff_outputs)
            return dec_outputs
    
    
    # 构建解码器
    n_layers = 6  # 设置Encoder/Decoder的层数
    device = "cuda" if torch.cuda.is_available() else "cpu"  # 设置设备
    class Decoder(nn.Module):
        def __init__(self, vocab_size, max_seq_len):
            super(Decoder, self).__init__()
            self.src_emb = nn.Embedding(vocab_size, d_embedding)  # 词嵌入层(参数为词典维度)
            self.pos_emb = nn.Embedding(max_seq_len, d_embedding)  # 位置编码层(参数为序列长度)
            self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)]) # 初始化N个解码器层
    
        def forward(self, dec_inputs):
            positions = torch.arange(len(dec_inputs), device=dec_inputs.device).unsqueeze(-1) #位置信息
            inputs_embedding = self.src_emb(dec_inputs) + self.pos_emb(positions) # 词嵌入与位置编码相加
            attn_mask = get_attn_subsequent_mask(inputs_embedding).to(device) # 生成自注意力掩码
            for layer in self.layers:
                dec_outputs = layer(inputs_embedding, attn_mask) # 将输入数据传递给解码器层
            return dec_outputs
    
    
    class GPT(nn.Module):
        def __init__(self, vocab_size, max_seq_len):
            super(GPT, self).__init__()
            self.decoder = Decoder(vocab_size, max_seq_len) # 解码器,用于学习文本生成能力
            self.projection = nn.Linear(d_embedding, vocab_size)  # 全连接层,输出预测结果
    
        def forward(self, dec_inputs):
            dec_outputs = self.decoder(dec_inputs) # 将输入数据传递给解码器
            logits = self.projection(dec_outputs) # 传递给全连接层以生成预测
            return logits #返回预测结果
    
    
    from torchtext.datasets import WikiText2 # 导入WikiText2
    from torchtext.data.utils import get_tokenizer # 导入Tokenizer分词工具
    from torchtext.vocab import build_vocab_from_iterator # 导入Vocabulary工具
    from torch.utils.data import DataLoader, Dataset # 导入Pytorch的DataLoader和Dataset
    
    tokenizer = get_tokenizer("basic_english") # 定义数据预处理所需的tokenizer
    
    train_iter = WikiText2(split='train') # 加载WikiText2数据集的训练部分
    valid_iter = WikiText2(split='valid') # 加载WikiText2数据集的验证部分
    
    # 定义一个生成器函数,用于将数据集中的文本转换为tokens
    def yield_tokens(data_iter):
        for item in data_iter:
            yield tokenizer(item)
    
    # 创建词汇表,包括特殊tokens:"<pad>", "<sos>", "<eos>"
    vocab = build_vocab_from_iterator(yield_tokens(train_iter), max_tokens=3000,
                                      specials=["<pad>", "<sos>", "<eos>"])
    vocab.set_default_index(vocab["<pad>"])
    
    # 打印词汇表信息
    print("词汇表大小:", len(vocab))
    print("词汇示例(word to index):",
          {word: vocab[word] for word in ["<pad>", "<sos>", "<eos>", "the", "apple"]})
    
    # 构建pytorch数据集
    max_seq_len = 256
    class WikiDataset(Dataset):
        def __init__(self, data_iter, vocab, max_len = max_seq_len):
            self.data = []
            i = 0
            for sentence in data_iter:
                i += 1
                tokens = tokenizer(sentence)[:max_len-2]
                tokens = [vocab['<sos>']] + vocab(tokens) + [vocab['<eos>']]
                self.data.append(tokens)
                if i > 5000:
                    break
    
        def __len__(self):
            return len(self.data)
    
        def __getitem__(self, idx):
            source = self.data[idx][:-1]
            target = self.data[idx][1:]
            return torch.tensor(source), torch.tensor(target)
    
    
    train_dataset = WikiDataset(train_iter, vocab)
    valid_dataset = WikiDataset(valid_iter, vocab)
    print('dataset数据条目: ', len(train_dataset))
    sample_source, sample_target = train_dataset[100]
    print('输入序列张量样例: ', sample_source)
    decoded_source = ' '.join(vocab.lookup_tokens(sample_source.tolist()))
    print('输入序列样例文本: ', decoded_source)
    
    from torch.utils.data import DataLoader # 导入Dataloader
    # 定义pad_sequence函数,用于将一批序列补齐到相同长度
    def pad_sequence(sequences, padding_value=0, length=None):
        # 计算最大序列长度,如果length参数未提供,则使用输入序列中的最大长度
        max_length = max(len(seq) for seq in sequences) if length is None else length
        # 创建一个具有适当形状的全零张量,用于存储补齐后的序列
        result = torch.full((len(sequences), max_length), padding_value, dtype=torch.long)
        # 遍历序列,将每个序列的内容复制到结果张量中
        for i, seq in enumerate(sequences):
            end = len(seq)
            result[i, :end] = seq[:end]
        return result
    
    # 定义collate_fn函数,用于将一个批次的数据整理成适当的形状
    def collate_fn(batch):
        # 从批次中分离源序列和目标序列
        sources, targets = zip(*batch)
        # 计算批次中的最大序列长度
        max_length = max(max(len(s) for s in sources), max(len(t) for t in targets))
        # 使用pad_sequence函数补齐源序列和目标序列
        sources = pad_sequence(sources, padding_value=vocab["<pad>"], length=max_length)
        targets = pad_sequence(targets, padding_value=vocab["<pad>"], length=max_length)
        # 返回补齐后的源序列和目标序列
        return sources, targets
    
    # 创建一个训练数据加载器,使用自定义的collate_fn函数
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size,
                                  shuffle=True, collate_fn=collate_fn)
    # 创建一个验证数据加载器,使用自定义的collate_fn函数
    valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size,
                                  shuffle=False, collate_fn=collate_fn)
    
    
    import torch.optim as optim  # 导入优化器
    device = "cuda" if torch.cuda.is_available() else "cpu"  # 设置设备
    model = GPT(len(vocab), max_seq_len).to(device)  # 创建GPT模型实例
    criterion = nn.CrossEntropyLoss(ignore_index=vocab["<pad>"])
    optimizer = optim.Adam(model.parameters(), lr=0.001)  # 优化器
    epochs = 2  # 训练轮次
    
    for epoch in range(epochs):
        epoch_loss = 0
        for batch_idx, (source, target) in enumerate(train_dataloader): # 用Dataloader加载数据
            inputs, targets = source.to(device), target.to(device)
            optimizer.zero_grad()  # 梯度清零
            outputs = model(inputs)  # 获取模型输出
            loss = criterion(outputs.view(-1, len(vocab)), targets.view(-1))  # 计算损失
            loss.backward()  # 反向传播
            optimizer.step()  # 更新参数
            epoch_loss += loss.item()
            if (batch_idx + 1) % 500 == 0: # 每500个批次打印一次损失
                print(f"Batch {batch_idx + 1}/{len(train_dataloader)}, Loss: {loss.item()}")
        epoch_loss /= len(train_dataloader) # 每轮打印一次损失
        print(f"Epoch {epoch + 1}/{epochs}, Average Loss: {epoch_loss}")
    
    
    # Save the trained model
    model_file_name = "gpt_wiki_model.pt"
    torch.save(model.state_dict(), model_file_name)
    print(f"Model saved as {model_file_name}")
    
    
    # Replace 'model_timestamp.pt' with your saved model's filename
    model.load_state_dict(torch.load(model_file_name))
    # 测试文本生成
    def generate_text_greedy_search(model, input_str, max_len=50):
        model.eval()  # 将模型设置为评估(测试)模式,关闭dropout和batch normalization等训练相关的层
        # 将输入字符串中的每个Token 转换为其在词汇表中的索引
        input_tokens = [vocab[token] for token in input_str.split()]
        # 创建一个新列表,将输入的Token复制到输出Token中,目前只有输入的词
        output_tokens = input_tokens.copy()
        with torch.no_grad():  # 禁用梯度计算,以节省内存并加速测试过程
            for _ in range(max_len):  # 生成最多max_len个Token
                # 将输出token转换为 PyTorch张量,并增加一个代表批次的维度[1, len(output_tokens)]
                inputs = torch.LongTensor(output_tokens).unsqueeze(0).to(device)
                outputs = model(inputs) # 输出 logits形状为[1, len(output_tokens), vocab_size]
                logits = outputs[:, -1, :] # 只关心最后一个时间步(即最新生成的token)的logits
                # 在最后一个维度上获取logits中的最大值,并返回其索引(即下一个Token)
                _, next_token = torch.max(logits, dim=-1)
                next_token = next_token.item() # 将张量转换为Python整数
                if next_token == vocab["<eos>"]:
                    break # 如果生成的Token是 EOS(结束符),则停止生成过程
                output_tokens.append(next_token) # 将生成的Token添加到output_tokens列表
        # 将输出Token转换回文本字符串
        output_str = " ".join([vocab.get_itos()[token] for token in output_tokens
                               if vocab.get_itos()[token] != "<pad>" and vocab.get_itos()[token] != "<unk>" ])
        return output_str
    
    input_str = "how are you" # 输入一个词:Python
    generated_text = generate_text_greedy_search(model, input_str) # 模型跟着这个字生成后续文本
    print("生成的文本:", generated_text) # 打印预测文本
    
    import numpy as np
    import torch
    import torch.nn as nn
    
    d_k = 64 # K(=Q)维度
    d_v = 64 # V维度
    
    # 定义缩放点积注意力表
    class ScaledDotProductAttention(nn.Module):
        def __init__(self):
            super(ScaledDotProductAttention, self).__init__()
    
        def forward(self, Q, K, V, attn_mask):
            scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
            scores.masked_fill(attn_mask.bool(), -1e9)
            weights = nn.Softmax(dim=-1)(scores)
            context = torch.matmul(weights, V)
            return context, weights
    
    # 定义多头自注意力类
    d_embedding = 512 # Embedding的维度
    n_heads = 8 # Multi-Head Attention中头的个数
    batch_size = 3 # 每一批的数据大小
    
    class MultiHeadAttention(nn.Module):
        def __init__(self):
            super(MultiHeadAttention, self).__init__()
            self.W_Q = nn.Linear(d_embedding, d_k * n_heads)
            self.W_K = nn.Linear(d_embedding, d_k * n_heads)
            self.W_V = nn.Linear(d_embedding, d_v * n_heads)
            self.linear = nn.Linear(n_heads*d_v, d_embedding)
            self.layer_norm = nn.LayerNorm(d_embedding)
    
        def forward(self, Q, K, V, attn_mask):
            residual, batch_size = Q, Q.size(0)
            q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
            k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
            v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1, 2)
            attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
            context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
            context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
            output = self.linear(context)
            output = self.layer_norm(output + residual)
            return output, residual
    
    # 定义逐位置前馈网络类
    class PoswiseFeedForwardNet(nn.Module):
        def __init__(self, d_ff=2048):
            super(PoswiseFeedForwardNet, self).__init__()
            self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=d_ff, kernel_size=1)
            self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_embedding, kernel_size=1)
            self.layer_norm = nn.LayerNorm(d_embedding)
    
        def forward(self, inputs):
            residual = inputs
            output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))
            output = self.conv2(output).transpose(1, 2)
            output = self.layer_norm(output + residual)
            return output
    
    # 定义正弦位置编码表的函数,用于在Transformer中引入位置信息
    def get_sin_enc_table(n_position, embedding_dim):
        sinusoid_table = np.zeros((n_position, embedding_dim))
        for pos_i in range(n_position):
            for hid_j in range(embedding_dim):
                angle = pos_i / np.power(10000, 2*(hid_j//2)/embedding_dim)
                sinusoid_table[pos_i, hid_j] = angle
        sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])
        sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])
        return torch.FloatTensor(sinusoid_table)
    
    # 定义填充注意力掩码函数
    def get_attn_pad_mask(seq_q, seq_k):
        batch_size, len_q = seq_q.size()
        batch_size, len_k = seq_k.size()
        pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
        pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)
        return pad_attn_mask
    
    
    # 生成后续注意力掩码的函数, 用于在多头自注意力计算中忽略未来信息
    def get_attn_subsequent_mask(seq):
        attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
        subsequent_mask = np.triu(np.ones(attn_shape), k=1)
        subsequent_mask = torch.from_numpy(subsequent_mask).byte()
        return subsequent_mask
    
    # 定义解码器层类
    class DecoderLayer(nn.Module):
        def __init__(self):
            super(DecoderLayer, self).__init__()
            self.self_attn = MultiHeadAttention()
            self.feed_forward = PoswiseFeedForwardNet()
            self.norm1 = nn.LayerNorm(d_embedding)
            self.norm2 = nn.LayerNorm(d_embedding)
    
        def forward(self, dec_inputs, attn_mask=None):
            attn_output, _ = self.self_attn(dec_inputs, dec_inputs, dec_inputs, attn_mask)
            norm1_output = self.norm1(dec_inputs + attn_output)
            ff_outputs = self.feed_forward(norm1_output)
            dec_outputs = self.norm2(norm1_output + ff_outputs)
            return dec_outputs
    
    # 定义解码器类
    n_layers = 6
    class Decoder(nn.Module):
        def __init__(self, vocab_size, max_seq_len):
            super(Decoder, self).__init__()
            self.src_emb = nn.Embedding(vocab_size, d_embedding)
            self.pos_emb = nn.Embedding(max_seq_len, d_embedding)
            self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])
    
        def forward(self, dec_inputs):
            positions = torch.arange(len(dec_inputs), device=dec_inputs.device).unsqueeze(-1)
            inputs_embedding = self.src_emb(dec_inputs) + self.pos_emb(positions)
            attn_mask = get_attn_subsequent_mask(inputs_embedding).to(device)
            dec_outputs = inputs_embedding
            for layer in self.layers:
                dec_outputs = layer(dec_outputs, attn_mask)
            return dec_outputs
    
    # 定义GPT模型
    class GPT(nn.Module):
        def __init__(self, vocab_size, max_seq_len):
            super(GPT, self).__init__()
            self.decoder = Decoder(vocab_size, max_seq_len)
            self.projection = nn.Linear(d_embedding, vocab_size)
    
        def forward(self, dec_inputs):
            dec_outputs = self.decoder(dec_inputs)
            logits = self.projection(dec_outputs)
            return logits
    # 构建语料库
    from collections import Counter
    class LanguageCorpus:
        def __init__(self, sentences):
            self.sentencs = sentences
            self.seq_len = max([len(sentence.split()) for sentence in sentences]) + 2
            self.vocab = self.create_vocabulary()
            self.idx2word = {v: k for k, v in self.vocab.items()}
    
        def create_vocabulary(self):
            vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
            counter = Counter()
            for sentence in self.sentencs:
                words = sentence.split()
                counter.update(words)
            for word in counter:
                if word not in vocab:
                    vocab[word] = len(vocab)
            return vocab
    
        def make_batch(self, batch_size, test_batch=False):
            input_batch, output_batch = [], []
            sentence_indices = torch.randperm(len(self.sentencs))[:batch_size]
            for index in sentence_indices:
                sentence = self.sentencs[index]
                seq = [self.vocab['<sos>']] + [self.vocab[word] for word in sentence.split()] + [self.vocab['<eos>']]
                seq += [self.vocab['<pad>']] * (self.seq_len-len(seq))
                input_batch.append(seq[:-1])
                output_batch.append(seq[1:])
            return torch.LongTensor(input_batch), torch.LongTensor(output_batch)
    
    with open("lang.txt", "r") as file:
        sentences = [line.strip() for line in file.readlines()]
    corpus = LanguageCorpus(sentences)
    vocab_size = len(corpus.vocab)
    max_seq_len = corpus.seq_len
    print(f"语料库词汇表大小: {vocab_size}")
    print(f"最长句子长度:{max_seq_len}")
    
    # GTP模型训练过程
    import torch.optim as optim
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = GPT(vocab_size, max_seq_len).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    epochs = 500
    for epoch in range(epochs):
        optimizer.zero_grad()
        inputs, targets = corpus.make_batch(batch_size)
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
        if (epoch+1) % 100 == 0:
            print(f'Epoch: {epoch+1:04d} cost={loss:.6f}')
        loss.backward()
        optimizer.step()
    
    def generate_text(model, input_str, max_len=50):
        model.eval()
        input_tokens = [corpus.vocab[token] for token in input_str]
        output_tokens = input_tokens.copy()
        with torch.no_grad():
            for _ in range(max_len):
                inputs = torch.LongTensor(output_tokens).unsqueeze(0).to(device)
                outputs = model(inputs)
                _, next_token = torch.max(outputs[:,-1,:], dim=1)
                next_token = next_token.item()
                if next_token == corpus.vocab['<eos>']:
                    break
                output_tokens.append(next_token)
        output_str = ' '.join([corpus.idx2word[token] for token in output_tokens])
        return output_str
    
    input_str = ['Python']
    generate_text = generate_text(model, input_str)
    print('生成的文本: ', generate_text)
    
    
    
    import torch
    from torchtext.datasets import WikiText2
    from torchtext.data.utils import get_tokenizer
    from torchtext.vocab import build_vocab_from_iterator
    from torch.utils.data import DataLoader, Dataset
    
    import numpy as np
    import torch
    import torch.nn as nn
    
    d_k = 64 # K(=Q)维度
    d_v = 64 # V维度
    
    # 定义缩放点积注意力表
    class ScaledDotProductAttention(nn.Module):
        def __init__(self):
            super(ScaledDotProductAttention, self).__init__()
    
        def forward(self, Q, K, V, attn_mask):
            scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
            scores.masked_fill(attn_mask.bool(), -1e9)
            weights = nn.Softmax(dim=-1)(scores)
            context = torch.matmul(weights, V)
            return context, weights
    
    # 定义多头自注意力类
    d_embedding = 512 # Embedding的维度
    n_heads = 8 # Multi-Head Attention中头的个数
    batch_size = 3 # 每一批的数据大小
    
    class MultiHeadAttention(nn.Module):
        def __init__(self):
            super(MultiHeadAttention, self).__init__()
            self.W_Q = nn.Linear(d_embedding, d_k * n_heads)
            self.W_K = nn.Linear(d_embedding, d_k * n_heads)
            self.W_V = nn.Linear(d_embedding, d_v * n_heads)
            self.linear = nn.Linear(n_heads*d_v, d_embedding)
            self.layer_norm = nn.LayerNorm(d_embedding)
    
        def forward(self, Q, K, V, attn_mask):
            residual, batch_size = Q, Q.size(0)
            q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
            k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
            v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1, 2)
            attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
            context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
            context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
            output = self.linear(context)
            output = self.layer_norm(output + residual)
            return output, residual
    
    # 定义逐位置前馈网络类
    class PoswiseFeedForwardNet(nn.Module):
        def __init__(self, d_ff=2048):
            super(PoswiseFeedForwardNet, self).__init__()
            self.conv1 = nn.Conv1d(in_channels=d_embedding, out_channels=d_ff, kernel_size=1)
            self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_embedding, kernel_size=1)
            self.layer_norm = nn.LayerNorm(d_embedding)
    
        def forward(self, inputs):
            residual = inputs
            output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))
            output = self.conv2(output).transpose(1, 2)
            output = self.layer_norm(output + residual)
            return output
    
    # 定义正弦位置编码表的函数,用于在Transformer中引入位置信息
    def get_sin_enc_table(n_position, embedding_dim):
        sinusoid_table = np.zeros((n_position, embedding_dim))
        for pos_i in range(n_position):
            for hid_j in range(embedding_dim):
                angle = pos_i / np.power(10000, 2*(hid_j//2)/embedding_dim)
                sinusoid_table[pos_i, hid_j] = angle
        sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])
        sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])
        return torch.FloatTensor(sinusoid_table)
    
    # 定义填充注意力掩码函数
    def get_attn_pad_mask(seq_q, seq_k):
        batch_size, len_q = seq_q.size()
        batch_size, len_k = seq_k.size()
        pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
        pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)
        return pad_attn_mask
    
    
    # 生成后续注意力掩码的函数, 用于在多头自注意力计算中忽略未来信息
    def get_attn_subsequent_mask(seq):
        attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
        subsequent_mask = np.triu(np.ones(attn_shape), k=1)
        subsequent_mask = torch.from_numpy(subsequent_mask).byte()
        return subsequent_mask
    
    # 定义解码器层类
    class DecoderLayer(nn.Module):
        def __init__(self):
            super(DecoderLayer, self).__init__()
            self.self_attn = MultiHeadAttention()
            self.feed_forward = PoswiseFeedForwardNet()
            self.norm1 = nn.LayerNorm(d_embedding)
            self.norm2 = nn.LayerNorm(d_embedding)
    
        def forward(self, dec_inputs, attn_mask=None):
            attn_output, _ = self.self_attn(dec_inputs, dec_inputs, dec_inputs, attn_mask)
            norm1_output = self.norm1(dec_inputs + attn_output)
            ff_outputs = self.feed_forward(norm1_output)
            dec_outputs = self.norm2(norm1_output + ff_outputs)
            return dec_outputs
    
    # 定义解码器类
    n_layers = 6
    class Decoder(nn.Module):
        def __init__(self, vocab_size, max_seq_len):
            super(Decoder, self).__init__()
            self.src_emb = nn.Embedding(vocab_size, d_embedding)
            self.pos_emb = nn.Embedding(max_seq_len, d_embedding)
            self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])
    
        def forward(self, dec_inputs):
            positions = torch.arange(len(dec_inputs), device=dec_inputs.device).unsqueeze(-1)
            inputs_embedding = self.src_emb(dec_inputs) + self.pos_emb(positions)
            attn_mask = get_attn_subsequent_mask(inputs_embedding).to(device)
            dec_outputs = inputs_embedding
            for layer in self.layers:
                dec_outputs = layer(dec_outputs, attn_mask)
            return dec_outputs
    
    # 定义GPT模型
    class GPT(nn.Module):
        def __init__(self, vocab_size, max_seq_len):
            super(GPT, self).__init__()
            self.decoder = Decoder(vocab_size, max_seq_len)
            self.projection = nn.Linear(d_embedding, vocab_size)
    
        def forward(self, dec_inputs):
            dec_outputs = self.decoder(dec_inputs)
            logits = self.projection(dec_outputs)
            return logits
    
    # 下载语料库,构建词汇表
    tokenizer = get_tokenizer("basic_english")
    train_iter = WikiText2(split='train')
    valid_iter = WikiText2(split='valid')
    
    def yield_tokens(data_iter):
        for item in data_iter:
            yield tokenizer(item)
    
    vocab = build_vocab_from_iterator(yield_tokens(train_iter),
                                      specials=['<pad>', '<sos>', '<eos>'])
    vocab.set_default_index(vocab['pad'])
    print(len(vocab))
    print({word: vocab[word] for word in ['<pad>', '<sos>', '<eos>', 'the', 'apple']})
    
    # 构建pytorch数据集
    max_seq_len = 256
    class WikiDataset(Dataset):
        def __init__(self, data_iter, vocab, max_len = max_seq_len):
            self.data = []
            for sentence in data_iter:
                tokens = tokenizer(sentence)[:max_len-2]
                tokens = [vocab['<sos>']] + vocab(tokens) + [vocab['<eos>']]
                self.data.append(tokens)
    
        def __len__(self):
            return len(self.data)
    
        def __getitem__(self, idx):
            source = self.data[idx][:-1]
            target = self.data[idx][1:]
            return torch.tensor(source), torch.tensor(target)
    
    train_dataset = WikiDataset(train_iter, vocab)
    valid_dataset = WikiDataset(valid_iter, vocab)
    print('dataset数据条目: ', len(train_dataset))
    sample_source, sample_target = train_dataset[100]
    print('输入序列张量样例: ', sample_source)
    decoded_source = ' '.join(vocab.lookup_tokens(sample_source.tolist()))
    print('输入序列样例文本: ', decoded_source)
    
    # 构建dataloader类
    def pad_sequence(sequences, padding_value=0, length=None):
        max_length = max(len(seq) for seq in sequences) if length is not None else length
        result = torch.full((len(sequences), max_length), padding_value, dtype=torch.long)
        for i, seq in enumerate(sequences):
            end = len(seq)
            result[i, :end] = seq[:end]
        return result
    
    def collate_fn(batch):
        sources, targets = zip(*batch)
        max_length = max(max(len(s) for s in sources), max(len(t) for t in targets))
        sources = pad_sequence(sources, padding_value=vocab['<pad>'], length=max_length)
        targets = pad_sequence(targets, padding_value=vocab['<pad>'], length=max_length)
        return sources, targets
    
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    
    import os
    import torch.optim as optim
    
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = GPT(len(vocab), max_seq_len).to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=vocab['<pad>'])
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    epochs = 2
    
    min_valid_loss = float('inf')
    save_path = 'best_model.pth'
    
    for epoch in range(epochs):
        epoch_loss = 0
        for batch_idx, (source, target) in enumerate(train_dataloader):
            inputs, targets = source.to(device), target.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.view(-1, len(vocab)), targets.view(-1))
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            if (batch_idx+1) % 1000 == 0:
                print(f'Batch {batch_idx + 1}/{len(train_dataloader)}, loss: {loss.item()}')
        epoch_loss /= len(train_dataloader)
        print(f'Epoch {epoch + 1}/{epochs}, Average Loss: {epoch_loss}')
    
        model.eval()
        valid_loss = 0
        with torch.no_grad():
            for source, target in valid_dataloader:
                inputs, targets = source.to(device), target.to(device)
                outputs = model(inputs)
                loss = criterion(outputs.view(-1, len(vocab)), targets.view(-1))
                valid_loss += loss.item()
            valid_loss /= len(valid_dataloader)
            print(f'Epoch {epoch+1} / {epochs}, Validation Loss: {valid_loss}')
            if valid_loss < min_valid_loss:
                min_valid_loss = valid_loss
                torch.save(model.state_dict(), save_path)
                print(f'New best model saved at epoch {epoch +1} with Validation Loss: {valid_loss}')
            model.train()
    
    
    

    二,训练文本

    Python is a popular programming language that is often considered easy to learn, 
    flexible, and (importantly) free to use on most computers. 
    Python is used extensively in science and in industry.
     One reason is that Python has a very strong set of add-on libraries that let you use it for all kinds of tasks including data analysis, 
     statistical modeling, developing applications for the web, running experiments, programming video games, making desktop apps, and programming robots.
     In this class, we will use Python as our analysis and programming language. 
     Teaching the full scope of programming in Python is beyond the scope of the class, 
     especially if you have no prior programming experience. However, 
     this chapter aims give you enough of what you need to do for most of the types of data analysis we will be doing in this lab course. 
     There are many additional tutorials and learning resources on the class homepage.
    
    In addition, both the univeristy in general as well as the department are offering courses on introductory programming using Python. 
    Thus, don��t let this class be the only or last exposure to these tools.
     Consider it the beginning of learning!
     
     This chapters gives you an overview of most the language features in Python you can expect to encounter in this course. 
     Bookmark this chapter for an easy reminder of the most important use cases, 
     but also when you are starting out it can make sense to step through these elements one by one.
     Python is a programming language that is frequently used for software development, web development, management automation, data analytics, and graphical representation of data.
     Because it��s quite simple to learn, 
     it��s becoming popular with non-technical individuals as well (people working in fields such as academia, finance, and bookkeeping). 
     It has become vital in the field of data science, especially when it comes to ML, statistics, 3D tools, and difficult calculations, 
     to have excellent analytical skills.
    

    相关文章

      网友评论

          本文标题:GPT图解:代码记录-GPT

          本文链接:https://www.haomeiwen.com/subject/xcsundtx.html