#!/usr/bin/env python3
import gym
from collections import namedtuple
import numpy as np
from tensorboardX import SummaryWriter
import torch
import torch.nn as nn
import torch.optim as optim
# tensorboard --logdir=
HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70
class Net(nn.Module):
def __init__(self, obs_size, hidden_size, n_actions):
# 总结:所有放在构造函数__init__里面的层的都是这个模型的“固有属性”.
super(Net, self).__init__() # 第一句话,super调用父类的构造函数 =super().__init__() # 第一句话,super调用父类的构造函数
'''
通过Sequential来包装层
即将几个层包装在一起作为一个大的层(块),前面的一篇文章详细介绍了Sequential类的使用,包括常见的三种方式,以及每一种方式的优缺点,
参见:https://blog.csdn.net/qq_27825451/article/details/90551513
'''
self.net = nn.Sequential(
nn.Linear(obs_size, hidden_size), # PyTorch的nn.Linear()是用于设置网络中的全连接层的,需要注意在二维图像处理的任务中,全连接层的输入与输出一般都设置为二维张量,形状通常为[batch_size, size],不同于卷积层要求输入输出是四维张量。
nn.ReLU(),
nn.Linear(hidden_size, n_actions)
)
# 此处定义为函数中的新定义,不是继承的
# view()的作用相当于numpy中的reshape,重新定义矩阵的形状
'''
class Fruit():
def __init__(self, color, shape):
self.color = color
self.shape = shape
class Apple(Fruit):
def __init__(self, color, shape, taste):
Fruit.__init__(self, color, shape) # 等价于super().__init__(color, shape)
self.taste = taste
def feature(self):
print("Apple's color is {}, shape is {} and taste {}".format(
self.color, self.shape, self.taste))
原文链接:https://blog.csdn.net/w1301100424/article/details/93858890
'''
def forward(self, x):
return self.net(x)
# 3)forward方法是必须要重写的,它是实现模型的功能,实现各个层之间的连接关系的核心。
'''
因为元组的局限性:不能为元组内部的数据进行命名,所以往往我们并不知道一个元组所要表达的意义,
所以在这里引入了 collections.namedtuple 这个工厂函数,来构造一个带字段名的元组。
'''
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])
'''
# 两种方法来给 namedtuple 定义方法名
#User = collections.namedtuple('User', ['name', 'age', 'id'])
User = collections.namedtuple('User', 'name age id')
user = User('tester', '22', '464643123')
'''
def iterate_batches(env, net, batch_size): # 接受环境(来自Gym库的Env实例)、神经网络、以及每次迭代时应该生成的episode数量
batch = [] # batch变量用于累积batch(一个Episode实例列表)
episode_reward = 0.0 # 奖励计数器
episode_steps = [] #
obs = env.reset() # 重新设定环境,获得第一个观察并创建softmax层,用于将网络输出装换成动作的概率分布
sm = nn.Softmax(dim=1)
'''
def softmax(x):
exp_x = np.exp(x)
sum_exp_x = np.sum(exp_x)
y = exp_x/sum_exp_x
return y
改进:解决溢出问题
def softmax(a):
c = np.max(a)
exp_a = np.exp(a - c) # 溢出对策
sum_exp_a = np.sum(exp_a)
y = exp_a / sum_exp_a
return y
softmax函数的输出是0.0到1.0之间的实数。并且,softmax函数的输出值的总和是1。
输出总和为1是softmax函数的一个重要性质。正因为有了这个性质,我们才可以把softmax函数的输出解释为“概率”。
所以,当nn.Softmax的输入是一个二维张量时,其参数dim = 0,是让列之和为1;dim = 1,是让行之和为1。
'''
while True: # 进行环境循环
obs_v = torch.FloatTensor([obs]) # 将观察值(在CartPole中是一个四个数字的向量,即cart_pos,cart_v,pole_angle,pole_v))转换成1*4的张量,这里用单一元素列表传递观察实现
act_probs_v = sm(net(obs_v)) # 这里没有在网络中使用非线性特性,他将输出原始动作分值,此分值需要softmax函数提供 ,net = Net(obs_size, HIDDEN_SIZE, n_actions),此处obs_v相当于网络输入的x
act_probs = act_probs_v.data.numpy()[0] # 这里的网络和softmax层都返回能够跟踪梯度变化的张量,因此需要通过访问tensor.data字段,然后将张量转换为Numpy数组将其解包。
# 这个数组具有和输入相同的二维结构,batch维度在0轴上,因此需要得到第一个batch元素,获得动作概率的一个一维向量
action = np.random.choice(len(act_probs), p=act_probs) # 根据已有的动作的概率分布,获得当前步骤采取的动作,通过使用Numpy.choice()函数对该分布进行采样实现,得到0~len(act_probs)-1整数列表
next_obs, reward, is_done, _ = env.step(action) # 之后,把这个动作传递到环境中,获得下一个观察、奖励以及episode是否结束的提示,step()是执行动作的方法
episode_reward += reward # 更新
episode_steps.append(EpisodeStep(observation=obs, action=action)) # episode列表扩展了一个(用于选择动作的观察,动作)对
if is_done:
batch.append(Episode(reward=episode_reward, steps=episode_steps)) # 将最终的episode附加到batch中,保存总奖励和采取的步骤,Episode是具名元组
episode_reward = 0.0 # 重置总奖励累加器并清理步骤列表
episode_steps = []
next_obs = env.reset() # 充值环境重新开始
if len(batch) == batch_size:
yield batch # 如果batch已经达到所需的episode数量,使用yield函数将其返回给调用者进行处理,返回具有不同的稍好一些(所期望)的行为
batch = [] # 清理batch
obs = next_obs # 非常重要的一步是将从环境中获得的观察分配给当前的观察变量
# 这个函数逻辑中需要理解的一个非常重要的事实是,这里的网络训练和episode的生成是同时进行的。
'''
到这里你可能就明白yield和return的关系和区别了,带yield的函数是一个生成器,而不是一个函数了,这个生成器有一个函数就是next函数,next就相当于“下一步”生成哪个数,
这一次的next开始的地方是接着上一次的next停止的地方执行的,所以调用next的时候,生成器并不会从foo函数的开始执行,只是接着上一步停止的地方开始,然后遇到yield后,return出要生成的数,此步就结束。
原文链接:https://blog.csdn.net/mieleizhi0522/article/details/82142856
'''
def filter_batch(batch, percentile):
rewards = list(map(lambda s: s.reward, batch))
'''map() 会根据提供的函数对指定序列做映射。map(function, iterable, ...) ,第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。
lambda (匿名函数):示例:add = lambda x,y:x+y print(add(3,4))-》7
'''
reward_bound = np.percentile(rewards, percentile)
# np.percentile(a, q, axis=None, out=None, overwrite_input=False, interpolation='linear', keepdims=False)
# 作用:找到一组数的分位数值,如四分位数等(具体什么位置根据自己定义),注意实际百分位数计算方式
reward_mean = float(np.mean(rewards))
# 这个函数是交叉熵方法的核心:他从给定batch中的episode和百分位数中计算出一个边界奖励,用于筛选“精华”episode进行训练。为获得边界奖励,
# 使用Numpy的百分位数函数,他从一组值列表和期望的百分位数中计算该百分位数对应的值。然后计算平均奖励,用于监控。
train_obs = []
train_act = []
for example in batch:
if example.reward < reward_bound:
continue
train_obs.extend(map(lambda step: step.observation, example.steps)) # 将example中的steps观察值列表扩展到train_obs中
# extend() 函数用于在列表末尾一次性追加另一个序列中的多个值(用新列表扩展原来的列表)。
train_act.extend(map(lambda step: step.action, example.steps))
# 然后筛选episode。对于batch中每个episode,这里将检查该episode的总奖励是否高于边界,若是则填写要观察和行动的列表用于训练。
train_obs_v = torch.FloatTensor(train_obs)
train_act_v = torch.LongTensor(train_act)
return train_obs_v, train_act_v, reward_bound, reward_mean
# 该函数最后一步,需要把“精华”episode中的观察和动作转换为张量,并返回一个四元组:观察、动作、奖励边界和平均奖励。
# 最后两个值仅用于将他们写入TensorBoard以检查智能体性能。
# 最后一部分代码将所有函数结合一起,训练循环组成如下:
if __name__ == "__main__":
env = gym.make("CartPole-v0")
# env = gym.wrappers.Monitor(env, directory="mon", force=True)
obs_size = env.observation_space.shape[0]
# env.observation_space是Box属性,box(可能是无界的)在n维空间中。一个box代表n维封闭区间的笛卡尔积。
# 假设集合A={a,b},集合B={0,1,2},则两个集合的笛卡尔积为{(a,0),(a,1),(a,2),(b,0),(b,1),(b,2)}。
n_actions = env.action_space.n
net = Net(obs_size, HIDDEN_SIZE, n_actions) # HIDDEN_SIZE = 128,返回一个net,可输入参数为x
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.01)
# 深度学习的优化算法Adam
# torch.optim is a package implementing various optimization algorithms. Most commonly used methods are already supported,
# and the interface is general enough, so that more sophisticated ones can be also easily integrated in the future.
writer = SummaryWriter(comment="-cartpole")
# 首先,创建所有必须的对象:环境、神经网络、目标函数、优化器、TensorBoard的摘要编写器。注释行创建一个监视器以写入智能体程序性能的视频。
for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)): # BATCH_SIZE = 16,enumerate返回索引和值
'''
for循环遍历的原理就是迭代,in后面必须是可迭代对象. iterate_batches()函数里面有yield()函数,自动变成可迭代对象
'''
obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE) # PERCENTILE = 70
optimizer.zero_grad()
action_scores_v = net(obs_v)
loss_v = objective(action_scores_v, acts_v)
loss_v.backward()
optimizer.step()
# 在训练循环中,迭代batch(一个episode对象的列表),然后使用filter_batch函数筛选“精华”episode。其结果就是观察和采取行动的变量,用于筛选的奖励边界和平均奖励。
# 之后,将网络的梯度归零,并将观察传递给网络,获得其动作分值。这些分值被传递给目标函数,目标函数计算网络输出和智能体所采取的动作之间的交叉熵。这样做可以增强网络,
# 以执行哪些可以带来良好奖励的“精华:动作。然后,计算损失梯度,并要求优化器调整网络。
print("%d: loss=%.3f, reward_mean=%.1f, reward_bound=%.1f" % (
iter_no, loss_v.item(), reward_m, reward_b))
writer.add_scalar("loss", loss_v.item(), iter_no)
writer.add_scalar("reward_bound", reward_b, iter_no)
writer.add_scalar("reward_mean", reward_m, iter_no)
# 循环其余部分是监控进度,在控制台上,显示迭代次数、损失、batch的平均奖励和奖励边界。这里还将相同的值写入TensorBoard,以获得一个漂亮的智能体学习性能图。
if reward_m > 199:
print("Solved!")
break
writer.close()
# 训练最后一次检查是比较该batch中episode的平均奖励。若该平均奖励数值超过199时,就停止训练。为什么是199?在Gym中,当最后100个episode的平均奖励大于195时,
# 此Cart Pole环境可以考虑为被解决完了,交叉熵方法收敛的非常快,以至于通常只需要100个episode。经过适当训练的智能体可以无限长时间的保持棍子平衡(获得任何数量的
# 分数),但Cart Pole中的episode长度限制为200步(Cart Pole环境中,Time limit包装器,它会停止200步后的episode)考虑到此问题,这将在batch中的平均奖励大于199
# 之后停止训练,者可以很好的表明智能体已经知道如何像一个专业者一样平衡棍子。
网友评论