Spark Streaming整合Kafka

作者: 董二弯 | 来源:发表于2019-05-26 16:21 被阅读2次

    前几章介绍了KafkaSpark Streaming入门Spark Streaming进阶。在这一章一起学习Spark Streaming和Kafka的整合。

    概述

    kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时计算框架实时地读取kafka中的数据然后进行计算。在spark1.3版本后,kafkaUtils里面提供了两个创建dstream的方法,一种为KafkaUtils.createDstream(需要receiver接收),另一种为KafkaUtils.createDirectStream。其中推荐使用KafkaUtils.createDirectStream的方式相比基于Receiver方式有几个优点:

    • 简化并行
      不需要创建多个kafka输入流,然后union它们,sparkStreaming将会创建和kafka分区一种的rdd的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。
    • 高效
      第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是被kafka复制,另一次是写到WAL中。而没有receiver的这种方式消除了这个问题。
    • 恰好一次语义(Exactly-once-semantics)
      Receiver读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,虽然这种方法可以通过数据保存在WAL中保证数据不丢失,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具。
    • 版本限制
      除了以上的原因,由于在学习Kafka时安装的版本是2.2.0,查询官方文档Spark Streaming整合Kafka在0.10已经不支持Receiver的方式。
      image.png

    综上我们只演示KafkaUtils.createDirectStream的方式进行整合。

    整合流程

    • 启动zookeeper集群
    zkServer.sh start
    
    • 启动kafka集群
      在启动之前在server.properties中根据虚拟机地址配置listeners的地址


      image.png

      因为不配置在启动整合代码时报Broker may not be available的错误,通过百度后指定listeners的地址即可。
      启动kafka

    kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
    
    • 创建topic
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
    //查看创建的topic,有记录说明创建成功
    kafka-topics.sh --list --zookeeper localhost:2181
    
    • 启动生成者,向topic中生产数据
    ./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_spark
    
    • 编写SparkStreaming应用程序
    • pom依赖
    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>2.4.2</version>
    </dependency>
    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                <version>2.4.3</version>
    </dependency>
    

    这里spark-streaming的版本我选择的是spark-streaming_2.12:2.42版本,这是由于我本地用的Scala的环境是2.12.8,spark-streaming这个版本中用到的Scala版本就是2.12.8。之前我使用的是spark-streaming_2.12:2.11.8版本,项目启动时报环境不匹配的问题。所以在本地演示时需要选择合适的版本。

    • Scala代码
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    /**
      * Spark Streaming整合Kafka
      *
      * @author zhiying.dong@hand-china.com 2019/05/24 16:54
      */
    object KafkaDirectWordCount{
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
          .setAppName("DirectKafka")
          .setMaster("local[2]")
    
        val ssc = new StreamingContext(conf, Seconds(2))
    
        val topicsSet = Array("kafka_spark")
        val kafkaParams = mutable.HashMap[String, String]()
        //必须添加以下参数,否则会报错
        kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
        kafkaParams.put("group.id", "group1")
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        val messages = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
          )
        )
    
        // Get the lines, split them into words, count the words and print
        val lines = messages.map(_.value)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        wordCounts.print()
    
        // Start the computation
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 本地测试
      在生产者中输入统计字符
    image.png

    观察控制台发现可以统计字符出现的此时,说明Spark Streaming可以消费到Kafka中生产的消息


    image.png
    • 服务器测试
      和之前一样,打包上传到服务器,在通过以下命令启动
    ./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.3 --class com.imooc.spark.Test ~/lib/sparktrain-1.0.jar
    

    相关文章

      网友评论

        本文标题:Spark Streaming整合Kafka

        本文链接:https://www.haomeiwen.com/subject/rfibzqtx.html