coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import os.path
import time
import tensorflow as tf
from six.moves import xrange # pylint: disable=redefined-builtin
from tensorflow.examples.tutorials.mnist import input_data
from tensorflow.examples.tutorials.mnist import mnist
# The MNIST dataset has 10 classes, representing the digits 0 through 9.
NUM_CLASSES = 10
# The MNIST images are always 28x28 pixels.
IMAGE_SIZE = 28
IMAGE_PIXELS = IMAGE_SIZE * IMAGE_SIZE
def inference(images, hidden1_units, hidden2_units):
# 建网络模型,两层
# 第一层网络,hidden1_units个
with tf.name_scope('hidden1'):
weights = tf.Variable(
tf.truncated_normal([IMAGE_PIXELS, hidden1_units],
stddev=1.0 / math.sqrt(float(IMAGE_PIXELS))),
name='weights')
biases = tf.Variable(tf.zeros([hidden1_units]),
name='biases')
# 用relu作转换
hidden1 = tf.nn.relu(tf.matmul(images, weights) + biases)
# 第二层hidden2_units个
with tf.name_scope('hidden2'):
weights = tf.Variable(
tf.truncated_normal([hidden1_units, hidden2_units],
stddev=1.0 / math.sqrt(float(hidden1_units))),
name='weights')
biases = tf.Variable(tf.zeros([hidden2_units]),
name='biases')
hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
# 再加一层
with tf.name_scope('softmax_linear'):
weights = tf.Variable(
tf.truncated_normal([hidden2_units, NUM_CLASSES],
stddev=1.0 / math.sqrt(float(hidden2_units))),
name='weights')
biases = tf.Variable(tf.zeros([NUM_CLASSES]),
name='biases')
logits = tf.matmul(hidden2, weights) + biases
return logits
def lossfunc(logits, labels):
# labels是一个id列表
labels = tf.to_int64(labels)
# softmax转换后求cross_entropy,在求平均
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
logits, labels, name='xentropy')
loss = tf.reduce_mean(cross_entropy, name='xentropy_mean')
return loss
def training(loss, learning_rate):
# 利用TensorBoard输出
tf.scalar_summary(loss.op.name, loss)
# 用梯度下降学习.
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
# 用global_step变量控制迭代次数.
global_step = tf.Variable(0, name='global_step', trainable=False)
train_op = optimizer.minimize(loss, global_step=global_step)
return train_op
def evaluation(logits, labels):
# logits,预测结果,[batch_size, NUM_CLASSES]
# labels,样本结果,[batch_size]
# 求top1的logits是否包含label,返回多少个正确的
correct = tf.nn.in_top_k(logits, labels, 1)
return tf.reduce_sum(tf.cast(correct, tf.int32))
# Basic model parameters as external flags.
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_float('learning_rate', 0.01, 'Initial learning rate.')
flags.DEFINE_integer('max_steps', 2000, 'Number of steps to run trainer.')
flags.DEFINE_integer('hidden1', 128, 'Number of units in hidden layer 1.')
flags.DEFINE_integer('hidden2', 32, 'Number of units in hidden layer 2.')
flags.DEFINE_integer('batch_size', 100, 'Batch size. '
'Must divide evenly into the dataset sizes.')
flags.DEFINE_string('train_dir', 'data', 'Directory to put the training data.')
flags.DEFINE_boolean('fake_data', False, 'If true, uses fake data '
'for unit testing.')
def placeholder_inputs(batch_size):
# 初始化placeholder
images_placeholder = tf.placeholder(tf.float32, shape=(batch_size,
mnist.IMAGE_PIXELS))
labels_placeholder = tf.placeholder(tf.int32, shape=(batch_size))
return images_placeholder, labels_placeholder
def fill_feed_dict(data_set, images_pl, labels_pl):
images_feed, labels_feed = data_set.next_batch(FLAGS.batch_size,
FLAGS.fake_data)
feed_dict = {
images_pl: images_feed,
labels_pl: labels_feed,
}
return feed_dict
def do_eval(sess,
eval_correct,
images_placeholder,
labels_placeholder,
data_set):
#批量计算eval_correct
true_count = 0 # Counts the number of correct predictions.
steps_per_epoch = data_set.num_examples // FLAGS.batch_size
num_examples = steps_per_epoch * FLAGS.batch_size
for step in xrange(steps_per_epoch):
feed_dict = fill_feed_dict(data_set,
images_placeholder,
labels_placeholder)
true_count += sess.run(eval_correct, feed_dict=feed_dict)
precision = true_count / num_examples
print(' Num examples: %d Num correct: %d Precision @ 1: %0.04f' %
(num_examples, true_count, precision))
def run_training():
#载入数据
data_sets = input_data.read_data_sets(FLAGS.train_dir, FLAGS.fake_data)
# default Graph.
with tf.Graph().as_default():
images_placeholder, labels_placeholder = placeholder_inputs(
FLAGS.batch_size)
# 网络
logits = inference(images_placeholder,
FLAGS.hidden1,
FLAGS.hidden2)
#loss函数
loss = lossfunc(logits, labels_placeholder)
# 训练函数
train_op = training(loss, FLAGS.learning_rate)
# 评估函数
eval_correct = evaluation(logits, labels_placeholder)
# Build the summary Tensor based on the TF collection of Summaries.
summary = tf.merge_all_summaries()
init = tf.initialize_all_variables()
# 用来保存训练checkpoint.
saver = tf.train.Saver()
sess = tf.Session()
# 初始化一个SummaryWriter
summary_writer = tf.train.SummaryWriter(FLAGS.train_dir, sess.graph)
sess.run(init)
for step in xrange(FLAGS.max_steps):
start_time = time.time()
feed_dict = fill_feed_dict(data_sets.train,
images_placeholder,
labels_placeholder)
_, loss_value = sess.run([train_op, loss],
feed_dict=feed_dict)
duration = time.time() - start_time
if step % 100 == 0:
print('Step %d: loss = %.2f (%.3f sec)' % (step, loss_value, duration))
# Update the events file.
summary_str = sess.run(summary, feed_dict=feed_dict)
summary_writer.add_summary(summary_str, step)
summary_writer.flush()
# 保存训练结果.
if (step + 1) % 1000 == 0 or (step + 1) == FLAGS.max_steps:
checkpoint_file = os.path.join(FLAGS.train_dir, 'checkpoint')
saver.save(sess, checkpoint_file, global_step=step)
# Evaluate against the training set.
print('Training Data Eval:')
do_eval(sess,
eval_correct,
images_placeholder,
labels_placeholder,
data_sets.train)
# Evaluate against the validation set.
print('Validation Data Eval:')
do_eval(sess,
eval_correct,
images_placeholder,
labels_placeholder,
data_sets.validation)
# Evaluate against the test set.
print('Test Data Eval:')
do_eval(sess,
eval_correct,
images_placeholder,
labels_placeholder,
data_sets.test)
def main(_):
run_training()
if __name__ == '__main__':
tf.app.run()
网友评论