Spark Streaming + Kafka整合

作者: sparkle123 | 来源:发表于2018-05-09 14:45 被阅读0次

    参考官网
    http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html

    • 之前先确保以下操作:
      1、先启动ZK:./zkServer.sh start
      2、启动Kafka:./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
      3、创建topic:
      ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
      ./kafka-topics.sh --list --zookeeper hadoop:2181
      4、通过控制台测试是否能正常生产与消费
      ./kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_streaming_topic
      ./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic kafka_streaming_topic

    Approach 1: Receiver-based Approach

    • Receiver方式的本地环境联调
      1、KafkaUtils.createStream Create an input stream that pulls messages from Kafka Brokers.
    import org.apache.spark.streaming.kafka._
    
     val kafkaStream = KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
    

    2、引入数组,含四个数->val Array(zkQuorum,group,topics,numThreads) = args

    3、判断是否传入四个参数->构建topicMap:
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    4、topicMap带入KafkaUtils参数
    5、业务代码:
    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    6、到IDEA的edit configuration编辑以下内容:
    hadoop:2181 test kafka_streaming_topic 1

    注意:

    test:group名
    1:线程数
    setMaster("local[2]")   一定要大于2
    

    7、run下代码,在kafka 生产者窗口手动输入几个单词,在kafka consumer窗口即时看到单词的产生,在本地代码的console窗口看到单词计数

    • Receiver方式的生产环境联调
      1、在项目根目录下执行编译
      mvn clean package -DskipTests
      2、上传到服务器hadoop的lib目录下,执行:
    spark-submit \
    --class com.feiyue.bigdata.sparkstreaming.KafkaReceiverWordCount \
    --master local[2] \
    --name KafkaReceiverWordCount \
    --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
    /home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:2181 test kafka_streaming_topic 1
    

    3、运行后看4040端口Spark Streaming的UI界面

    可以知道UI页面中,
    Receiver是一直都在运作的,
    而Direct方式没有此Jobs
    

    Approach 2: Direct Approach (No Receivers)

    Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

    特点:
    1、简化了并行度,不需要多个Input Stream,只需要一个DStream
    2、加强了性能,真正做到了0数据丢失,而Receiver方式需要写到WAL才可以(即副本存储),Direct方式没有Receiver
    3、只执行一次

    缺点:
    1、基于ZooKeeper的Kafka监控工具,无法展示出来,所以需要周期性地访问offset才能更新到ZooKeeper

    • 怎么做

    基于Receiver方式的代码,将createStream改为createDirectStream,其余业务代码都不用改动。

        //kafkaParams: Map[String, String],
        //topics: Set[String]
        val Array(brokers, topics) = args
    
    
        //val sparkConf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
        val sparkConf = new SparkConf()
    
        val ssc = new StreamingContext(sparkConf, Seconds(5))
    
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        val topicsSet = topics.split(",").toSet
    
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
        messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    
        ssc.start()
        ssc.awaitTermination()
    
    
    • Direct生产环境联调
      基于Receiver方式,参数只需要传brokers与topics,注意查看源码与泛型看返回类型并构造出来
    spark-submit \
    --class com.feiyue.bigdata.sparkstreaming.KafkaDirectWordCount \
    --master local[2] \
    --name KafkaDirectWordCount \
    --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
    /home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:9092  kafka_streaming_topic
    

    3、运行后看4040端口Spark Streaming的UI界面

    可以知道UI页面中,Direct方式没有此Jobs
    

    相关文章

      网友评论

        本文标题:Spark Streaming + Kafka整合

        本文链接:https://www.haomeiwen.com/subject/piinrftx.html