Cartpole

作者: 从此不迷茫 | 来源:发表于2021-09-24 08:47 被阅读0次
    #!/usr/bin/env python3
    import gym
    from collections import namedtuple
    import numpy as np
    from tensorboardX import SummaryWriter
    
    import torch
    import torch.nn as nn
    import torch.optim as optim
    # tensorboard --logdir=
    
    HIDDEN_SIZE = 128
    BATCH_SIZE = 16
    PERCENTILE = 70
    
    
    class Net(nn.Module):
        def __init__(self, obs_size, hidden_size, n_actions):
            # 总结:所有放在构造函数__init__里面的层的都是这个模型的“固有属性”.
            super(Net, self).__init__()  # 第一句话,super调用父类的构造函数  =super().__init__()  # 第一句话,super调用父类的构造函数
            '''
            通过Sequential来包装层
    即将几个层包装在一起作为一个大的层(块),前面的一篇文章详细介绍了Sequential类的使用,包括常见的三种方式,以及每一种方式的优缺点,
    参见:https://blog.csdn.net/qq_27825451/article/details/90551513
            '''
            self.net = nn.Sequential(
                nn.Linear(obs_size, hidden_size),  # PyTorch的nn.Linear()是用于设置网络中的全连接层的,需要注意在二维图像处理的任务中,全连接层的输入与输出一般都设置为二维张量,形状通常为[batch_size, size],不同于卷积层要求输入输出是四维张量。
                nn.ReLU(),
                nn.Linear(hidden_size, n_actions)
            )
            # 此处定义为函数中的新定义,不是继承的
            # view()的作用相当于numpy中的reshape,重新定义矩阵的形状
            '''
            class Fruit():
                def __init__(self, color, shape):
                    self.color = color
                    self.shape = shape
    
            class Apple(Fruit):
                def __init__(self, color, shape, taste):
                    Fruit.__init__(self, color, shape) # 等价于super().__init__(color, shape)
                    self.taste = taste
        
                def feature(self):
                    print("Apple's color is {}, shape is {} and taste {}".format(
                        self.color, self.shape, self.taste))
    原文链接:https://blog.csdn.net/w1301100424/article/details/93858890
            '''
    
        def forward(self, x):
            return self.net(x)
    # 3)forward方法是必须要重写的,它是实现模型的功能,实现各个层之间的连接关系的核心。
    
    '''
    因为元组的局限性:不能为元组内部的数据进行命名,所以往往我们并不知道一个元组所要表达的意义,
    所以在这里引入了 collections.namedtuple 这个工厂函数,来构造一个带字段名的元组。
    '''
    
    Episode = namedtuple('Episode', field_names=['reward', 'steps'])
    EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])
    '''
    # 两种方法来给 namedtuple 定义方法名
    #User = collections.namedtuple('User', ['name', 'age', 'id'])
    User = collections.namedtuple('User', 'name age id')
    user = User('tester', '22', '464643123')
    '''
    
    def iterate_batches(env, net, batch_size):  # 接受环境(来自Gym库的Env实例)、神经网络、以及每次迭代时应该生成的episode数量
        batch = []  # batch变量用于累积batch(一个Episode实例列表)
        episode_reward = 0.0  # 奖励计数器
        episode_steps = []  #
        obs = env.reset()  # 重新设定环境,获得第一个观察并创建softmax层,用于将网络输出装换成动作的概率分布
        sm = nn.Softmax(dim=1)
        '''
        def softmax(x):
            exp_x = np.exp(x)
            sum_exp_x = np.sum(exp_x) 
            y = exp_x/sum_exp_x
            return y
        改进:解决溢出问题
        def softmax(a):
            c = np.max(a)
            exp_a = np.exp(a - c) # 溢出对策
            sum_exp_a = np.sum(exp_a)
            y = exp_a / sum_exp_a
            return y
        softmax函数的输出是0.0到1.0之间的实数。并且,softmax函数的输出值的总和是1。
        输出总和为1是softmax函数的一个重要性质。正因为有了这个性质,我们才可以把softmax函数的输出解释为“概率”。
        
        所以,当nn.Softmax的输入是一个二维张量时,其参数dim = 0,是让列之和为1;dim = 1,是让行之和为1。
        '''
        while True:  # 进行环境循环
            obs_v = torch.FloatTensor([obs])  # 将观察值(在CartPole中是一个四个数字的向量,即cart_pos,cart_v,pole_angle,pole_v))转换成1*4的张量,这里用单一元素列表传递观察实现
            act_probs_v = sm(net(obs_v))  # 这里没有在网络中使用非线性特性,他将输出原始动作分值,此分值需要softmax函数提供 ,net = Net(obs_size, HIDDEN_SIZE, n_actions),此处obs_v相当于网络输入的x
            act_probs = act_probs_v.data.numpy()[0]   # 这里的网络和softmax层都返回能够跟踪梯度变化的张量,因此需要通过访问tensor.data字段,然后将张量转换为Numpy数组将其解包。
            # 这个数组具有和输入相同的二维结构,batch维度在0轴上,因此需要得到第一个batch元素,获得动作概率的一个一维向量
    
            action = np.random.choice(len(act_probs), p=act_probs)  # 根据已有的动作的概率分布,获得当前步骤采取的动作,通过使用Numpy.choice()函数对该分布进行采样实现,得到0~len(act_probs)-1整数列表
            next_obs, reward, is_done, _ = env.step(action)   # 之后,把这个动作传递到环境中,获得下一个观察、奖励以及episode是否结束的提示,step()是执行动作的方法
            episode_reward += reward  # 更新
            episode_steps.append(EpisodeStep(observation=obs, action=action))  # episode列表扩展了一个(用于选择动作的观察,动作)对
            if is_done:
                batch.append(Episode(reward=episode_reward, steps=episode_steps))  # 将最终的episode附加到batch中,保存总奖励和采取的步骤,Episode是具名元组
                episode_reward = 0.0  # 重置总奖励累加器并清理步骤列表
                episode_steps = []
                next_obs = env.reset()  # 充值环境重新开始
                if len(batch) == batch_size:
                    yield batch  # 如果batch已经达到所需的episode数量,使用yield函数将其返回给调用者进行处理,返回具有不同的稍好一些(所期望)的行为
                    batch = []  # 清理batch
            obs = next_obs   # 非常重要的一步是将从环境中获得的观察分配给当前的观察变量
    # 这个函数逻辑中需要理解的一个非常重要的事实是,这里的网络训练和episode的生成是同时进行的。
    '''
    到这里你可能就明白yield和return的关系和区别了,带yield的函数是一个生成器,而不是一个函数了,这个生成器有一个函数就是next函数,next就相当于“下一步”生成哪个数,
    这一次的next开始的地方是接着上一次的next停止的地方执行的,所以调用next的时候,生成器并不会从foo函数的开始执行,只是接着上一步停止的地方开始,然后遇到yield后,return出要生成的数,此步就结束。
    原文链接:https://blog.csdn.net/mieleizhi0522/article/details/82142856
    '''
    def filter_batch(batch, percentile):
        rewards = list(map(lambda s: s.reward, batch))
        '''map() 会根据提供的函数对指定序列做映射。map(function, iterable, ...) ,第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。
        lambda (匿名函数):示例:add = lambda x,y:x+y  print(add(3,4))-》7
        '''
        reward_bound = np.percentile(rewards, percentile)
        # np.percentile(a, q, axis=None, out=None, overwrite_input=False, interpolation='linear', keepdims=False)
        # 作用:找到一组数的分位数值,如四分位数等(具体什么位置根据自己定义),注意实际百分位数计算方式
        reward_mean = float(np.mean(rewards))
    
        # 这个函数是交叉熵方法的核心:他从给定batch中的episode和百分位数中计算出一个边界奖励,用于筛选“精华”episode进行训练。为获得边界奖励,
        # 使用Numpy的百分位数函数,他从一组值列表和期望的百分位数中计算该百分位数对应的值。然后计算平均奖励,用于监控。
    
        train_obs = []
        train_act = []
        for example in batch:
            if example.reward < reward_bound:
                continue
            train_obs.extend(map(lambda step: step.observation, example.steps))  # 将example中的steps观察值列表扩展到train_obs中
            # extend() 函数用于在列表末尾一次性追加另一个序列中的多个值(用新列表扩展原来的列表)。
            train_act.extend(map(lambda step: step.action, example.steps))
        # 然后筛选episode。对于batch中每个episode,这里将检查该episode的总奖励是否高于边界,若是则填写要观察和行动的列表用于训练。
    
        train_obs_v = torch.FloatTensor(train_obs)
        train_act_v = torch.LongTensor(train_act)
        return train_obs_v, train_act_v, reward_bound, reward_mean
        # 该函数最后一步,需要把“精华”episode中的观察和动作转换为张量,并返回一个四元组:观察、动作、奖励边界和平均奖励。
        # 最后两个值仅用于将他们写入TensorBoard以检查智能体性能。
    
    # 最后一部分代码将所有函数结合一起,训练循环组成如下:
    if __name__ == "__main__":
        env = gym.make("CartPole-v0")
        # env = gym.wrappers.Monitor(env, directory="mon", force=True)
        obs_size = env.observation_space.shape[0]
        # env.observation_space是Box属性,box(可能是无界的)在n维空间中。一个box代表n维封闭区间的笛卡尔积。
        # 假设集合A={a,b},集合B={0,1,2},则两个集合的笛卡尔积为{(a,0),(a,1),(a,2),(b,0),(b,1),(b,2)}。
        n_actions = env.action_space.n
    
        net = Net(obs_size, HIDDEN_SIZE, n_actions)  # HIDDEN_SIZE = 128,返回一个net,可输入参数为x
        objective = nn.CrossEntropyLoss()
        optimizer = optim.Adam(params=net.parameters(), lr=0.01)
        # 深度学习的优化算法Adam
        # torch.optim is a package implementing various optimization algorithms. Most commonly used methods are already supported,
        # and the interface is general enough, so that more sophisticated ones can be also easily integrated in the future.
        writer = SummaryWriter(comment="-cartpole")
        # 首先,创建所有必须的对象:环境、神经网络、目标函数、优化器、TensorBoard的摘要编写器。注释行创建一个监视器以写入智能体程序性能的视频。
    
        for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):  # BATCH_SIZE = 16,enumerate返回索引和值
            '''
            for循环遍历的原理就是迭代,in后面必须是可迭代对象. iterate_batches()函数里面有yield()函数,自动变成可迭代对象
            '''
            obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)  # PERCENTILE = 70
            optimizer.zero_grad()
            action_scores_v = net(obs_v)
            loss_v = objective(action_scores_v, acts_v)
            loss_v.backward()
            optimizer.step()
        # 在训练循环中,迭代batch(一个episode对象的列表),然后使用filter_batch函数筛选“精华”episode。其结果就是观察和采取行动的变量,用于筛选的奖励边界和平均奖励。
        # 之后,将网络的梯度归零,并将观察传递给网络,获得其动作分值。这些分值被传递给目标函数,目标函数计算网络输出和智能体所采取的动作之间的交叉熵。这样做可以增强网络,
        # 以执行哪些可以带来良好奖励的“精华:动作。然后,计算损失梯度,并要求优化器调整网络。
    
    
            print("%d: loss=%.3f, reward_mean=%.1f, reward_bound=%.1f" % (
                iter_no, loss_v.item(), reward_m, reward_b))
            writer.add_scalar("loss", loss_v.item(), iter_no)
            writer.add_scalar("reward_bound", reward_b, iter_no)
            writer.add_scalar("reward_mean", reward_m, iter_no)
            # 循环其余部分是监控进度,在控制台上,显示迭代次数、损失、batch的平均奖励和奖励边界。这里还将相同的值写入TensorBoard,以获得一个漂亮的智能体学习性能图。
            if reward_m > 199:
                print("Solved!")
                break
        writer.close()
        # 训练最后一次检查是比较该batch中episode的平均奖励。若该平均奖励数值超过199时,就停止训练。为什么是199?在Gym中,当最后100个episode的平均奖励大于195时,
        # 此Cart Pole环境可以考虑为被解决完了,交叉熵方法收敛的非常快,以至于通常只需要100个episode。经过适当训练的智能体可以无限长时间的保持棍子平衡(获得任何数量的
        # 分数),但Cart Pole中的episode长度限制为200步(Cart Pole环境中,Time limit包装器,它会停止200步后的episode)考虑到此问题,这将在batch中的平均奖励大于199
        # 之后停止训练,者可以很好的表明智能体已经知道如何像一个专业者一样平衡棍子。
    
    

    相关文章

      网友评论

          本文标题:Cartpole

          本文链接:https://www.haomeiwen.com/subject/hpgagltx.html