美文网首页
009 Spark stream 处理 Kafka

009 Spark stream 处理 Kafka

作者: 逸章 | 来源:发表于2019-12-12 00:00 被阅读0次

    拥有发布-订阅能力的Message queueing systems 通常被用来处理messages 。 传统消息队列处理不了large-scale data processing applications所需要的每秒处理巨大数量的messages需求。

    Kafka is a publish-subscribe messaging system used by many IoT applications to process a huge number of messages.
    它的特点:

    • Extremely fast: Kafka can process huge amounts of data by handling reading and writing in short intervals of time from many application clients
    • Highly scalable: Kafka is designed to scale up and scale out to form a cluster using commodity hardware(通用硬件)
    • Persists a huge number of messages: Messages reaching Kafka topics are persisted into the secondary storage, while at the same time it is handling huge number of messages flowing through

    Kafka中的3个重要elements:

    • Producer: The real source of the messages, such as weather sensors or mobile phone network
    • Broker: The Kafka cluster(就是Kafka集群), which receives and persists the messages published to its topics by various producers
    • Consumer: The data processing applications subscribed to the Kafka topics that consume the messages published to the topics

    1. 工程结构

    image.png

    1.1 sbt文件

    name := "Simple Project"
    version := "1.0"
    
    scalaVersion := "2.11.12"
    val sparkVersion = "2.4.4"
    
    //libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.4.4" % "provided"
    libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.4.4" % "provided"
    //libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.4.4"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % sparkVersion,
      "org.apache.spark" %% "spark-mllib" % sparkVersion,
      //"org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-streaming" % sparkVersion,
      "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion
     )
       
    
    image.png

    1.2 用于提交的submit.sh脚本

    #!/bin/bash
    #-----------
    # submit.sh
    #-----------
    # IMPORTANT - Assumption is that the $SPARK_HOME and $KAFKA_HOME environment variables are already set in the system that is running the application
    
    # [FILLUP] Which is your Spark master. If monitoring is needed, use the desired Spark master or use local
    # When using the local mode. It is important to give more than one cores in square brackets
    #SPARK_MASTER=spark://Rajanarayanans-MacBook-Pro.local:7077
    #SPARK_MASTER=local[4]
    SPARK_MASTER=spark://yay-ThinkPad-T470-W10DG:7077
    
    # [OPTIONAL] Your Scala version
    SCALA_VERSION="2.11"
    
    # [OPTIONAL] Name of the application jar file. You should be OK to leave it like that
    APP_JAR="simple-project_2.11-1.0.jar"
    
    # [OPTIONAL] Absolute path to the application jar file
    PATH_TO_APP_JAR="target/scala-$SCALA_VERSION/$APP_JAR"
    
    # [OPTIONAL] Spark submit command
    SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit"
    
    # [OPTIONAL] Pass the application name to run as the parameter to this script
    APP_TO_RUN=$1
    
    sbt package
    
    if [ $2 -eq 1 ]
    then  
      $SPARK_SUBMIT --class $APP_TO_RUN --master $SPARK_MASTER --jars $KAFKA_HOME/libs/kafka-clients-2.1.1.jar,$KAFKA_HOME/libs/metrics-core-2.2.0.jar,$KAFKA_HOME/libs/zkclient-0.11.jar,./lib/kafka_2.11-0.8.2.1.jar,./lib/spark-streaming-kafka-0-8_2.11-2.4.4.jar $PATH_TO_APP_JAR
    else
      $SPARK_SUBMIT --class $APP_TO_RUN --master $SPARK_MASTER --jars $PATH_TO_APP_JAR $PATH_TO_APP_JAR
    fi
    
    image.png

    1.3 源代码

    KafkaStreamingApps.scala

    /**
    The following program can be compiled and run using SBT
    Wrapper scripts have been provided with this
    The following script can be run to compile the code
    ./compile.sh
    
    The following script can be used to run this application in Spark. The second command line argument of value 1 is very important. This is to flag the shipping of the kafka jar files to the Spark cluster
    ./submit.sh com.packtpub.sfb.KafkaStreamingApps 1
    **/
    package com.packtpub.sfb
    
    import java.util.HashMap
    import org.apache.spark.streaming._
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.streaming.kafka._
    import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,ProducerRecord}
    import org.apache.log4j.{Level, Logger}
    object KafkaStreamingApps {
        def main(args: Array[String]) {
            // Log level settings
            LogSettings.setLogLevels()
            // Variables used for creating the Kafka stream
            //The quorum of Zookeeper hosts
            val zooKeeperQuorum = "localhost"
            // Message group name
            val messageGroup = "sfb-consumer-group"
            //Kafka topics list separated by coma if there are multiple topics to be listened on
            val topics = "sfb"
            //Number of threads per topic
            val numThreads = 1
            // Create the Spark Session and the spark context
            val spark = SparkSession.builder.appName(getClass.getSimpleName).getOrCreate()
            // Get the Spark context from the Spark session for creating the streaming context
            val sc = spark.sparkContext
            // Create the streaming context
            val ssc = new StreamingContext(sc, Seconds(10))
            // Set the check point directory for saving the data to recover when there is a crash
            ssc.checkpoint("/tmp")
            // Create the map of topic names
            val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
            // Create the Kafka stream
            val appLogLines = KafkaUtils.createStream(ssc, zooKeeperQuorum, messageGroup, topicMap).map(_._2)
            // Count each log messge line containing the word ERROR
            val errorLines = appLogLines.filter(line => line.contains("ERROR"))
            // Print the line containing the error
            errorLines.print()
            // Count the number of messages by the windows and print them
            errorLines.countByWindow(Seconds(30), Seconds(10)).print()
            // Start the streaming
            ssc.start()
            // Wait till the application is terminated
            ssc.awaitTermination()
        }
    
    }
    
    object LogSettings{
        /**
        Necessary log4j logging level settings are done
        */
        def setLogLevels() {
            val log4jInitialized =
            Logger.getRootLogger.getAllAppenders.hasMoreElements
            if (!log4jInitialized) {
                // This is to make sure that the console is clean from other INFO messages printed by Spark
                Logger.getRootLogger.setLevel(Level.ERROR)
            }
        }
    }
    

    2. 编译

    image.png

    3. 提交运行

    3.1. Kafka和Zookeeper环境准备

    3.1.1. 安装Kafka和Zookeeper

    本空间有安装指导,这里不再重复

    3.1..2. 启动Zookeeper和Kafka Server
    $ZOOKEEPER_HOME/bin/zkServer.sh start
    
    $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
    

    3.2.启动Spark程序

    $SPARK_HOME/sbin/start-all.sh
    

    或者分开启动:

    $SPARK_HOME/sbin/start-master.sh
    
    $SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
    

    3.3. 创建消息流

    3.3.1. 创建主题和查看主题

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sfb
    
    $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    3.4. 提交

    3.4.1.建立文件夹并下载需要的包

    3.4.2. 创建Producer

    $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sfb
    
    

    3.4.3 在producer里面输入内容

    image.png
    [Fri Dec 20 01:46:23 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
    forbidden by rule: /home/raj/
    [Fri Dec 20 01:46:23 2015] [WARN] [client 1.2.3.4.5.6] Directory index
    forbidden by rule: /home/raj/
    [Fri Dec 20 01:54:34 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
    

    然后观察我们的程序:


    image.png

    完整工程下载路径(包括需要的lib目录):https://github.com/yinbodotcc/spark-stream-Kafka-0.8.git

    相关文章

      网友评论

          本文标题:009 Spark stream 处理 Kafka

          本文链接:https://www.haomeiwen.com/subject/ykhzgctx.html