美文网首页
Spark Streaming之DStream创建

Spark Streaming之DStream创建

作者: 万事万物 | 来源:发表于2021-08-08 08:49 被阅读0次

Spark Streaming之DStream入门入门的案例中,worldcount程序中使用的是一个socketTextStream数据源,哪还有没有其他的数据源呢?
比如:SparkStreaming自带的RDD队列Kafka数据源以及自定义数据源。

RDD队列

用法
(1)使用ssc.queueStream(queueOfRDDs)来创建DStream
(2)将每一个推送到这个队列中的RDD,都会作为一个DStream处理。

步骤

  1. 创建StreamingContext
  2. 从队列中获取数据
  3. 数据处理
  4. 结果展示
  5. 启动
  6. 阻塞

案例

 @Test
  def rdd(): Unit ={

    //1. 创建StreamingContext
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc =new StreamingContext(conf,Seconds(5))

    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    //2. 从队列中获取数据

    // 创建一个可变队列
    val queue=Queue[RDD[String]]()
    val ds: InputDStream[String] = ssc.queueStream(queue)

    //3. 数据处理
    val value: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_,1))
    val value1: DStream[(String, Int)] = value.reduceByKey(_ + _)

    //4. 结果展示
    value1.print()

    //5. 启动
    ssc.start()

    //向 队列中添加数据
    for(_ <- 0 until(50)){
      queue +=(ssc.sparkContext.parallelize(List("java spark java python","java spark java python"),2))
    }

    //6. 阻塞
    ssc.awaitTermination()

  }

运行结果

Time: 1625843430000 ms
-------------------------------------------
(python,2)
(spark,2)
(java,4)

-------------------------------------------
Time: 1625843435000 ms
-------------------------------------------
(python,2)
(spark,2)
(java,4)

queueStream

  • def queueStream[T: ClassTag](queue: Queue[RDD[T]],oneAtATime: Boolean = true)
  • def queueStream[T: ClassTag](queue: Queue[RDD[T]],oneAtATime: Boolean,defaultRDD: RDD[T])

queue:存放RDD的队列
oneAtATime:是否每个时间间隔只从队列中消费一个 RDD;默认为true,
defaultRDD: 指定一个默认的RDD

oneAtATime
默认为true,只会从每个时间间隔只从队列中消费一个 RDD;若指定为false,将消费这一批次中所有的RDD

设置oneAtATimefalse

ssc.queueStream(queue,false)

为了效果更加明显,每次睡眠2秒钟

    //向 队列中添加数据
    for(_ <- 0 until(50)){
      queue +=(ssc.sparkContext.parallelize(List("java spark java python","java spark java python"),2))

      Thread.sleep(2000)

    }

重新运行

-------------------------------------------
Time: 1625844025000 ms
-------------------------------------------
(python,2)
(spark,2)
(java,4)

-------------------------------------------
Time: 1625844030000 ms
-------------------------------------------
(python,6)
(spark,6)
(java,12)

21/07/09 23:20:34 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
-------------------------------------------
Time: 1625844035000 ms
-------------------------------------------
(python,4)
(spark,4)
(java,8)

对比上面,有没有看出区别?


自定义数据源

实际开发中,使用自定义数据源,常用读取数据库,redis、kafka等数据。

  1. 继承 Receiver[T](存储位置)
import org.apache.spark.streaming.receiver.Receiver

T:表示输入的参数类型

  1. 重写Receiver两个方法
    // receiver启动时调用
  override def onStart(): Unit = ???

// receiver结束前调用

  override def onStop(): Unit = ???

存储位置:
Spark的StorageLevel共有7个缓存级别:

NONE = 不存储

DISK_ONLY = 缓存入硬盘。这个级别主要是讲那些庞大的Rdd,之后仍需使用但暂时不用的,放进磁盘,腾出Executor内存。

DISK_ONLY_2 = 多一个缓存副本。

MEMORY_ONLY = 只使用内存进行缓存。这个级别最为常用,对于马上用到的高频rdd,推荐使用。

MEMORY_ONLY_2 = 多一个缓存副本。

MEMORY_ONLY_SER = 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

MEMORY_ONLY_SER_2 = 多一个缓存副本。

MEMORY_AND_DISK = 先使用内存,多出来的溢出到磁盘,对于高频的大rdd可以使用。

MEMORY_AND_DISK_2 = 多一个缓存副本。

MEMORY_AND_DISK_SER = 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

MEMORY_AND_DISK_SER_2 = 多一个缓存副本。

OFF_HEAP = 除了内存、磁盘,还可以存储在OFF_HEAP。

需求
监听服务器,重网络中读取数据进行worldCount计算

自定义Receiver

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.Socket

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

/**
 * 自定义Receiver
 * @param host ip
 * @param port 端口
 */
class CustomReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  // 网络
  var socket: Socket=null
  // 字符流
  var in:InputStream=null
  // 缓冲流
  var buffer: BufferedReader=null

  /**
   * 程序运行时调用
   */
  override def onStart(): Unit = {

    // 监听网络端口
    socket =new Socket(host,port)

    //判断是否关闭
    if(!socket.isClosed){
      // 获取数据流
      in = socket.getInputStream()

      // 为了读取方便,将字节流转换成字符流
      buffer =new BufferedReader(new InputStreamReader(in))

      // 读取一行数据
      var line: String = buffer.readLine()

      while (line!=null){
        // 调用父类的 store 存储数据 存储
        store(line)

        // 读取下一行数据
        line=buffer.readLine()
      }
    }

  }

  /**
   * 程序关闭时调用
   */
  override def onStop(): Unit = {

    // 关闭资源
    if(socket!=null){
      socket.close()
    }

    if(in!=null){
      in.close()
    }
    if(buffer!=null){
      buffer.close()
    }
  }
}

调用测试

  def main(args: Array[String]): Unit = {
    // 配置
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    // 创建 StreamingContext
    val ssc = new StreamingContext(conf,Seconds(5))

    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    // 采用自定义Receiver
    val receiver: ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("Hadoop102", 9999))

    // 对单词进行切分,并进行映射
    val value: DStream[(String, Int)] = receiver.flatMap(_.split(" ")).map((_,1))


    // 聚合统计单词个数
    val result: DStream[(String, Int)] = value.reduceByKey(_ + _)


    // 控制台输出
    result.print()

    // 开启
    ssc.start()

    // 等待
    ssc.awaitTermination()


  }

运行main

使用nc 发送数据

 ~]$ nc -lk 9999
java python java python scala scala spark

运行结果

-------------------------------------------
Time: 1625917785000 ms
-------------------------------------------
(python,2)
(spark,1)
(scala,2)
(java,2)

Kafka数据源

在开发中往往用得最多的还是使用Kafka数据源

kafka http://spark.apache.org/docs/2.4.0/streaming-kafka-integration.html
在spark官网提过了两个Kafka版本(如上图)0.810

0.8版本优缺点:提供了一个高级的API(Receiver DStream)能够自动拉数据,自动提交offset。但是着各种方式(Receiver DStream)有个很大的问题,使用之后不能动态控制拉取数据的数量,也就是说背压机制会失效。
10版本:由于0.8的缺点,目前大部分都用的是10版本,采用的是直连(Direct DStream)的方式,拉取速率又自身控制。

添加依赖
使用10版本需要添加一个依赖

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
     <version>3.0.0</version>
</dependency>

准备kafka

  • 启动 zookeeper
  • 启动 Kafka
  • 创建一个topic
 bin]$ kafka-topics.sh --create --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log --partitions 3 --replication-factor 2

--create:创建
--bootstrap-server:集群配置
--topic:topic名称
--partitions :指定分区数
--replication-factor :指定副本数

  • 检查topic是否创建成功
bin]$ kafka-topics.sh --list --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092
__consumer_offsets
applog
haoduanzi
movie
spark-log
test
zhenai

--list :topic列表

  • 测试读写是否正常
    生产者
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log
>java

消费者

bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log --group g1
java

编写程序
不会写怎么办?看官网

kafka 依样画葫芦的嫖
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

 def main(args: Array[String]): Unit = {

    //创建StreamingContext
    val ssc=new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))


    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    // 从官网上 借鉴下来
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer], // key 反序列化器
      "value.deserializer" -> classOf[StringDeserializer], // value 反序列化器
      "group.id" -> "g1", // 消费者组
      "auto.offset.reset" -> "earliest", // 指定消费者组第一次应该从topic哪里开始消费
      "enable.auto.commit" -> (true: java.lang.Boolean) // 是否自动提交
    )

    // 指定topic
    val topics = Array("spark-log")

    // KafkaUtils.createDirectStream[Key的类型 , Value的类型]
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, // StreamingContext
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    // 获取 value 数据
    val value: DStream[String] = stream.map(record => record.value)

    // 经典的 worldCount
    val result: DStream[(String, Int)] = value.flatMap(_.split(" ")).map((_, 1))

    // 汇总
    val data: DStream[(String, Int)] = result.reduceByKey(_ + _)

    //控制台输出
    data.print()

    // 开始
    ssc.start()

    // 等待
    ssc.awaitTermination()

  }

auto.offset.reset:

  • earliest:自动将偏移重置为最小偏移 (最开始)
  • latest:自动将偏移量重置为最大偏移量(最新)
  • none:向消费者抛出异常

运行程序

  • 生产消息
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log
>scala python java
>scala python java
>scala python java
>scala python java
>scala python java
  • 运行结果
-------------------------------------------
Time: 1625923240000 ms
-------------------------------------------
(python,4)
(scala,4)
(java,4)

程序运行时,RDD 分区数是多少呢?

    // 查看分区数
    stream.foreachRDD(rdd=>{
      println(s"分区数0=${rdd.getNumPartitions}")
    })

运行结果为2 ,跟 --partitions 3有关?

分区数0=3

进一步测试,更改分区数

bin]$ kafka-topics.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic spark-log --alter --partitions 4

运行结果

分区数0=4

总结

消费kafka数据的时候,每个批次RDD的分区数 = topic分区数
sparkstreaming会动态感知kafka topic分区数,然后调整RDD的分区数

相关文章

网友评论

      本文标题:Spark Streaming之DStream创建

      本文链接:https://www.haomeiwen.com/subject/qkyqpltx.html