准备数据:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import tensorflow as tf
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
# 正负样本数量
n_positive, n_negative = 2000, 2000
#生成正样本, 小圆环分布
r_p = 5.0 + tf.random.truncated_normal([n_positive, 1], 0.0, 1.0)
theta_p = tf.random.uniform([n_positive, 1], 0.0, 2*np.pi)
Xp = tf.concat([r_p*tf.cos(theta_p), r_p*tf.sin(theta_p)], axis=1)
Yp = tf.ones_like(r_p)
#生成负样本, 大圆环分布
r_n = 8.0 + tf.random.truncated_normal([n_negative, 1], 0.0, 1.0)
theta_n = tf.random.uniform([n_negative, 1], 0.0, 2*np.pi)
Xn = tf.concat([r_n*tf.cos(theta_n), r_n*tf.sin(theta_n)], axis=1)
Yn = tf.zeros_like(r_n)
# 汇总样本
X = tf.concat([Xp, Xn], axis=0)
Y = tf.concat([Yp, Yn], axis=0)
# 可视化
plt.figure(figsize=(6, 6))
plt.scatter(Xp[:, 0].numpy(), Xp[:, 1].numpy(), c="r")
plt.scatter(Xn[:, 0].numpy(), Xn[:, 1].numpy(), c="g")
plt.legend(["positive", "negative"])
plt.show()
效果:
1 低阶 API
构建数据管道迭代器:
def data_iter(features, labels, batch_size=8):
num_examples = len(features)
indices = list(range(num_examples))
np.random.shuffle(indices) #样本的读取顺序是随机的
for i in range(0, num_examples, batch_size):
indexs = indices[i: min(i + batch_size, num_examples)]
yield tf.gather(X,indexs), tf.gather(Y,indexs)
# 测试数据管道效果
batch_size = 10
(features,labels) = next(data_iter(X,Y,batch_size))
print(features)
print(labels)
输出:
tf.Tensor(
[[ 7.63374 -2.4995987 ]
[ 3.5261695 2.8121672 ]
[ 6.926017 0.43988833]
[ 9.417864 -0.8176333 ]
[ 8.488339 -3.2209659 ]
[ 3.745992 -0.43044332]
[ 4.44868 6.7732778 ]
[ 2.578557 2.2084076 ]
[-1.4551258 -4.423701 ]
[ 8.7067995 -1.698387 ]], shape=(10, 2), dtype=float32)
tf.Tensor(
[[0.]
[1.]
[0.]
[0.]
[0.]
[1.]
[0.]
[1.]
[1.]
[0.]], shape=(10, 1), dtype=float32)
此处范例我们利用 tf.Module
来组织模型变量:
class DNNModel(tf.Module):
def __init__(self, name=None):
super().__init__(name=name)
self.w1 = tf.Variable(
tf.random.truncated_normal([2, 4]), dtype=tf.float32)
self.b1 = tf.Variable(tf.zeros([1, 4]), dtype=tf.float32)
self.w2 = tf.Variable(
tf.random.truncated_normal([4, 8]), dtype=tf.float32)
self.b2 = tf.Variable(tf.zeros([1, 8]), dtype=tf.float32)
self.w3 = tf.Variable(
tf.random.truncated_normal([8, 1]), dtype=tf.float32)
self.b3 = tf.Variable(tf.zeros([1, 1]), dtype=tf.float32)
# 正向传播
@tf.function(input_signature=[tf.TensorSpec(shape=[None, 2], dtype=tf.float32)])
def __call__(self, x):
x = tf.nn.relu(x@self.w1 + self.b1)
x = tf.nn.relu(x@self.w2 + self.b2)
y = tf.nn.sigmoid(x@self.w3 + self.b3)
return y
# 损失函数(二元交叉熵)
@tf.function(input_signature=[tf.TensorSpec(shape=[None, 1], dtype=tf.float32),
tf.TensorSpec(shape=[None, 1], dtype=tf.float32)])
def loss_func(self, y_true, y_pred):
# 将预测值限制在 1e-7 以上, 1 - 1e-7 以下,避免log(0)错误
eps = 1e-7
y_pred = tf.clip_by_value(y_pred, eps, 1.0-eps)
bce = - y_true*tf.math.log(y_pred) - (1-y_true)*tf.math.log(1-y_pred)
return tf.reduce_mean(bce)
# 评估指标(准确率)
@tf.function(input_signature=[tf.TensorSpec(shape=[None, 1], dtype=tf.float32),
tf.TensorSpec(shape=[None, 1], dtype=tf.float32)])
def metric_func(self, y_true, y_pred):
y_pred = tf.where(y_pred > 0.5, tf.ones_like(y_pred, dtype=tf.float32),
tf.zeros_like(y_pred, dtype=tf.float32))
acc = tf.reduce_mean(1-tf.abs(y_true-y_pred))
return acc
model = DNNModel()
测试模型结构:
batch_size = 10
features, labels = next(data_iter(X, Y, batch_size))
predictions = model(features)
loss = model.loss_func(labels, predictions)
metric = model.metric_func(labels, predictions)
tf.print("init loss:", loss)
tf.print("init metric", metric)
print(len(model.trainable_variables))
输出:
init loss: 0.889420033
init metric 0.6
6
使用 autograph 机制转换成静态图加速:
# 打印时间分割线
@tf.function
def printbar():
today_ts = tf.timestamp() % (24*60*60)
hour = tf.cast(today_ts//3600+8, tf.int32) % tf.constant(24)
minite = tf.cast((today_ts % 3600)//60, tf.int32)
second = tf.cast(tf.floor(today_ts % 60), tf.int32)
def timeformat(m):
if tf.strings.length(tf.strings.format("{}", m)) == 1:
return(tf.strings.format("0{}", m))
else:
return(tf.strings.format("{}", m))
timestring = tf.strings.join([timeformat(hour), timeformat(minite),
timeformat(second)], separator=":")
tf.print("=========="*8+timestring)
@tf.function
def train_step(model, features, labels):
# 正向传播求损失
with tf.GradientTape() as tape:
predictions = model(features)
loss = model.loss_func(labels, predictions)
# 反向传播求梯度
grads = tape.gradient(loss, model.trainable_variables)
# 执行梯度下降
for p, dloss_dp in zip(model.trainable_variables, grads):
p.assign(p - 0.001*dloss_dp)
# 计算评估指标
metric = model.metric_func(labels, predictions)
return loss, metric
def train_model(model, epochs):
for epoch in tf.range(1, epochs+1):
for features, labels in data_iter(X, Y, 100):
loss, metric = train_step(model, features, labels)
if epoch % 100 == 0:
printbar()
tf.print("epoch =", epoch, "loss = ", loss, "accuracy = ", metric)
train_model(model, epochs=600)
输出:
================================================================================16:50:26
epoch = 100 loss = 0.556310713 accuracy = 0.7
================================================================================16:50:28
epoch = 200 loss = 0.405847311 accuracy = 0.86
================================================================================16:50:30
epoch = 300 loss = 0.467671931 accuracy = 0.75
================================================================================16:50:32
epoch = 400 loss = 0.426428646 accuracy = 0.85
================================================================================16:50:34
epoch = 500 loss = 0.360130191 accuracy = 0.84
================================================================================16:50:36
epoch = 600 loss = 0.361137211 accuracy = 0.84
结果可视化:
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))
ax1.scatter(Xp[:, 0], Xp[:, 1], c="r")
ax1.scatter(Xn[:, 0], Xn[:, 1], c="g")
ax1.legend(["positive", "negative"])
ax1.set_title("y_true")
Xp_pred = tf.boolean_mask(X, tf.squeeze(model(X) >= 0.5), axis=0)
Xn_pred = tf.boolean_mask(X, tf.squeeze(model(X) < 0.5), axis=0)
ax2.scatter(Xp_pred[:, 0], Xp_pred[:, 1], c="r")
ax2.scatter(Xn_pred[:, 0], Xn_pred[:, 1], c="g")
ax2.legend(["positive", "negative"])
ax2.set_title("y_pred")
效果:
2 中级 API
构建模型:
from tensorflow.keras import layers, losses, metrics, optimizers
class DNNModel(tf.Module):
def __init__(self, name=None):
super().__init__(name=name)
self.dense1 = layers.Dense(4, activation="relu")
self.dense2 = layers.Dense(8, activation="relu")
self.dense3 = layers.Dense(1, activation="sigmoid")
# 正向传播
@tf.function(input_signature=[tf.TensorSpec(shape=[None, 2], dtype=tf.float32)])
def __call__(self, x):
x = self.dense1(x)
x = self.dense2(x)
y = self.dense3(x)
return y
model = DNNModel()
model.loss_func = losses.binary_crossentropy
model.metric_func = metrics.binary_accuracy
model.optimizer = optimizers.Adam(learning_rate=0.001)
测试模型结构:
# 构建输入数据管道
ds = tf.data.Dataset.from_tensor_slices((X, Y)) \
.shuffle(buffer_size=4000).batch(100) \
.prefetch(tf.data.experimental.AUTOTUNE)
features, labels = next(ds.as_numpy_iterator())
predictions = model(features)
loss = model.loss_func(tf.reshape(labels, [-1]), tf.reshape(predictions, [-1]))
metric = model.metric_func(tf.reshape(
labels, [-1]), tf.reshape(predictions, [-1]))
tf.print("init loss:", loss)
tf.print("init metric", metric)
输出:
init loss: 0.930841148
init metric 0.51
训练模型:
@tf.function
def train_step(model, features, labels):
with tf.GradientTape() as tape:
predictions = model(features)
loss = model.loss_func(tf.reshape(
labels, [-1]), tf.reshape(predictions, [-1]))
grads = tape.gradient(loss, model.trainable_variables)
model.optimizer.apply_gradients(zip(grads, model.trainable_variables))
metric = model.metric_func(tf.reshape(
labels, [-1]), tf.reshape(predictions, [-1]))
return loss, metric
def train_model(model, epochs):
for epoch in tf.range(1, epochs+1):
loss, metric = tf.constant(0.0), tf.constant(0.0)
for features, labels in ds:
loss, metric = train_step(model, features, labels)
if epoch % 10 == 0:
printbar()
tf.print("epoch =", epoch, "loss = ", loss, "accuracy = ", metric)
train_model(model, epochs=60)
输出:
================================================================================17:01:42
epoch = 10 loss = 0.0938826576 accuracy = 0.96
================================================================================17:01:42
epoch = 20 loss = 0.0858769417 accuracy = 0.96
================================================================================17:01:42
epoch = 30 loss = 0.126385167 accuracy = 0.94
================================================================================17:01:43
epoch = 40 loss = 0.0790566728 accuracy = 0.96
================================================================================17:01:43
epoch = 50 loss = 0.0721534416 accuracy = 0.97
================================================================================17:01:43
epoch = 60 loss = 0.105751008 accuracy = 0.96
结果可视化:
fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
ax1.legend(["positive","negative"]);
ax1.set_title("y_true");
Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0)
Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0)
ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r")
ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g")
ax2.legend(["positive","negative"]);
ax2.set_title("y_pred");
效果:
3 高级 API
数据构建:
n = len(X)
ds_train = tf.data.Dataset.from_tensor_slices((X[0:n*3//4,:],Y[0:n*3//4,:])) \
.shuffle(buffer_size = 1000).batch(20) \
.prefetch(tf.data.experimental.AUTOTUNE) \
.cache()
ds_valid = tf.data.Dataset.from_tensor_slices((X[n*3//4:,:],Y[n*3//4:,:])) \
.batch(20) \
.prefetch(tf.data.experimental.AUTOTUNE) \
.cache()
定义模型:
from tensorflow.keras import layers, losses, metrics, optimizers, models
tf.keras.backend.clear_session()
class DNNModel(models.Model):
def __init__(self):
super().__init__()
def build(self,input_shape):
self.dense1 = layers.Dense(4,activation = "relu",name = "dense1")
self.dense2 = layers.Dense(8,activation = "relu",name = "dense2")
self.dense3 = layers.Dense(1,activation = "sigmoid",name = "dense3")
super(DNNModel,self).build(input_shape)
# 正向传播
@tf.function(input_signature=[tf.TensorSpec(shape = [None,2], dtype = tf.float32)])
def call(self,x):
x = self.dense1(x)
x = self.dense2(x)
y = self.dense3(x)
return y
model = DNNModel()
model.build(input_shape =(None,2))
model.summary()
输出:
Model: "dnn_model"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense1 (Dense) multiple 12
_________________________________________________________________
dense2 (Dense) multiple 40
_________________________________________________________________
dense3 (Dense) multiple 9
=================================================================
Total params: 61
Trainable params: 61
Non-trainable params: 0
训练模型:
### 自定义训练循环
optimizer = optimizers.Adam(learning_rate=0.01)
loss_func = tf.keras.losses.BinaryCrossentropy()
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_metric = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')
valid_loss = tf.keras.metrics.Mean(name='valid_loss')
valid_metric = tf.keras.metrics.BinaryAccuracy(name='valid_accuracy')
@tf.function
def train_step(model, features, labels):
with tf.GradientTape() as tape:
predictions = model(features)
loss = loss_func(labels, predictions)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
train_loss.update_state(loss)
train_metric.update_state(labels, predictions)
@tf.function
def valid_step(model, features, labels):
predictions = model(features)
batch_loss = loss_func(labels, predictions)
valid_loss.update_state(batch_loss)
valid_metric.update_state(labels, predictions)
def train_model(model,ds_train,ds_valid,epochs):
for epoch in tf.range(1,epochs+1):
for features, labels in ds_train:
train_step(model,features,labels)
for features, labels in ds_valid:
valid_step(model,features,labels)
logs = 'Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'
if epoch%100 ==0:
printbar()
tf.print(tf.strings.format(logs,
(epoch,train_loss.result(),train_metric.result(),valid_loss.result(),valid_metric.result())))
train_loss.reset_states()
valid_loss.reset_states()
train_metric.reset_states()
valid_metric.reset_states()
train_model(model,ds_train,ds_valid,1000)
输出:
================================================================================08:04:33
Epoch=100,Loss:0.100245498,Accuracy:0.956,Valid Loss:0.0823013112,Valid Accuracy:0.96
================================================================================08:04:42
Epoch=200,Loss:0.0889734551,Accuracy:0.959666669,Valid Loss:0.0959472954,Valid Accuracy:0.956
================================================================================08:04:50
Epoch=300,Loss:0.0859184787,Accuracy:0.959666669,Valid Loss:0.100719072,Valid Accuracy:0.952
================================================================================08:04:59
Epoch=400,Loss:0.0843160897,Accuracy:0.960666656,Valid Loss:0.104706556,Valid Accuracy:0.952
================================================================================08:05:08
Epoch=500,Loss:0.0835563391,Accuracy:0.961,Valid Loss:0.107334398,Valid Accuracy:0.952
================================================================================08:05:17
Epoch=600,Loss:0.0829177722,Accuracy:0.962,Valid Loss:0.109648846,Valid Accuracy:0.952
================================================================================08:05:27
Epoch=700,Loss:0.0824796259,Accuracy:0.961666644,Valid Loss:0.111882597,Valid Accuracy:0.951
================================================================================08:05:36
Epoch=800,Loss:0.0822021216,Accuracy:0.961666644,Valid Loss:0.113583572,Valid Accuracy:0.951
================================================================================08:05:45
Epoch=900,Loss:0.082076259,Accuracy:0.961666644,Valid Loss:0.115141563,Valid Accuracy:0.95
================================================================================08:05:54
Epoch=1000,Loss:0.0819547623,Accuracy:0.962,Valid Loss:0.116600387,Valid Accuracy:0.949
可视化:
fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
ax1.legend(["positive","negative"]);
ax1.set_title("y_true");
Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0)
Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0)
ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r")
ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g")
ax2.legend(["positive","negative"]);
ax2.set_title("y_pred");
显示:
网友评论