美文网首页
用TensorFlow训练卷积网络的详细实现

用TensorFlow训练卷积网络的详细实现

作者: 卜是 | 来源:发表于2020-08-07 15:25 被阅读0次

    本文采用LeNet-5卷积网络来完成对MNIST数据集的分类任务,以下将详细介绍具体思考模式和流程。
    1. 创建神经网络模型

    def model(data, train=False):
      """模型定义"""
      # 2D卷积, with 'SAME' padding (输出的feature map与输入大小一致)
    # strides向量分别对应[image索引, y, x, depth]
      conv = tf.nn.conv2d(data,
                          conv1_weights,
                          strides=[1, 1, 1, 1],
                          padding='SAME')
      # conv1_biases为卷积层常数项对应的变量,也作为被训练参数之一
      relu = tf.nn.relu(tf.nn.bias_add(conv, conv1_biases))
      # Max pooling. 池化大小为2*2,且stripe为2*2
      pool = tf.nn.max_pool(relu,
                            ksize=[1, 2, 2, 1],
                            strides=[1, 2, 2, 1],
                            padding='SAME')
    #第二层卷积
      conv = tf.nn.conv2d(pool,
                          conv2_weights,
                          strides=[1, 1, 1, 1],
                          padding='SAME')
      relu = tf.nn.relu(tf.nn.bias_add(conv, conv2_biases))
      pool = tf.nn.max_pool(relu,
                            ksize=[1, 2, 2, 1],
                            strides=[1, 2, 2, 1],
                            padding='SAME')
      # 将feature map转化为2d矩阵,目的是适配全连接层
      pool_shape = pool.get_shape().as_list()
      reshape = tf.reshape(
          pool,
          [pool_shape[0], pool_shape[1] * pool_shape[2] * pool_shape[3]])
      # 全连接层
      hidden = tf.nn.relu(tf.matmul(reshape, fc1_weights) + fc1_biases)
      # 仅仅在训练阶段添加50%的dropout,作为对参数的正则化
      if train:
        hidden = tf.nn.dropout(hidden, 0.5, seed=SEED)
      return tf.matmul(hidden, fc2_weights) + fc2_biases
    
    # 构建计算图
    train_data_node = tf.placeholder(
        tf.float32,
        shape=(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS))
    train_labels_node = tf.placeholder(tf.int64, shape=(BATCH_SIZE,))
    eval_data = tf.placeholder(
        tf.float32,
        shape=(EVAL_BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS))
    
    # 以下为可训练的所有权重参数.
    # 初始化命令为:{tf.global_variables_initializer().run()}
    conv1_weights = tf.Variable(
        # 5x5 filter, depth 32.
        tf.truncated_normal([5, 5, NUM_CHANNELS, 32],  
                            stddev=0.1,
                            seed=SEED, dtype=tf.float32))
    conv1_biases = tf.Variable(tf.zeros([32], dtype=tf.float32))
    conv2_weights = tf.Variable(tf.truncated_normal(
        [5, 5, 32, 64], stddev=0.1,
        seed=SEED, dtype=tf.float32))
    conv2_biases = tf.Variable(tf.constant(0.1, shape=[64],
                               dtype=tf.float32))
    fc1_weights = tf.Variable(  # fully connected, depth 512.
        tf.truncated_normal([IMAGE_SIZE // 4 * IMAGE_SIZE // 4 * 64, 512],
                            stddev=0.1,
                            seed=SEED,
                            dtype=tf.float32))
    fc1_biases = tf.Variable(tf.constant(0.1, shape=[512],
                             dtype=tf.float32))
    fc2_weights = tf.Variable(tf.truncated_normal([512, NUM_LABELS],
                                                  stddev=0.1,
                                                  seed=SEED,
                                                  dtype=tf.float32))
    fc2_biases = tf.Variable(tf.constant(
        0.1, shape=[NUM_LABELS], dtype=tf.float32))
    
    #调用以上的model函数,得到模型输出,并计算softmax交叉熵损失函数
    logits = model(train_data_node, True)
    loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
        labels=train_labels_node, logits=logits))
    
    # 全连接层参数的L2正则化
    regularizers = (tf.nn.l2_loss(fc1_weights)
                    + tf.nn.l2_loss(fc1_biases)
                    + tf.nn.l2_loss(fc2_weights)
                    + tf.nn.l2_loss(fc2_biases))
    loss += 5e-4 * regularizers
    
    # 每批次增加1,用于控制decay rate
    batch = tf.Variable(0, dtype=tf.float32)
    # Decay once per epoch, using an exponential schedule starting at 0.01.
    learning_rate = tf.train.exponential_decay(
        0.01,                # Base learning rate.
        batch * BATCH_SIZE,  # Current index into the dataset.
        train_size,          # Decay step.
        0.95,                # Decay rate.
        staircase=True)
    # 优化器
    optimizer = tf.train.MomentumOptimizer(learning_rate,
                                           0.9).minimize(loss,
                                                         global_step=batch)
    
    # 得到当前批次得到的预测分类值
    train_prediction = tf.nn.softmax(logits)
    
    # 对测试集和验证数据集进行预测
    eval_prediction = tf.nn.softmax(model(eval_data))
    

    2. 数据的准备

    #下载文件
    def download(filename):
      """从网站下载"""
      if not os.path.exists(WORK_DIRECTORY):
        os.makedirs(WORK_DIRECTORY)
      filepath = os.path.join(WORK_DIRECTORY, filename)
      if not os.path.exists(filepath):
        filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename,
                                                 filepath)
        size = os.stat(filepath).st_size
      return filepath
    
    # 从文件中提取出图片数据
    def extract_data(filename, num_images):
      """将图片生成4D张量数据[image index, y, x, channels].
      并且将值从 [0, 255] 转化为 [-0.5, 0.5]
      """
      print('Extracting', filename)
      with gzip.open(filename) as bytestream:
        bytestream.read(16)
        buf = bytestream.read(
            IMAGE_SIZE * IMAGE_SIZE * num_images * NUM_CHANNELS)
        data = numpy.frombuffer(buf, dtype=numpy.uint8).astype(
            numpy.float32)
        data = (data - (255/2.0))/255 
        data = data.reshape(num_images, IMAGE_SIZE, IMAGE_SIZE,
                            NUM_CHANNELS)
        return data
    
    #提取标签
    def extract_labels(filename, num_images):
      """转化为一个类型为int64的向量"""
      print('Extracting', filename)
      with gzip.open(filename) as bytestream:
        # Discard header.
        bytestream.read(8)
        # Read bytes for labels.
        buf = bytestream.read(num_images)
        labels = numpy.frombuffer(buf, dtype=numpy.uint8).astype(
            numpy.int64)
      return labels
    
    SOURCE_URL = 'http://yann.lecun.com/exdb/mnist/'
    WORK_DIRECTORY = 'data'
    IMAGE_SIZE = 28
    NUM_CHANNELS = 1
    PIXEL_DEPTH = 255
    NUM_LABELS = 10
    VALIDATION_SIZE = 5000  # Size of the validation set.
    SEED = 66478  # Set to None for random seed.
    BATCH_SIZE = 64
    NUM_EPOCHS = 10
    EVAL_BATCH_SIZE = 64
    EVAL_FREQUENCY = 100  # Number of steps between evaluations.
    
    train_data_filename = download('train-images-idx3-ubyte.gz')
    train_labels_filename = download('train-labels-idx1-ubyte.gz')
    test_data_filename = download('t10k-images-idx3-ubyte.gz')
    test_labels_filename = download('t10k-labels-idx1-ubyte.gz')
    
    train_data = extract_data(train_data_filename, 60000)
    train_labels = extract_labels(train_labels_filename, 60000)
    test_data = extract_data(test_data_filename, 10000)
    test_labels = extract_labels(test_labels_filename, 10000)
    
    validation_data = train_data[:VALIDATION_SIZE, ...]
    validation_labels = train_labels[:VALIDATION_SIZE]
    train_data = train_data[VALIDATION_SIZE:, ...]
    train_labels = train_labels[VALIDATION_SIZE:]
    
    num_epochs = NUM_EPOCHS
    train_size = train_labels.shape[0]
    

    3. 模型训练

    def eval_in_batches(data, sess):
      """批量模型推理预测"""
      size = data.shape[0]
      if size < EVAL_BATCH_SIZE:
        raise ValueError("batch size for evals larger than dataset: %d"
                         % size)
      predictions = numpy.ndarray(shape=(size, NUM_LABELS),
                                  dtype=numpy.float32)
      for begin in xrange(0, size, EVAL_BATCH_SIZE):
        end = begin + EVAL_BATCH_SIZE
        if end <= size:
          predictions[begin:end, :] = sess.run(
              eval_prediction,
              feed_dict={eval_data: data[begin:end, ...]})
        else:
          batch_predictions = sess.run(
              eval_prediction,
              feed_dict={eval_data: data[-EVAL_BATCH_SIZE:, ...]})
          predictions[begin:, :] = batch_predictions[begin - size:, :]
      return predictions
    
    def error_rate(predictions, labels):
      """计算分类的错误率"""
      return 100.0 - (
          100.0 *
          numpy.sum(numpy.argmax(predictions, 1) == labels) /
          predictions.shape[0])
    
    start_time = time.time()
    with tf.Session() as sess:
      tf.global_variables_initializer().run()
      # 训练步长
      for step in xrange(int(num_epochs * train_size) // BATCH_SIZE):
        # 得到每一步长对应批量数据
        offset = (step * BATCH_SIZE) % (train_size - BATCH_SIZE)
        batch_data = train_data[offset:(offset + BATCH_SIZE), ...]
        batch_labels = train_labels[offset:(offset + BATCH_SIZE)]
        feed_dict = {train_data_node: batch_data,
                     train_labels_node: batch_labels}
        # 更新权重等参数
        sess.run(optimizer, feed_dict=feed_dict)
        # 一定阶段后输出对应的变量值,作为评估
        if step % EVAL_FREQUENCY == 0:
          # fetch some extra nodes' data
          l, lr, predictions = sess.run([loss, learning_rate,
                                         train_prediction],
                                        feed_dict=feed_dict)
          elapsed_time = time.time() - start_time
          start_time = time.time()
          print('Step %d (epoch %.2f), %.1f ms' %
                (step, float(step) * BATCH_SIZE / train_size,
                 1000 * elapsed_time / EVAL_FREQUENCY))
          print('Minibatch loss: %.3f, learning rate: %.6f' % (l, lr))
          print('Minibatch error: %.1f%%'
                % error_rate(predictions, batch_labels))
          print('Validation error: %.1f%%' % error_rate(
              eval_in_batches(validation_data, sess), validation_labels))
          sys.stdout.flush()
      # 最终结果
      test_error = error_rate(eval_in_batches(test_data, sess),
                              test_labels)
      print('Test error: %.1f%%' % test_error)
    

    相关文章

      网友评论

          本文标题:用TensorFlow训练卷积网络的详细实现

          本文链接:https://www.haomeiwen.com/subject/qujfdktx.html