美文网首页PySpark研究机器学习与数据挖掘程序猿日记
利用PySpark 数据预处理(特征化)实战

利用PySpark 数据预处理(特征化)实战

作者: 祝威廉 | 来源:发表于2017-10-29 12:53 被阅读766次

    前言

    之前说要自己维护一个spark deep learning的分支,加快SDL的进度,这次终于提供了一些组件和实践,可以很大简化数据的预处理。

    模型

    这次实际情况是,我手头已经有个现成的模型,基于TF开发,并且算法工程师也提供了一些表给我,有用户信息表,用户行为表。行为表已经关联了内容的文本。现在我需要通过SDL来完成两个工作:

    1. 根据已有的表获取数据,处理成四个向量。
    2. 把数据喂给模型,进行训练

    思路整理

    四个向量又分成两个部分:

    1. 用户向量部分
    2. 内容向量部分

    用户向量部分由2部分组成:

    1. 根据几个用户的基础属性,他们有数值也有字符串,我们需要将他们分别表示成二进制后拼接成一个数组。
    2. 根据用户访问的内容,通过词向量把每篇内容转化为一个向量,再把某个用户看过的所有内容转化为一个向量(都是简单采用加权平均)

    内容向量部分组成:

    对于文章,我们需要把他表示为一个数字序列(每个词汇由一个数字表示),同时需要放回词向量表,给RNN/CNN使用。

    所以处理流程也是比较直观的:

    1. 通过用户信息表,可以得到用户基础属性向量
    2. 通过行为表,可以得到每篇涉及到的内容的数字序列表表示,同时也可以为每个用户算出行为向量。

    最后的算法的输入其实是行为表,但是这个时候的行为表已经包含基础信息,内容序列,以及用户的内容行为向量。

    实现

    现在我们看看利用SDL里提供的组件,如何完成这些数据处理的工作以及衔接模型。

    第一个是pyspark的套路,import SDL的一些组件,构建一个spark session:

    # -*- coding: UTF-8 -*-
    from pyspark.sql import SparkSession
    from pyspark.sql.types import IntegerType, ArrayType, StringType, FloatType
    from pyspark.sql.functions import *
    import numpy as np
    
    from sparkdl.transformers.tf_text import CategoricalBinaryTransformer, CombineBinaryColumnTransformer, \
        TextAnalysisTransformer, TextEmbeddingSequenceTransformer
    from sparkdl.estimators.text_estimator import TextEstimator
    
    session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
    

    读取用户基础信息表,这里我是直接读了一个CSV文件,现实中应该是Hive表。同时罗列有哪些字段是这次要用的,罗列一下:

    person_basic_info_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72329.csv", encoding="utf-8",
                                            header=True)
    person_basic_info_df.registerTempTable("person_basic_info_df")
    
    # 把所有基础属性罗列出来
    person_basic_properties_str = "education,jobtitle..."
    
    person_basic_properties_group = [item for item in
                                     person_basic_properties_str.split(",")]
    # 每个属性我们会表示为一个12位的二进制字符串。
    person_basic_info_vector_size = len(person_basic_properties_group) * 12
    

    接着我们就可以利用SDL提供的CategoricalBinaryTransformer把这些字段批量转化为二进制。

    # 基础信息中字符串字段需要转化为数字
    binary_columns = [item + "_binary" for item in person_basic_properties_group]
    
    binary_trans = CategoricalBinaryTransformer(inputCols=person_basic_properties_group,
                                                outputCols=binary_columns,
                                                embeddingSize=12)
    combin_trans = CombineBinaryColumnTransformer(inputCols=binary_columns, outputCol="person_info_vector")
    
    person_basic_info_with_all_binary_df = combin_trans.transform(binary_trans.transform(person_basic_info_df)). \
        groupBy("id").agg(first("person_info_vector").alias("person_info_vector"))
    

    CategoricalBinaryTransformer接受inputCols参数, 传递一个数组字段,告诉他哪些字段是需要转化为二进制数值表示的。outputCols指定输出的名字,embeddingSize指定用多少个二进制数字。 所有的CategoricalBinaryTransformer会添加outputCols指定的字段。

    因为我们需要把这些字段都拼接成一个字段,这个时候可以利用CombineBinaryColumnTransformer 。方式和CategoricalBinaryTransformer一样,但是输出只有一个字段。这样我们就得到了一个长度为person_basic_info_vector_size 的字段,格式大致这个样子:

    [1,0,1,0,0,....]
    

    CategoricalBinaryTransformer 内部的机制是,会将字段所有的值枚举出来,并且给每一个值递增的编号,然后给这个编号设置一个二进制字符串。

    现在第一个特征就构造好了。接着,有一些NLP特有的操作了,我们需要对某些内容进行分词
    ,同时将他们转化为数字序列(比如RNN就需要这种),并且把数字和词还有向量的对应关系给出。分词现在默认采用的是jieba。

    person_behavior_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72321.csv", encoding="utf-8",
                                          header=True).sample(True, 0.01).where(col("title").isNotNull()).where(
        col("text_body").isNotNull())
    
    # 通过TextAnalysisTransformer我们对所有需要分词/抽词的字段进行分词
    text_columns = ["title", "text_body"]
    text_cut_columns = [item + "_cut" for item in text_columns]
    tat_trans = TextAnalysisTransformer(inputCols=text_columns, outputCols=text_cut_columns)
    tat_df = tat_trans.transform(person_behavior_df)
    tat_df.show()
    
    # 通过TextEmbeddingSequenceTransformer把分完词的字段里面的词汇全部替换成数字,这一步分会作为文章的输出
    text_sequence_columns = [item + "_seq" for item in text_columns]
    test_trans = TextEmbeddingSequenceTransformer(inputCols=text_cut_columns, outputCols=text_sequence_columns)
    test_df = test_trans.transform(tat_df)
    test_df.show()
    
    # TextEmbeddingSequenceTransformer 有几个属性可以获取词向量相关信息
    word_embedding = test_trans.getWordEmbedding()
    word2vec_model = test_trans.getW2vModel()
    embedding_size = test_trans.getEmbeddingSize()
    
    # 广播出去,方便在自定义函数里使用
    word_index2v_mapping_br = session.sparkContext.broadcast(
        dict([(item["word_index"], item["vector"]) for item in word_embedding]))
    
    # 把标题和正文拼接
    person_behavior_vector_seq_cctf = CombineBinaryColumnTransformer(inputCols=text_sequence_columns,
                                                                     outputCol="person_behavior_vector_seq")
    person_behavior_vector_seq_df = person_behavior_vector_seq_cctf.transform(test_df)
    

    这样就完成了文本到数字序列的转化了,并且通过TextEmbeddingSequenceTransformer获取词向量表数据。接下来,我们看看如何做一个复杂的自定义操作,这个操作主要是在行为表,把数字序列转化词向量,然后做加权平均。这个时候,每篇文章已经可以用一个向量表示了。

    # 定义一个函数,接受的是一个数字序列,然后把数字转化为vector,然后做
    # 加权平均
    def avg_word_embbeding(word_seq):
        result = np.zeros(embedding_size)
        for item in word_seq:
            if item in word_index2v_mapping_br.value:
                result = result + np.array(word_index2v_mapping_br.value[item])
        return (result / len(word_seq)).tolist()
    
    # 注册成udf函数
    avg_word_embbeding_udf = udf(avg_word_embbeding, ArrayType(FloatType()))
    # 添加一个person_behavior_article_vector新列
    person_behavior_vector_df = person_behavior_vector_seq_df.withColumn(
        "person_behavior_article_vector",
        avg_word_embbeding_udf(
            "person_behavior_vector_seq"))
    

    现在根据用户id做groupby 然后把多篇文章的文章向量合并成一个,然后把数字转换为向量,做加权平均。这个时候,每个用户终于有一个行为向量了。

    # 我们根据用户名groupby ,把用户看过的所有文章聚合然后计算一个向量
    
    def avg_word_embbeding_2(word_seq):
        result = np.zeros(embedding_size)
        for item in word_seq:
            result = result + np.array(item)
        return (result / len(word_seq)).tolist()
    
    
    avg_word_embbeding_2_udf = udf(avg_word_embbeding_2, ArrayType(FloatType()))
    
    person_behavior_vector_all_df = person_behavior_vector_df.groupBy("id").agg(
        avg_word_embbeding_2_udf(collect_list("person_behavior_article_vector")).alias("person_behavior_vector"))
    

    现在,我们拿到了用户基础信息向量,访问内容向量。 当然还有之前计算出来的访问内容的数字序列,但是分在不同的表里(dataframe),我们把他们拼接成一个:

    pv_df = person_basic_info_with_all_binary_df.select("id", "person_info_vector").alias("pv")
    cv_df = person_behavior_vector_all_df.select("id", "person_behavior_vector").alias(
        "cv")
    person_vector_df = cv_df.join(
        pv_df,
        col("pv.id") == col("cv.id"), "left"
    )
    
    person_df = person_vector_df.select("pv.id", "pv.person_info_vector", "cv.person_behavior_vector").where(
        col("id").isNotNull())
    

    这里是标准的spark dataframe的join操作。

    我们假设做的是一个二分类问题,到目前为止,我们还没有分类字段,为了简单起见我随机填充了分类,利用前面的办法,自定义一个UDF函数,添加了一个like_or_not_like 列。最后返回df的时候,过滤掉去胳膊少腿的行。

    def like_or_not_like():
        return [0, 1] if np.random.uniform() < 0.5 else [1, 0]
    
    like_or_not_like_udf = udf(like_or_not_like, ArrayType(IntegerType()))
    result_df = person_behavior_vector_df.join(person_df, person_behavior_vector_df["id"] == person_df["id"],
                                               "left").withColumn("like_or_not_like", like_or_not_like_udf()).drop(
        person_df["id"]).where(
        col("person_info_vector").isNotNull()).where(
        col("person_behavior_vector").isNotNull()).where(
        col("person_behavior_vector_seq").isNotNull())
    
    word2v_mapping_br = session.sparkContext.broadcast(
        dict([(item["word"], item["vector"]) for item in word_embedding]))
    
    

    现在我们获得了所有的向量,我们可以把数据喂给算法了,这个主要通过TextEstimator来完成。

    estimator = TextEstimator(kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                          "group_id": "sdl_1", "test_mode": False},
                              runningMode="Normal",
                              fitParam=[{"epochs": 5, "batch_size": 64, "word_embedding_bs": word2v_mapping_br.value}],
                              mapFnParam=map_fun)
    estimator.fit(result_df).collect()
    
    

    word embbeding表,我们通过fitParam参数传递给tf程序,然后tf所有的代码都在map_fun里,我们简单看看tf怎么拿到数据:

    def map_fun(args={}, ctx=None, _read_data=None):
        import tensorflow as tf
        import numpy as np
        import datetime
        import os
        import time
        from sklearn.utils import Bunch
        FLAGS = Bunch(**args["params"]["fitParam"])
        embedded_vec = FLAGS.word_embedding_bs
    
        def config_default_value(name, value, desc):
            FLAGS.setdefault(name, value)
    
        # 产生数据
        def training_batch_generator(batch_size):
            for items in _read_data(max_records=batch_size):
                x_basic_info = [item["person_info_vector"] for item in items]
                x_subs = [item["person_subs"] for item in items]
                x_personas = [item["person_behavior_vector"] for item in items]
                x_contents = [item["person_behavior_vector_seq"] for item in items]
                y = [item["like_or_not_like"] for item in items]
                yield np.array(x_basic_info), np.array(x_subs), np.array(x_personas), np.array(x_contents), np.array(y)
    

    现在通过training_batch_generator你已经可以拿到训练数据了。

    如何执行

    虽然已经简化了处理,但是代码还是不少,为了方便调试,建议使用pyspark shell。运行指令如下:

    export PYTHONIOENCODING=utf8;./bin/pyspark --py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar --jars spark-deep-learning-assembly-0.1.0-spark2.1.jar --master "local[*]"
    

    然后把代码黏贴进去就可以了。

    相关文章

      网友评论

        本文标题:利用PySpark 数据预处理(特征化)实战

        本文链接:https://www.haomeiwen.com/subject/anxypxtx.html