美文网首页
SparkStream与kafka对接

SparkStream与kafka对接

作者: CoderInsight | 来源:发表于2023-02-09 14:07 被阅读0次

    1.官网简介

    SparkStream与kafka对接.png

    2.参考资料

    1,必读:再讲Spark与kafka 0.8.2.1+整合
    2,必读:Spark与kafka010整合
    3,spark streaming kafka 整合(010-Consumer)

    3.两版本代码整理

    (1).spark-streaming-kafka-0-8

    1),当前版本依赖的jar包

    第一和第二个jar包可以直接在当前版本的lib目录中拷贝。第三个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。

    [root@localhost jars]# pwd
    /usr/local/spark/jars
    
    [root@localhost jars]# ll | grep kafka
    -rw-r--r--. 1 root root   3954430 Mar 20 11:34 kafka_2.11-0.8.2.1.jar
    -rw-r--r--. 1 root root    324010 Mar 20 11:34 kafka-clients-0.8.2.1.jar
    -rw-r--r--. 1 root root    298522 Mar 19 16:07 spark-streaming-kafka-0-8_2.11-2.1.0.jar
    

    2).producer(生产者)

    import java.util.HashMap
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
     // 指定再在哪个kafka服务器运行
    val brokers="localhost:9092"
    // 指定topic
    val topic="demo01"
    // 指定每秒钟向topic放多少消息
    val messagesPerSec=3
    // 每个消息中包含多少个单词
    val wordsPerMessage=5
    
    // 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
    // 使用键值对的方式将
    val props = new HashMap[String, Object]()
    // 指定 运行 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    // 对key和value都去序列化,且因为我定义的Key和Value的类型都是String的,所以直接用的 StringSerializer去序列化
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    // 生产者
    val producer = new KafkaProducer[String, String](props)
    
    // 在while循环中不断的生成数据
    while(true) {
      // 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
      (1 to messagesPerSec.toInt).foreach { messageNum =>
          // 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
          .mkString(" ") // 指定生成随机数根据什么符号进行分割
          print(str)
          println()
        // 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
          // 指定 主题、key、value
        val message = new ProducerRecord[String, String](topic, null, str)
        // 将消息对象发送出去
        producer.send(message)
      }
     Thread.sleep(3000) // 线程休眠(3秒)
    }
    
    

    3).consumer(消费者)

    import _root_.kafka.serializer.StringDecoder
    
    import org.apache.spark._
    import org.apache.spark.SparkConf
    
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    // 设置与brokers获取连接
    val brokers="localhost:9092"
    // 可以指定多个分区,然后使用 "," 分割
    val topics="demo01"
    
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount2").setMaster("local[2]")
    // 创建具有2秒批处理间隔的上下文
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    
    // 为了 容错 (目录不存在的时候,其可以自动创建)
    ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") // 设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop
    
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    
    // 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
    // Get the lines, split them into words, count the words and print
    // 由于Producer生产的数据,第一个值是null,我们用不到,所以直接去数据的第二个数据
    val lines = messages.map(_._2)
    // 拿到数据进行拆分,拿到每一个单词
    val words = lines.flatMap(_.split(" "))
    // 对每一个单词,再去做map变换,然后进行计数
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    // 将结果打印
    wordCounts.print()
    
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
    

    4).知识补充

    • Direct模式简介

      • 在spark 1.3以后引入了一种新的spark Streaming api,新的api回自己在driver内部维护一个偏移,然后自动计算指定的 topic+partition 该批次需要拉去数据的范围,然后从kafka拉去数据来计算。
      • 不同于基于Receiver的方式,direct模式不会将偏移记录到Zookeeper,以保证故障恢复从上次偏移处消费消息。Direct模式采用的是,你可以通过Checkpoint或者自己编写工具来实现偏移的维护,保证数据消费不丢失。
    • Direct模式优势

      • 简化并行度:不需要创建多个kafka stream,然后union他们。使用directStream,spark streaming 生成的RDD分区和kafka的分区是一一对应的,这种方式理解起来更简单而且便于调优。
      • 高效:
        • 基于Receiver的方式要保证数据不丢失,必须启用预写日志。这个行为实际上是非常低效的,数据会被复制两次,一次是kafka集群,一次是预写日志。
        • Direct方式解决了这个问题,由于没有Receiver,故而也不需要预写日志。只要你kafka里面存有数据,那么消息就可以从kafka里面恢复。
      • 仅一次消费语义:
        • 基于Receiver的会把偏移提交到Zookeeper。这种方式结合预写日志能保证数据不丢失,也即是最少一次消费语义,但是有几率导致消费者在存在失败的情况下消费消息两次。比如,消息处理并经过存储之后,但是偏移并没有提交到Zookeeper,这个时候发生故障了,那么恢复之后,就会按照Zookeeper上的偏移再一次消费数据并处理,导致消息重复处理。
        • 但是direct 方式偏移不会提交到Zookeeper,是spark streaming在driver使用内存变量加Checkpoint进行追踪的,所以尽管会存在任务失败,但是仍然能保证消费的一次处理。
      • 注意:由于direct方式不会提交偏移到Zookeeper,所以,基于Zookeeper的kafka监控工具就不能监控到spark streaming的消费情况。然而,你可以自己讲偏移提交道Zookeeper,来满足你的需求。

    (2).spark-streaming-kafka-0-10

    1).当前依赖的jar包

    第一个jar包可以直接在当前版本的lib目录中拷贝。第二个jar包需要从本地下载好,然后上传执行。所以在本地的jar路径中我保存了一份完整的jar包。

    [root@master jars]# pwd
    /usr/local/spark/jars
    
    [root@master jars]# ll | grep kafka
    -rw-r--r--. 1 root root   946811 Mar 20 16:02 kafka-clients-0.10.2.0.jar
    -rw-r--r--. 1 root root   190413 Mar 20 14:51 spark-streaming-kafka-0-10_2.11-2.3.0.jar
    

    2).Producer(生产者)

    import java.util.HashMap
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    // 注意当前版本的stream对象是与旧版本不一样的
    import org.apache.spark.streaming.kafka010._
    
    
    // 指定再在哪个kafka服务器运行
    val brokers="localhost:9092"
    // 指定topic
    val topic="demo01"
    // 指定每秒钟向topic放多少消息
    val messagesPerSec=3
    // 每个消息中包含多少个单词
    val wordsPerMessage=5
    
    // 以下四行代码一般是固定的写法,之后直接复制粘贴就可以
    // 使用键值对的方式将
    val props = new HashMap[String, Object]()
    // 指定 运行 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    // 对key和value都去序列化
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    // 生产者
    val producer = new KafkaProducer[String, String](props)
    
    // 在while循环中不断的生成数据
    while(true) {
      // 控制一秒钟生成多少数据(1 to XX 会生成 Range 类型变量)
      (1 to messagesPerSec.toInt).foreach { messageNum =>
          // 控制一条消息中有几个单词 1 to XXX,接下来对集合中每一个元素执行map操作
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) // Random的nextInt(10)的作用是 生成0-9之间的一个随机数
          .mkString(" ") // 指定生成随机数根据什么符号进行分割
          print(str)
          println()
        // 生产者 生产出来的消息类型都是 ProducerRecord 类型的,
          // 指定 主题、key、value
        val message = new ProducerRecord[String, String](topic, null, str)
        // 将消息对象发送出去
        producer.send(message)
      }
     Thread.sleep(3000) // 线程休眠(3秒)
    }
    

    3).Consumer(消费者)

    // 代码是在jupyter中运行的,所以没有加函数体等
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    // 创建配置文件对象
    var sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkKafKaTest")
    // 创建具有5秒批处理间隔的上下文
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    
    // 如果指定多个topic,那么在这里直接使用","进行分割
    val topics = Array("demo01")
    // 配置kafka的相关参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    // 创建 DirectStream 对象(DStream对象:实际上就是一堆RDD的集合)
    // 此时的数据来源是从kafka来的;每行数据是以(K,V)的形式存放??
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    
    // 将DStream中数据进行map变换,将每一行记录中的key和value值进行整理,然后打印输出
    stream.map(record => (record.key, record.value)).print()
    /*
    这里是将计数操作直接写在了一行代码中:
    1.将数据中value取出来;(数据是以key,value的形式存的吗?????)
    2.将每一行数据进行单词的切分,
    3.对切分之后的单词进行map变换,重新映射为另一种形式
    4.然后对转换之后的数据进行单词计数
    5.最后将结果打印在屏幕上
    */
    stream.map(record =>  record.value).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
    
    // 启动当前程序
    ssc.start() 
    // 等待一个终止命令
    ssc.awaitTermination() 
    
    def createDirectStream[
        K: ClassTag,
        V: ClassTag,
        KD <: Decoder[K]: ClassTag,
        VD <: Decoder[V]: ClassTag] (
          ssc: StreamingContext,
          kafkaParams: Map[String, String],
          topics: Set[String]
      ): InputDStream[(K, V)] {...}
    

    相关文章

      网友评论

          本文标题:SparkStream与kafka对接

          本文链接:https://www.haomeiwen.com/subject/gnqqkdtx.html