美文网首页
TensorFlow2 不同层次的建模方法

TensorFlow2 不同层次的建模方法

作者: 水之心 | 来源:发表于2020-09-10 08:07 被阅读0次

    准备数据:

    import numpy as np
    import pandas as pd
    from matplotlib import pyplot as plt
    import tensorflow as tf
    %matplotlib inline
    %config InlineBackend.figure_format = 'svg'
    
    # 正负样本数量
    n_positive, n_negative = 2000, 2000
    
    #生成正样本, 小圆环分布
    r_p = 5.0 + tf.random.truncated_normal([n_positive, 1], 0.0, 1.0)
    theta_p = tf.random.uniform([n_positive, 1], 0.0, 2*np.pi)
    Xp = tf.concat([r_p*tf.cos(theta_p), r_p*tf.sin(theta_p)], axis=1)
    Yp = tf.ones_like(r_p)
    
    #生成负样本, 大圆环分布
    r_n = 8.0 + tf.random.truncated_normal([n_negative, 1], 0.0, 1.0)
    theta_n = tf.random.uniform([n_negative, 1], 0.0, 2*np.pi)
    Xn = tf.concat([r_n*tf.cos(theta_n), r_n*tf.sin(theta_n)], axis=1)
    Yn = tf.zeros_like(r_n)
    
    # 汇总样本
    X = tf.concat([Xp, Xn], axis=0)
    Y = tf.concat([Yp, Yn], axis=0)
    
    
    # 可视化
    plt.figure(figsize=(6, 6))
    plt.scatter(Xp[:, 0].numpy(), Xp[:, 1].numpy(), c="r")
    plt.scatter(Xn[:, 0].numpy(), Xn[:, 1].numpy(), c="g")
    plt.legend(["positive", "negative"])
    plt.show()
    

    效果:

    1 低阶 API

    构建数据管道迭代器:

    def data_iter(features, labels, batch_size=8):
        num_examples = len(features)
        indices = list(range(num_examples))
        np.random.shuffle(indices)  #样本的读取顺序是随机的
        for i in range(0, num_examples, batch_size):
            indexs = indices[i: min(i + batch_size, num_examples)]
            yield tf.gather(X,indexs), tf.gather(Y,indexs)
    
    # 测试数据管道效果   
    batch_size = 10
    (features,labels) = next(data_iter(X,Y,batch_size))
    print(features)
    print(labels)
    

    输出:

    tf.Tensor(
    [[ 7.63374    -2.4995987 ]
     [ 3.5261695   2.8121672 ]
     [ 6.926017    0.43988833]
     [ 9.417864   -0.8176333 ]
     [ 8.488339   -3.2209659 ]
     [ 3.745992   -0.43044332]
     [ 4.44868     6.7732778 ]
     [ 2.578557    2.2084076 ]
     [-1.4551258  -4.423701  ]
     [ 8.7067995  -1.698387  ]], shape=(10, 2), dtype=float32)
    tf.Tensor(
    [[0.]
     [1.]
     [0.]
     [0.]
     [0.]
     [1.]
     [0.]
     [1.]
     [1.]
     [0.]], shape=(10, 1), dtype=float32)
    

    此处范例我们利用 tf.Module 来组织模型变量:

    class DNNModel(tf.Module):
        def __init__(self, name=None):
            super().__init__(name=name)
            self.w1 = tf.Variable(
                tf.random.truncated_normal([2, 4]), dtype=tf.float32)
            self.b1 = tf.Variable(tf.zeros([1, 4]), dtype=tf.float32)
            self.w2 = tf.Variable(
                tf.random.truncated_normal([4, 8]), dtype=tf.float32)
            self.b2 = tf.Variable(tf.zeros([1, 8]), dtype=tf.float32)
            self.w3 = tf.Variable(
                tf.random.truncated_normal([8, 1]), dtype=tf.float32)
            self.b3 = tf.Variable(tf.zeros([1, 1]), dtype=tf.float32)
    
        # 正向传播
        @tf.function(input_signature=[tf.TensorSpec(shape=[None, 2], dtype=tf.float32)])
        def __call__(self, x):
            x = tf.nn.relu(x@self.w1 + self.b1)
            x = tf.nn.relu(x@self.w2 + self.b2)
            y = tf.nn.sigmoid(x@self.w3 + self.b3)
            return y
    
        # 损失函数(二元交叉熵)
        @tf.function(input_signature=[tf.TensorSpec(shape=[None, 1], dtype=tf.float32),
                                      tf.TensorSpec(shape=[None, 1], dtype=tf.float32)])
        def loss_func(self, y_true, y_pred):
            # 将预测值限制在 1e-7 以上, 1 - 1e-7 以下,避免log(0)错误
            eps = 1e-7
            y_pred = tf.clip_by_value(y_pred, eps, 1.0-eps)
            bce = - y_true*tf.math.log(y_pred) - (1-y_true)*tf.math.log(1-y_pred)
            return tf.reduce_mean(bce)
    
        # 评估指标(准确率)
        @tf.function(input_signature=[tf.TensorSpec(shape=[None, 1], dtype=tf.float32),
                                      tf.TensorSpec(shape=[None, 1], dtype=tf.float32)])
        def metric_func(self, y_true, y_pred):
            y_pred = tf.where(y_pred > 0.5, tf.ones_like(y_pred, dtype=tf.float32),
                              tf.zeros_like(y_pred, dtype=tf.float32))
            acc = tf.reduce_mean(1-tf.abs(y_true-y_pred))
            return acc
    
    
    model = DNNModel()
    

    测试模型结构:

    batch_size = 10
    features, labels = next(data_iter(X, Y, batch_size))
    
    predictions = model(features)
    
    loss = model.loss_func(labels, predictions)
    metric = model.metric_func(labels, predictions)
    
    tf.print("init loss:", loss)
    tf.print("init metric", metric)
    print(len(model.trainable_variables))
    

    输出:

    init loss: 0.889420033
    init metric 0.6
    6
    

    使用 autograph 机制转换成静态图加速:

    # 打印时间分割线
    @tf.function
    def printbar():
        today_ts = tf.timestamp() % (24*60*60)
    
        hour = tf.cast(today_ts//3600+8, tf.int32) % tf.constant(24)
        minite = tf.cast((today_ts % 3600)//60, tf.int32)
        second = tf.cast(tf.floor(today_ts % 60), tf.int32)
    
        def timeformat(m):
            if tf.strings.length(tf.strings.format("{}", m)) == 1:
                return(tf.strings.format("0{}", m))
            else:
                return(tf.strings.format("{}", m))
    
        timestring = tf.strings.join([timeformat(hour), timeformat(minite),
                                      timeformat(second)], separator=":")
        tf.print("=========="*8+timestring)
    
    @tf.function
    def train_step(model, features, labels):
    
        # 正向传播求损失
        with tf.GradientTape() as tape:
            predictions = model(features)
            loss = model.loss_func(labels, predictions)
    
        # 反向传播求梯度
        grads = tape.gradient(loss, model.trainable_variables)
    
        # 执行梯度下降
        for p, dloss_dp in zip(model.trainable_variables, grads):
            p.assign(p - 0.001*dloss_dp)
    
        # 计算评估指标
        metric = model.metric_func(labels, predictions)
    
        return loss, metric
    
    
    def train_model(model, epochs):
        for epoch in tf.range(1, epochs+1):
            for features, labels in data_iter(X, Y, 100):
                loss, metric = train_step(model, features, labels)
            if epoch % 100 == 0:
                printbar()
                tf.print("epoch =", epoch, "loss = ", loss, "accuracy = ", metric)
    
    
    train_model(model, epochs=600)
    

    输出:

    ================================================================================16:50:26
    epoch = 100 loss =  0.556310713 accuracy =  0.7
    ================================================================================16:50:28
    epoch = 200 loss =  0.405847311 accuracy =  0.86
    ================================================================================16:50:30
    epoch = 300 loss =  0.467671931 accuracy =  0.75
    ================================================================================16:50:32
    epoch = 400 loss =  0.426428646 accuracy =  0.85
    ================================================================================16:50:34
    epoch = 500 loss =  0.360130191 accuracy =  0.84
    ================================================================================16:50:36
    epoch = 600 loss =  0.361137211 accuracy =  0.84
    

    结果可视化:

    fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))
    ax1.scatter(Xp[:, 0], Xp[:, 1], c="r")
    ax1.scatter(Xn[:, 0], Xn[:, 1], c="g")
    ax1.legend(["positive", "negative"])
    ax1.set_title("y_true")
    
    Xp_pred = tf.boolean_mask(X, tf.squeeze(model(X) >= 0.5), axis=0)
    Xn_pred = tf.boolean_mask(X, tf.squeeze(model(X) < 0.5), axis=0)
    
    ax2.scatter(Xp_pred[:, 0], Xp_pred[:, 1], c="r")
    ax2.scatter(Xn_pred[:, 0], Xn_pred[:, 1], c="g")
    ax2.legend(["positive", "negative"])
    ax2.set_title("y_pred")
    

    效果:

    2 中级 API

    构建模型:

    from tensorflow.keras import layers, losses, metrics, optimizers
    
    class DNNModel(tf.Module):
        def __init__(self, name=None):
            super().__init__(name=name)
            self.dense1 = layers.Dense(4, activation="relu")
            self.dense2 = layers.Dense(8, activation="relu")
            self.dense3 = layers.Dense(1, activation="sigmoid")
    
        # 正向传播
        @tf.function(input_signature=[tf.TensorSpec(shape=[None, 2], dtype=tf.float32)])
        def __call__(self, x):
            x = self.dense1(x)
            x = self.dense2(x)
            y = self.dense3(x)
            return y
    
    
    model = DNNModel()
    model.loss_func = losses.binary_crossentropy
    model.metric_func = metrics.binary_accuracy
    model.optimizer = optimizers.Adam(learning_rate=0.001)
    

    测试模型结构:

    # 构建输入数据管道
    ds = tf.data.Dataset.from_tensor_slices((X, Y)) \
        .shuffle(buffer_size=4000).batch(100) \
        .prefetch(tf.data.experimental.AUTOTUNE)
    features, labels = next(ds.as_numpy_iterator())
    
    predictions = model(features)
    
    loss = model.loss_func(tf.reshape(labels, [-1]), tf.reshape(predictions, [-1]))
    metric = model.metric_func(tf.reshape(
        labels, [-1]), tf.reshape(predictions, [-1]))
    
    tf.print("init loss:", loss)
    tf.print("init metric", metric)
    

    输出:

    init loss: 0.930841148
    init metric 0.51
    

    训练模型:

    @tf.function
    def train_step(model, features, labels):
        with tf.GradientTape() as tape:
            predictions = model(features)
            loss = model.loss_func(tf.reshape(
                labels, [-1]), tf.reshape(predictions, [-1]))
        grads = tape.gradient(loss, model.trainable_variables)
        model.optimizer.apply_gradients(zip(grads, model.trainable_variables))
        metric = model.metric_func(tf.reshape(
            labels, [-1]), tf.reshape(predictions, [-1]))
        return loss, metric
    
    
    def train_model(model, epochs):
        for epoch in tf.range(1, epochs+1):
            loss, metric = tf.constant(0.0), tf.constant(0.0)
            for features, labels in ds:
                loss, metric = train_step(model, features, labels)
            if epoch % 10 == 0:
                printbar()
                tf.print("epoch =", epoch, "loss = ", loss, "accuracy = ", metric)
    
    
    train_model(model, epochs=60)
    

    输出:

    ================================================================================17:01:42
    epoch = 10 loss =  0.0938826576 accuracy =  0.96
    ================================================================================17:01:42
    epoch = 20 loss =  0.0858769417 accuracy =  0.96
    ================================================================================17:01:42
    epoch = 30 loss =  0.126385167 accuracy =  0.94
    ================================================================================17:01:43
    epoch = 40 loss =  0.0790566728 accuracy =  0.96
    ================================================================================17:01:43
    epoch = 50 loss =  0.0721534416 accuracy =  0.97
    ================================================================================17:01:43
    epoch = 60 loss =  0.105751008 accuracy =  0.96
    

    结果可视化:

    fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
    ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
    ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
    ax1.legend(["positive","negative"]);
    ax1.set_title("y_true");
    
    Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0)
    Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0)
    
    ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r")
    ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g")
    ax2.legend(["positive","negative"]);
    ax2.set_title("y_pred");
    

    效果:

    3 高级 API

    数据构建:

    n = len(X)
    ds_train = tf.data.Dataset.from_tensor_slices((X[0:n*3//4,:],Y[0:n*3//4,:])) \
         .shuffle(buffer_size = 1000).batch(20) \
         .prefetch(tf.data.experimental.AUTOTUNE) \
         .cache()
    
    ds_valid = tf.data.Dataset.from_tensor_slices((X[n*3//4:,:],Y[n*3//4:,:])) \
         .batch(20) \
         .prefetch(tf.data.experimental.AUTOTUNE) \
         .cache()
    

    定义模型:

    from tensorflow.keras import layers, losses, metrics, optimizers, models
    
    tf.keras.backend.clear_session()
    class DNNModel(models.Model):
        def __init__(self):
            super().__init__()
    
        def build(self,input_shape):
            self.dense1 = layers.Dense(4,activation = "relu",name = "dense1") 
            self.dense2 = layers.Dense(8,activation = "relu",name = "dense2")
            self.dense3 = layers.Dense(1,activation = "sigmoid",name = "dense3")
            super(DNNModel,self).build(input_shape)
    
        # 正向传播
        @tf.function(input_signature=[tf.TensorSpec(shape = [None,2], dtype = tf.float32)])  
        def call(self,x):
            x = self.dense1(x)
            x = self.dense2(x)
            y = self.dense3(x)
            return y
    
    model = DNNModel()
    model.build(input_shape =(None,2))
    
    model.summary()
    

    输出:

    Model: "dnn_model"
    _________________________________________________________________
    Layer (type)                 Output Shape              Param #   
    =================================================================
    dense1 (Dense)               multiple                  12        
    _________________________________________________________________
    dense2 (Dense)               multiple                  40        
    _________________________________________________________________
    dense3 (Dense)               multiple                  9         
    =================================================================
    Total params: 61
    Trainable params: 61
    Non-trainable params: 0
    

    训练模型:

    ### 自定义训练循环
    
    optimizer = optimizers.Adam(learning_rate=0.01)
    loss_func = tf.keras.losses.BinaryCrossentropy()
    
    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_metric = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')
    
    valid_loss = tf.keras.metrics.Mean(name='valid_loss')
    valid_metric = tf.keras.metrics.BinaryAccuracy(name='valid_accuracy')
    
    
    @tf.function
    def train_step(model, features, labels):
        with tf.GradientTape() as tape:
            predictions = model(features)
            loss = loss_func(labels, predictions)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
    
        train_loss.update_state(loss)
        train_metric.update_state(labels, predictions)
    
    @tf.function
    def valid_step(model, features, labels):
        predictions = model(features)
        batch_loss = loss_func(labels, predictions)
        valid_loss.update_state(batch_loss)
        valid_metric.update_state(labels, predictions)
    
    
    def train_model(model,ds_train,ds_valid,epochs):
        for epoch in tf.range(1,epochs+1):
            for features, labels in ds_train:
                train_step(model,features,labels)
    
            for features, labels in ds_valid:
                valid_step(model,features,labels)
    
            logs = 'Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'
    
            if  epoch%100 ==0:
                printbar()
                tf.print(tf.strings.format(logs,
                (epoch,train_loss.result(),train_metric.result(),valid_loss.result(),valid_metric.result())))
    
            train_loss.reset_states()
            valid_loss.reset_states()
            train_metric.reset_states()
            valid_metric.reset_states()
    
    train_model(model,ds_train,ds_valid,1000)
    

    输出:

    ================================================================================08:04:33
    Epoch=100,Loss:0.100245498,Accuracy:0.956,Valid Loss:0.0823013112,Valid Accuracy:0.96
    ================================================================================08:04:42
    Epoch=200,Loss:0.0889734551,Accuracy:0.959666669,Valid Loss:0.0959472954,Valid Accuracy:0.956
    ================================================================================08:04:50
    Epoch=300,Loss:0.0859184787,Accuracy:0.959666669,Valid Loss:0.100719072,Valid Accuracy:0.952
    ================================================================================08:04:59
    Epoch=400,Loss:0.0843160897,Accuracy:0.960666656,Valid Loss:0.104706556,Valid Accuracy:0.952
    ================================================================================08:05:08
    Epoch=500,Loss:0.0835563391,Accuracy:0.961,Valid Loss:0.107334398,Valid Accuracy:0.952
    ================================================================================08:05:17
    Epoch=600,Loss:0.0829177722,Accuracy:0.962,Valid Loss:0.109648846,Valid Accuracy:0.952
    ================================================================================08:05:27
    Epoch=700,Loss:0.0824796259,Accuracy:0.961666644,Valid Loss:0.111882597,Valid Accuracy:0.951
    ================================================================================08:05:36
    Epoch=800,Loss:0.0822021216,Accuracy:0.961666644,Valid Loss:0.113583572,Valid Accuracy:0.951
    ================================================================================08:05:45
    Epoch=900,Loss:0.082076259,Accuracy:0.961666644,Valid Loss:0.115141563,Valid Accuracy:0.95
    ================================================================================08:05:54
    Epoch=1000,Loss:0.0819547623,Accuracy:0.962,Valid Loss:0.116600387,Valid Accuracy:0.949
    

    可视化:

    fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
    ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
    ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
    ax1.legend(["positive","negative"]);
    ax1.set_title("y_true");
    
    Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0)
    Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0)
    
    ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r")
    ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g")
    ax2.legend(["positive","negative"]);
    ax2.set_title("y_pred");
    

    显示:

    相关文章

      网友评论

          本文标题:TensorFlow2 不同层次的建模方法

          本文链接:https://www.haomeiwen.com/subject/nsbeektx.html