美文网首页
LunarLander-v2 in reinforcement

LunarLander-v2 in reinforcement

作者: Rain师兄 | 来源:发表于2021-10-06 17:40 被阅读0次

    这篇文章讲的是ppo算法,训练lunarlander。

    (

    关于这个环境:火箭轨迹优化在最优控制中是一个经典的主题

    根据Pontryagin's的最大值原则,最好将发动机全油门点火或将其关闭,这就是为什么这个环境有离散行为是可行的。

    着陆垫的坐标总是(0,0),坐标是状态向量中的前两个数字,从屏幕顶部移动到着陆垫并且零速度的奖励约为 100..140 分。

    如果着陆器原理这个landing pad就会损失奖励,如果着陆器坠毁或休息,获得额外的 -100 或 +100 分,

    每条腿与地面接触+10 分。每帧启动主引擎是 -0.3 点。每帧触发副引擎是 -0.03 点。解决是200分。

    可以在着陆垫外着陆。燃料是无限的,所以代理可以学习飞行然后着陆

    在它的第一次尝试中。详情请查看源代码。

    )

    在这个教程最后,为了学习控制任何离散的游戏环节,你将知道如何将一个基于策略的学习方法应用到演员评论家框架里。

    这篇文章使用了openai gym的环境,但是你可以使用任何其他的游戏环境,只需要确定这个环境支持openai的API就行,如果您想为其他环境调整代码,只需确保您的输入和输出是正确的。

    Running the LunarLander-v2 Environment

    在这个环境里面:

    Action space (Discrete): 0-Do nothing, 1-Fire left engine, 2-Fire down engine, 3-Fire right engine

    Proximal Policy Optimization (PPO)

    PPO算法在2017年被openai团队引入,并且快速的成为最受欢迎的强化学习方法之一,使得其他所有的强化学习方法在那一刻被放在一边

    PPO涉及到收集一小批与环境交互的经验并且使用这些batch来更新做决定的策略,一旦策略用batch更新后,这些经验就被抛弃,然后新的batch通过新的策略被收集。

    这就是为什么它是一个on-policy的方法,收集的经验样本仅用于更新当前政策。

    主要思想是,在一次更新后,新的策略与旧策略的差距不会太大。

    这会以一些偏差为代价减少训练中的差异,但确保更顺畅的训练,并确保代理不会走上不可恢复的路径,采取无意义的行动。因此,让我们继续将我们的代理分解为更多细节,看看它如何定义和更新其策略。

    The Actor-Critic model's structure

    PPO对agent用的是Actor-Critic approach。

    这意味着什么?意味着,使用了两个模型,一个叫做actor,一个叫做,critic。

    The Actor model

    Actor模型是用来学习在一个环境的特定观测状态下要采取什么动作。在lunarlander-v2里,它接受这个游戏的拥有八个值的列表作为输入。

    这代表了当前的rocket的状态,输出是点火点哪个引擎。

    Custom PPO loss

    这是PPO算法,所以让我们理解一下损失函数。

    行为的概率和old_概率表明策略由我们的actor神经网络模型定义。通过训练这个模型,我们想要改善这些概率,以便于随着时间的流逝给予我们愈来越好的action.

    现在在一些强化学习方法中主要的问题是,一旦我们的模型采取了一个坏的策略,它在游戏中只会采取bad action,所以从那以后我们无法产生任何好的action ,只会让我们走无法挽回的下坡路。PPO试图解决依靠对模型做小的更新步,因此使得训练过程稳定。

    The Critic model

    我们将actor预测得出的action发送给我们的环境,并且观测在游戏中发生了什么,如果我们的action导致了一些积极的事情发生了,比如spaceship landing了。

    然后环境就发送一个正的反馈仪奖励的形式,但是如果我们的spaceship fall 了,我们将接受一个负的奖励,这些奖励在训练critic模型的时候接受。

    import gym

    import random

    env = gym.make("LunarLander-v2")

    def Random_games():

    # Each of this episode is its own game.
    
    for episode in range(10):
    
        env.reset()
    
        # this is each frame, up to 500...but we wont make it that far with random.
    
        while True:
    
            # This will display the environment
    
            # Only display if you really want to see it.
    
            # Takes much longer to display it.
    
            env.render()
    
            
    
            # This will just create a sample action in any environment.
    
            # In this environment, the action can be any of one how in list on 4, for example [0 1 0 0]
    
            action = env.action_space.sample()
    
    
    
            # this executes the environment with an action, 
    
            # and returns the observation of the environment, 
    
            # the reward, if the env is over, and other info.
    
            next_state, reward, done, info = env.step(action)
    
            
    
            # lets print everything in one line:
    
            print(next_state, reward, done, info, action)
    
            if done:
    
                break
    

    Random_games()

    section 1

    class Actor_Model:

    def __init__(self, input_shape, action_space, lr, optimizer):
    
        X_input = Input(input_shape)
    
        self.action_space = action_space
    
    
    
        X = Dense(512, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X_input)
    
        X = Dense(256, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X)
    
        X = Dense(64, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X)
    
        output = Dense(self.action_space, activation="softmax")(X)
    
    
    
        self.Actor = Model(inputs = X_input, outputs = output)
    
        self.Actor.compile(loss=self.ppo_loss, optimizer=optimizer(lr=lr))
    
    
    
    def ppo_loss(self, y_true, y_pred):
    
        # Defined in https://arxiv.org/abs/1707.06347
    
        advantages, prediction_picks, actions = y_true[:, :1], y_true[:, 1:1+self.action_space], y_true[:, 1+self.action_space:]
    
        LOSS_CLIPPING = 0.2
    
        ENTROPY_LOSS = 0.001
    
        
    
        prob = actions * y_pred
    
        old_prob = actions * prediction_picks
    
    
    
        prob = K.clip(prob, 1e-10, 1.0)
    
        old_prob = K.clip(old_prob, 1e-10, 1.0)
    
    
    
        ratio = K.exp(K.log(prob) - K.log(old_prob))
    
        
    
        p1 = ratio * advantages
    
        p2 = K.clip(ratio, min_value=1 - LOSS_CLIPPING, max_value=1 + LOSS_CLIPPING) * advantages
    
    
    
        actor_loss = -K.mean(K.minimum(p1, p2))
    
    
    
        entropy = -(y_pred * K.log(y_pred + 1e-10))
    
        entropy = ENTROPY_LOSS * K.mean(entropy)
    
        
    
        total_loss = actor_loss - entropy
    
    
    
        return total_loss
    
    
    
    def predict(self, state):
    
        return self.Actor.predict(state)
    

    section 2

    class Critic_Model:

    def __init__(self, input_shape, action_space, lr, optimizer):
    
        X_input = Input(input_shape)
    
        old_values = Input(shape=(1,))
    
    
    
        V = Dense(512, activation="relu", kernel_initializer='he_uniform')(X_input)
    
        V = Dense(256, activation="relu", kernel_initializer='he_uniform')(V)
    
        V = Dense(64, activation="relu", kernel_initializer='he_uniform')(V)
    
        value = Dense(1, activation=None)(V)
    
    
    
        self.Critic = Model(inputs=[X_input, old_values], outputs = value)
    
        self.Critic.compile(loss=[self.critic_PPO2_loss(old_values)], optimizer=optimizer(lr=lr))
    
    
    
    def critic_PPO2_loss(self, values):
    
        def loss(y_true, y_pred):
    
            LOSS_CLIPPING = 0.2
    
            clipped_value_loss = values + K.clip(y_pred - values, -LOSS_CLIPPING, LOSS_CLIPPING)
    
            v_loss1 = (y_true - clipped_value_loss) ** 2
    
            v_loss2 = (y_true - y_pred) ** 2
    
            
    
            value_loss = 0.5 * K.mean(K.maximum(v_loss1, v_loss2))
    
            #value_loss = K.mean((y_true - y_pred) ** 2) # standard PPO loss
    
            return value_loss
    
        return loss
    
    
    
    def predict(self, state):
    
        return self.Critic.predict([state, np.zeros((state.shape[0], 1))])
    

    section 3

    def get_gaes(self, rewards, dones, values, next_values, gamma = 0.99, lamda = 0.9, normalize=True):

    deltas = [r + gamma * (1 - d) * nv - v for r, d, nv, v in zip(rewards, dones, next_values, values)]
    
    deltas = np.stack(deltas)
    
    gaes = copy.deepcopy(deltas)
    
    for t in reversed(range(len(deltas) - 1)):
    
        gaes[t] = gaes[t] + (1 - dones[t]) * gamma * lamda * gaes[t + 1]
    
    
    
    target = gaes + values
    
    if normalize:
    
        gaes = (gaes - gaes.mean()) / (gaes.std() + 1e-8)
    
    return np.vstack(gaes), np.vstack(target)
    

    section 4

    def replay(self, states, actions, rewards, predictions, dones, next_states):

    # reshape memory to appropriate shape for training
    
    states = np.vstack(states)
    
    next_states = np.vstack(next_states)
    
    actions = np.vstack(actions)
    
    predictions = np.vstack(predictions)
    
    
    
    # Get Critic network predictions 
    
    values = self.Critic.predict(states)
    
    next_values = self.Critic.predict(next_states)
    
    
    
    # Compute discounted rewards and advantages
    
    advantages, target = self.get_gaes(rewards, dones, np.squeeze(values), np.squeeze(next_values))
    
    
    
    # stack everything to numpy array
    
    # pack all advantages, predictions and actions to y_true and when they are received
    
    # in custom PPO loss function we unpack it
    
    y_true = np.hstack([advantages, predictions, actions])
    
    
    
    # training Actor and Critic networks
    
    a_loss = self.Actor.Actor.fit(states, y_true, epochs=self.epochs, verbose=0, shuffle=self.shuffle)
    
    c_loss = self.Critic.Critic.fit([states, values], target, epochs=self.epochs, verbose=0, shuffle=self.shuffle)
    

    def run_batch(self): # train every self.Training_batch episodes

    state = self.env.reset()
    
    state = np.reshape(state, [1, self.state_size[0]])
    
    done, score, SAVING = False, 0, ''
    
    while True:
    
        # Instantiate or reset games memory
    
        states, next_states, actions, rewards, predictions, dones = [], [], [], [], [], []
    
        for t in range(self.Training_batch):
    
            self.env.render()
    
            # Actor picks an action
    
            action, action_onehot, prediction = self.act(state)
    
            # Retrieve new state, reward, and whether the state is terminal
    
            next_state, reward, done, _ = self.env.step(action)
    
            # Memorize (state, action, reward) for training
    
            states.append(state)
    
            next_states.append(np.reshape(next_state, [1, self.state_size[0]]))
    
            actions.append(action_onehot)
    
            rewards.append(reward)
    
            dones.append(done)
    
            predictions.append(prediction)
    
            # Update current state
    
            state = np.reshape(next_state, [1, self.state_size[0]])
    
            score += reward
    
            if done:
    
                self.episode += 1
    
                average, SAVING = self.PlotModel(score, self.episode)
    
                print("episode: {}/{}, score: {}, average: {:.2f} {}".format(self.episode, self.EPISODES, score, average, SAVING))
    
    
    
                state, done, score, SAVING = self.env.reset(), False, 0, ''
    
                state = np.reshape(state, [1, self.state_size[0]])
    
                
    
        self.replay(states, actions, rewards, predictions, dones, next_states)
    

    section 5

    相关文章

      网友评论

          本文标题:LunarLander-v2 in reinforcement

          本文链接:https://www.haomeiwen.com/subject/xzsznltx.html