在Spark Streaming之DStream入门入门的案例中,worldcount
程序中使用的是一个socketTextStream
数据源,哪还有没有其他的数据源呢?
比如:SparkStreaming自带的RDD队列
、Kafka数据源
以及自定义
数据源。
RDD队列
用法
(1)使用ssc.queueStream
(queueOfRDDs)来创建DStream
(2)将每一个推送到这个队列中的RDD,都会作为一个DStream处理。
步骤
- 创建StreamingContext
- 从队列中获取数据
- 数据处理
- 结果展示
- 启动
- 阻塞
案例
@Test
def rdd(): Unit ={
//1. 创建StreamingContext
val conf=new SparkConf().setMaster("local[4]").setAppName("test")
val ssc =new StreamingContext(conf,Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
//2. 从队列中获取数据
// 创建一个可变队列
val queue=Queue[RDD[String]]()
val ds: InputDStream[String] = ssc.queueStream(queue)
//3. 数据处理
val value: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_,1))
val value1: DStream[(String, Int)] = value.reduceByKey(_ + _)
//4. 结果展示
value1.print()
//5. 启动
ssc.start()
//向 队列中添加数据
for(_ <- 0 until(50)){
queue +=(ssc.sparkContext.parallelize(List("java spark java python","java spark java python"),2))
}
//6. 阻塞
ssc.awaitTermination()
}
运行结果
Time: 1625843430000 ms
-------------------------------------------
(python,2)
(spark,2)
(java,4)
-------------------------------------------
Time: 1625843435000 ms
-------------------------------------------
(python,2)
(spark,2)
(java,4)
queueStream
- def queueStream[T: ClassTag](queue: Queue[RDD[T]],oneAtATime: Boolean = true)
- def queueStream[T: ClassTag](queue: Queue[RDD[T]],oneAtATime: Boolean,defaultRDD: RDD[T])
queue
:存放RDD的队列
oneAtATime
:是否每个时间间隔只从队列中消费一个 RDD;默认为true,
defaultRDD
: 指定一个默认的RDD
。
oneAtATime
默认为true,只会从每个时间间隔只从队列中消费一个 RDD;若指定为false
,将消费这一批次中所有的RDD
设置oneAtATime
为false
ssc.queueStream(queue,false)
为了效果更加明显,每次睡眠2秒钟
//向 队列中添加数据
for(_ <- 0 until(50)){
queue +=(ssc.sparkContext.parallelize(List("java spark java python","java spark java python"),2))
Thread.sleep(2000)
}
重新运行
-------------------------------------------
Time: 1625844025000 ms
-------------------------------------------
(python,2)
(spark,2)
(java,4)
-------------------------------------------
Time: 1625844030000 ms
-------------------------------------------
(python,6)
(spark,6)
(java,12)
21/07/09 23:20:34 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
-------------------------------------------
Time: 1625844035000 ms
-------------------------------------------
(python,4)
(spark,4)
(java,8)
对比上面,有没有看出区别?
自定义数据源
实际开发中,使用自定义数据源,常用读取数据库,redis、kafka等数据。
- 继承
Receiver
[T
](存储位置
)
import org.apache.spark.streaming.receiver.Receiver
T:表示输入的参数类型
- 重写
Receiver
两个方法
// receiver启动时调用
override def onStart(): Unit = ???
// receiver结束前调用
override def onStop(): Unit = ???
存储位置:
Spark的StorageLevel
共有7个缓存级别:
NONE = 不存储
DISK_ONLY = 缓存入硬盘。这个级别主要是讲那些庞大的Rdd,之后仍需使用但暂时不用的,放进磁盘,腾出Executor内存。
DISK_ONLY_2 = 多一个缓存副本。
MEMORY_ONLY = 只使用内存进行缓存。这个级别最为常用,对于马上用到的高频rdd,推荐使用。
MEMORY_ONLY_2 = 多一个缓存副本。
MEMORY_ONLY_SER = 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_ONLY_SER_2 = 多一个缓存副本。
MEMORY_AND_DISK = 先使用内存,多出来的溢出到磁盘,对于高频的大rdd可以使用。
MEMORY_AND_DISK_2 = 多一个缓存副本。
MEMORY_AND_DISK_SER = 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER_2 = 多一个缓存副本。
OFF_HEAP = 除了内存、磁盘,还可以存储在OFF_HEAP。
需求
监听服务器,重网络中读取数据进行worldCount
计算
自定义Receiver
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.Socket
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
/**
* 自定义Receiver
* @param host ip
* @param port 端口
*/
class CustomReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
// 网络
var socket: Socket=null
// 字符流
var in:InputStream=null
// 缓冲流
var buffer: BufferedReader=null
/**
* 程序运行时调用
*/
override def onStart(): Unit = {
// 监听网络端口
socket =new Socket(host,port)
//判断是否关闭
if(!socket.isClosed){
// 获取数据流
in = socket.getInputStream()
// 为了读取方便,将字节流转换成字符流
buffer =new BufferedReader(new InputStreamReader(in))
// 读取一行数据
var line: String = buffer.readLine()
while (line!=null){
// 调用父类的 store 存储数据 存储
store(line)
// 读取下一行数据
line=buffer.readLine()
}
}
}
/**
* 程序关闭时调用
*/
override def onStop(): Unit = {
// 关闭资源
if(socket!=null){
socket.close()
}
if(in!=null){
in.close()
}
if(buffer!=null){
buffer.close()
}
}
}
调用测试
def main(args: Array[String]): Unit = {
// 配置
val conf=new SparkConf().setMaster("local[4]").setAppName("test")
// 创建 StreamingContext
val ssc = new StreamingContext(conf,Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
// 采用自定义Receiver
val receiver: ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("Hadoop102", 9999))
// 对单词进行切分,并进行映射
val value: DStream[(String, Int)] = receiver.flatMap(_.split(" ")).map((_,1))
// 聚合统计单词个数
val result: DStream[(String, Int)] = value.reduceByKey(_ + _)
// 控制台输出
result.print()
// 开启
ssc.start()
// 等待
ssc.awaitTermination()
}
运行main
使用nc
发送数据
~]$ nc -lk 9999
java python java python scala scala spark
运行结果
-------------------------------------------
Time: 1625917785000 ms
-------------------------------------------
(python,2)
(spark,1)
(scala,2)
(java,2)
Kafka数据源
在开发中往往用得最多的还是使用Kafka数据源
。
在spark官网提过了两个Kafka版本(如上图)
0.8
和10
0.8
版本优缺点:提供了一个高级的API(Receiver DStream
)能够自动拉数据,自动提交offset
。但是着各种方式(Receiver DStream
)有个很大的问题,使用之后不能动态控制拉取数据的数量,也就是说背压机制
会失效。
10
版本:由于0.8
的缺点,目前大部分都用的是10
版本,采用的是直连(Direct DStream
)的方式,拉取速率又自身控制。
添加依赖
使用10
版本需要添加一个依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
准备kafka
- 启动
zookeeper
- 启动
Kafka
- 创建一个topic
bin]$ kafka-topics.sh --create --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log --partitions 3 --replication-factor 2
--create:创建
--bootstrap-server:集群配置
--topic:topic名称
--partitions :指定分区数
--replication-factor :指定副本数
- 检查topic是否创建成功
bin]$ kafka-topics.sh --list --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092
__consumer_offsets
applog
haoduanzi
movie
spark-log
test
zhenai
--list :topic列表
- 测试读写是否正常
生产者
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log
>java
消费者
bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log --group g1
java
编写程序
不会写怎么办?看官网
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
def main(args: Array[String]): Unit = {
//创建StreamingContext
val ssc=new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))
// 设置日志级别
ssc.sparkContext.setLogLevel("warn")
// 从官网上 借鉴下来
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"key.deserializer" -> classOf[StringDeserializer], // key 反序列化器
"value.deserializer" -> classOf[StringDeserializer], // value 反序列化器
"group.id" -> "g1", // 消费者组
"auto.offset.reset" -> "earliest", // 指定消费者组第一次应该从topic哪里开始消费
"enable.auto.commit" -> (true: java.lang.Boolean) // 是否自动提交
)
// 指定topic
val topics = Array("spark-log")
// KafkaUtils.createDirectStream[Key的类型 , Value的类型]
val stream = KafkaUtils.createDirectStream[String, String](
ssc, // StreamingContext
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// 获取 value 数据
val value: DStream[String] = stream.map(record => record.value)
// 经典的 worldCount
val result: DStream[(String, Int)] = value.flatMap(_.split(" ")).map((_, 1))
// 汇总
val data: DStream[(String, Int)] = result.reduceByKey(_ + _)
//控制台输出
data.print()
// 开始
ssc.start()
// 等待
ssc.awaitTermination()
}
auto.offset.reset
:
- earliest:自动将偏移重置为最小偏移 (最开始)
- latest:自动将偏移量重置为最大偏移量(最新)
- none:向消费者抛出异常
运行程序
- 生产消息
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log
>scala python java
>scala python java
>scala python java
>scala python java
>scala python java
- 运行结果
-------------------------------------------
Time: 1625923240000 ms
-------------------------------------------
(python,4)
(scala,4)
(java,4)
程序运行时,RDD 分区数是多少呢?
// 查看分区数
stream.foreachRDD(rdd=>{
println(s"分区数0=${rdd.getNumPartitions}")
})
运行结果为2 ,跟 --partitions 3
有关?
分区数0=3
进一步测试,更改分区数
bin]$ kafka-topics.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log --alter --partitions 4
运行结果
分区数0=4
总结
消费kafka数据的时候,每个批次RDD的分区数 = topic分区数
sparkstreaming会动态感知kafka topic分区数,然后调整RDD的分区数
网友评论