DeepFM 工程实现 tensorflow

作者: xiaogp | 来源:发表于2020-07-26 23:02 被阅读0次

    先总结一下代码, 后续再补上原理

    数据说明

    以一个二分类模型为例, 特征全部是离散变量, 连续变量都做了分箱离散化处理, 预测用户是否会购买某一款商品,部分特征预览如下


    特征预览.png

    数据预处理

    对样本每一行进行转化, 获得特征值和特征索引, 离散变量的特征值都是1, 连续变量的特征值是标准化之后的原始值, 获得特征索引的目的是为了后续做embedding_lookup获得特征的隐向量
    先获得特征索引文件, 训练集和测试集以及后续预测集都是以这一份特征索引文件作为特征转化依据

    import pandas as pd
    
    df = pd.read_csv("churn_test.csv")
    
    index_start = 0
    with open("churn_feature_index.txt", "w", encoding="utf8") as f:
        for col in df.columns[1:-1]:
            unique_vals = df[col].unique().tolist()
            if not "unknow" in unique_vals:
                unique_vals += ["unknow"]
            for unique_val in unique_vals:
                f.write(col + ":" + str(unique_val) + " " + str(index_start) + "\n")
                index_start += 1
    

    特征索引文件预览如下, 特征:分类取值 索引值, 索引值从0开始

    shop_duration:30以下 0
    shop_duration:30-60 1
    shop_duration:90-120 2
    shop_duration:60-90 3
    shop_duration:120以上 4
    shop_duration:unknow 5
    recent:120以上 6
    recent:30以下 7
    recent:60-90 8
    recent:30-60 9
    recent:90-120 10
    recent:unknow 11
    

    下一步训练集和测试集的特征转化, 分别获得特征值, 特征索引值和样本值, 由于数据没有连续变量, 因此只要对离散变量获得索引即可, 特征值都是1

    import pandas as pd
    
    label_col = ["label"]
    categorical_cols = ['shop_duration', 'recent', 'monetary', 'max_amount',
           'items_count', 'valid_points_sum', 'CHANNEL_NUM_ID', 'member_day',
           'VIP_TYPE_NUM_ID', 'frequence', 'avg_amount', 'item_count_turn',
           'avg_piece_amount', 'monetary3', 'max_amount3', 'items_count3',
           'frequence3', 'shops_count', 'promote_percent', 'wxapp_diff',
           'store_diff', 'shop_channel', 'week_percent', 'infant_group',
           'water_product_group', 'meat_group', 'beauty_group', 'health_group',
           'fruits_group', 'vegetables_group', 'pets_group', 'snacks_group',
           'smoke_group', 'milk_group', 'instant_group', 'grain_group']
    
    
    def get_feature_index():
        feature_index = {}
        with open("./churn_feature_index.txt", "r", encoding="utf8") as f:
            for line in f.readlines():
                items = line.strip()
                if items:
                    key = items.split()[0]
                    val = items.split()[1]
                    feature_index[key] = val
        return feature_index
    
    
    def preprocessing(df, label=True):
        data_index = []
        data_value = []
        for line in df.iloc[:, 1:-1].values:
            indexs = []
            values = []
            for col, val in zip(categorical_cols, line):
                key = col + ":" + str(val)
                value = 1
               
                # 获得索引号, 如果新数据有没有出现过的分类值,用unkonw代替
                if key in feature_index:
                    index = int(feature_index[key])
                else:
                    index = int(feature_index[col + ":" + "unknow"])
                indexs.append(index)
                values.append(value)
            data_index.append(indexs) 
            data_value.append(values)
        
        if label:
            data_label = np.array(df["label"].tolist()).reshape(-1, 1)
            return data_index, data_value, data_label
        
        return data_index, data_value
                
            
    if __name__ == "__main__":
        import pickle
        feature_index = get_feature_index()
        train = pd.read_csv("./churn_train.csv")
        test = pd.read_csv("./churn_test.csv")
        train_index, train_value, train_label = preprocessing(train)
        test_index, test_value, test_label = preprocessing(test)
        
        with open("churn_train.pkl", "wb") as f:
            pickle.dump(train_index, f)
            pickle.dump(train_value, f)
            pickle.dump(train_label, f)
        with open("churn_test.pkl", "wb") as f:
            pickle.dump(test_index, f)
            pickle.dump(test_value, f)
            pickle.dump(test_label, f)
    

    构建DeepFM和模型训练

    先定义一个DeepFM模型参数的累对象Args, 模型层分别构建FM的一阶+二阶, 以及三层DNN数据网络, 三者横向拼接再加一层全连接, 其中embedding层被FM的二阶参数计算和DNN的输入层共享, 模型采用早停策略, 当连续n=5次测试集没有新的最低loss出现的时候,模型停止训练, 同事ckpt最多存储5份, 模型最终读取最优loss下的ckpt作为评价指标并且将ckpt导出为冻结图pb文件

    # -*- coding: utf-8 -*-
    
    import os
    import pickle
    import random
    import shutil
    import time
    
    import numpy as np
    import tensorflow as tf
    from tensorflow.python.saved_model import tag_constants
    
    from utils import get_metrics, early_stop
    
    
    class Args():
        feature_sizes = 100
        field_size = 15
        embedding_size = 100
        deep_layers = [512, 256, 128]
        epoch = 1
        batch_size = 1024
        learning_rate = 1e-3
        l2_reg_rate = 0.00
        checkpoint_dir = os.path.join("./churn_ckpt/model.ckpt")
        is_training = True
    
    
    class model():
        def __init__(self, args):
            self.feature_sizes = args.feature_sizes
            self.field_size = args.field_size
            self.embedding_size = args.embedding_size
            self.deep_layers = args.deep_layers
            self.l2_reg_rate = args.l2_reg_rate
    
            self.learning_rate = args.learning_rate
            self.decaylearning_rate = args.decaylearning_rate
            self.deep_activation = tf.nn.relu
            self.weight = dict()
            self.checkpoint_dir = args.checkpoint_dir
            self.build_model()
    
        def build_model(self):
            self.feat_index = tf.placeholder(tf.int32, shape=[None, None], name='feature_index')
            self.feat_value = tf.placeholder(tf.float32, shape=[None, None], name='feature_value')
            self.label = tf.placeholder(tf.float32, shape=[None, None], name='label')
    
            # One-hot编码后的输入层与Dense embeddings层的权值定义,即DNN的输入embedding。注:Dense embeddings层的神经元个数由field_size和决定
            self.weight['feature_weight'] = tf.Variable(
                # 初始化维度, onehot后特征数量 * embedidng_size
                # 标准正太分布, 均值0, 标准差0.01
                tf.random_normal([self.feature_sizes, self.embedding_size], 0.0, 0.01),
                name='feature_weight')
    
            # FM部分中一次项的权值定义
            self.weight['feature_first'] = tf.Variable(
                tf.random_normal([self.feature_sizes, 1], 0.0, 1.0),
                name='feature_first')
    
            # deep网络部分的weight
            num_layer = len(self.deep_layers)
            input_size = self.field_size * self.embedding_size
            # 初始化标准差修正
            init_method = np.sqrt(2.0 / (input_size + self.deep_layers[0]))
    
            # shape (9984,512)
            self.weight['layer_0'] = tf.Variable(
                # loc, 正太分布均值
                # scale, 正太分布的标准差
                np.random.normal(loc=0, scale=init_method, size=(input_size, self.deep_layers[0])), dtype=np.float32
            )
            # shape(1, 512)
            self.weight['bias_0'] = tf.Variable(
                np.random.normal(loc=0, scale=init_method, size=(1, self.deep_layers[0])), dtype=np.float32
            )
    
            # 生成deep network里面每层的weight 和 bias
            if num_layer != 1:
                for i in range(1, num_layer):
                    init_method = np.sqrt(2.0 / (self.deep_layers[i - 1] + self.deep_layers[i]))
    
                    # shape  (512,256)  (256,128)
                    self.weight['layer_' + str(i)] = tf.Variable(
                        np.random.normal(loc=0, scale=init_method, size=(self.deep_layers[i - 1], self.deep_layers[i])),
                        dtype=np.float32)
    
                    # shape (1,256)  (1,128)
                    self.weight['bias_' + str(i)] = tf.Variable(
                        np.random.normal(loc=0, scale=init_method, size=(1, self.deep_layers[i])),
                        dtype=np.float32)
    
            # deep部分output_size + 一次项output_size + 二次项output_size
            # field_size 不是 feature_size已经把0值的位置忽略了
            last_layer_size = self.deep_layers[-1] + self.field_size + self.embedding_size
            init_method = np.sqrt(np.sqrt(2.0 / (last_layer_size + 1)))
            # 生成最后一层的结果
            self.weight['last_layer'] = tf.Variable(
                np.random.normal(loc=0, scale=init_method, size=(last_layer_size, 1)), dtype=np.float32)
            self.weight['last_bias'] = tf.Variable(tf.constant(0.01), dtype=np.float32)
    
            # embedding_part
            # shape (?,?,256)
            # batch_size * feature_sizes(25) * embedding_size(256)
            self.embedding_index = tf.nn.embedding_lookup(self.weight['feature_weight'],
                                                          self.feat_index)  # Batch*F*K
    
            # 把特征值和embedding中每个元素进行相乘
            # 不reshape位置对应不上
            self.embedding_part = tf.multiply(self.embedding_index,
                                              tf.reshape(self.feat_value, [-1, self.field_size, 1]))
    
            """
            网络传递结构
            """
            # FM部分
            # 一阶特征
            self.embedding_first = tf.nn.embedding_lookup(self.weight['feature_first'],
                                                          self.feat_index)  # bacth*F*1
            # 对应位置相乘, 维度一致
            self.embedding_first = tf.multiply(self.embedding_first, tf.reshape(self.feat_value, [-1, self.field_size, 1]))
            
            # shape (?,39)一阶的结果
            self.first_order = tf.reduce_sum(self.embedding_first, 2)
    
            # 二阶特征
            # 先求和再开方
            # embedding_part [None, 16, 128] 特征值和embedding所有元素相乘
            self.sum_second_order = tf.reduce_sum(self.embedding_part, 1)
            self.sum_second_order_square = tf.square(self.sum_second_order)  # [None, 128]
            
            # 先开方后求和
            self.square_second_order = tf.square(self.embedding_part)
            self.square_second_order_sum = tf.reduce_sum(self.square_second_order, 1)  # [None, 128]
    
            # 1/2*((a+b)^2 - a^2 - b^2)=ab
            # [None, embedding_size]
            self.second_order = 0.5 * tf.subtract(self.sum_second_order_square, self.square_second_order_sum)
            
            
            self.fm_part = tf.concat([self.first_order, self.second_order], axis=1)
    
            # DNN部分
            # shape (?,9984)
            # 全部拉直铺开, 神经网络的输入是 filed × embedding_size
            # 所有有值特征的embedding拼接在一起
            self.deep_embedding = tf.reshape(self.embedding_part, [-1, self.field_size * self.embedding_size])
            # print('deep_embedding:', self.deep_embedding)
    
            # 全连接部分
            for i in range(0, len(self.deep_layers)):
                self.deep_embedding = tf.add(tf.matmul(self.deep_embedding, self.weight["layer_%d" % i]),
                                             self.weight["bias_%d" % i])
                self.deep_embedding = self.deep_activation(self.deep_embedding)
    
            # FM输出与DNN输出拼接
            din_all = tf.concat([self.fm_part, self.deep_embedding], axis=1)
            # 全连接
            self.out = tf.add(tf.matmul(din_all, self.weight['last_layer']), self.weight['last_bias'])
    
            # loss部分
            self.out = tf.nn.sigmoid(self.out, name='logit')
            
            # 增加auc
            self.auc_score = tf.metrics.auc(self.label, self.out)
    
            self.loss = -tf.reduce_mean(
                self.label * tf.log(self.out + 1e-24) + (1 - self.label) * tf.log(1 - self.out + 1e-24))
    
            # 正则:sum(w^2)/2*l2_reg_rate
            self.loss += tf.nn.l2_loss(self.weight["last_layer"]) * self.l2_reg_rate
            for i in range(len(self.deep_layers)):
                self.loss += tf.nn.l2_loss(self.weight["layer_%d" % i]) * self.l2_reg_rate
            
            self.global_step = tf.Variable(0, trainable=False)
            self.learning_rate_decay = tf.train.exponential_decay(self.learning_rate, self.global_step, 100, self.decaylearning_rate)
            opt = tf.train.AdamOptimizer(self.learning_rate_decay)
            # opt = tf.train.AdamOptimizer(self.learning_rate)
            trainable_params = tf.trainable_variables()
    
            gradients = tf.gradients(self.loss, trainable_params)
            clip_gradients, _ = tf.clip_by_global_norm(gradients, 5)
            self.train_op = opt.apply_gradients(
                zip(clip_gradients, trainable_params), global_step=self.global_step)
            
            self.saver = tf.train.Saver(tf.global_variables(), max_to_keep=5)
    
        def train(self, sess, feat_index, feat_value, label):
            loss, _, step, auc_score = sess.run([self.loss, self.train_op, self.global_step, self.auc_score], feed_dict={
                self.feat_index: feat_index,
                self.feat_value: feat_value,
                self.label: label
            })
            return loss, step, auc_score
    
        def predict(self, sess, feat_index, feat_value):
            result = sess.run(self.out, feed_dict={
                self.feat_index: feat_index,
                self.feat_value: feat_value
            })
            return result
        
        def evaluate(self, sess, feat_index, feat_value, label):
            loss, auc_score = sess.run([self.loss, self.auc_score], feed_dict={
                self.feat_index: feat_index,
                self.feat_value: feat_value,
                self.label: label
            })
            return loss, auc_score
    
        def save(self, sess, path):
            self.saver.save(sess, save_path=path, global_step=self.global_step)
    
        def restore(self, sess, path):
            kpt = tf.train.latest_checkpoint("log/")
            saver = tf.train.import_meta_graph("{}.meta".format(kpt))
            saver.restore(sess, kpt)
    
    
    def get_batch(data_index, data_value, data_label, epochs=10, batch_size=256):
        data_len = len(data_index)
        for epoch in range(epochs):
            data = list(zip(data_index, data_value, data_label))
            random.shuffle(data)
            data_index, data_value, data_label = zip(*data)
            for batch in range(0, data_len, batch_size):
                if batch + batch_size < data_len:
                    output_data = (data_index[batch: batch + batch_size], 
                                   data_value[batch: batch + batch_size], 
                                   data_label[batch: batch + batch_size])
                else:
                    output_data = (data_index[batch: data_len], 
                                   data_value[batch: data_len], 
                                   data_label[batch: data_len])
                yield output_data
    
    
    if __name__ == '__main__': 
        args = Args()
        args.feature_sizes = 171
        args.field_size = 36
        args.is_training = True
        args.epoch = 20
        args.batch_size = 2048 * 8
        args.learning_rate = 1e-3 * 8
        args.decaylearning_rate = 0.95
        args.embedding_size = 8
        args.deep_layers = [128, 64, 32]
        
        with open("churn_train.pkl", "rb") as f:
            train_index = pickle.load(f)
            train_value = pickle.load(f)
            train_label = pickle.load(f)
            
        with open("churn_test.pkl", "rb") as f:
            test_index = pickle.load(f)
            test_value = pickle.load(f)
            test_label = pickle.load(f)
        
        data_len = len(train_index)
        
        tf.reset_default_graph()
        with tf.Session() as sess:
            Model = model(args)
            sess.run(tf.global_variables_initializer())
            sess.run(tf.local_variables_initializer())
            
            loss_list = []        
            print("{:-^30}".format("training"))
            train_data = get_batch(train_index, train_value, train_label, epochs=args.epoch, batch_size=args.batch_size)
            for batch_index, batch_value, batch_label in train_data:
                loss, step, auc_score = Model.train(sess, batch_index, batch_value, batch_label)
                epoch_num = int(step * args.batch_size / data_len) + 1
                if step % 10 == 0:
                    print("epoch: {} setp: {} => loss: {}".format(epoch_num, step, loss))
                
                if step % 50 == 0:
                    loss, auc_score = Model.evaluate(sess, test_index, test_value, test_label)
                    print("{:-^30}".format("evaluation"))
                    print("evaluation => loss: {}".format(loss))
                    Model.save(sess, args.checkpoint_dir)
                    
                    # 计算当前loss相比之前的最有loss下降多少
                    if len(loss_list) == 0:
                        diff = 0
                    else:    
                        diff = loss - min(loss_list)
                    loss_list.append(loss)
                    print("本轮loss比之前最小loss{}:{}, 当前最小loss: {}".format("上升" if diff > 0 else "下降", abs(diff), min(loss_list)))
                    print("-" * 40)
                    # 早停
                    if early_stop(loss_list, windows=5):
                        break        
            
            print("-" * 30)
            
        # 读取ckpt模型
        tf.reset_default_graph()
        with tf.Session() as sess:
            file_list = os.listdir("./churn_ckpt")
            min_index_file = "./churn_ckpt/" + min([".".join(x.split(".")[:2]) for x in file_list if x != "checkpoint"])
            print("读取ckpt: {}".format(min_index_file))
            saver = tf.train.import_meta_graph("{}.meta".format(min_index_file))
            saver.restore(sess, min_index_file)
    
            graph = tf.get_default_graph()
            input_xi = graph.get_operation_by_name("feature_index").outputs[0]
            input_xv = graph.get_operation_by_name("feature_value").outputs[0]
            probs = graph.get_tensor_by_name("logit:0")
    
            pred = sess.run(probs, feed_dict={input_xi: test_index, input_xv: test_value})
        
            # 测试集评价
            res = [x[0] for x in pred]
            test_label = [x[0] for x in test_label]
            get_metrics(test_label, res, 0.5)
            
            # 模型保存
            pb_path = os.path.join("./churn_pb", str(int(time.time())))
            shutil.rmtree(pb_path, ignore_errors=True)
            builder = tf.saved_model.builder.SavedModelBuilder(pb_path)
            inputs = {'feature_index': tf.saved_model.utils.build_tensor_info(Model.feat_index), 
                      'feature_value': tf.saved_model.utils.build_tensor_info(Model.feat_value)
                      }
            outputs = {'output': tf.saved_model.utils.build_tensor_info(Model.out)}
            signature = tf.saved_model.signature_def_utils.build_signature_def(
                inputs=inputs,
                outputs=outputs,
                method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)
    
            builder.add_meta_graph_and_variables(sess, [tag_constants.SERVING], {'my_signature': signature})
            builder.save()
    
    

    utils.py下的early_stop和get_metrics函数

    from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score
    
    def early_stop(loss_list, windows=5):
        if len(loss_list) <= windows:
            return False
        latest_loss = loss_list[-windows:]
        previous_loss = loss_list[:-windows]
        min_previous_loss = min(previous_loss)
        min_latest_loss = min(latest_loss)
        if min_latest_loss > min_previous_loss:
            return True
        return False
    
    def get_metrics(labels, predictions, thres):
        predictions_label = [1 if x >= thres else 0 for x in predictions]
        print("accuracy:", accuracy_score(labels, predictions_label))
        print("precision:", precision_score(labels, predictions_label))
        print("reall:", recall_score(labels, predictions_label))
        print("f1:", f1_score(labels, predictions_label))
        print("auc:", roc_auc_score(labels, predictions))
    

    模型最终训练过程输出如下, 设置对打epoch 20轮, 在第15轮早停

    epoch: 14 setp: 660 => loss: 0.49427324533462524
    epoch: 14 setp: 670 => loss: 0.49196428060531616
    epoch: 15 setp: 680 => loss: 0.4925547242164612
    epoch: 15 setp: 690 => loss: 0.4977375268936157
    epoch: 15 setp: 700 => loss: 0.49650928378105164
    ----------evaluation----------
    evaluation => loss: 0.4999653100967407
    本轮loss比之前最小loss上升:0.0009462833404541016, 当前最小loss: 0.4990190267562866
    ----------------------------------------
    ------------------------------
    读取ckpt: ./churn_ckpt/model.ckpt-500
    INFO:tensorflow:Restoring parameters from ./churn_ckpt/model.ckpt-500
    accuracy: 0.7584251187687513
    precision: 0.7706821837759759
    reall: 0.8258112742942573
    f1: 0.7972948877964289
    auc: 0.8262372477851344
    INFO:tensorflow:No assets to save.
    INFO:tensorflow:No assets to write.
    INFO:tensorflow:SavedModel written to: ./churn_pb\1595775832\saved_model.pb
    

    同时输出5份ckpt文件,其中索引最小的一份是最低loss的参数, 和一些pb文件


    ckpt和pb文件.png

    模型运用预测

    分别此时从ckpt和pb导入模型进行新数据预测, 结果和训练的一致

    import os
    import pickle 
    
    import tensorflow as tf
    from tensorflow.python.saved_model import tag_constants
    
    from utils import get_metrics
    
    
    with open("churn_test.pkl", "rb") as f:
        test_index = pickle.load(f)
        test_value = pickle.load(f)
        test_label = pickle.load(f)
    
    
    def predict_ckpt():
        """从检查点导入模型"""
        with tf.Session() as sess:
    #        checkpoint_file = tf.train.latest_checkpoint("./ckpt")
    #        saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
    #        saver.restore(sess, checkpoint_file)
            
            file_list = os.listdir("./churn_ckpt")
            min_index_file = "./churn_ckpt/" + min([".".join(x.split(".")[:2]) for x in file_list if x != "checkpoint"])
            saver = tf.train.import_meta_graph("{}.meta".format(min_index_file))
            saver.restore(sess, min_index_file)
    
            graph = tf.get_default_graph()
            input_xi = graph.get_operation_by_name("feature_index").outputs[0]
            input_xv = graph.get_operation_by_name("feature_value").outputs[0]
            probs = graph.get_tensor_by_name("logit:0")
    
            pred = sess.run(probs, feed_dict={input_xi: test_index, input_xv: test_value})
        
        return pred
    
    
    def predict_pb():
        """从pb导入模型"""
        max_time = max(os.listdir("./churn_pb"))
        with tf.Session(graph=tf.Graph()) as sess:
            tf.saved_model.loader.load(sess, [tag_constants.SERVING], os.path.join("./churn_pb", max_time))
    
            graph = tf.get_default_graph()
            input_xi = graph.get_operation_by_name("feature_index").outputs[0]
            input_xv = graph.get_operation_by_name("feature_value").outputs[0]
            probs = graph.get_tensor_by_name("logit:0")
            
            pred = sess.run(probs, feed_dict={input_xi: test_index, input_xv: test_value})
            
        return pred
    
    
    if __name__ == "__main__":
        res = predict_ckpt()
        res = [x[0] for x in res]
        test_label = [x[0] for x in test_label]
        get_metrics(test_label, res, 0.5)
        
        res2 = predict_pb()
        res2 = [x[0] for x in res2]
        get_metrics(test_label, res2, 0.5)
    

    输入如下

    INFO:tensorflow:Restoring parameters from ./churn_ckpt/model.ckpt-500
    accuracy: 0.7584251187687513
    precision: 0.7706821837759759
    reall: 0.8258112742942573
    f1: 0.7972948877964289
    auc: 0.8262372477851344
    WARNING:tensorflow:From C:/Users/14165/Desktop/DeepFM/churn_predict.py:44: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.
    Instructions for updating:
    This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
    INFO:tensorflow:Restoring parameters from ./churn_pb\1595775832\variables\variables
    accuracy: 0.7584251187687513
    precision: 0.7706821837759759
    reall: 0.8258112742942573
    f1: 0.7972948877964289
    auc: 0.8262372477851344
    

    相关文章

      网友评论

        本文标题:DeepFM 工程实现 tensorflow

        本文链接:https://www.haomeiwen.com/subject/vxtnlktx.html