Batch Normalization 批标准化
搭建网络
输入需要的模块和定义网络的结构
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
ACTIVATION = tf.nn.relu # 每一层都使用 relu
N_LAYERS = 7 # 一共7层隐藏层
N_HIDDEN_UNITS = 30 # 每个层隐藏层有 30 个神经元
使用 build_net() 功能搭建神经网络:
def built_net(xs, ys, norm):
def add_layer(inputs, in_size, out_size, activation_function=None):
# 添加层功能
Weights = tf.Variable(tf.random_normal([in_size, out_size], mean=0., stddev=1.))
biases = tf.Variable(tf.zeros([1, out_size]) + 0.1)
Wx_plus_b = tf.matmul(inputs, Weights) + biases
if activation_function is None:
outputs = Wx_plus_b
else:
outputs = activation_function(Wx_plus_b)
return outputs
fix_seed(1)
layers_inputs = [xs] # 记录每层的 input
# loop 建立所有层
for l_n in range(N_LAYERS):
layer_input = layers_inputs[l_n]
in_size = layers_inputs[l_n].get_shape()[1].value
output = add_layer(
layer_input, # input
in_size, # input size
N_HIDDEN_UNITS, # output size
ACTIVATION, # activation function
)
layers_inputs.append(output) # 把 output 加入记录
# 建立 output layer
prediction = add_layer(layers_inputs[-1], 30, 1, activation_function=None)
cost = tf.reduce_mean(tf.reduce_sum(tf.square(ys - prediction), reduction_indices=[1]))
train_op = tf.train.GradientDescentOptimizer(0.001).minimize(cost)
return [train_op, cost, layers_inputs]
创建数据
创造数据并可视化数据:
x_data = np.linspace(-7, 10, 500)[:, np.newaxis]
noise = np.random.normal(0, 8, x_data.shape)
y_data = np.square(x_data) - 5 + noise
# 可视化 input data
plt.scatter(x_data, y_data)
plt.show()
Batch Normalization 代码
给 built_net 和 add_layer 都加上 norm 参数, 表示是否是 Batch Normalization 层:
def built_net(xs, ys, norm):
def add_layer(inputs, in_size, out_size, activation_function=None, norm=False):
每层的 Wx_plus_b 需要进行一次 batch normalize 的步骤, 这样输出到 activation 的 Wx_plus_b 就已经被 normalize 过了:
if norm: # 判断书否是 BN 层
fc_mean, fc_var = tf.nn.moments(
Wx_plus_b,
axes=[0], # 想要 normalize 的维度, [0] 代表 batch 维度
# 如果是图像数据, 可以传入 [0, 1, 2], 相当于求[batch, height, width] 的均值/方差, 注意不要加入 channel 维度
)
scale = tf.Variable(tf.ones([out_size]))
shift = tf.Variable(tf.zeros([out_size]))
epsilon = 0.001
Wx_plus_b = tf.nn.batch_normalization(Wx_plus_b, fc_mean, fc_var, shift, scale, epsilon)
# 上面那一步, 在做如下事情:
# Wx_plus_b = (Wx_plus_b - fc_mean) / tf.sqrt(fc_var + 0.001)
# Wx_plus_b = Wx_plus_b * scale + shift
如果使用 batch 进行每次的更新, 那每个 batch 的 mean/var 都会不同, 可以使用 moving average 的方法记录并慢慢改进 mean/var
的值. 然后将修改提升后的 mean/var 放入 tf.nn.batch_normalization().
在 test 阶段, 可以直接调用最后一次修改的 mean/var 值进行测试, 而不是采用 test 时的 fc_mean/fc_var.
对这句进行扩充, 修改前:
Wx_plus_b = tf.nn.batch_normalization(Wx_plus_b, fc_mean, fc_var, shift, scale, epsilon)
# 修改后:
ema = tf.train.ExponentialMovingAverage(decay=0.5) # exponential moving average 的 decay 度
def mean_var_with_update():
ema_apply_op = ema.apply([fc_mean, fc_var])
with tf.control_dependencies([ema_apply_op]):
return tf.identity(fc_mean), tf.identity(fc_var)
mean, var = mean_var_with_update() # 根据新的 batch 数据, 记录并稍微修改之前的 mean/var
# 将修改后的 mean / var 放入下面的公式
Wx_plus_b = tf.nn.batch_normalization(Wx_plus_b, mean, var, shift, scale, epsilon)
输入数据 xs 时, 给它做一个 normalization:
if norm:
# BN for the first input
fc_mean, fc_var = tf.nn.moments(
xs,
axes=[0],
)
scale = tf.Variable(tf.ones([1]))
shift = tf.Variable(tf.zeros([1]))
epsilon = 0.001
xs = tf.nn.batch_normalization(xs, fc_mean, fc_var, shift, scale, epsilon)
在建立网络的循环中的这一步加入 norm 这个参数:
output = add_layer(
layer_input, # input
in_size, # input size
N_HIDDEN_UNITS, # output size
ACTIVATION, # activation function
norm, # normalize before activation
)
对比有无 BN
搭建两个神经网络, 一个没有 BN, 一个有 BN:
xs = tf.placeholder(tf.float32, [None, 1]) # [num_samples, num_features]
ys = tf.placeholder(tf.float32, [None, 1])
train_op, cost, layers_inputs = built_net(xs, ys, norm=False) # without BN
train_op_norm, cost_norm, layers_inputs_norm = built_net(xs, ys, norm=True) # with BN
def fix_seed(seed=1):
np.random.seed(seed)
tf.set_random_seed(seed)
def plot_his(inputs,inputs_norm):
for j,all_inputs in enumerate([inputs,inputs_norm]):
for i, input in enumerate(all_inputs):
plt.subplot(2,len(all_inputs),j*len(all_inputs)+(i+1))
plt.cla()
if i==0:
the_range=(-7,10)
else:
the_range=(-1,1)
plt.hist(input.ravel(),bins=15,range=the_range,color="#FF5733")
plt.yticks(())
if j==1:
plt.xticks(the_range)
else:
plt.xticks(())
ax=plt.gca()
ax.spines['right'].set_color('none')
ax.spines['top'].set_color('none')
plt.title('%s nomalizing'%('Without'if j==0 else 'With'))
plt.draw()
plt.pause(.01)
训练神经网络:
sess = tf.Session()
sess.run(tf.global_variables_initializer())
# 记录两种网络的 cost 变化
cost_his = []
cost_his_norm = []
record_step = 5
plt.ion()
plt.figure(figsize=(7, 3))
for i in range(251):
if i % 50 == 0:
# 每层在 activation 之前计算结果值的分布
all_inputs, all_inputs_norm = sess.run([layers_inputs, layers_inputs_norm], feed_dict={xs: x_data, ys: y_data})
plot_his(all_inputs, all_inputs_norm)
sess.run(train_op, feed_dict={xs: x_data, ys: y_data})
sess.run(train_op_norm, feed_dict={xs: x_data, ys: y_data})
if i % record_step == 0:
# 记录 cost
cost_his.append(sess.run(cost, feed_dict={xs: x_data, ys: y_data}))
cost_his_norm.append(sess.run(cost_norm, feed_dict={xs: x_data, ys: y_data}))
plt.ioff()
plt.figure()
plt.plot(np.arange(len(cost_his))*record_step, np.array(cost_his), label='no BN') # no norm
plt.plot(np.arange(len(cost_his))*record_step, np.array(cost_his_norm), label='BN') # norm
plt.legend()
plt.show()
relu cost 的对比:
没有使用 NB 的网络, 大部分神经元都死了, 所以连误差曲线都没了
tanh:
tanh 的误差对比:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
tf.set_random_seed(1)
np.random.seed(1)
# Hyper parameters
N_SAMPLES = 2000
BATCH_SIZE = 64
EPOCH = 12
LR = 0.03
N_HIDDEN = 8
ACTIVATION = tf.nn.tanh
B_INIT = tf.constant_initializer(-0.2) # use a bad bias initialization
# training data
x = np.linspace(-7, 10, N_SAMPLES)[:, np.newaxis]
np.random.shuffle(x)
noise = np.random.normal(0, 2, x.shape)
y = np.square(x) - 5 + noise
train_data = np.hstack((x, y))
# test data
test_x = np.linspace(-7, 10, 200)[:, np.newaxis]
noise = np.random.normal(0, 2, test_x.shape)
test_y = np.square(test_x) - 5 + noise
# plot input data
plt.scatter(x, y, c='#FF9359', s=50, alpha=0.5, label='train')
plt.legend(loc='upper left')
# tensorflow placeholder
tf_x = tf.placeholder(tf.float32, [None, 1])
tf_y = tf.placeholder(tf.float32, [None, 1])
tf_is_train = tf.placeholder(tf.bool, None) # flag for using BN on training or testing
class NN(object):
def __init__(self, batch_normalization=False):
self.is_bn = batch_normalization
self.w_init = tf.random_normal_initializer(0., .1) # weights initialization
self.pre_activation = [tf_x]
if self.is_bn:
self.layer_input = [tf.layers.batch_normalization(tf_x, training=tf_is_train)] # for input data
else:
self.layer_input = [tf_x]
for i in range(N_HIDDEN): # adding hidden layers
self.layer_input.append(self.add_layer(self.layer_input[-1], 10, ac=ACTIVATION))
self.out = tf.layers.dense(self.layer_input[-1], 1, kernel_initializer=self.w_init, bias_initializer=B_INIT)
self.loss = tf.losses.mean_squared_error(tf_y, self.out)
# !! IMPORTANT !! the moving_mean and moving_variance need to be updated,
# pass the update_ops with control_dependencies to the train_op
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
self.train = tf.train.AdamOptimizer(LR).minimize(self.loss)
def add_layer(self, x, out_size, ac=None):
x = tf.layers.dense(x, out_size, kernel_initializer=self.w_init, bias_initializer=B_INIT)
self.pre_activation.append(x)
# the momentum plays important rule. the default 0.99 is too high in this case!
if self.is_bn:
x = tf.layers.batch_normalization(x, momentum=0.4, training=tf_is_train) # when have BN
out = x if ac is None else ac(x)
return out
nets = [NN(batch_normalization=False), NN(batch_normalization=True)] # two nets, with and without BN
sess = tf.Session()
sess.run(tf.global_variables_initializer())
# plot layer input distribution
f, axs = plt.subplots(4, N_HIDDEN+1, figsize=(10, 5))
plt.ion() # something about plotting
def plot_histogram(l_in, l_in_bn, pre_ac, pre_ac_bn):
for i, (ax_pa, ax_pa_bn, ax, ax_bn) in enumerate(zip(axs[0, :], axs[1, :], axs[2, :], axs[3, :])):
[a.clear() for a in [ax_pa, ax_pa_bn, ax, ax_bn]]
if i == 0:
p_range = (-7, 10)
the_range = (-7, 10)
else:
p_range = (-4, 4)
the_range = (-1, 1)
ax_pa.set_title('L' + str(i))
ax_pa.hist(pre_ac[i].ravel(), bins=10, range=p_range, color='#FF9359', alpha=0.5)
ax_pa_bn.hist(pre_ac_bn[i].ravel(), bins=10, range=p_range, color='#74BCFF', alpha=0.5)
ax.hist(l_in[i].ravel(), bins=10, range=the_range, color='#FF9359')
ax_bn.hist(l_in_bn[i].ravel(), bins=10, range=the_range, color='#74BCFF')
for a in [ax_pa, ax, ax_pa_bn, ax_bn]:
a.set_yticks(())
a.set_xticks(())
ax_pa_bn.set_xticks(p_range)
ax_bn.set_xticks(the_range)
axs[0,0].set_ylabel('Pre_Ac')
axs[1,0].set_ylabel('pre_Ac_BN')
axs[2, 0].set_ylabel('Act')
axs[3, 0].set_ylabel('BN Act')
plt.pause(0.01)
losses = [[], []] # record test loss
for epoch in range(EPOCH):
print('Epoch: ', epoch)
np.random.shuffle(train_data)
step = 0
in_epoch = True
while in_epoch:
b_s, b_f = (step*BATCH_SIZE) % len(train_data), ((step+1)*BATCH_SIZE) % len(train_data) # batch index
step += 1
if b_f < b_s:
# print('b_f:',b_f,'\nb_s:',b_s)
b_f = len(train_data)
in_epoch = False
# print('\nstep:',step)
b_x, b_y = train_data[b_s: b_f, 0:1], train_data[b_s: b_f, 1:2] # batch training data
sess.run([nets[0].train, nets[1].train], {tf_x: b_x, tf_y: b_y, tf_is_train: True}) # train
if step == 1:
loss0, loss1, l_in, l_in_bn, pa, pa_bn = sess.run(
[nets[0].loss, nets[1].loss, nets[0].layer_input, nets[1].layer_input,
nets[0].pre_activation, nets[1].pre_activation],
{tf_x: test_x, tf_y: test_y, tf_is_train: False})
[loss.append(l) for loss, l in zip(losses, [loss0, loss1])] # recode test loss
plot_histogram(l_in, l_in_bn, pa, pa_bn) # plot histogram
print(losses)
plt.ioff()
# plot test loss
plt.figure(3)
plt.plot(losses[0], c='#FF9359', lw=3, label='Original')
plt.plot(losses[1], c='#74BCFF', lw=3, label='Batch Normalization')
plt.ylabel('test loss')
plt.ylim((0, 2000))
plt.legend(loc='best')
# plot prediction line
pred, pred_bn = sess.run([nets[0].out, nets[1].out], {tf_x: test_x, tf_is_train: False})
# print(list(zip(list(pred[120:130]),list(pred_bn[120:130]),list(y[120:130]))))
plt.figure(4)
plt.plot(test_x, pred, c='#FF9359', lw=4, label='Original')
plt.plot(test_x, pred_bn, c='#74BCFF', lw=4, label='Batch Normalization')
plt.scatter(x[:200], y[:200], c='r', s=50, alpha=0.2, label='train')
plt.legend(loc='best')
plt.show()
Last updated