美文网首页
# 数据分析最佳实践 - spark Dataset/DataF

# 数据分析最佳实践 - spark Dataset/DataF

作者: Sevsea | 来源:发表于2018-05-28 19:03 被阅读0次

    0x01前言

    官网上的spark with scala 的文档比较难理解,内容也特别少。初学遇到很多实际情况,会很迷茫怎么处理数据。
    在此把自己踩的坑列一列,供初学者参考。
    大牛请轻拍,有问题欢迎指教。

    0x02 理解

    其实spark不算难理解,但是加上一门不太熟悉的scala语言,整个人刚学的时候都是懵逼的。

    Spark:
    
    实际上它本身还是一个Map Reduce操作,Map负责对数据块操作,Reduce对结果汇总。只是Spark提出了RDD的概念,让我们不再需要关注底层的计算,而是更专注于数据集的操作。
    
    它的操作后端会有一个自动的调度系统,帮我们完成计算的过程。我们不需要关心这个任务被分配给了多少台机器,怎样的计算过程,我们只需要得到计算完毕的结果。
    
    Scala:
    
    scala是一门神奇的语言,老司机的代码你可能根本看不懂,对,就是那种一行搞定一个算法的那种。它的面向对象和函数式并不只是表面的意思。目前理解不够深入,希望有缘能再研究研究..
    
    Spark with scala:
    
    scala作为spark的编程语言,相较python此类扩展性更强,更少出现某个问题卡住没办法解决的问题。
    
    

    0x03 吐槽结束,正文来了!

    两周的研究中,发现从示例程序入手最好理解!如有不用再去查询相关文档!

    用它做了一个小功能,以下为本次实践中需求的场景,
    读写存储部分:

    • 从mongodb中读数据,spark处理后入mongodb

    数据处理部分:

    • 处理获取的mongodb数据及json数据。

    读写存储数据部分:

    首先,初上手的我,参考了一下mongodb针对spark的官网文档:
    https://docs.mongodb.com/spark-connector/master/scala/write-to-mongodb/

    这里有详细的方法去对mongodb数据进行读写。
    对后续数据处理,这类方法在文档中最清晰明了,于是我使用的这类方法(参见上方链接datasets and SQL部分):

    var sparkSession = SparkSession.builder().master("local[2]").appName("conn").config("spark.mongodb.input.uri", "mongodb://[username]:[password]@10.10.10.10:28018/[dbname].[collections]").config("spark.mongodb.output.uri", "mongodb://[username]:[password]@10.10.10.10:28018/[dbname].[collections]").getOrCreate()
    

    因为习惯性将每个功能点放进每个函数里分别调用,这里遇到了第一个问题,

    !SparkSession在一个任务中只能存在一个,多个会报冲突

    如果你多个函数中均要使用这个sparkSession,可以放入公共区域或声明为共享变量以供调用。

    共享变量了解一下:
    
        rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 
    
        解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差。
    
        另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接迟对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象,但在同一时间只有一个connection.send(record)执行,因为在同一个时间里,只有一个微批次的rdd.
    

    实际情况中,在读出数据处理完毕后,并不是所有情况都如官网示例一样,需要存储的是简单的数据,更多时候是需要糅合的数据,此时我们可能要从多种类型转换成dataframe/rdd类型数据进行存储。

    !多种类型数据转换成df/rdd类型
    import sparkSession.implicits._
    
    case class DealData(user:String,date:String,array_acc:ListBuffer[Map[String,String]],array_err:ListBuffer[Map[String,String]],array_fro:ListBuffer[Map[String,String]])
    
    val seq_df = Seq(DealData(user,NowDate(),array_acc,array_err,array_fro)).toDF()
    #这是一个将多类数据糅合进一个rdd的例子,DealData不是一个函数,是指明其类型,使用Seq必须在object中指明它包含的所有元素的类型。
    
    val rdd = seq_df.rdd
    #Dataframe to rdd
    
    val testDF = rdd.map {line=>
          (line._1,line._2)
        }.toDF("col1","col2")
    #rdd to dataframe,rdd是没有sql方法的,df和rdd的区别为:rdd是无字段名的。
    
    MongoSpark.save(seq_df.write.option("collection","[dbname]").mode("Append"))
    #以Dataframe格式保存进mongodb
    
    
    

    看官网demo中读取出来数据,会发现,可以将读出的数据具象为一个数据库形式,去展示,筛选等操作。

    但是实际上,除了show()函数检查数据是否具象成功,并不知道如何处理数据。下例或许会给你一点启示:

    
    val df = MongoSpark.load(sparkSession) 
    
    df.createOrReplaceTempView("[dbname]") 
    #这里是你给这个虚拟的db起一个名字,需与下句中表名保持一致。
    
    val df1 = sparkSession.sql("select user from [dbname] as l where cast(l.actTime as String) >= '2018/05/28 18:00:00' ")
    #这是一个依据时间筛选的例子,df1的数据类型为DataFrame
    
    val df_count0 = df1.groupBy("user").count()
    #这里是按照user与每个user对应的数量做一个新的表。表中只有user和count两个字段,df_count0依旧是DataFrame类型。
    
    val df_count = df_count0.orderBy("count")
    #按照count排序,此时df_count是DataSet类型。
    
    val users = df_count.rdd.map(x=>x(0)).collect()
    #取df_count所有数据集中的第一列元素,将其转换成Array类型-数组类型
    
    

    实际上还有一点google了很久都没有结果的数据处理需求,由spark的例子我们知道如何读取文件,但是如何处理有规律的文件,如,将json内容,转换成rdd或df的格式进行数据分析:

    !json内容转换成RDD/dataframe类型。
    import com.alibaba.fastjson.{JSON, JSONArray}
    
    val json = JSON.parseObject(content)
    val data = json.getJSONObject("DATA").get("data")
    #处理两层包含的json数据。
    
    
    row.replace("{","").replace("}","").split(",").map(row0=>try{
        val row0_array = row0.toString.split(":")
        array_map += (row0_array(0)->row0_array(1))
    })
    #实际上我们要做的事是把json数据转换成可处理的数据形式,
    #我们需要先把数据规律性的处理成key:value形式,
    #然后使用map函数对其进行指向,做成一个字典的形式,
    #之后再使用Seq方法将其转换成dataframe/rdd类型就可以啦~
    
    

    另外在环境搭建初期,遇到一些的环境错误Q&A分享一下~:

    Q:
    IDEA里运行代码时出现Error:scalac: error while loading JUnit4, Scala signature JUnit4 has wrong version expected: 5.0 found: 4.1 in JUnit4.class错误的解决办法
    A:
    删除test/scala/下的文件

    Q:
    Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class
    A:
    切换scala2.11.0

    Q:
    Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    A:
    scala版本和pom.xml版本不一致。

    Q:
    A master URL must be set in your configuration?
    A:
    没配置materURL,可在代码中加setMaster
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[]")*

    Q:
    res.withPipeline(Seq(Document.parse("{ $match: { 'errno' : '220' } }"))) Document未定义问题
    A:
    import org.bson.Document

    Q:
    ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
    java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.analysis.TypeCoercion$.findTightestCommonTypeOfTwo()Lscala/Function2;
    A:
    切换spark和spark-mongo版本为2.2版本。

    欢迎大佬指出不足和认知错误,scala这个萝卜坑,有缘再蹲。

    相关文章

      网友评论

          本文标题:# 数据分析最佳实践 - spark Dataset/DataF

          本文链接:https://www.haomeiwen.com/subject/dhwgjftx.html