美文网首页
Spark Streaming + Kafka

Spark Streaming + Kafka

作者: 歌哥居士 | 来源:发表于2019-03-29 16:09 被阅读0次

    Kafka Receiver

    <!-- Spark Streaming整合Kafka -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    

    本地测试

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Spark Streamin对接Kafka的方式一
      */
    object KafkaReceiverWordCount {
      def main(args: Array[String]): Unit = {
    
        // 检查参数以及初始化
        if (args.length != 4) {
          System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
        }
        val Array(zkQuorum, group, topics, numThreads) = args
        val conf = new SparkConf()
          .setMaster("local[2]")
          .setAppName("KafkaReceiverWordCount")
          .set("spark.driver.host", "localhost")
        val ssc = new StreamingContext(conf, Seconds(5))
    
    
        // 关键代码
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
        messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()
    
        // ~
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    运行
    先启动zookeeper和kafka
    运行时加入参数: host000:2181 group_test hello_test 1
    

    服务端测试

    修改代码
    //      .setMaster("local[2]")
    //      .setAppName("KafkaReceiverWordCount")
    //      .set("spark.driver.host", "localhost")
    $ mvn clean package -DskipTests
    $ scp spark-learning-1.0-SNAPSHOT.jar user000@host000:~/jars
    $ spark-submit  --master local[2] \
    --class KafkaReceiverWordCount \
    --packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \
    ~/jars/spark-learning-1.0-SNAPSHOT.jar host000:2181 group_test hello_test 1
    

    Kafka Direct

    我的Kafka依赖使用的是0.9.0.0,会报错:kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。所以将Kafka版本改成0.8.2.1。
    <!-- Spark Streaming整合Kafka -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    

    本地测试

    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Spark Streamin对接Kafka的方式二
      */
    object KafkaDirectWordCount {
      def main(args: Array[String]): Unit = {
    
        // 检查参数以及初始化
        if (args.length != 2) {
          System.err.println("Usage: KafkaReceiverWordCount <brokers> <topics>")
          System.exit(1)
        }
        val Array(brokers,topics) = args
        val conf = new SparkConf()
          .setMaster("local[2]")
          .setAppName("KafkaDirectWordCount")
          .set("spark.driver.host", "localhost")
        val ssc = new StreamingContext(conf, Seconds(5))
    
        // 关键代码
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        val topicsSet = topics.split(",").toSet
        val messages = KafkaUtils.createDirectStream
          [String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
        messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()
    
        // ~
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    先启动zookeeper和kafka
    运行时加入参数: host000:9092 hello_test
    

    服务端测试

    修改代码
    //      .setMaster("local[2]")
    //      .setAppName("KafkaDirectWordCount")
    //      .set("spark.driver.host", "localhost")
    $ mvn clean package -DskipTests
    $ scp spark-learning-1.0-SNAPSHOT.jar user000@host000:~/jars
    $ spark-submit  --master local[2] \
    --class KafkaDirectWordCount \
    --packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \
    ~/jars/spark-learning-1.0-SNAPSHOT.jar host000:9092 hello_test
    

    相关文章

      网友评论

          本文标题:Spark Streaming + Kafka

          本文链接:https://www.haomeiwen.com/subject/xpicvqtx.html