通过词频统计功能学习Spark-submit的使用:
先打开一个命令窗口输入nc -lk 9999
然后在另一个窗口,spark的bin文件夹下输入
./spark-submit --master local[2] \
--class org.apache.spark.examples.streaming.NetworkWordCount \
--name NetworkWordCount \
/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999
在netcat窗口输入a a a a b b之后再spark窗口的流式输出会见到词频统计的结果。
sparkStreaming工作原理(粗粒度)
Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。

sparkStreaming工作原理(细粒度)

首先,spark应用程序运行在driver端,driver需要在Executor(电脑)中启动Receiver接收器,接收数据流,并且分模块接收,可能还会以副本的方式存储,接收了一个周期之后,Executor会向spark应用程序返回接收情况(分块数量,副本数量等等)应用程序会将任务分发到Executor中。
DStream概念:对DStream进行操作,比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所构成的。
每一个输入流Input DStreamings 都要对应一个receivers来接收它,Input DStreamings的种类:文件系统,socket传输,Kafka,Flume。
Output Operation 的种类:print(),saveAsTextFiles保存到文件系统,saveAsHadoopFiles等。
实战:spark streaming 处理socket数据
object NetworkWorldCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWorldCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost",6789)
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
在另外一个控制台里输入
nc -lk 6789
a a a a c c c d d d
结果:

实战:spark streaming 处理socket数据并写入mysql数据库
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//前几行不变。。
result.foreachRDD(rdd => { //循环每一个Rdd
rdd.foreachPartition(partitionOfRecords => { //在一个rdd里循环每一个partition
val connection = createCOnnection() //获取mysql连接
partitionOfRecords.foreach(record => { //在每一个partition里获取一条记录
val sql = "insert into wordcount(word, wordcount) values('" + record._1+ "'," + record._2+")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
def createCOnnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")
}
结果:

spark streaming从socket接收数据后根据标准过滤数据实战(黑名单例子)
//构建黑名单
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("TransformApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//跟前面一样
val blacks = List("zs","ls") //构建黑名单List
val blackRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true)) //将List转成(zs,true)的这种RDD类型
val lines = ssc.socketTextStream("localhost", 6789) //lines是DSTream类型
val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {
//lines是这种类型的数据(20160410,zs) 根据逗号分隔后重整为(zs:20160410,zs),即为(x.split(",")(1),x))的结果,得到的结果仍然是RDD类型,transform函数是将每个Rdd拿出来操作。
rdd.leftOuterJoin(blackRDD) //每个rdd都跟blackRDD进行leftOuterJoin,得到(zs:[<20160410,zs>,<true>])这种类型的数据
.filter(x=> x._2._2.getOrElse(false) != true) //过滤,将参数的第二个中的第二个为true的过滤掉。
.map(x =>x._2._1) //重整,将结构变为rdd中第二个的第一个,即为<20160410,zs>
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}
在nc -lk 6789中输入
20160410,zs
20160410,ls
20160410,ww
20160410,zs
20160410,ls
20160410,ww
20160410,zs
20160410,ls
20160410,ww
控制台输出

网友评论