美文网首页
Spark Streaming学习六七八章笔记

Spark Streaming学习六七八章笔记

作者: muffinfeng | 来源:发表于2018-10-15 17:17 被阅读0次

通过词频统计功能学习Spark-submit的使用:

先打开一个命令窗口输入nc -lk 9999


然后在另一个窗口,spark的bin文件夹下输入

./spark-submit --master local[2] \

--class org.apache.spark.examples.streaming.NetworkWordCount \

--name NetworkWordCount \

/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999

在netcat窗口输入a a a a b b之后再spark窗口的流式输出会见到词频统计的结果。


sparkStreaming工作原理(粗粒度)

Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。


sparkStreaming工作原理(细粒度)

细粒度工作原理

首先,spark应用程序运行在driver端,driver需要在Executor(电脑)中启动Receiver接收器,接收数据流,并且分模块接收,可能还会以副本的方式存储,接收了一个周期之后,Executor会向spark应用程序返回接收情况(分块数量,副本数量等等)应用程序会将任务分发到Executor中。

DStream概念:对DStream进行操作,比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所构成的。

每一个输入流Input DStreamings 都要对应一个receivers来接收它,Input DStreamings的种类:文件系统,socket传输,Kafka,Flume。

Output Operation 的种类:print(),saveAsTextFiles保存到文件系统,saveAsHadoopFiles等。

实战:spark streaming 处理socket数据

object NetworkWorldCount {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWorldCount")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("localhost",6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

在另外一个控制台里输入

nc -lk 6789

a a a a c c c d d d 

结果:

spark streaming 处理socket数据

实战:spark streaming 处理socket数据并写入mysql数据库

object ForeachRDDApp {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("localhost", 6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//前几行不变。。

    result.foreachRDD(rdd => {       //循环每一个Rdd

      rdd.foreachPartition(partitionOfRecords => {  //在一个rdd里循环每一个partition

        val connection = createCOnnection()     //获取mysql连接

        partitionOfRecords.foreach(record => {        //在每一个partition里获取一条记录

          val sql = "insert into wordcount(word, wordcount) values('" + record._1+ "'," + record._2+")"

          connection.createStatement().execute(sql)

        })

        connection.close()

      })

    })

    ssc.start()

    ssc.awaitTermination()

  }

  def createCOnnection() = {

    Class.forName("com.mysql.jdbc.Driver")

    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")

  }

结果:

结果


spark streaming从socket接收数据后根据标准过滤数据实战(黑名单例子)

//构建黑名单

object TransformApp {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("TransformApp").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

//跟前面一样

    val blacks = List("zs","ls")    //构建黑名单List

    val blackRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))        //将List转成(zs,true)的这种RDD类型

    val lines = ssc.socketTextStream("localhost", 6789)        //lines是DSTream类型

    val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {       

//lines是这种类型的数据(20160410,zs) 根据逗号分隔后重整为(zs:20160410,zs),即为(x.split(",")(1),x))的结果,得到的结果仍然是RDD类型,transform函数是将每个Rdd拿出来操作。

      rdd.leftOuterJoin(blackRDD)   //每个rdd都跟blackRDD进行leftOuterJoin,得到(zs:[<20160410,zs>,<true>])这种类型的数据

        .filter(x=> x._2._2.getOrElse(false) != true)         //过滤,将参数的第二个中的第二个为true的过滤掉。

        .map(x =>x._2._1)      //重整,将结构变为rdd中第二个的第一个,即为<20160410,zs>

    })

    clicklog.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

在nc -lk 6789中输入

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

控制台输出

结果

相关文章

网友评论

      本文标题:Spark Streaming学习六七八章笔记

      本文链接:https://www.haomeiwen.com/subject/pynvaftx.html