目录介绍

StreamingContext:核心概念1
官方文档:带领着进入Spark Streaming的开发
http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext


Discretized(离散化) Streams(DStreams):核心概念2

对DStream的计算操作,底层其实就是其中包含的RDD进行操作比如做map/flatMap

Input DStreams and Receivers:核心概念3

Input DStreams是一种来自于网络服务器的源源不断的数据流,每一个Input DStreams都会被Receivers解析成一个对象,并存储在Spark的内存中,但是除了文件流以外。

Transformations on DStreams:核心概念4

Output Operations on DStreams:核心概念5

Spark Streaming处理socket数据-案例
首先把java对应的spark streaming的java版的jar包配置到项目的pom文件中
地址:https://search.maven.org/artifact/org.apache.spark/spark-sql_2.10/2.1.1/jar



linux往某个端口输入数据:
nc -lk 端口号
打包我们的java类
kafka-test-1.0-SNAPSHOT.jar,并放在jar文件夹下/home/hadoop/waimai/selfjars/
提交到yarn上运行
./spark-submit --class com.lppz.sparkstreaming.NetworkWordCount --master yarn /home/hadoop/waimai/selfjars/kafka-test-1.0-SNAPSHOT.jar 10.101.3.3 9999
Spark Streaming处理文件系统数据

文件系统方式读取数的注意事项:
文件夹可以是本地服务器文件夹,也可以是任何文件系统地址如:hdfs://namenode:8040/logs/,S3,NFS,ETC等
1、textFileStream中的文件夹会被Streaming监控起来,并且会被处理。
2、被监控的文件夹下的所有文件必须是同一中格式。
3、一旦文件被处理之后,即使文件有所变化,也不会被再次处理。也就是说仅仅会被处理一次。
4、文件夹里的文件越多,所需要耗费的扫描时间越多,即使文件没有被更改过。

Spark Streaming带状态的算子:UpdateStateByKey
实战:
1、计算到目前为止累计出现的单词个数写入到MySql中




把结果写入mysql,我们 要用到DStream的output的操作






我自己实现的一个简陋的样例,仅供参考




2、基于Window的统计


3、黑名单过滤
实现要求:
访问日志==》DStream
20190610、zhangsan
20190610、lisi
20190610、wangwu
黑名单列表==》RDD
lisi、wangwu
==》20190610、zhangsan
其中lisi和wangwu是黑名单,需要过滤掉
只输出20190610、zhangsan
那么我们如何把DStream与RDD的数据进行关联操作呐?
如下:
20190610、zhangsan
20190610、lisi
20190610、wangwu
===》转化成(zhangsan:20190610,zhangsan)(lisi:20190610,lisi)(wangwu:20190610,wangwu)
黑名单列表lisi、wangwu
===》转化成(lisi:true)(wangwu:true)
===>20190610、zhangsan
使用leftjoin,就能得到如下列表
===》
(zhagnsan:<20190610,zhangsan>,<true>)
(lisi:<20190610,lisi>,<true>)
(wangwu:<20190610,wangwu>,<false>)
要想拿到20190610,wangwu,只需要===》tuple 1就可以了
接下来看下我们写的例子。
我们需要用到Transform这个API






网友评论