美文网首页Tensorflow深度学习Spark和机器学习
如何基于SDL+TensorFlow/SK-Learn开发NLP

如何基于SDL+TensorFlow/SK-Learn开发NLP

作者: 祝威廉 | 来源:发表于2017-10-24 17:03 被阅读981次

    准备

    Step1: 首先下载项目:

    //下载项目
    git clone https://github.com/allwefantasy/spark-deep-learning.git .
    //切换到release 分支
    git checkout release
    

    Step2: 构建pyspark环境:

    确保安装了python 2.7 ,强烈建议你使用Virtualenv方便python环境的管理。之后通过pip 安装pyspark

    pip install pyspark
    

    文件比较大,大约180多M,有点耐心。你也可以使用阿里源:

    pip install pyspark -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
    

    下载 spark 2.2.0,然后解压到特定目录,设置SPARK_HOME即可。

    其实如果通过spark-submit 提交程序,并不会需要额外安装pyspark, 这里通过pip安装的主要目的是为了让你的IDE能有代码提示。

    接着安装项目需要的依赖:

    pip install -r requirements.txt
    

    最后进行项目build:

    build/sbt assembly
    

    这个时候你就得到了一个jar包:

    target/scala-2.11/spark-deep-learning-assembly-0.1.0-spark2.1.jar
    

    另外,另外你还需要一个Kafka。 似乎感觉有点麻烦,然而只要配置一次。

    方便代码提示,package python 源码

    为了方便在IDE得到代码提示,我们还需要把python相关的代码打包。
    在主目录运行:

    cd ./python && python setup.py bdist_wheel && cd dist && pip uninstall sparkdl  && pip install ./sparkdl-0.2.2-py2-none-any.whl && cd ..
    

    我这里打包和安装放一块了。

    现在,在IDE里,你可以得到代码提示补全了。

    开发基于SK-Learn的应用

    首先我们假设我们有这样的数据:

    # -*- coding: UTF-8 -*-
    from pyspark.ml import Pipeline
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import StringIndexer
    from spark_sklearn import GridSearchCV
    from sklearn import svm
    from sklearn.model_selection import GridSearchCV
    
    from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
    from sparkdl.transformers.tf_text import TFTextTransformer
    
    session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()
    
    documentDF = session.createDataFrame([
        ("Hi I heard about Spark", "spark"),
        ("I wish Java could use case classes", "java"),
        ("I wish Java could use case classes", "java"),
        ("I wish Java could use case classes", "java"),
        ("Logistic regression models are neat", "mlib"),
        ("Logistic regression models are neat", "spark"),
        ("Logistic regression models are neat", "mlib"),
        ("Logistic regression models are neat", "java"),
        ("Logistic regression models are neat", "spark"),
        ("Logistic regression models are neat", "java"),
        ("Logistic regression models are neat", "mlib")
    ], ["text", "preds"])
    

    接着我们希望把preds转化为数字(分类),text转化为向量,这样才能喂给算法。我们可以这么做:

    features = TFTextTransformer(
        inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)
    
    indexer = StringIndexer(inputCol="preds", outputCol="labels")
    
    pipline = Pipeline(stages=[features, indexer])
    ds = pipline.fit(documentDF).transform(documentDF)
    

    TFTextTransformer 默认提供的是一个二维数组,shape为(64,100),这种shape其实是为了给深度学习使用的,这里我指定shape为(-1,) 则会将二维数组转化为一个64*100的向量

    现在我们写一个函数,里面实现具体的sk-learn逻辑:

    def sk_map_fun(args={}, ctx=None, _read_data=None):
        params = args['params']['fitParam']
        data = [item for item in _read_data()]
        parameters = {'kernel': ('linear', 'rbf')}
        svr = svm.SVC()
        clf = GridSearchCV(svr, parameters)
        X = [x["features"] for x in data[0]]
        y = [int(x["labels"]) for x in data[0]]
        model = clf.fit(X, y)
        print(model.best_estimator_)
        return ""
    

    前面必须是def sk_map_fun(args={}, ctx=None, _read_data=None): 这样,函数名字可以随意定。 _read_data 是你获取数据的一个对象,典型用法如下:

            for data in _read_data(max_records=params["batch_size"]):
                batch_data = feed_dict(data)
                sess.run(train_step, feed_dict={input_x: batch_data})
    

    因为SVM是需要全量数据的,所以我简单的一次性拉取所有数据,因为条数小于默认的64条,所以我没有指定max_records.

     data = [item for item in _read_data()]
     X = [x["features"] for x in data[0]]
     y = [int(x["labels"]) for x in data[0]]
    

    现在我们要把sk_map_fun 集成到Estimator里:

    estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
                              kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                          "group_id": "sdl_1", "test_mode": False},
                              runningMode="Normal",
                              fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                              mapFnParam=sk_map_fun)
    estimator.fit(ds).collect()
    

    这里,通过mapFnParam 参数,我们将sklearn函数传递给了TextEstimator,并且我们配置了Kakfa相关参数。这里唯一需要注意的是fitParam, 这里的fitParam 长度为2,意味着会启动两个进程运行sk_map_fun,并且一次传递对应的参数给sk_map_fun,sk_map_fun的第一段代码:

    params = args['params']['fitParam']
    

    这个时候params是{"epochs": 5, "batch_size": 64} 或者 {"epochs": 5, "batch_size": 1}。
    这样你可以通过params拿到epoche,batch_size等,然后传给对应的Sk-Learn模型。

    如果你只是运行Local模式,那么可以修改下kafkaParam参数:

    import tempfile
    mock_kafka_file = tempfile.mkdtemp()
    kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                                  "mock_kafka_file": mock_kafka_file,
                                                  "group_id": "sdl_1", "test_mode": True},
    

    指定一个临时目录mock_kafka_file,并且设置为test_mode为True,这样就可以不依赖于Kafka.

    现在我么给出完整程序:

    # -*- coding: UTF-8 -*-
    from pyspark.ml import Pipeline
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import StringIndexer
    from spark_sklearn import GridSearchCV
    from sklearn import svm
    from sklearn.model_selection import GridSearchCV
    
    from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
    from sparkdl.transformers.tf_text import TFTextTransformer
    
    session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()
    
    documentDF = session.createDataFrame([
        ("Hi I heard about Spark", "spark"),
        ("I wish Java could use case classes", "java"),
        ("I wish Java could use case classes", "java"),
        ("I wish Java could use case classes", "java"),
        ("Logistic regression models are neat", "mlib"),
        ("Logistic regression models are neat", "spark"),
        ("Logistic regression models are neat", "mlib"),
        ("Logistic regression models are neat", "java"),
        ("Logistic regression models are neat", "spark"),
        ("Logistic regression models are neat", "java"),
        ("Logistic regression models are neat", "mlib")
    ], ["text", "preds"])
    
    # transform text column to sentence_matrix column which contains 2-D array.
    features = TFTextTransformer(
        inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)
    
    indexer = StringIndexer(inputCol="preds", outputCol="labels")
    
    pipline = Pipeline(stages=[features, indexer])
    ds = pipline.fit(documentDF).transform(documentDF)
    
    
    def sk_map_fun(args={}, ctx=None, _read_data=None):
        data = [item for item in _read_data()]
        parameters = {'kernel': ('linear', 'rbf')}
        svr = svm.SVC()
        clf = GridSearchCV(svr, parameters)
        X = [x["features"] for x in data[0]]
        y = [int(x["labels"]) for x in data[0]]
        model = clf.fit(X, y)
        print(model.best_estimator_)
        return ""
    
    
    # create a estimator to training where map_fun contains tensorflow's code
    estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
                              kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                          "group_id": "sdl_1", "test_mode": False},
                              runningMode="Normal",
                              fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                              mapFnParam=sk_map_fun)
    estimator.fit(ds).collect()
    
    

    然后使用如下指令运行:

    ./bin/spark-submit \
    --py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar \
    --jars spark-deep-learning-assembly-0.1.0-spark2.1.jar \
    --master "local[*]"  Sk2.py
    

    记得改下代码。

    开发基于TensorFlow的应用

    只要修改map_fun函数即可,比如:

    def map_fun(args={}, ctx=None, _read_data=None):
        import tensorflow as tf
        EMBEDDING_SIZE = args["embedding_size"]
        params = args['params']['fitParam']
        SEQUENCE_LENGTH = 64
    
        def feed_dict(batch):
            # Convert from dict of named arrays to two numpy arrays of the proper type
            features = []
            for i in batch:
                features.append(i['sentence_matrix'])
    
            # print("{} {}".format(feature, features))
            return features
    
        encoder_variables_dict = {
            "encoder_w1": tf.Variable(
                tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"),
            "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"),
            "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"),
            "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2")
        }
    
        def encoder(x, name="encoder"):
            with tf.name_scope(name):
                encoder_w1 = encoder_variables_dict["encoder_w1"]
                encoder_b1 = encoder_variables_dict["encoder_b1"]
    
                layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1)
    
                encoder_w2 = encoder_variables_dict["encoder_w2"]
                encoder_b2 = encoder_variables_dict["encoder_b2"]
    
                layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2)
                return layer_2
    
        def decoder(x, name="decoder"):
            with tf.name_scope(name):
                decoder_w1 = tf.Variable(tf.random_normal([128, 256]))
                decoder_b1 = tf.Variable(tf.random_normal([256]))
    
                layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1)
    
                decoder_w2 = tf.Variable(
                    tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE]))
                decoder_b2 = tf.Variable(
                    tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE]))
    
                layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2)
                return layer_2
    
        tf.reset_default_graph
        sess = tf.Session()
    
        input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x")
        flattened = tf.reshape(input_x,
                               [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE])
    
        encoder_op = encoder(flattened)
    
        tf.add_to_collection('encoder_op', encoder_op)
    
        y_pred = decoder(encoder_op)
    
        y_true = flattened
    
        with tf.name_scope("xent"):
            consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1),
                             tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)),
                                         tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1))))
            xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine))
            tf.summary.scalar("xent", xent)
    
        with tf.name_scope("train"):
            # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent)
            train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent)
    
        summ = tf.summary.merge_all()
    
        sess.run(tf.global_variables_initializer())
    
        for i in range(params["epochs"]):
            print("epoll {}".format(i))
            for data in _read_data(max_records=params["batch_size"]):
                batch_data = feed_dict(data)
                sess.run(train_step, feed_dict={input_x: batch_data})
    
        sess.close()
    

    我这里还是之前的一个例子,一个auto-encoder程序。
    接着通过TextEstimator接入:

            estimator = TextEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds",
                                      kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                                  "mock_kafka_file": mock_kafka_file,
                                                  "group_id": "sdl_1", "test_mode": True},
                                      runningMode="Normal",
                                      fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                                      mapFnParam=map_fun)
    estimator.fit(df).collect()
    

    大同小异了。

    关于tensorflow,还可以有集群模式,可参考: 为Spark Deep Learning 集成TFoS

    相关文章

      网友评论

        本文标题:如何基于SDL+TensorFlow/SK-Learn开发NLP

        本文链接:https://www.haomeiwen.com/subject/gghxpxtx.html