美文网首页机器学习与数据挖掘我爱编程工作补给站
在Tensorflow Serving上部署基于LSTM的文本分

在Tensorflow Serving上部署基于LSTM的文本分

作者: liyonghong | 来源:发表于2018-02-02 20:47 被阅读472次

    一些重要的概念


    Servables

    • Servables 是客户端请求执行计算的基础对象,大小和粒度是灵活的。
    • Servables 不会管理自己的运行周期。
    • 典型的Servables包括:
      • a TensorFlow SavedModelBundle (tensorflow::Session)
      • a lookup table for embedding or vocabulary lookups

    Servable Versions

    • Tensorflow servables 可以管理多个版本的servable,可以通过版本管理来更新算法配置和模型参数;
    • 可以支持同时加载多个servable

    Servable Streams

    • servable的版本序列。

    Models

    • TensorFlow Serving 将 model 表示为一个或者多个Servables,一个Servable可能对应着模型的一部分,例如,a large lookup table 可以被许多 TensorFlow Serving 共享。

    Loaders

    • Loaders 管理着对应的一个servable的生命周期

    Source

    • Sources 是可以寻找和提供 servables 的模块,每个 Source 提供了0个或者多个servable streams,对于每个servable stream,Source 都会提供一个Loader实例;
    • Sources 可以保存在多个服务或者版本之间共享的state;

    Aspired Versions

    • Aspired Versions 是可以被加载服务版本集合;

    Managers

    • 管理 Servable 的整个的生命周期,包括:
      • loading Servables
      • serving Servables
      • unloading Servables
    • 如果服务要求的资源不足,Manager 可能会拒绝加载新的模型;

    流程


    流程图

    简单来说:

    • Source 为 Servable 创建 Loader 对象;
    • Loader 被作为 Aspired Versions 发送给 Manager,之后 Manager 加载指定的服务去处理客户端的请求;

    具体来说:

    • Source 为指定的服务创建Loader,Loader里包含了服务所需要的元数据(模型);
    • 之后 Source 使用 回掉函数通知 Manager 的 Aspired Version(servable version的集合);
    • Manager 根据配置的Version Policy决定下一步的操作(是否 unload 之前的servable,或者 load 新的servable);
    • 如果 Manager 判定是操作安全的,就会给 Loader 要求的resource并让 Loader 加载新的版本;
    • 客户端向 Manager 请求服务,可以指定服务版本或者只是请求最新的版本。Manager 返回服务端的处理结果;

    Source 可以看作不断更新weights的graph,weights 存储在磁盘中

    例如:

    • Source 选择了一个新版本的模型,创建了一个Loader,Loader包含指向模型文件路径的指针;
    • Source 使用回掉函数通知 Manager 的 Aspired Version;
    • 根据 Version Policy ,Manager 决定加载新的模型;
    • Manager 通知 Loader 有足够的内存用于模型加载,Loader 使用新的weights将graph实例化;
    • 客户端发送请求,Manager 返回处理结果;

    生成可用于tensorflow serving的模型


    环境:Tensorflow 1.0.1
    模型:基于LSTM的文本情感分类的模型:

    import os,time
    import tensorflow as tf
    
    import online_lstm_model
    from utils import data_utils,data_process
    from lib import config
    #载入词向量,生成字典
    embedding_matrix, word_list = data_utils.load_pretained_vector(config.WORD_VECTOR_PATH)
    data_utils.create_vocabulary(config.VOCABULARY_PATH, word_list)
    #数据预处理
    for file_name in os.listdir(config.ORGINAL_PATH):
        data_utils.data_to_token_ids(config.ORGINAL_PATH + file_name, config.TOCKEN_PATN + file_name,
                                       config.VOCABULARY_PATH)
    vocabulary_size = len(word_list) + 2
    #获取训练数据
    x_train, y_train, x_test, y_test = data_process.data_split(choose_path=config.choose_car_path,
                                                               buy_path=config.buy_car_path,
                                                               no_path=config.no_car_path)
    
    with tf.Graph().as_default():
        #build graph
        model = online_lstm_model.RNN_Model(vocabulary_size, config.BATCH_SIZE, embedding_matrix)
        logits = model.logits
        loss  = model.loss
        cost = model.cost
        acu = model.accuracy
        prediction = model.prediction
        train_op = model.train_op
    
        saver = tf.train.Saver()
        #GPU设置
        gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=1.0)
        with tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)) as sess:
            count = 0
            init = tf.global_variables_initializer()
            sess.run(init)
            while count<config.max_steps:
                start_time = time.time()
                #生成训练batch
                batch_x_train, batch_y_train = data_process.generate_batch(x_train, y_train)
                input_feed = {
                    model.input_data: batch_x_train,
                    model.target: batch_y_train
                }
                #进行训练
                train_loss, train_cost, train_acu, _ = sess.run(fetches=[loss, cost, acu, train_op],
                                                                feed_dict=input_feed)
    
                count += 1
                if count % 100 == 0:
                    #每100轮,验证一次
                    sum_valid_cost = 0
                    sum_valid_acu = 0
                    for i in range(len(x_test)//64):
                        batch_x_valid, batch_y_valid = data_process.generate_batch(x_test, y_test)
                        valid_feed = {
                            model.input_data: batch_x_valid,
                            model.target: batch_y_valid
                        }
                        valid_loss, valid_cost, valid_acu = sess.run(fetches=[loss, cost, acu],
                                                                    feed_dict=valid_feed)
                        sum_valid_cost += valid_cost
                        sum_valid_acu += valid_acu
                    print("current step: %f, train cost: %f, train accuracy: %f, cost_time: %f"%(count, train_cost, train_acu, time.time()-start_time))
                    print("valid cost: %f, valid accuracy: %f"%(sum_valid_cost/(len(x_test)//64), sum_valid_acu/(len(x_test)//64)))
            #将模型保存为可用于线上服务的文件(一个.pb文件,一个variables文件夹)
            export_path_base = config.export_path_base
            export_path = os.path.join(
                tf.compat.as_bytes(export_path_base),
                tf.compat.as_bytes(str(count)))
            print('Exporting trained model to', export_path)
    
            builder = tf.saved_model.builder.SavedModelBuilder(export_path)
    
            # 建立签名映射
            """
            build_tensor_info:建立一个基于提供的参数构造的TensorInfo protocol buffer,
            输入:tensorflow graph中的tensor;
            输出:基于提供的参数(tensor)构建的包含TensorInfo的protocol buffer
            """
            input_sentence = tf.saved_model.utils.build_tensor_info(model.input_data)
            classification_outputs_classes = tf.saved_model.utils.build_tensor_info(model.prediction)
            classification_outputs_scores = tf.saved_model.utils.build_tensor_info(model.logits)
    
            """
            signature_constants:SavedModel保存和恢复操作的签名常量。
            
            如果使用默认的tensorflow_model_server部署模型,
            这里的method_name必须为signature_constants中CLASSIFY,PREDICT,REGRESS的一种
            """
            #定义模型的输入输出,建立调用接口与tensor签名之间的映射
            classification_signature = (
                tf.saved_model.signature_def_utils.build_signature_def(
                    inputs={
                        "input_sentence":
                            input_sentence
                    },
                    outputs={
                        "predict_classification":
                            classification_outputs_classes,
                        "predict_scores":
                            classification_outputs_scores
                    },
                    method_name=tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))
    
            """
            tf.group : 创建一个将多个操作分组的操作,返回一个可以执行所有输入的操作
            """
            legacy_init_op = tf.group(tf.tables_initializer(), name='legacy_init_op')
    
            """
            add_meta_graph_and_variables:建立一个Saver来保存session中的变量,
                                          输出对应的原图的定义,这个函数假设保存的变量已经被初始化;
                                          对于一个SavedModelBuilder,这个API必须被调用一次来保存meta graph;
                                          对于后面添加的图结构,可以使用函数 add_meta_graph()来进行添加
            """
            #建立模型名称与模型签名之间的映射
            builder.add_meta_graph_and_variables(
                sess, [tf.saved_model.tag_constants.SERVING],
                #保存模型的方法名,与客户端的request.model_spec.signature_name对应
                signature_def_map={
                    tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
                        classification_signature},
                legacy_init_op=legacy_init_op)
    
            builder.save()
            print("Build Done")
    

    启动服务

    在这里采用默认的 tensorflow_model_server 启动服务

    bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server  --port=8000 --model_name=your_model_name --model_base_path=your_model_path
    

    例如:

    bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server  --port=8000 --model_name=sentiment_classification --model_base_path=/home/liyonghong/sentiment_analysis/builder_last
    

    注意事项:

    • 在实验过程中,采用保存checkpoint的方式(每n轮保存一次)保存serving模型时,进行本地调用时出错,所以最好在训练过程中只保存一次模型。

    Tensorflow Serving Client端(在这里使用第三方包tensorflow_serving_client)


    固定request_input的shape,向request中放一条数据

    如果构建模型时输入的维度是固定的,应该将shape字段设置为模型输入的shape

    from tensorflow_serving_client.protos import predict_pb2, prediction_service_pb2
    from grpc.beta import implementations
    import tensorflow as tf
    import time
    
    from keras.preprocessing.sequence import pad_sequences
    from utils import data_utils,data_process
    from lib import config
    
    #文件读取和处理
    vocb, rev_vocb = data_utils.initialize_vocabulary("./vocab")
    test_sentence_ = ["我", "讨厌", "这", "车"]
    test_token_sentence = [[vocb.get(i.encode('utf-8'), 1) for i in test_sentence_]]
    #将多条数据放到一个request中:
    # for i in range(128):
    #     test_token_sentence.append([vocb.get(i.encode('utf-8'), 1) for i in test_sentence_])
    padding_sentence = pad_sequences(test_token_sentence, maxlen=config.MAX_SEQUENCE_LENGTH)
    #计时
    start_time = time.time()
    #建立连接
    channel = implementations.insecure_channel("服务的IP地址", 端口号)
    stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
    request = predict_pb2.PredictRequest()
    #这里由保存和运行时定义,第一个启动tensorflow serving时配置的model_name,第二个是保存模型时的方法名
    request.model_spec.name = "sentiment_classification"
    request.model_spec.signature_name = "serving_default"
    #入参参照入参定义
    request.inputs["input_sentence"].ParseFromString(tf.contrib.util.make_tensor_proto(padding_sentence,
                                                                                       dtype=tf.int64,
                                                                                       shape=[1, 50]).SerializeToString())
    #第二个参数是最大等待时间,因为这里是block模式访问的
    response = stub.Predict(request, 10.0)
    results = {}
    for key in response.outputs:
        tensor_proto = response.outputs[key]
        nd_array = tf.contrib.util.make_ndarray(tensor_proto)
        results[key] = nd_array
    print("cost %ss to predict: " % (time.time() - start_time))
    print(results["predict_classification"])
    # print(results["predict_scores"])
    

    结果:

    cost 0.7014968395233154s to predict:
    predict label is: [0]
    
    不固定request_input的shape,将 n 条数据放在同一个请求中(实验中n=128)
    from tensorflow_serving_client.protos import predict_pb2, prediction_service_pb2
    from grpc.beta import implementations
    import tensorflow as tf
    import time
    
    from keras.preprocessing.sequence import pad_sequences
    from utils import data_utils,data_process
    from lib import config
    
    #文件读取和处理
    vocb, rev_vocb = data_utils.initialize_vocabulary(config.VOCABULARY_PATH)
    test_sentence_ = ["我", "讨厌", "这", "车"]
    test_token_sentence = [[vocb.get(i.encode('utf-8'), 1) for i in test_sentence_]]
    #将多条数据放到一个request中:
    for i in range(128):
        test_token_sentence.append([vocb.get(i.encode('utf-8'), 1) for i in test_sentence_])
    padding_sentence = pad_sequences(test_token_sentence, maxlen=config.MAX_SEQUENCE_LENGTH)
    #计时
    start_time = time.time()
    #建立连接
    channel = implementations.insecure_channel("部署服务的IP地址", 端口号)
    stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
    request = predict_pb2.PredictRequest()
    #这里由保存和运行时定义,第一个启动tensorflow serving时配置的model_name,第二个是保存模型时的方法名
    request.model_spec.name = "sentiment_classification"
    request.model_spec.signature_name = "serving_default"
    #入参参照入参定义
    request.inputs["input_sentence"].ParseFromString(tf.contrib.util.make_tensor_proto(padding_sentence,
                                                                                       dtype=tf.int64).SerializeToString())
    #第二个参数是最大等待时间,因为这里是block模式访问的
    response = stub.Predict(request, 10.0)
    results = {}
    for key in response.outputs:
        tensor_proto = response.outputs[key]
        nd_array = tf.contrib.util.make_ndarray(tensor_proto)
        results[key] = nd_array
    print("cost %ss to predict: " % (time.time() - start_time))
    print("predict label is:",results["predict_classification"])
    # print(results["predict_scores"])
    

    结果:

    cost 0.7941889762878418s to predict:
    predict label is: [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
    0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
    

    第二种调用方式处理的数据远多于第一种调用方式处理的数据,但是所用的时间跟第一种方式差不多,这可能是由于Tensorflow Serving 在处理请求时,采用了并行计算的方式,降低了计算耗时。

    注意事项

    • 对request.inputs进行定义时,数据类型dtype要与保存模型时对应的tensor的数据类型保持一致。例如:定义graph中tensor的数据类型为tf.int64,那么request.inputs也应该定义为tf.int64,如果定义为别的数据类型,可能会损失精度;
    • 客户端调用成功后,建议将客户端请求结果与本地调用模型的结果进行对比,验证结果是否正确;

    相关文章

      网友评论

      • d93fbe2792ee:楼主,我在input这一步报错 数据输入的问题,请问在使用图片作为预测的时候,input这里要怎么改,谢谢!
        liyonghong:@Jhin_d26e iuput的定义跟模型的输入有关,可以看看有没有跟模型的输入shape和type保持一致
      • Fishoning:谢谢,解决了我很多疑惑,请问一下有没有完整的github源码,参考以下?
        liyonghong:@Fishoning 不客气~
        Fishoning:@liyonghong 多谢啦!我看看改一改能不能通过。
        liyonghong:@Fishoning https://github.com/joehammer934/sentiment-classification
      • 309324a800e8:你好,我想把一个NER的模型部署到tensorflow-serving上,但是出现了客户端请求结果与本地调用模型的结果不一样的情况,请问有什么可能会导致这个问题呢?谢谢
        liyonghong:@夕日坂 emmmmm,其他的原因我也没遇见过,可以试着单步调试一下,看看哪一步出现了问题😳
        309324a800e8:@liyonghong 我确定input的dtype与模型输入的dtype是一致的,但是结果还是不一样……:sob:
        liyonghong:@夕日坂 我之前也出现过这样的问题,原因是request的时候,input的dtype没有与模型输入的dtype保持一致,你可以看看是不是这个原因😂
      • joepayne:我这边用keras 来实现lstm 的上线似乎有些问题,不知道有没有完整的源代码可供参考的,谢谢~
        joepayne:@liyonghong 感谢大佬的回复!对于这个问题,原因是keras版本问题,为了减少维护成本,我这边直接抛弃掉keras,切换tenorflow 框架。还有一些其它的细节问题给你发私信了=)
        liyonghong:@joepayne 你是说LSTM的模型代码吗

      本文标题:在Tensorflow Serving上部署基于LSTM的文本分

      本文链接:https://www.haomeiwen.com/subject/yowezxtx.html