美文网首页
MNIST手写识别笔记(三)下

MNIST手写识别笔记(三)下

作者: Kean_L_C | 来源:发表于2017-08-17 10:04 被阅读34次

    计算速度优化

    • 前面的计算都是针对输入一个样本,然后更新一次权重。这里将代码改成矩阵运算,每次批量计算mini_batch对权重的更改。下面把这章节的代码和该系列文章二的代码运算速度对比,结果如下:
    参数:
    net.SGD(training_data, 10, 10, 0.5, test_data, False)  # 全样本
    二            :0:01:19.567001
    三(下)      :0:00:42.725754
    
    • 针对前面提到过的采用softmax作为输出层函数,和似然函数作为损失函数结(输入样本x输出a真实值为y, y对应真实值位置k与则这cost:- LOGe(a[k]), a理解为x被分为y每类对应的概率; sum(y)=1,这是softmax函数导致的。当预测越接近真实值,a[k]越接近1, 即 - LOGe(a[k])越接近0)。这里给出一些学习softmax函数的链接ufldl.stanford.educsdn
      代码如下
    # encoding: utf-8
    
    """
    @version: python3.5.2
    @author: kaenlee  @contact: lichaolfm@163.com
    @software: PyCharm Community Edition
    @time: 2017/8/16 11:09
    purpose:
    """
    
    # 输出层采用softmax
    # 似然函数作为损失函数
    # minibatch训练采用矩阵乘法曾快计算
    # dropout 应对过度拟合
    
    import numpy as np
    from tensorflow.examples.tutorials.mnist import input_data
    import random
    from functools import reduce
    import operator
    import datetime as dt
    import pandas as pd
    import matplotlib.pyplot as plt
    import matplotlib as mp
    
    mp.style.use('ggplot')
    
    
    
    
    # 各个层仍然会用到s函数
    def Sigmod(z):
        return 1 / (1 + np.exp(-z))
    
    
    def SigmodPrime(z):
        """对S函数求导"""
        return Sigmod(z) * (1 - Sigmod(z))
    
    
    class CrossEntropyLossFunc:
        @staticmethod
        def loss(A, Y):
            """
            计算cost
            :param A: N X 10 ,N:样本的数量
            :param Y: N X 10
            """
            # 对应的输出index
            index = np.argmax(Y, axis=1)
            CS = [-np.log(A[row, col]) for row, col in zip(range(len(index)), index)]
            return np.sum(np.nan_to_num(CS)) / len(index)  # 似然损失函数计算方法
    
        @staticmethod
        def delta(A, Y):
            # L的误差向量即偏倒(C-b)
            return A - Y    # 每行对应一个样本L层delta向量
    
    
    
    
    class NetWorks:
        # 定义一个神经网络,也就是定义每一层的权重以及偏置
        def __init__(self, size, lossFunc):
            """
            给出每层的节点数量,包含输出输出层
            :param size: list
            """
            self.size = size
            self.Layers = len(size)
            self.initializeWeightBias()
            self.lossFunc = lossFunc
    
        def initializeWeightBias(self):
            # 普通的初始化权重方法, 后面会给出更好的
            self.bias = [np.random.randn(num) for num in self.size[1:]]  # 输入层没有bias
            # 每层的权重取决于row取决于该层的节点数量,从来取决于前面一层的输出即节点数
            self.weight = [np.random.randn(row, col) for row, col in zip(self.size[1:], self.size[:-1])]
    
        def Feedward(self, X):
            """
            :param X:输入向量矩阵 , array
            :return:
            """
            for b, w in zip(self.bias, self.weight):
                Z = X.dot(w.T) + b    # 带全输入信号 N X ?
                X = Sigmod(Z)         # 输出信号, 每行代表一个样本 N X ?
    
            # 最后一层输出需要除以输出的和
            total = np.sum(X, axis=1)
            total.shape = -1, 1
            return X / total     # N X 10
    
        def SGD(self, training_data, epochs, minibatch_size, eta, test_data=None, isplot=False):
            """
            随机梯度下降法
            :param training_data:输入模型训练数据@[input, output] # 输入的数据格式变化
            :param epochs: 迭代的期数@ int
            :param minibatch_size: 每次计算梯度向量的取样数量
            :param eta: 学习速率
            :param p: 每次dropout的神经元百分比
            :param test_data: 训练数据
            :return:
            """
            trainX = training_data[0]
            trainY = training_data[1]
            if test_data:
                testX = test_data[0]
                testY = test_data[1]
                n_test = len(testY)
            n = len(trainY)
            accuracy_train = []
            accuracy_test = []
            cost_train = []
            cost_test = []
            for e in range(epochs):
                # 每个迭代器抽样前先打乱数据的顺序
                indices = np.arange(n)
                random.shuffle(indices)
                trainX = trainX[indices]
                trainY = trainY[indices]
                batchXs = [trainX[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
                batchYs = [trainY[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
    
                for batchX, batchY in zip(batchXs, batchYs):
                    self.Update_miniBatchs(batchX, batchY, eta)
    
                if test_data:
                    totall_predRight = self.Evalueate(test_data)
                    print('Epoch {0}: {1}/{2}'.format(e, totall_predRight, n_test))
    
                    if isplot:
                        accuracy_test.append(totall_predRight / n_test)
                        cost_test.append(self.lossFunc.loss(self.Feedward(testX), testY))
    
                if isplot:
                    accuracy_train.append(self.Evalueate(training_data) / n)
                    # 计算训练数据的cost 即loss
                    cost_train.append(self.lossFunc.loss(self.lossFunc.loss(trainX), trainY))
    
            if isplot:
                plt.figure()
                plt.plot(np.arange(1, epochs + 1), accuracy_train, label='train')
                plt.plot(np.arange(1, epochs + 1), accuracy_test, label='test')
                axis = plt.gca()
                axis_01 = plt.twinx(axis)
                axis_01.plot(np.arange(1, epochs + 1), cost_train, label='cost')
                plt.xlabel('epoch')
                plt.legend()
                plt.savefig('dropout.png')
                plt.close()
    
        def Update_miniBatchs(self, batchX, batchY, eta):
            """
            对mini_batch采用梯度下降法,对网络的权重进行更新
            :param mini_batch:
            :param eta:
            :return:
            """
            # 批量计算每个样本对权重改变
            Cprime_bs, Cprime_ws = self.BackProd(batchX, batchY)
            self.bias = [bias - eta * change for bias, change in zip(self.bias, Cprime_bs)]
            self.weight = [weight - eta * change for weight, change in zip(self.weight, Cprime_ws)]
    
        def BackProd(self, batchX, batchY):
            """
            :param batchX: N X 748
            :param batchY: N X 10
            """
            n = len(batchY)                   # 样本的数量
            # 每层都会有n个z, a
            zs_n = []                         # 每层的加权输入向量, 第一层没有(输入层)n X ?(取决于每层的神经元个数) X layers -1
            activations_n = [batchX]          # 每层的输出信号,第一层为xmat本身        n X ? X layers
            # 计算2...L的权重和偏置(n组)
            for b, w in zip(self.bias, self.weight):
                z_n = activations_n[-1].dot(w.T) + b
                zs_n.append(z_n)                        # 从第二层开始保存带权输入,size-1个
                activations_n.append(Sigmod(z_n))       # 输出信号a
    
            # 计算输出层L每个节点的delta
            delta_L = self.lossFunc.delta(activations_n[-1], batchY)  # n X 10
            Cprime_bs = [delta_L]                                     # 输出成L的c对b偏倒等于delta_L
            Cprime_ws = [[np.array(np.mat(delta_L[i]).T * np.mat(activations_n[-2][i])) for i in
                          range(n)]]  # c对w的骗到等于前一层的输出信号装置乘当前层的误差
            # 计算所有的层的误差
            temp = delta_L
            for i in range(1, self.Layers - 1):
                # 仅仅需要计算到第二层(且最后一层已知),当前层的delta即b可以用下一层的w、delta表示和当前z表示
                # 从倒数第二层开始求解
                x1 = temp.dot(self.weight[-i])  # 下一层的权重的装置乘下一层的delta
                x2 = SigmodPrime(zs_n[-i - 1])  # 当前层的带权输入
                delta_now = x1 * x2
                Cprime_bs.append(delta_now)
                Cprime_ws.append([np.array(np.mat(delta_now[j]).T * np.mat(activations_n[-i - 2][j])) for j in range(n)])
                temp = delta_now
    
            # 把每个样本的求解权重进行加总并取平均
            Cprime_bs = [np.sum(bn, axis=0) / n for bn in Cprime_bs]
            Cprime_ws = [reduce(operator.add, wn) / n for wn in Cprime_ws]
            # print([len(b) for b in Cprime_bs])
            # print([w.shape for w in Cprime_ws])
            # 改变输出的顺序
            Cprime_bs.reverse()
            Cprime_ws.reverse()
            return (Cprime_bs, Cprime_ws)
    
        def Evalueate(self, test_data):
            """
            评估模型
            :param test_data:
            :return:返回预测正确的数量@int
            """
            # 最大数字位置相对应记为正确
            testX = test_data[0]
            testY = test_data[1]
            n_test = len(testY)
            res_pred = np.argmax(self.Feedward(testX), axis=1) == np.argmax(testY, axis=1)
            return sum(res_pred)
    
    
    if __name__ == '__main__':
        mnist = input_data.read_data_sets(r'D:\PycharmProjects\HandWritingRecognition\TF\data', one_hot=True)
        training_data = [mnist.train.images, mnist.train.labels]
        test_data = [mnist.test.images, mnist.test.labels]
        net = NetWorks([784, 20, 10], CrossEntropyLossFunc)
        X = test_data[0][:3]
        Y = test_data[1][:3]
        # print(net.Feedward(X))
        # print(net.BackProd(X, Y))
        start = dt.datetime.now()
        net.SGD(training_data, 10, 10, 0.5, test_data, isplot=False)
        print(dt.datetime.now() - start)
    

    DropOut

    文(三)是针对解决过度拟合的问题,回归主题。这里补充上(三)上的dropout代码

    1.等比例随机删除隐藏层的p比例节点,备份一份权重偏置数据
    2.剩下的节点按自己原有权重,进行一次更新
    3.将更新的权重,覆盖备份数据中对应位置的权重
    4.预测取权重(1-p)比例进行预测,预测后将权重还原
    5.回到步骤1

    # encoding: utf-8
    
    """
    @version: python3.5.2
    @author: kaenlee  @contact: lichaolfm@163.com
    @software: PyCharm Community Edition
    @time: 2017/8/16 11:09
    purpose:
    """
    
    # 输出层采用softmax
    # 似然函数作为损失函数
    # minibatch训练采用矩阵乘法曾快计算
    # dropout 应对过度拟合
    
    import numpy as np
    from tensorflow.examples.tutorials.mnist import input_data
    import random
    from functools import reduce
    import operator
    import datetime as dt
    import pandas as pd
    import matplotlib.pyplot as plt
    import matplotlib as mp
    
    mp.style.use('ggplot')
    
    
    # 各个层仍然会用到s函数
    def Sigmod(z):
        return 1 / (1 + np.exp(-z))
    
    
    def SigmodPrime(z):
        """对S函数求导"""
        return Sigmod(z) * (1 - Sigmod(z))
    
    
    class CrossEntropyLossFunc:
        @staticmethod
        def loss(A, Y):
            """
            计算cost
            :param A: N X 10 ,N:样本的数量
            :param Y: N X 10
            """
            # 对应的输出index
            index = np.argmax(Y, axis=1)
            CS = [-np.log(A[row, col]) for row, col in zip(range(len(index)), index)]
            return np.sum(np.nan_to_num(CS)) / len(index)  # 似然损失函数计算方法
    
        @staticmethod
        def delta(A, Y):
            # L的误差向量即偏倒(C-b)
            return A - Y  # 每行对应一个样本L层delta向量
    
    
    class NetWorks:
        # 定义一个神经网络,也就是定义每一层的权重以及偏置
        def __init__(self, size, lossFunc):
            """
            给出每层的节点数量,包含输出输出层
            :param size: list
            """
            self.size = size
            self.Layers = len(size)
            self.initializeWeightBias()
            self.lossFunc = lossFunc
    
        def initializeWeightBias(self):
            # 普通的初始化权重方法, 后面会给出更好的
            self.bias = [np.random.randn(num) for num in self.size[1:]]  # 输入层没有bias
            # 每层的权重取决于row取决于该层的节点数量,从来取决于前面一层的输出即节点数
            self.weight = [np.random.randn(row, col) for row, col in zip(self.size[1:], self.size[:-1])]
    
        def Feedward(self, X, p, ISpredtest=True):
            """
            :param X:输入向量矩阵 , array
            :return:
            """
            if ISpredtest:
                # 这个主要用来预测函数, 权重要乘以1-p
                weight = self.weight.copy()
                bias = self.bias.copy()
                self.bias = [(1 - p) * b for b in bias]
                self.weight = [(1 - p) * w for w in weight]
    
            for b, w in zip(self.bias, self.weight):
                Z = X.dot(w.T) + b  # 带全输入信号 N X ?
                X = Sigmod(Z)  # 输出信号, 每行代表一个样本 N X ?
            if ISpredtest:
                # 每个迭代器期都会预测, 预测后需要将权重返还
                self.weight = weight
                self.bias = bias
            # 最后一层输出需要除以输出的和
            total = np.sum(X, axis=1)
            total.shape = -1, 1
            return X / total  # N X 10
    
        def DropOut(self, p):
            # 给出隐藏层隐藏层删除的节点
            # print(p)
            weight = self.weight.copy()     # 被这个copy坑死了
            # print('that', weight[-1].shape)
            bias = self.bias
            n = len(weight)
            updateW = []
            updateB = []
            size = self.size[1:]  # 输入层没有权重
            save = []
            for i in range(0, n - 1):  # 保留全部输出
                # 删除隐藏层的部分节点
                saveIndex = []      # 无放回的抽样
                sample_num = int(size[i] * (1 - p))
                while len(saveIndex) != sample_num:
                    index = np.random.randint(size[i])
                    if index not in saveIndex:
                        saveIndex.append(index)
    
                # print(size[i], saveIndex)
                saveIndex = sorted(saveIndex)
                save.append(saveIndex)
                updateW.append(self.weight[i][saveIndex])
                updateB.append(self.bias[i][saveIndex])
                # 当删除当前层节点个数,后面一层的每个节点w权重个数也相应减少
                self.weight[i + 1] = self.weight[i + 1][:, saveIndex]
                # print(weight[i])
                # print((bias[i]))
                # print(updateB)
                # print(updateW)
            updateW.append(self.weight[-1])   # 保留输出层全部权重
            updateB.append(self.bias[-1])
            save.append(np.arange(size[-1]))
            self.weight = updateW
            self.bias = updateB
            # print('here', weight[-1].shape)
            return weight, bias, save
    
        def SGD(self, training_data, epochs, minibatch_size, eta, p, test_data=None, isplot=False):
            """
            随机梯度下降法
            :param training_data:输入模型训练数据@[input, output] # 输入的数据格式变化
            :param epochs: 迭代的期数@ int
            :param minibatch_size: 每次计算梯度向量的取样数量
            :param eta: 学习速率
            :param p: 每次dropout的神经元百分比
            :param test_data: 训练数据
            :return:
            """
            trainX = training_data[0]
            trainY = training_data[1]
            if test_data:
                testX = test_data[0]
                testY = test_data[1]
                n_test = len(testY)
            n = len(trainY)
            accuracy_train = []
            accuracy_test = []
            cost_train = []
            cost_test = []
            for e in range(epochs):
                # 每个迭代器抽样前先打乱数据的顺序
                indices = np.arange(n)
                random.shuffle(indices)
                trainX = trainX[indices]
                trainY = trainY[indices]
                batchXs = [trainX[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
                batchYs = [trainY[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
    
                for batchX, batchY in zip(batchXs, batchYs):
                    weightBackup, biasBackup, save = self.DropOut(p)
                    # print(self.bias)
                    # print(self.weight)
                    self.Update_miniBatchs(batchX, batchY, eta)
                    # 更新完后的权重和加入的权重相结合
                    for i in range(self.Layers - 1):
                        # print('i', i)
                        biasBackup[i][save[i]] = self.bias[i]
                        if i == 0:
                            # L2的层仅仅减少节点个数并没有改变每个节点权重个数,因为输出层没有变
                            weightBackup[i][save[i]] = self.weight[i]
                        else:
                            row = save[i]
                            col = save[i - 1]
                            # print(row, col)
                            # print(type(weightBackup[i]))
                            # print(weightBackup[i].shape)
                            weightBackup[i][row, :][:, col] = self.weight[i]
    
    
                    self.weight = weightBackup
                    self.bias = biasBackup
    
                if test_data:
                    totall_predRight = self.Evalueate(test_data, p)
                    print('Epoch {0}: {1}/{2}'.format(e, totall_predRight, n_test))
    
                    if isplot:
                        # ???计算test data 的cost需要  * 1-p ???
                        accuracy_test.append(totall_predRight / n_test)
                        cost_test.append(self.lossFunc.loss(self.Feedward(testX, p), testY))
    
                if isplot:
                    accuracy_train.append(self.Evalueate(training_data, p, False) / n)
                    # 计算训练数据的cost 即loss
                    cost_train.append(self.lossFunc.loss(self.Feedward(trainX, p, False), trainY))
    
            if isplot:
                plt.figure()
                plt.plot(np.arange(1, epochs + 1), accuracy_train, label='train')
                plt.plot(np.arange(1, epochs + 1), accuracy_test, label='test')
                axis = plt.gca()
                axis_01 = plt.twinx(axis)
                axis_01.plot(np.arange(1, epochs + 1), cost_train, label='cost')
                plt.xlabel('epoch')
                plt.legend()
                plt.savefig('dropout.png')
                plt.close()
    
        def Update_miniBatchs(self, batchX, batchY, eta):
            """
            对mini_batch采用梯度下降法,对网络的权重进行更新
            :param mini_batch:
            :param eta:
            :return:
            """
            # 批量计算每个样本对权重改变
            Cprime_bs, Cprime_ws = self.BackProd(batchX, batchY)
            self.bias = [bias - eta * change for bias, change in zip(self.bias, Cprime_bs)]
            self.weight = [weight - eta * change for weight, change in zip(self.weight, Cprime_ws)]
    
        def BackProd(self, batchX, batchY):
            """
            :param batchX: N X 748
            :param batchY: N X 10
            """
            n = len(batchY)  # 样本的数量
            # 每层都会有n个z, a
            zs_n = []  # 每层的加权输入向量, 第一层没有(输入层)n X ?(取决于每层的神经元个数) X layers -1
            activations_n = [batchX]  # 每层的输出信号,第一层为xmat本身        n X ? X layers
            # 计算2...L的权重和偏置(n组)
            # print(self.bias)
            # print(self.weight)
            for b, w in zip(self.bias, self.weight):
                # print(w.shape)
                z_n = activations_n[-1].dot(w.T) + b
                zs_n.append(z_n)  # 从第二层开始保存带权输入,size-1个
                activations_n.append(Sigmod(z_n))  # 输出信号a
    
            # 计算输出层L每个节点的delta
            delta_L = self.lossFunc.delta(activations_n[-1], batchY)  # n X 10
            Cprime_bs = [delta_L]  # 输出成L的c对b偏倒等于delta_L
            Cprime_ws = [[np.array(np.mat(delta_L[i]).T * np.mat(activations_n[-2][i])) for i in
                          range(n)]]  # c对w的骗到等于前一层的输出信号装置乘当前层的误差
            # 计算所有的层的误差
            temp = delta_L
            for i in range(1, self.Layers - 1):
                # 仅仅需要计算到第二层(且最后一层已知),当前层的delta即b可以用下一层的w、delta表示和当前z表示
                # 从倒数第二层开始求解
                x1 = temp.dot(self.weight[-i])  # 下一层的权重的装置乘下一层的delta
                x2 = SigmodPrime(zs_n[-i - 1])  # 当前层的带权输入
                delta_now = x1 * x2
                Cprime_bs.append(delta_now)
                Cprime_ws.append([np.array(np.mat(delta_now[j]).T * np.mat(activations_n[-i - 2][j])) for j in range(n)])
                temp = delta_now
    
            # 把每个样本的求解权重进行加总并取平均
            Cprime_bs = [np.sum(bn, axis=0) / n for bn in Cprime_bs]
            Cprime_ws = [reduce(operator.add, wn) / n for wn in Cprime_ws]
            # print([len(b) for b in Cprime_bs])
            # print([w.shape for w in Cprime_ws])
            # 改变输出的顺序
            Cprime_bs.reverse()
            Cprime_ws.reverse()
            return (Cprime_bs, Cprime_ws)
    
        def Evalueate(self, test_data, p, IStest=True):
            """
            评估模型
            :param test_data:
            :return:返回预测正确的数量@int
            """
            # 最大数字位置相对应记为正确
            testX = test_data[0]
            testY = test_data[1]
            n_test = len(testY)
            res_pred = np.argmax(self.Feedward(testX, p, IStest), axis=1) == np.argmax(testY, axis=1)
            return sum(res_pred)
    
    
    if __name__ == '__main__':
        mnist = input_data.read_data_sets(r'D:\PycharmProjects\HandWritingRecognition\TF\data', one_hot=True)
        training_data = [mnist.train.images[:2000], mnist.train.labels[:2000]]
        test_data = [mnist.test.images[:1000], mnist.test.labels[:1000]]
        net = NetWorks([784, 100, 10], CrossEntropyLossFunc)
        X = test_data[0][:3]
        Y = test_data[1][:3]
        # print(net.Feedward(X))
        # print(net.BackProd(X, Y))
        start = dt.datetime.now()
        net.SGD(training_data, 100, 10, 3, 0.5, test_data, isplot=True)
        print(dt.datetime.now() - start)
    
    

    结果分析:虽然train和test的accuracy几乎都是同时饱和,但是cost缺还在下降,无法解释。(难道pred train data的时候权重也需要乘以1-p!!!!有待考证)

    dropout.png

    相关文章

      网友评论

          本文标题:MNIST手写识别笔记(三)下

          本文链接:https://www.haomeiwen.com/subject/wfairxtx.html