美文网首页大数据,机器学习,人工智能大数据Hadoop
Kafka+Spark Streaming实现单词数量的实时统计

Kafka+Spark Streaming实现单词数量的实时统计

作者: 杨赟快跑 | 来源:发表于2018-10-09 19:33 被阅读61次

1. 准备工作

Kafka集群的搭建可以参考 Kafka集群搭建与配置
Spark集群的搭建可以参考Hadoop+HBase+Spark+Hive环境搭建

2. 编写代码(scala实现)

引入pom依赖

<properties>
    <kafka.version>2.0.0</kafka.version>
    <spark.version>2.3.1</spark.version>
    <scala.version>2.11</scala.version>
</properties>
<dependencies>
     <!--spark-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- log4j -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>org.scalanlp</groupId>
        <artifactId>breeze-viz_2.11</artifactId>
        <version>0.13.2</version>
    </dependency>
</dependencies>

生产者,每秒发送数据"yang yun ni hao sha a"

package com.whut.demo

import java.util

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

object KafkaProducer {
  def main(args: Array[String]) {

    //设置代理节点和主题
    val brokers = "192.168.1.41:9092,192.168.1.42:9092,192.168.1.47:9092" //zookeeper代理节点
    val inputTopic = "input-streaming-topic" //topic

    //设置zookeeper连接属性
    val props = new util.HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    //初始化生产者
    val producer = new KafkaProducer[String, String](props)

    //发送消息
    while(true) {
      val key = null
      val value = "yang yun ni hao sha a"
      val message = new ProducerRecord[String,String](inputTopic, key, value)
      producer.send(message)
      println(message)
      Thread.sleep(1000)
    }
  }
}

Spark Streaming作消费者,每2秒统计一次最近10秒每个单词出现的次数

package com.whut.demo

import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object SparkConsumer{

  def main(args:Array[String]){

    /**
      * 设置spark master
      * 单机模式: "local[*]"
      * 集群模式: "spark://192.168.1.32:7077"
      */
    val master = "local[*]"

    /**
      * 设置checkpoint路径
      * 单机模式: "checkpoint"
      * 集群模式: "hdfs://master:9000/user/checkpoint"
      */
    val checkpoint = "checkpoint"

    //设置日志等级
    LogConfig.setStreamingLogLevels()

    //设置批处理间隔,单位秒
    val batchDuration = 1

    //设置输入流的topic
    val inputTopic = "input-streaming-topic"

    //设置输出流的topic
    val outputTopic = "output-streaming-topic"

    //初始化streamingContext
    val streamingContext = new StreamingContext(
      new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster(master),
      Seconds(batchDuration)
    )
    streamingContext.checkpoint(checkpoint)

    //kafka配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.41:9092,192.168.1.42:9092,192.168.1.47:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    //创建DStream
    val dStream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](Array(inputTopic), kafkaParams)
    )

    //对接收到的一个DStream进行解析
    val lines = dStream.map(record => (record.key, record.value))
    val words = lines.map(_._2)
    val word = words.flatMap(_.split(" "))
    val pair = word.map(x => (x,1))
    //窗口长度设置为10秒,窗口滑动距离设置为2秒
    val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Seconds(10), Seconds(2))
    wordCounts.print
    streamingContext.start
    streamingContext.awaitTermination
  }
}

为了使得输出更加简洁,我们还要设置一下日志等级

package com.whut.demo
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}

object LogConfig extends Logging {

  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)//警告以上级别才打印
    }
  }
}

3. 集群模式下流处理任务的提交

单机模式
直接在IDEA中运行两个程序即可
集群模式
KafkaProducer 可以直接在IDEA中运行,但SparkConsumer需要将打成jar包,然后用spark-submit提交任务

Idea下,将程序打成jar包(参考IDEA 打jar,提交spark集群运行

将Jar包上传到HDFS(下面是可能会用到的hdfs命令)

hdfs dfs -ls /
hdfs dfs -rm /SparkConsumer.jar
hdfs dfs -put SparkConsumer.jar /

spark提交任务

spark-submit --class com.whut.demo.SparkConsumer --master spark://master:7077 hdfs://master:9000/SparkConsumer.jar

相关文章

网友评论

    本文标题:Kafka+Spark Streaming实现单词数量的实时统计

    本文链接:https://www.haomeiwen.com/subject/ihkyaftx.html