Spark Streaming整合Kafka

作者: 董二弯 | 来源:发表于2019-05-26 16:21 被阅读2次

前几章介绍了KafkaSpark Streaming入门Spark Streaming进阶。在这一章一起学习Spark Streaming和Kafka的整合。

概述

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时计算框架实时地读取kafka中的数据然后进行计算。在spark1.3版本后,kafkaUtils里面提供了两个创建dstream的方法,一种为KafkaUtils.createDstream(需要receiver接收),另一种为KafkaUtils.createDirectStream。其中推荐使用KafkaUtils.createDirectStream的方式相比基于Receiver方式有几个优点:

  • 简化并行
    不需要创建多个kafka输入流,然后union它们,sparkStreaming将会创建和kafka分区一种的rdd的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。
  • 高效
    第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是被kafka复制,另一次是写到WAL中。而没有receiver的这种方式消除了这个问题。
  • 恰好一次语义(Exactly-once-semantics)
    Receiver读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,虽然这种方法可以通过数据保存在WAL中保证数据不丢失,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具。
  • 版本限制
    除了以上的原因,由于在学习Kafka时安装的版本是2.2.0,查询官方文档Spark Streaming整合Kafka在0.10已经不支持Receiver的方式。
    image.png

综上我们只演示KafkaUtils.createDirectStream的方式进行整合。

整合流程

  • 启动zookeeper集群
zkServer.sh start
  • 启动kafka集群
    在启动之前在server.properties中根据虚拟机地址配置listeners的地址


    image.png

    因为不配置在启动整合代码时报Broker may not be available的错误,通过百度后指定listeners的地址即可。
    启动kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  • 创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
//查看创建的topic,有记录说明创建成功
kafka-topics.sh --list --zookeeper localhost:2181
  • 启动生成者,向topic中生产数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_spark
  • 编写SparkStreaming应用程序
  • pom依赖
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.2</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.3</version>
</dependency>

这里spark-streaming的版本我选择的是spark-streaming_2.12:2.42版本,这是由于我本地用的Scala的环境是2.12.8,spark-streaming这个版本中用到的Scala版本就是2.12.8。之前我使用的是spark-streaming_2.12:2.11.8版本,项目启动时报环境不匹配的问题。所以在本地演示时需要选择合适的版本。

  • Scala代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * Spark Streaming整合Kafka
  *
  * @author zhiying.dong@hand-china.com 2019/05/24 16:54
  */
object KafkaDirectWordCount{
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("DirectKafka")
      .setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(2))

    val topicsSet = Array("kafka_spark")
    val kafkaParams = mutable.HashMap[String, String]()
    //必须添加以下参数,否则会报错
    kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
    kafkaParams.put("group.id", "group1")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
      )
    )

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 本地测试
    在生产者中输入统计字符
image.png

观察控制台发现可以统计字符出现的此时,说明Spark Streaming可以消费到Kafka中生产的消息


image.png
  • 服务器测试
    和之前一样,打包上传到服务器,在通过以下命令启动
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.3 --class com.imooc.spark.Test ~/lib/sparktrain-1.0.jar

相关文章

网友评论

    本文标题:Spark Streaming整合Kafka

    本文链接:https://www.haomeiwen.com/subject/rfibzqtx.html