美文网首页程序员
mac上搭建kafka并利用spark streaming实时消

mac上搭建kafka并利用spark streaming实时消

作者: 学习之术 | 来源:发表于2018-04-30 14:48 被阅读409次
    hello.jpg

    Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。

    APP流行起来之后,企业就有了大量用户的行为数据,怎么有效的利用这些数据成了企业的重中之重。利用数据的第一步就是高效的采集数据,Kafka就是企业传输收集用户行为数据的常用系统。

    程序员开发的算法都是部署在服务器上,要想在本地测试就得自己在本地搭建Kafka环境。通过模拟用户行为数据来营造真实环境,这样可以更方便地对代码进行调试。

    mac上安装Kafka

    采用mac下的安装利器homebrew,在终端输入brew install kafka即可,homebrew会自动安装kafka的依赖zookeeper。

    使用brew安装后,kafka和zookeeper的配置文件路径如下,通常情况下我们也不需要做任何修改。

    /usr/local/etc/kafka/server.properties
    /usr/local/etc/kafka/zookeeper.properties
    

    安装完之后,终端会显示启动提示消息:

    brew install kafka

    使用下面两个命令快速启动zookeeper和kafka:

    brew services start zookeeper
    brew services start kafka
    

    模拟真实环境生产Kafka消息

    虽然可以在终端生产消息,但要生产真实环境下的json格式的数据确很麻烦,所以写了脚本自动模拟真实环境下的数据。

    先在idea上增加mavern-kafka依赖

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    

    下面的代码创建了两个topic,每200毫秒随机产生一些用户在电商APP上的行为数据。

    import java.util.Properties
    import org.codehaus.jettison.json.JSONObject
    import kafka.javaapi.producer.Producer
    import kafka.producer.KeyedMessage
    import kafka.producer.ProducerConfig
    import org.apache.log4j.{Level, Logger}
    import scala.util.Random
    
    object KafkaEventProducer {
      private val goodsid = Array("1100", "1975", "24724", "4542")
      private val userid = Array("20ad3455", "47c4ea08","F4727214", "J9FE7E52")
      private val siteid = Array("67800", "60902")
      private val eventkey = Array("open", "goods_view", "addtobag", "impression", "checkout")
      private val pagename = Array("categories", "search", "goodsdetail", "Home")
      private val sid = Array("xie", "chen", "long")
    
      private val goodsLength = goodsid.length
      private val userdLength = userid.length
      private  val siteidLength = siteid.length
      private val eventkeyLength = eventkey.length
      private  val pagenameLength = pagename.length
      private val sidLength = sid.length
    
      private val random = new Random()
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        val topic = Array("impression", "event")
        val brokers = "127.0.0.1:9092"
        val props = new Properties()
        props.put("metadata.broker.list", brokers)
        props.put("serializer.class", "kafka.serializer.StringEncoder")
    
        val kafkaConfig = new ProducerConfig(props)
        val producer = new Producer[String, String](kafkaConfig)
    
        while(true) {
          val event = new JSONObject()
          event
            .put("userid", userid(random.nextInt(useridLength)))
            .put("eventtime", System.currentTimeMillis.toString)
            .put("siteid", siteid(random.nextInt(siteidLength)))
            .put("goodsid", goodsid(random.nextInt(goodsid.length)))
            .put("pagename", pagename(random.nextInt(pagenameLength)))
            .put("sid", sid(random.nextInt(sidLength)))
            .put("eventkey", eventkey(random.nextInt(eventkeyLength)))
    
          // produce event message
          producer.send(new KeyedMessage[String, String](topic(random.nextInt(2)), event.toString))
          println("Message sent: " + event)
    
          Thread.sleep(200)
        }
      }
    }
    

    在终端输入以下命令,接收topic-impression的消息.

    kafka-console-consumer --bootstrap-server localhost:9092 --topic impression --from-beginning
    

    spark streaming消费消息

    Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。

    下面的程序实现spark streaming每6秒打印一次从kafka收集到数据。

    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Durations, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.broadcast.Broadcast
    
    
    /**
      * @author XieChenlong
      * 流处理例子
      */
    object streamingMab {
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("StreamingMab").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Durations.seconds(GlobalConfig.KAFKA_INTERVAL_TIME))
    
        val KAFKA_TOPIC_NAME = Array("impression", "event")
        val KAFKA_BROKERS = "127.0.0.1:9092"
        val KAFKA_PARAMS = Map[String, Object](
            "bootstrap.servers" -> "127.0.0.1:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> “xieTest”,
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](KAFKA_TOPIC_NAME, GlobalConfig.KAFKA_PARAMS)
        )
    
        val eventDStream = stream.map(record => record.value)
    
        eventDStream.foreachRDD(
          rdd => if (!rdd.isEmpty()) {
            rdd.foreach {x =    >
                println(x)
            }
          }
        )
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
      }
    }
    

    参考资料

    1. 不停游动的鱼, mac 本地安装kafka

    2. lusecond,mac环境下使用brew安装kafka

    3. spark, Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

    4. wiki, kafka

    5. Andi Ai, Spark编程指南 - 简体中文版

    相关文章

      网友评论

        本文标题:mac上搭建kafka并利用spark streaming实时消

        本文链接:https://www.haomeiwen.com/subject/cbwzlftx.html