关键字:batch normalization
,tensorflow
,批量归一化
bn简介
batch normalization批量归一化,目的是对神经网络的中间层的输出进行一次额外的处理,经过处理之后期望每一层的输出尽量都呈现出均值为0标准差是1的相同的分布上,从而保证每一层的输出稳定不会剧烈波动,从而有效降低模型的训练难度快速收敛,同时对大学习率的容忍度增强,避免了大学习率的梯度爆炸问题,因此配合大学习率能加快收敛,跳出不好的局部极值。
原理概括
bn的实现方法是:针对一个批次
的数据,对网络的隐藏层(中间层)的输出做批量归因化操作,该操作包括两个部分:
-
1.标准化
:对一批次数据在中间层的每个神经元
的输出进行标准化,一个数据一个神经元只有一个输出,一组数据一个神经元就是一个一维向量,对该向量每个值减去均值
除以标准差
,这块的操作完全根据输入数据决定或者说由上层操作决定,该层或者说神经网络不设置参数学习 -
2.分布还原
:在完成标准化之后,再学习一个线性映射
(wx+b)中的w和b,再把标准化的输出进行映射,具体怎么映射由神经网络自己学习得到。
操作如下图所示
bn操作两步走标准化很好理解,为啥要在标准化之后再把分布拉回来,那不是做了无用功吗,不是的。标准化破坏了数据的原始分布,可能导致输入给下游非线性函数比如激活函数的时候产生负面效果,因此加入还原线性变换进行适度还原
,所谓适度还原就是不用担心数据的原始分布被破坏导致影响网络训练的问题,因为还有个还原层,它的上限就是把标准化彻底还原成原始分布(w是标准差,b是均值),下限就是保留标准化,中间水平就是把标准化的结果稍微拉动一下,具体还原函数还原到什么程度完全由网络自行学习决定,相当于人为让分布标准化统一,也给模型留了个口子如果这个人为动作不合理就打退回去,等同于有自动审查机制的每层分布统一,在不影响模型学习的情况下尽量让分布统一。
两个阶段的各自目的说明白之后还有几个重点问题没有解决:
-
在哪个中间层进行bn
:通常在全连接层或者卷积层后面,在激活函数之前,也有说在激活函数之后的,反正在上一层的输出到下一层的输入之间进行 -
bn要学习的参数
:一个bn是一层网络,接受比如一层全连接的输出,其中全连接的每个神经元的输出的均值和标准化是由输入决定的不需要学习,而bn要学习是第二阶段还原函数的w和b,w向量和b向量是在一层bn中共享的 -
训练和测试bn怎么保持一致
:训练过程以每个批次的实际均值和标准差进行标准化,而预测/测试过程不论是单样本预测还是批量预测,都是以整个训练过程的均值和标准差进行标准化,bn采用了滑动平均
的方式随着训练批次的进去,慢慢逼近训练集全集在每一个bn层上的均值和标准差,最终在每一层bn上都有一组均值向量和标准差向量,相当于每一层每一个神经元通过滑动平均的方法都记录下了最终的各自的均值和标准差。
图示计算过程
先看下公式
计算过程
公式和之前讲的多一个小e,通常是一个极小的数比如1e-3,目的防止分母为0。
深入理解bn图示
公式太抽象,画一下bn计算和应用的图示,如上图是一个(None, 3)的输入到一层全连接(3, 4)之后加入bn再到激活函数的数据流转情况
滑动平均估算整体均值标准差
模型训练好之后,需要计算训练集全集在各个bn层的均值标准差权重向量,实现方法是在训练过程中记录中间状态不断修正调整逼近结果,这样等模型训练完,这个结果也记录在网络变量中,在预测的时候直接调用即可。具体公式如下
mean_value = mean_value * decay + batch_mean * (1 - decay)
var_value = var_value * decay + batch_var * (1 - decay)
其中mean_value吸收了训练数据之后每一轮都会更新,初始值是0,同理var_value,他的初始值是1,decay推荐是0.9或者0.99,举例
a = [1, 2, 1.5, 2, 3, 3, 3.2, 1.2, 0.5, 0.8, 0.3, 2, 2.1, 2.2, 1.6] * 100
sum(a) / len(a) # 1.759999999999992
mean_value = 0
for i in a:
mean_value = mean_value * 0.99 + i * 0.01 # 1.755411190749514
理论上batch越多结果越接近真实,另外decay越大越稳定,decay越小新加入的batch mean占比重大波动越大,推荐0.9以上是求稳定,因此需要更多的batch,这样才能避免还没有毕竟真实就停止计算了,导致测试集的参考均值和方差不准。
tensorflow实现方法
推荐使用from tensorflow.contrib.layers.python.layers import batch_norm
,传入需要bn的tensor,将是否是训练还是测试/预测也作为一个tensor传入进去,通过tf.cond+布尔标量实现逻辑判断,其中训练中batch_norm的参数is_training=True,预测is_training=False,另外测试时reuse=True,表示训练和预测网络中共享这个BN层,否则会出现两个BN层,预测时拿得是初始化的w=0和b=1,导致预测集效果出问题。
def batch_norm_layer(value, is_training, scope):
def batch_statistics():
return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=True, scope=scope)
def population_statistics():
return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=False, reuse=True, scope=scope)
return tf.cond(is_training, batch_statistics, population_statistics)
tf.cond输入一个布尔tensor,一个true返回函数,一个false返回函数,举例
a = tf.convert_to_tensor([1, 2, 3])
b = tf.constant(3)
c = tf.constant(False)
result = tf.cond(c, lambda: tf.add(a, a), lambda: tf.square(b))
with tf.Session() as sess:
res = sess.run(result)
print(res) # 9,如果c是True返回[2 4 6]
除此之外还需要以下八股文代码
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
# loss的例子
self.train_step = optimizer.minimize(self.loss, global_step=self.global_step)
这段代表表示训练过程中记录的滑动平均均值和标准差这个操作存储在tf.GraphKeys.UPDATE_OPS
中,每次进行一次loss计算需要也计算一遍,希望在计算loss之前也把滑动平均也计算一边因此采用tf.control_dependencies
,表示with下文的内容必须在control_dependencies后面的条件完成之后才能运行。如果不加这个网络仅仅计算loss操作,中间滑动平均根本不计算,这不影响训练因为训练用不到,但是几乎彻底摧毁预测,导致预测的参考均值标准差都是初始值。另一种就是把update_ops拿出来和train_step合并成一个最终的训练操作
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
if update_ops:
train_ops = [train_step] + update_ops
train_op_final = tf.group(*train_ops)
这种不区分执行的前后顺序了,好像也可以。
代码实战
以下代码测试一下batch_norm在一个网络中的应用,分别对比有bn和没有bn的网络训练情况
import os
import time
import shutil
import pickle
import tensorflow as tf
from tensorflow.contrib.layers.python.layers import batch_norm
from tensorflow.python.saved_model import tag_constants
from preprocessing import get_batch
def batch_norm_layer(value, is_training, scope):
def batch_statistics():
return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=True, scope=scope)
def population_statistics():
return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=False, reuse=True, scope=scope)
return tf.cond(is_training, batch_statistics, population_statistics)
class Model(object):
def __init__(self, num_class, feature_size, learning_rate=0.5, weight_decay=0.01, decay_learning_rate=0.99):
self.input_x = tf.placeholder(tf.float32, [None, feature_size], name="input_x")
self.input_y = tf.placeholder(tf.float32, [None, num_class], name="input_y")
self.dropout_keep_prob = tf.placeholder(tf.float32, name="dropout_keep_prob")
self.batch_normalization = tf.placeholder(tf.bool, name="batch_normalization")
self.global_step = tf.Variable(0, name="global_step", trainable=False)
with tf.name_scope('layer_1'):
dense_out_1 = tf.layers.dense(self.input_x, 64)
# add
dense_out_1 = batch_norm_layer(dense_out_1, is_training=self.batch_normalization, scope="bn1")
dense_out_act_1 = tf.nn.relu(dense_out_1)
with tf.name_scope('layer_2'):
dense_out_2 = tf.layers.dense(dense_out_act_1, 32)
# add
dense_out_2 = batch_norm_layer(dense_out_2, is_training=self.batch_normalization, scope="bn2")
dense_out_act_2 = tf.nn.relu(dense_out_2)
with tf.name_scope('layer_out'):
self.output = tf.layers.dense(dense_out_act_2, 2)
self.probs = tf.nn.softmax(self.output, dim=1, name="probs")
with tf.name_scope('loss'):
self.loss = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits_v2(logits=self.output, labels=self.input_y))
vars = tf.trainable_variables()
loss_l2 = tf.add_n([tf.nn.l2_loss(v) for v in vars if
v.name not in ['bias', 'gamma', 'b', 'g', 'beta']]) * weight_decay
self.loss += loss_l2
with tf.name_scope("optimizer"):
if decay_learning_rate:
learning_rate = tf.train.exponential_decay(learning_rate, self.global_step, 100, decay_learning_rate)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
self.train_step = optimizer.minimize(self.loss, global_step=self.global_step)
# update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
# if update_ops:
# train_ops = [self.train_step] + update_ops
# self.train_op_final = tf.group(*train_ops)
with tf.name_scope("metrics"):
self.accuracy = tf.reduce_mean(
tf.cast(tf.equal(tf.arg_max(self.probs, 1), tf.arg_max(self.input_y, 1)), dtype=tf.float32))
if __name__ == '__main__':
train_x, train_y = pickle.load(
open("/home/myproject/BatchNormalizationTest/batch_normalization_test/data/train.pkl", "rb"))
test_x, test_y = pickle.load(
open("/home/myproject/BatchNormalizationTest/batch_normalization_test/data/test.pkl", "rb"))
tf.reset_default_graph()
model = Model(num_class=2, feature_size=15, weight_decay=0)
saver = tf.train.Saver(tf.global_variables(), max_to_keep=1)
# all_variable = tf.global_variables()
# for variable in all_variable:
# if "moving" in variable.name:
# print(variable.name, variable.eval())
BASIC_PATH = "/home/myproject/BatchNormalizationTest/batch_normalization_test"
with tf.Session() as sess:
init_op = tf.group(tf.global_variables_initializer())
sess.run(init_op)
train_batch = get_batch(10, 64, train_x, train_y)
train_loss_list = []
train_step_cnt = []
acc_list = []
val_feed_dict = {model.input_x: test_x, model.input_y: test_y, model.dropout_keep_prob: 1,
model.batch_normalization: False}
for batch in train_batch:
epoch, batch_x, batch_y = batch
feed_dict = {model.input_x: batch_x, model.input_y: batch_y, model.dropout_keep_prob: 1,
model.batch_normalization: True}
_, step, loss_train = sess.run([model.train_step, model.global_step, model.loss], feed_dict=feed_dict)
train_loss_list.append(loss_train)
train_step_cnt.append(step)
if step % 10 == 0:
print("epoch:", epoch + 1, "step:", step, "loss:", loss_train)
# ckpt
saver.save(sess, os.path.join(BASIC_PATH, "./ckpt2/ckpt"))
if step % 50 == 0:
loss_val, acc_val, probs = sess.run([model.loss, model.accuracy, model.probs], feed_dict=val_feed_dict)
print("{:-^30}".format("evaluation"))
print("[evaluation]", "loss:", loss_val, "acc", acc_val)
loss_val, acc_val, probs = sess.run([model.loss, model.accuracy, model.probs], feed_dict=val_feed_dict)
print("{:-^30}".format("evaluation"))
print("[evaluation]", "loss:", loss_val, "acc", acc_val)
import matplotlib.pyplot as plt
plt.plot(train_step_cnt, train_loss_list)
plt.ylim([0.25, 2])
plt.show()
# save
pb_num = str(int(time.time()))
pb_path = os.path.join(BASIC_PATH, "./tfserving2", pb_num)
shutil.rmtree(pb_path, ignore_errors=True)
tf.reset_default_graph()
with tf.Session() as sess:
last_ckpt = tf.train.latest_checkpoint(os.path.join(BASIC_PATH, "./ckpt2"))
print("读取ckpt: {}".format(last_ckpt))
saver = tf.train.import_meta_graph("{}.meta".format(last_ckpt))
saver.restore(sess, last_ckpt)
graph = tf.get_default_graph()
# get tensor
input_x = graph.get_tensor_by_name("input_x:0")
dropout_keep_prob = graph.get_tensor_by_name("dropout_keep_prob:0")
batch_norm_is_train = graph.get_tensor_by_name("batch_normalization:0")
pred = graph.get_tensor_by_name("layer_out/probs:0")
builder = tf.saved_model.builder.SavedModelBuilder(pb_path)
inputs = {'input_x': tf.saved_model.utils.build_tensor_info(input_x),
'dropout_keep_prob': tf.saved_model.utils.build_tensor_info(dropout_keep_prob),
'batch_norm': tf.saved_model.utils.build_tensor_info(batch_norm_is_train)
}
outputs = {'output': tf.saved_model.utils.build_tensor_info(pred)}
signature = tf.saved_model.signature_def_utils.build_signature_def(
inputs=inputs,
outputs=outputs,
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)
builder.add_meta_graph_and_variables(sess, [tag_constants.SERVING], {'my_signature': signature})
builder.save()
print("pb文件保存完成:", pb_num)
代码是两个全连接层,每层在线性之后加入bn,bn之后是relu,其中bn只使用均值偏移,不是用标准差,采用0.5的学习率运行结果如下
----------evaluation----------
[evaluation] loss: 0.48918334 acc 0.75525653
epoch: 10 step: 2210 loss: 0.38185906
epoch: 10 step: 2220 loss: 0.47259575
epoch: 10 step: 2230 loss: 0.62081766
epoch: 10 step: 2240 loss: 0.5432115
----------evaluation----------
[evaluation] loss: 0.49052832 acc 0.76843286
同样采用0.5的学习率,去除两个bn层之后训练结果如下
----------evaluation----------
[evaluation] loss: 0.6970511 acc 0.50602746
epoch: 10 step: 2210 loss: 0.71779585
epoch: 10 step: 2220 loss: 0.7110723
epoch: 10 step: 2230 loss: 0.72440666
epoch: 10 step: 2240 loss: 0.7036286
----------evaluation----------
[evaluation] loss: 0.6983369 acc 0.50602746
对比下有bn和没有bn在采用一个比较大的学习率的时候训练阶段网络的收敛情况,先看全部训练step,明显发现最左侧快速收敛阶段使用bn比不是用bn更薄,后期不是用bn loss直接起飞,估计梯度爆炸了
收敛对比1
再看前几轮快速收敛阶段,不是用bn前200轮还没有收敛到0.6以下,使用bn已经收敛到最好0.4维持在0.5左右
收敛对比2
如果学习率回到正常比如0.01,两个网络的效果没有明显区别
预测阶段和验证一样,设置一个布尔占位符给道False即可
import os
import pickle
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.python.saved_model import tag_constants
BASIC_PATH = "/home/myproject/BatchNormalizationTest/batch_normalization_test"
def predict_pb(input_x_value, pb_file_no=None):
"""从pb导入模型"""
max_time = pb_file_no
if max_time is None:
max_time = max(os.listdir(os.path.join(BASIC_PATH, "./tfserving2")))
# max_time = "1672132226"
print("读取pb版本:", max_time)
with tf.Session(graph=tf.Graph()) as sess:
tf.saved_model.loader.load(sess, [tag_constants.SERVING], os.path.join(BASIC_PATH, "./tfserving2", max_time))
graph = tf.get_default_graph()
input_x = graph.get_operation_by_name("input_x").outputs[0]
dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
batch_norm_is_train = graph.get_operation_by_name("batch_normalization").outputs[0]
probs = graph.get_tensor_by_name("layer_out/probs:0")
pred = sess.run(probs, feed_dict={input_x: input_x_value, dropout_keep_prob: 1.0, batch_norm_is_train: False})
return pred
if __name__ == '__main__':
test_x, test_y = pickle.load(
open("/home/myproject/BatchNormalizationTest/batch_normalization_test/data/test.pkl", "rb"))
pred = predict_pb(test_x).tolist()
pred = [0 if x[1] < 0.5 else 1 for x in pred]
test_y = [0 if x == [1, 0] else 1 for x in test_y]
print(accuracy_score(test_y, pred))
参考文章:
https://blog.csdn.net/qq_35556369/article/details/102779775
https://blog.csdn.net/chanbo8205/article/details/86591429
https://blog.csdn.net/weixin_39627201/article/details/110583187
网友评论