美文网首页
Spark-从Kafka读取数据

Spark-从Kafka读取数据

作者: 布莱安托 | 来源:发表于2020-07-07 14:47 被阅读0次

    在工程中引用spark-streaming-kafka-0-10_2.11来使用它。通过包中提供的KafkaUtils可以在StreamingContextJavaStreamingContext中对Kafka消息创建DStream

    由于KafkaUtils可以订阅多个topic,因此创建的DStream由成对的topic和消息组成。具体操作如下:

    1. 导入依赖

      "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.0.0"
      
    2. 实现Wordcount

      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      object KafkaDemo {
        def main(args: Array[String]): Unit = {
      
          val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaDemo")
          val streamingContext = new StreamingContext(conf, Seconds(5))
      
          // Kafka参数
          val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "localhost:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "use_a_separate_group_id_for_each_stream",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
          )
      
          // topic列表
          val topics = Array("test")
      
          /*
            创建DStream
            需要的参数如下:
            1. StreamingContext或JavaStreamingContext
            2. LocationStrategy 位置策略,有三个实现:LocationStrategies.PreferBrokers(如果Broker节点同样是Executor节点,任务首选Broker节点),
               LocationStrategies.PreferConsistent(任务分布到所有可获得的Executor上),LocationStrategies.PreferFixed(数据倾斜是选择,自定义分布)
            3. ConsumerStrategy 消费策略,有两个实现:ConsumerStrategies.Subscribe(订阅模式),ConsumerStrategies.Assign(分配模式)
            4. PerPartitionConfig 可选,分区配置:可以设置分区读取速率,其实现类DefaultPerPartitionConfig,通过制定参数spark.streaming.kafka.maxRatePerPartition设置速率,默认为0
           */
          val kafkaDStream = KafkaUtils.createDirectStream[String, String](
            streamingContext,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
          )
      
          val flatMapDStream = kafkaDStream.flatMap(_.key().split(" "))
      
          val mapDStream = flatMapDStream.map((_, 1))
      
          val reduceByKeyDStream = mapDStream.reduceByKey(_ + _)
      
          reduceByKeyDStream.print()
      
          streamingContext.start()
          streamingContext.awaitTermination()
      
        }
      }
      
      

    相关文章

      网友评论

          本文标题:Spark-从Kafka读取数据

          本文链接:https://www.haomeiwen.com/subject/rrigqktx.html