spark 学习笔记

作者: 哎哟喂喽 | 来源:发表于2017-01-15 10:27 被阅读2191次

    Spark学习笔记

    Data

    Source->Kafka->Spark Streaming->Parquet->Spark SQL(SparkSQL可以结合MLGraphX)->Parquet->其它各种Data Mining

    1.1 Spark集群的安装

    Spark的运行是构建在hadoop集群之上(默认hadoop集群已经安装好了),在spark集群集群上必须要安装对应版本的scala

    1.1.1 scala安装

    Ø下载scala版本,解压scala

    Ø配置环境变量/etc/profile,添加SCALA_HOME、修改PATH,添加上scala的path路径

    Ø进入$SCALA_HOME/bin目录,执行./scala验证scala是否安装成功

    Ø集群机器都需要安装scala

    1.1.2 spark安装

    在集群的所有机器上都必须要安装spark,首先安装master的spark程序

    Ø先解压spark程序

    Ø修改环境变量/etc/profile添加SPARK_HOME和修改spark PATH路径

    Ø配置spark,进入conf目录下

    nmv spark-env.sh.template spark-env.sh

    其中:spark_master_ip:用于指定master

    nvi slaves修改文件,把work节点都添加进去;

    Ø至此,spark集群安装完毕

    1.1.3启动集群校验

    Ø先启动hadoop集群,jps查看进程

    Ø再启动spark集群,在sbin目录下执行./start-all.shjps查看进程

    ØUi访问,检查集群情况http://master:8080

    Ø进入spark/bin目录下,启动spark-shell脚本

    1.2 spark-shell的使用

    在master机器上的$SPARK_HOME/bin目录下,运行./spark-shell程序启动shark-shell脚本;通过http://master:4040查看spark-shell运行情况

    1.2.1 spark-shell操作hdfs文件实战

    Ø将spark目录下的README.md文件上传到hdfs上的/test目录下,通过hdfs ui来进行查看slave:50070/explorer.html#/查看文件是否上传成功

    Ø在spark-shell脚本程序下,执行sc(SparkContext实例),启动spark-sehll时,系统自动生成

    scala> sc

    res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@65859b44

    # SparkContext是把代码提交到集群或者本地的通道,编写Spark代码,无论是要运行本地还是集群都必须要有SparkContext实例

    ØSpark-shell读取hdfs文件的README.md文件

    val file = sc.textFile(“hdfs://mapeng:8020/test/README.md”)

    #这里把读取到的文本内容赋值给了变量file,(就是一个MappedRDD,在spark的代码中,一切都是基于RDD进行操作的)

    Ø读取文本中包含有“spark”的行

    scala> val sparks = file.filter(line

    => line.contains("spark"))

    sparks: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at

    filter at :26

    #此时生成了一个FilterRDD

    Ø统计spark一共出现多少次

    Sparks.count

    1.2.2 spark-shell操作及详细说明

    1.2.2.1并行化集合(parallelize)

    Ø加载集合数据

    val data = sc.parallelize(1 to 10)#加载集合数据

    或者; val data = sc.parallelize(List(1,2,3,4…))

    Ø对集合数据进行*2操作

    val data1 = data.map(_*2)

    Ø对数据进行过滤:过滤出是2的倍数的集合

    val data2 = data.filter(_%2==0)

    Ø内存缓存数据

    data.cache

    Ø触发action,以数据的形式返回结果集

    data.collect

    Ø返回结果集的第一个元素

    data.first

    Ø返回结果集的前3个元素

    data.take(3)

    Ø统计元素的个数

    data.count

    Ø查看RDD的转换过程

    data.toDebugString

    1.2.2.2 map数据集合

    Ø加载List(Map)数据

    val

    data=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

    Ø排序sortByKey()

    scala> data.sortByKey().collect

    res55: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5),

    (C,3))

    Ø分组groupByKey()

    scala>

    data.groupByKey().collect

    res57: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2,  5)), (A,CompactBuffer(1, 4)), (C,CompactBuffer(3)))

    Ø求和reduceByKey(_+_)

    scala>

    data.reduceByKey(_+_).collect

    res59: Array[(String, Int)] = Array((B,7), (A,5), (C,3))

    Ø去重distinct

    scala> data.distinct.collect

    res60: Array[(String, Int)] = Array((A,1), (A,4), (B,5), (C,3),

    (B,2))

    Ø联合union

    scala> val

    data1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

    data1:

    org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at

    parallelize at :24

    scala> val data2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5)))

    data2:

    org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at

    parallelize at :24

    scala>

    data1.union(data2).collect

    res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4),

    (B,5), (A,4), (A,4), (C,3), (A,4), (B,5))

    Ø关联join

    相当于笛卡尔积

    1.2.2.3保存转换结果saveAsTextFile

    data.saveAsTextFile(“path”);//将转换结果存储在hdfs指定的路径

    1.2.3 spark cache缓存

    对于spark程序,第二次执行要比前面的执行的效率要高

    1.3 RDD(弹性分布式数据集)

    1.3.1 RDD介绍

    ØRDD是一个容错的、并行的数据结构,可以让用户显示的将数据存储在磁盘和内存中,并能控制数据的分区。

    ØRDD提供了一套丰富的函数来操作数据

    ØRDD作为数据机构,本质上是一个只读的分区记录集合;一个RDD可以包含多个分区,每个分区就是一个dataSet片段;RDD可以相互依赖

    n窄依赖:RDD的每个分区最多只能被一个child

    RDD的分区使用(例如:map操作)

    n宽依赖:RDD的分区可以被多个child

    RDD的分区使用(例如:join操作)

    区别:

    (1)窄依赖可以在集群中的一个节点上如流水般的执行,相反,宽依赖需要所有的父分区的数据都可用

    (2)从出现失败恢复的角度来考虑:窄依赖只需要重新计算失败的父RDD的分区,而宽依赖失败会导致其父RDD的多个分区重新计算

    1.3.2 RDD分区

    1.3.3创建操作

    1.3.3.1集合创建操作

    Spark提供了两类函数实现从集合生成RDD;

    Øparallelize

    val rdd = sc.parallelize(1 to 100)

    ØmakeRDD:还提供了指定分区参数

    val rdd = sc.makeRDD(1 to 100,3)#指定了分区数为3

    1.3.3.2存储创建操作

    操作hdfs

    val rdd = sc.textFile(“hdfs://master:9000/test/xxx.txt”)

    1.3.4 RDD的基本转换操作

    1.3.4.1 RDD的重新分区

    repartition和coalesce是对RDD的分区进行重新划分

    Ørepartition(numPartitions:Int):RDD[T]

    Øcoalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

    repartition只是coalesce接口中shuffle为true的简易实现。

    重新划分分区主要有三种情况:(原RDD有N个分区,需要重新划分为M个分区)

    ØN

    ØN>M(相差不大):面临着要把原分区进行合并的操作,最终合成M个分区,这时将shuffle设置为false

    注:在shuffle为false时,设置M>N,coalesce是不起作用的

    ØN>>M(差距悬殊):如果将shuffle设置为false,由于父子RDD是窄依赖,会使得它们同处于一个stage中,可能会造成spark程序运行的并行度不够,从而影响效率。

    因而,最好设置为true,使得coalesce之前的操作有更好的并行度

    1.3.4.2 RDD转换为数组(randomSplit、glom)

    ØrandomSplit(weight:Array[Double],seed:Long=System.nanoTime):Array[RDD[T]]

    randomSplit函数是将一个RDD切分为多个RDD,返回结果是一个RDD数组;函数的第一个方法传入的参数权重是一个Double类型的数组;权重大的,分到的数据的概率大

    val rdd = sc.makeRDD(1 to 10)

    val splitRDD = rdd.randomSplit(Array(1.0,3.0,6.0))

    #返回的一个RDD数组,查看数组元素

    splitRDD(0).collect

    splitRDD(1).collect

    splitRDD(2).collect

    Øglom():RDD[Array[T]]

    glom函数是将RDD中每一个分区中类型为T的元素转换为Array[T]

    val rdd = sc.makeRDD(1 to 10,3)

    val glomRDD= rdd.glom

    #返回的结果是一个数组,

    glomRDD.collect

    scala> glomRDD.collect

    res44: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7,

    8, 9, 10))

    1.3.4.3 RDD的集合操作

    Øunion(other:RDD[T]) :RDD[T]

    将两个RDD的数据进行合并,返回两个RDD的并集,不去重

    Øintersection(other:RDD[T]) :RDD[T]

    返回两个RDD的交集(会去重)

    Øsubtract(other:RDD[T]) :RDD[T]

    取差集

    Øzip(other:RDD[T]):RDD[T]

    zip函数用于将两个RDD组合成key/value形式的RDD,两个RDD的partition个数以及元素的数量都必须要相同,不然会抛出异常

    val rdd1 = sc.makeRDD(List(1,2,3,3))

    val rdd2 = sc.makeRDD(List(2,3,4))

    union操作

    rdd1.union(rdd2).collect

    #结果

    Array[Int] = Array(1, 2, 3, 3, 2, 3, 4)

    intersection操作

    rdd1.intersection(rdd2).collect

    #结果(去重)

    Array[Int] = Array(3, 2)

    subtract操作

    rdd1.subtract(rdd2).collect

    #结果(不去重)

    Array[Int] = Array(1, 1)

    zip操作

    scala> val rdd1 = sc.makeRDD(1

    to 3)

    scala> val rdd2 =

    sc.makeRDD(List(1.0,2.0,3.0))

    scala> rdd1.zip(rdd2).collect

    res8: Array[(Int, Double)] = Array((1,1.0), (2,2.0), (3,3.0))

    1.3.4.4键值RDD转换操作

    map和flatmap的区别:

    (1)map是对每个元素都进行指定的操作,返回每个元素处理后的对象

    (2)flatmap对所有的元素都做指定的操作,将所有的对象合并为一个对象返回

    val rdd = sc.makeRDD(1 to 3)

    rdd.map(x=>Seq(x,x)).collect

    #结果

    Array[Seq[Int]] = Array(List(1, 1),

    List(2, 2), List(3, 3), List(4, 4))

    rdd.flatMap(x=>Seq(x,x)).collect

    #结果,合并为一个对象返回

    Array[Int] = Array(1, 1, 2, 2, 3, 3, 4,  4)

    未完待续.....

    1.3.5 RDD的行动操作

    每调用一次行动操作,都会触发一次spark的调度并返回响应的结果

    1.3.5.1集合标量行动操作

    Øcount

    返回RDD中的元素的个数

    Øfirst

    返回RDD中的第一个元素

    Øreduce(f:(T,T) = >T)

    对RDD中的元素进行二元计算,返回计算结果

    val rdd = sc.makeRDD(1 to 4)

    rdd.reduce(_+_)#10

    rdd.reduce(_-_)#-8

    Øcollect()

    以集合的形式返回RDD的元素

    Øtake(number:Int)

    返回集合中[0,num-1]下标的元素

    Øtop(num:Int)

    先降序排序,返回前num个元素

    ØtakeOrdered(num:Int)

    以与top相反的排序规则(升序),返回前num个元素

    Ølookup(key:k):Seq[v]

    lookup是针对(k,v)类型RDD的行动操作,针对给定的键值,返回与此键值相对应的所有值

    1.3.5.2存储行动操作

    RDD不仅可以存储在hdfs中还能存储到Hbase、MangoDB等数据库中

    1.4 Spark SQL

    1.4.1 spark sql与shark区别

    sparksql是一个支持结构化数据处理的spark模块,提供DaraFrame作为可编程的数据抽象,可以对DataFrame执行sql的操作。

    spark sql的诞生就是为了解决spark平台上的交互式查询问题,并且提供sql接口兼容原有数据库用户的使用情况

    Øshark简单的说,就是spark上的hive,其底层依赖Hive引擎的,但在spark平台上,解析速度是hive的好多倍;就是一个升级版的大数据仓库

    Ø在spark1.0版本开始,shark被官方抛弃使用

    Øspark sql的优势:

    nspark sql完全脱离了hive的限制

    nspark sql支持查询原生的RDD,能够高效的处理大数据的各种场景的基础

    n能够在scala中写sql语句,支持简单的sql语法检查,将结果取回作为RDD使用

    nCatalyst能够帮助用户优化查询,catalyst能够进行一定程度的性能提升

    # catalyst是spark sql的调度核心,解析sql形成其对应的执行计划(遵循DAG图)

    1.4.2 DataFrame和DataSet

    1.4.2.1 RDD与DataFrame的区别

    如上图:

    Ø左侧的RDD[Person]虽然以Person为类型参数,但是spark框架本省不了解Person类的内部结构;而右侧的DataFrame却提供了详细的结构信息(schema),使得spark sql可以清楚的知道该数据集中包含哪些列,每列的名称和类型

    ØRDD是分布式的java对象的集合;而DataFrame是分布式的row对象的集合

    ØDataFrame除了提供比RDD更丰富的算子外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化

    1.4.2.2 DataFrame与DataSet的区别

    DataSet可以认为是DataFrame的一个特例,主要区别是DataSet的每一个record存储的是一个强类型值而不是一个Row。具有三个特点:

    ØDataSet在编译时检查类型

    Ø面向对象的编程接口

    Ø后面的版本DataFrame是继承DataSet的,DataFrame是面向Spark sql的接口

    相互转换:

    DataFrame和DataSet可以相互转化,df.as[ElementType]这样可以把DataFrame转化为DataSet,ds.toDF()这样可以把DataSet转化为DataFrame

    1.4.2.3 DataFrame

    (1)DataFrame是一个分布式的数据集,类似于关系数据库的一个表。

    ØDataFrame以列的形式存储,但是不知道列的类型,因此,在编译时不进行校验,只有在运行时才会处理;DataSet不仅知道字段,还知道类型,所以编译时会进行类型校验

    Ø可以由结构化的数据转换过来,也可以从hive,外部数据库或者RDD转换

    ØDataFrame在spark sql中,可以使用sql的方式进行操作,与RDD类似,也可以采用lazy的方式,只有动作发生时才会真正的计算

    ØDataFrame的数据源:支持JSON文件、hive表格,支持本地文件系统以及hdfs等;配合JDBC还支持外部关系型数据库

    1.4.2.4与RDD的相互操作

    spark sql支持两种不同的方式用于将存在的RDD转换为DataSets、DataFrame

    Ø反射推断模式:

    该模式使得代码更加的简练,不过在写spark程序的时候已经知道模式信息,(比如RDD中自己定义的case class类型)

    练习:从hdfs文件中读取数据,创建一个Person的RDD

    1.定义Person type

    scala> case class

    Person(id:Int,name:String,addr:String)

    defined class Person

    2.从hdfs读取文件,封装成DataFrame数据集

    scala> val personDf = spark.sparkContext

    .textFile("/test/preson.txt")

    .map(_.split(","))

    .map(p=>Person(p(0).toInt,p(1),p(2)))

    .toDF

    personDf:

    org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

    3.将personDf注册为一个视图view

    scala>

    personDf.createOrReplaceTempView("person")

    4.通过sql查询视图;sql支持复杂的,包括多表关联

    scala> spark.sql("select

    addr,count(1) from person group by addr").show

    Ø编程指定模式:

    构造一个模式,将其应用到一个已经存在的RDD上将其转化为DataFrame,该方法适用于运行之前不知道列以及列的类型的情况

    import org.apache.spark.sql.types._

    1.加载数据

    val presonRDD= spark.sparkContext.textFile(“/test/person.txt”)

    2.定义schema

    val stringSchema = “id,name,addr”

    val schema = StructType(stringSchma.split(",").map(field=>StructField(field,StringType,nullable=true)))

    3.转换rdd的记录到rows集合

    1.4.3 spark sql的操作

    1.4.3.1创建SparkSession实例

    SparkSeesion类时Spark SQL的所有功能的入口;spark-shell启动时,默认生成了一个SparkSession的实例:spark

    importorg.apache.spark.sql.SparkSession

    val spark = SparkSession.builder

    .master("local")

    .config("spark.sql.warehouse.dir", "/user/hive/warehouse")

    .appName("spark text")

    .getOrCreate

    //包含隐式转换(比如讲RDDs转成DataFrames)API

    importspark.implicits._

    1.4.3.2创建DataFrame

    spark sql读取hdfs中json数据

    val df = spark.read.json("/test/course.json")

    #显示df的数据

    Ø显示df的数据

    df.show

    #结果:

    Ø查询df的结构信息

    Ø显示指定的字段值:使用select(col1,,col2)

    Ø过滤,查询长度》12的数据

    Ø分组操作groupBy

    1.4.3.3 spark sql实战

    1.4.3.3.1入口:SQLContext,HiveContext(Starting Point: SQLContext)

    spark sql中所有的操作入口点都是SQLContext类或者它的子类.创建一个基本的SQLContext,只需要SparkContext即可(sc)

    注:spark2.0之后,sparkSession是实现了同样的功能,不需要显示的创建SparkConf、sparkContext、SQLContext,因为这些对象都封装在了SparkSession中。即是

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    valhiveContext =neworg.apache.spark.sql.hive.HiveContext(sc)

    除了SparkContext外,还有HiveContext。两者的区别:

    ØSQLContext只支持标准的sql语法解析器

    ØHiveContext现在支持sql语法解析器和HiveSql语法解析器;默认为hivesql语法解析器,用户可以通过配置来切换sql语法解析器,来运行hivesql不支持的语法

    Ø使用HiveContext可以使用Hive的UDF,读写Hive表数据等hive操作。sqlContext不可以对hive进行操作

    Ø趋势:SqlContext不断丰富中,最终两者会形成一个统一的Context

    1.4.3.3.2创建DataFrame

    使用SqlContext,spark程序可以通过RDD、hive表、JSON格式数据等数据源创建DataFrame

    val df = sqlContext.read.json(“/test/readme.json”)

    1.4.3.3.3 DataFrame操作

    df.show

    df.printSchema

    df.

    1.4.3.3.4 Parquet文件

    Parquet文件是一种列式存储格式的文件,能被很多数据处理系统支持。Spark SQL支持读取和写入Parquet文件,并可自动保留原始数据的格式(schema)

    优势:

    可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量

    压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码进一步节省存储空间

    只读取需要的列,支持向量运算,能够获取更好的扫描性能

    还有,parquet数据源支持自动发现和推断分区信息

    1.4.3.3.5 DataFrame的java操作

    将DataFrame的结果转换为java的list

    List listRow =result.javaRDD().collect();

    for(Row row : listRow){

    System.out.println(row);

    }

    1.4.3.4 SparkSession操作

    在2.0版本之前,与spark交互之前必须创建SparkConf和SparkContext;然而到了2.0版本,不需要显示的创建这些对象SparkConf、SparkContext和SqlContext了,这些对象都已经封装在了SparkSession中了,即,2.0版本之后,入口就是就变成了SparkSession,在spark-shell启动时,会实例化一个SparkSession实例spark

    ØSparkSession封装的对象

    Ø获取conf默认配置,可以调整配置spark的运行参数

    1.4.3.4.0 SparkSession的创建

    sparkSession类是所有Spark SQL功能的入口,只需要调用SparkSession.builder()即可创建

    importorg.apache.spark.sql.SparkSession

    val spark = SparkSession.builder

    .master("local")

    .config("spark.sql.warehouse.dir", "/user/hive/warehouse")

    .appName("spark Streaming +kafka")

    .enableHiveSupport

    .getOrCreate

    1.4.3.4.1获取catalog元数据

    1.4.3.4.2创建Dataset和Dataframe

    最简单的办法就是通过range方法,创建DataSet

    注,range也可以有3个参数,第三个参数是间隔,默认的创建的字段:id

    Øtop(n)操作

    Ø对某一列进行统计操作

    Ø通过createDataFrame创建

    重新命名列名withColumnRenamed

    1.4.3.4.3读取json文件

    1.4.3.4.4在SparkSession中使用Spark SQL

    1.4.3.4.5数据源

    spark支持多种数据源的数据,

    Ø最简单的加载方式是load,默认的格式为parquet文件,(可以通过spark.sql.sources.default来默认指定格式)

    val df = spark.read.load(“...”)

    Ø将DataFrame数据存储为parquet

    df.select("name","type").write.save("course.parquet")

    存储的路径为(hdfs):path hdfs://192.168.21.144:9000/user/root/course.parquet

    也可以手动指定格式,以及指定要保存的文件的格式

    scala> val df =

    spark.read.format("json").load("/test/course.json")

    df: org.apache.spark.sql.DataFrame = [length: bigint, name: string

    ... 1 more field]

    #指定要保存的文件的格式

    scala>

    df.write.format("parquet").save("course1.parquet")

    1.4.3.4.6保存数据到永久表saveAsTable

    DataFrame可以通过调用saveAsTable方法将数据落地到hive表中,不过对已经部署的hive不会受影响,spark会创建本地的metastore(使用derby),saveAsTable会持久化数据并指向hive metastore

    saveAsTable默认会创建一个“受管理表”,意味着数据的位置都是受metastore管理的。当“受管理表”被删除,其对应的数据也都会被删除。

    注:文件内容保存在${SPARK_HOME}/bin/spark-warehouse/tableName

    scala> df.write.saveAsTable("course")

    #调用,spark.sql(sql_str)比较灵活

    1.4.3.4.7 spark整合hive

    如果spark没有整合hive,那么spark的元数据都是在bin目录下,自动创建metastore_db(以derby做支撑)

    整合hive后,支持spark从hive取数,永久保存数据到hive中;并支持hive的mysql作为元数据存储数据库

    Ø将hive配置文件中hive-site.xml文件复制到${spark_home}/conf

    Ø将hadoop配置文件中hdfs-site.xml和core-site.xml文件复制到${spark_home}/conf

    Ø将hive下元数据库mysql的驱动,复制到${spark_home}/jars下

    可以将DataFrame数据永久保存到hive表中

    1.5 Spark Streaming

    spark2.0将流数据计算统一到了DataSet中,提出了Structured

    Streaming的概念,将数据源映射为一张无限长度的表,同时将流计算的结果映射为另一张表,完全以结构化的方式去操作流数据,复用了其对象的Catalyst引擎

    1.5.1 spark Streaming实战

    创建Steaming DataFrame,用来监听host:9999获取socket数据,并对获取的数据进行RDD转换操作,最后统计各个词出现的次数

    Ø创建socket通道

    nc –lk 9999

    Ø获取socket通道数据(需要填写socket的host、port)

    scala>val line

    =

    spark.readStream.format("socket").option("host","mapeng").option("port",9999).load

    Ø转换操作

    scala> val wordCount = line.as[String].flatMap(_.split("

    ")).groupBy("value").count

    wordCount: org.apache.spark.sql.DataFrame

    = [value: string, count: bigint]

    Ø使用start()来启动流式数据计算流程

    scala> val query =

    wordCount.writeStream.outputMode("complete").format("console").start

    程序自动启动了job计算,并在控制台展现计算结果;(接收到socket流数据,spark自动计算,控制台展现结果)

    说明:

    (1)outputMode现在有三种方式:complete ,append,update(目前只实现了前两种)

    lcomplete:每次计算完成后,都能得到全量的计算结果(每次计算都得到转换后的最新结果集)

    lappend:每次计算完成后,能拿到增量的计算结果

    两种方式的使用说明:

    使用了聚合类函数才能使用complete的模式,只有简单的使用了map,filter等转换模式才能使用append模式,不做复杂的聚合统计运算

    1.6 Spark Streaming + kafka整合

    1.6.1 pom.xml文件,添加spark依赖

    org.apache.hadoop

    hadoop-client

    2.6.0

    org.apache.hadoop

    hadoop-common

    2.6.0

    org.apache.hadoop

    hadoop-hdfs

    2.6.0

    org.apache.spark

    spark-core_2.11

    2.0.0

    org.apache.spark

    spark-sql_2.11

    2.0.0

    org.apache.spark

    spark-streaming_2.11

    2.0.0

    org.apache.spark

    spark-hive_2.11

    2.0.0

    org.apache.spark

    spark-streaming-kafka-0-10_2.11

    2.0.0

    org.apache.hive

    hive-jdbc

    1.2.1

    io.netty

    netty-all

    4.0.29.Final

    1.6.2实战代码

    packagecom.mp.fight

    importorg.apache.spark.sql.SparkSession

    importorg.apache.spark.streaming.StreamingContext

    importorg.apache.spark.streaming.Seconds

    importorg.apache.spark.streaming._

    importorg.apache.spark.streaming.kafka010.KafkaUtils

    importorg.apache.spark.streaming.kafka010.LocationStrategies

    importorg.apache.kafka.common.serialization.StringDeserializer

    importorg.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

    importorg.apache.spark.sql.SQLContext

    importorg.apache.spark.sql.SaveMode

    objectTest4{

    caseclassPerson(id:Int,name:String,addr:String)

    defmain(args: Array[String]): Unit = {

    //声明sparkSession

    valspark=SparkSession.builder

    .master("local")

    .appName("spark  Streaming kafkasql")

    .config("spark.sql.warehouse.dir","/user/hive/warehouse")

    .getOrCreate

    //kafka设置

    valkafkaParams=Map[String, Object](

    "bootstrap.servers"->"mapeng:9092",

    "key.deserializer"->classOf[StringDeserializer],

    "value.deserializer"->classOf[StringDeserializer],

    "group.id"->"example",

    "auto.offset.reset"->"latest",

    "enable.auto.commit"->(false: java.lang.Boolean)

    )

    //topic

    valtopics=List("testmp")

    //初始化StreamingContext

    valssc=newStreamingContext(spark.sparkContext,Seconds(30));

    //从kafka中读取数据

    valkafkaStream=KafkaUtils.createDirectStream[String,String](

    ssc,

    LocationStrategies.PreferConsistent,

    Subscribe[String,String](topics,kafkaParams)

    ).map(_.value())

    //kafkaStream.print

    importspark.sqlContext.implicits._

    //启用sparkSql来操作DStream转换为DataFrame

    kafkaStream.foreachRDD{rdd=>{

    if(rdd.isEmpty) {

    println("rdd is

    empty")

    }else{

    valperson=rdd.map(_.split(",")).map(p=>Person(p(0).toInt,p(1),p(2))).toDF

    //新接收的数据,追加存储在parquet文件中(重写文件)

    person.write.mode(SaveMode.Append).save("hdfs://mapeng:9000/test/person.parquet")

    //实时统计区域人数

    valdf=spark.read.load("hdfs://mapeng:9000/test/person.parquet")

    df.createOrReplaceTempView("person")

    valaddrCount=spark.sql("select

    addr,count(1) as num from person group by addr")

    //将统计结果实时回写到parquet文件中

    addrCount.write.mode(SaveMode.Overwrite).save("hdfs://mapeng:9000/test/addrCount.parquet")

    //继续做多维度统计,可以使用sparksql操作处理parquet文件

    }

    }

    }

    //启动job

    ssc.start

    ssc.awaitTermination

    }

    }

    1.6.3 DStream中foreachRDD、foreachePartition、foreach的区别

    ØforeachRDD:得到的是处理一个批次的数据

    ØforeachPartition:对一个批次的每个分区数据做处理

    Øforeach:每条数据处理,单个元素处理

    1.6.4 spark Streaming + socket

    val ssc = new StreamingContext(sparkConf,

    Seconds(1))

    //获得一个DStream负责连接监听端口:地址

    val lines = ssc.socketTextStream(“192.168.21.144”, 9999)

    //对每一行数据执行Split操作

    val words = lines.flatMap(_.split(" "))

    //统计word的数量

    val pairs = words.map(word => (word, 1))

    val wordCounts = pairs.reduceByKey(_ + _)

    //输出结果

    wordCounts.print

    ssc.start//开始

    ssc.awaitTermination//计算完毕退出

    1.8 parquet文件

    1.8.1 parquet是面向分析型业务的列式存储格式,有如下优势

    Parquet文件尾部存储了文件的元数据信息和统计信息,自描述的,方便解析

    1.只读取需要的列,支持向量运算,能够获得更好的扫描性能

    2.可以跳过不符合条件的数据,只读取需要的数据,降低io

    3.同一列的数据类型是一样的,可以使用更高效的压缩编码,节约存储磁盘

    1.8.2 parquet适配多种计算框架

    Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:

    查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM

    Big SQL

    计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite

    数据模型: Avro, Thrift, Protocol Buffers, POJOs

    1.8.3 parquet数据模型

    eg:

    message AddressBook {

    required string owner;

    repeated stringownerPhoneNumbers;

    repeated group contacts {

    required string name;

    optional string phoneNumber;

    }

    }

    说明:

    schema的格式是这样的:

    (1)根叫做message

    (2)message包括有多个fields,每个fields包括有三个属性:repetition,type,name

    其中,repetition有3中类型:required(出现一次)、optional(出现0次或者1次)、repeated(出现0次或者多次)

    (3)type可以是一个group或者一个简单的类型

    以上schema描述说明:

    (1)每条记录标识一个AddressBook

    (2)有且只有一个owner

    (3)有0个或多个ownerPhoneNumbers

    (4)owner可以有0个或者多个contacts。每个contact有且只有一个name,这个contact的phoneNumber可有可无(0个或者1个)

    注:parquet格式的数据类型没有复杂的Map,List,Set等,使用group和repeated fields来表示;

    null值不会被存储

    实例:

    1.8.4 parquet格式文件的存储

    在parquet格式的存储中,一个schema的树结构有几个叶子节点,实际存储汇总就有几个column,例如上图中的schema实际存储就4个列

    1.8.5 DataFrame与Parquet

    (1)保存DF为Parquet格式

    dfPerson.write.parquet("person.parquet")

    (2)hive中建立parquet格式的表

    createtableperson_parquetlikepersonstoredasparquet;

    insertoverwritetableperson_parquetselect*fromperson;

    (3)加载Parquet文件不再需要case class。

    valpersonDF =spark.read.parquet("person.parquet")

    personDF.registerAsTempTable("pp")

    valmales = spark.sql("select * from pp where gender='M'")

    males.show

    1.8.5 parquet文件的持久化

    1.8.5.1 spark中将DataFrame数据写到hdfs中的parquet文件中,支持追加

    personDf.write.mode(SaveMode.Append).save(“hdfs://mapeng:9000/test/person.parquet”)

    saveMode有如下几种方式:

    1.8.5.2 parquet文件合并

    合并的规则:相同的列,在新的数据集中,是通用的列,

    各自不同的列,也作为新的数据集的列。

    实战:

    1.9 spark项目实战

    相关文章

      网友评论

        本文标题:spark 学习笔记

        本文链接:https://www.haomeiwen.com/subject/ohpybttx.html