美文网首页Spark技术干货
SparkSQL+RDD计算句子相似性!

SparkSQL+RDD计算句子相似性!

作者: 文哥的学习日记 | 来源:发表于2017-07-06 12:12 被阅读165次

    Spark版本:2.1
    Python版本:2.7.12

    之前想做一个检索式的智能对话,其实只用SparkRDD是完全可以的,但是想实用一下SparkSQL,所以有点强搬硬套的感觉,不过没关系,干货就是干货,对于了解SparkSQL的基本使用肯定是有帮助的。

    1、SparkSQL从mysql中获取数据

    回忆一下之前文章中的提到的使用pyspark并从mysql中获取数据,完整代码如下:

    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext
    
    sc = SparkSession.builder.appName("Python Spark SQL basic example")\
        .config('spark.some.config,option0','some-value')\
        .getOrCreate()
    ctx = SQLContext(sc)
    jdbcDf=ctx.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test",
                                           driver="com.mysql.jdbc.Driver",
                                           dbtable="(SELECT * FROM chat) tmp",user="root",
                                           password="0845").load()
    print(jdbcDf.printSchema())
    

    数据如下图所示:


    chat数据表.png

    2、DataFrame转RDD

    一开始,我尝试了如下的DataFrame转RDD的方法,首先使用select方法选择其中id和vector两列,随后使用sparkSession的rdd方法转换成RDD:

    vectors= df.select(df['id'],df['vector'])
    vector_rdd = vectors.rdd.map(lambda p:(p.id,[float(x) for x in p.vector.split(' ')]))
    

    运行报错了,对于我来说,至今仍是个未解之谜!

    直接转rdd报错.png

    后来看了官方文档,文档上面按照如下的代码,首先将我们从mysql读入的数据转换成一个虚拟的表,然后使用类似mysql的选择语句,选择出其中两列,再进行转换,居然真的成功了!:

    df.createOrReplaceTempView('chats')
    vectors = sc.sql('Select id,vector from chats')
    vector_rdd = vectors.rdd.map(lambda p:(p.id,[float(x) for x in p.vector.split(' ')]))
    

    3、句子相似度计算

    假设我们输入的句子是“我想问一个问题”,运行我在之前文章中提到过的word2vec模型,将其转换为词向量,随后转换为一个RDD对象。

    model = gensim.models.Word2Vec.load('wiki.zh.text.model')
    
    def list_to_vector(q_list):
        q_vec = np.zeros((400),dtype= np.float32)
        for word in q_list:
            if word in model:
                q_vec += model[word]
        return (list(q_vec))
    spark = sc.sparkContext
    data = spark.parallelize([list_to_vector(list(jieba.cut('我想问一个问题')))])
    

    这里的输入是对句子分词的结果,由于计算相似度的方法我选的是余弦相似度,该相似度的计算只与向量的方向有关,与向量的大小无关,所以我直接在每一维上进行相加,而没有取平均。
    这里另一个需要注意的问题是,SparkSession不能直接计算创建RDD,而需要SparkContext对象,所以首先利用SparkSession的sparkContext属性得到SparkContext对象。

    得到两个RDD之后,我们就可以利用如下的代码来计算相似度,并按照降序进行排序:

    def cosine(t):
        x = t[0][1]
        y = t[1]
        convxy = 0
        sumx=0
        sumy=0
        for i in range(len(x)):
            convxy += x[i] * y[i]
            sumx += x[i] ** 2
            sumy += y[i] ** 2
        if (math.sqrt(sumx) * math.sqrt(sumy)) != 0:
            value = convxy / (math.sqrt(sumx) * math.sqrt(sumy))
        else:
            value = 0
        return (t[0][0],value)
    
    data_rdd = vector_rdd.cartesian(data)
    output = sorted(data_rdd.map(cosine).collect(),key=lambda x:x[1])
    

    我们打印输出最为匹配的句子:

    print (df.filter(df['id']==output[0]).answer)
    
    结果.png

    哇,可以看到效果还是不错的!如果我们输入一句“我想问一个问题”,那么系统给出的答案是“行啊,你问吧!”,匹配的非常不错。

    4、完整代码

    #-*-coding:utf-8-*-#
    from pyspark.sql import SparkSession
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    import pymongo
    import json
    import pymysql
    import jieba
    import gensim
    import numpy as np
    import pandas as pd
    import math
    
    model = gensim.models.Word2Vec.load('wiki.zh.text.model')
    
    def list_to_vector(q_list):
        q_vec = np.zeros((400),dtype= np.float32)
        for word in q_list:
            if word in model:
                q_vec += model[word]
        return (list(q_vec))
    
    
    def cosine(t):
        x = t[0][1]
        y = t[1]
        convxy = 0
        sumx=0
        sumy=0
        for i in range(len(x)):
            convxy += x[i] * y[i]
            sumx += x[i] ** 2
            sumy += y[i] ** 2
        if (math.sqrt(sumx) * math.sqrt(sumy)) != 0:
            value = convxy / (math.sqrt(sumx) * math.sqrt(sumy))
        else:
            value = 0
        return (t[0][0],value)
    #
    #
    sc = SparkSession.builder.appName("Python Spark SQL basic example").config('spark.some.config.option','some-value').getOrCreate()
    
    ctx = SQLContext(sc)
    spark = sc.sparkContext
    
    df=ctx.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/test",driver="com.mysql.jdbc.Driver",dbtable="(SELECT * FROM chat) tmp",user="root",password="0845").load()
    print(df.printSchema())
    #print (df.select('question').show())
    df.createOrReplaceTempView('chats')
    # vectors= df.select(df['id'],df['vector'])
    # print (type(vectors))
    # print (vectors.show())
    vectors = sc.sql('Select id,vector from chats')
    #print (type(vectors))
    vector_rdd = vectors.rdd.map(lambda p:(p.id,[float(x) for x in p.vector.split(' ')]))
    
    data = spark.parallelize([list_to_vector(list(jieba.cut('我想问一个问题')))])
    data_rdd = vector_rdd.cartesian(data)
    output = sorted(data_rdd.map(cosine).collect(),key=lambda x:x[1])
    print (output[0])
    print (df.filter(df['id']==output[0]).answer)
    

    如果你喜欢我写的文章,可以帮忙给小编点个赞或者加个关注,我一定会互粉的!
    如果大家对spark感兴趣,欢迎跟小编进行交流,小编微信为sxw2251,加我要写好备注哟!


    我的微信

    相关文章

      网友评论

      本文标题:SparkSQL+RDD计算句子相似性!

      本文链接:https://www.haomeiwen.com/subject/rglhhxtx.html