美文网首页
008 Spark Stream 案例1:过滤日志

008 Spark Stream 案例1:过滤日志

作者: 逸章 | 来源:发表于2019-12-04 23:33 被阅读0次

    一、例1:过滤日志

    1. 小例子

    1.1 工程结构

    image.png

    config.sbt和前面的例子不一样了:

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.12"
    val sparkVersion = "2.4.4"
    
    resolvers ++= Seq(
      "apache-snapshots" at "http://repository.apache.org/snapshots/"
    )
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-sql" % sparkVersion,
      "org.apache.spark" %% "spark-mllib" % sparkVersion,
      "org.apache.spark" %% "spark-streaming" % sparkVersion,
      "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0-preview"
    )
    

    写成下面这样亦可

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.12"
    val sparkVersion = "2.4.4"
    //libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4" % "provided"
    //libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.4.4"
    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-sql" % sparkVersion,
    "org.apache.spark" %% "spark-mllib" % sparkVersion,
    "org.apache.spark" %% "spark-core" % sparkVersion,
    "org.apache.spark" %% "spark-streaming" % sparkVersion,
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion
    )   
    

    StreamingApps.scala:

    package com.packtpub.sfb
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.storage.StorageLevel
    import org.apache.log4j.{Level, Logger}
    object StreamingApps{
        def main(args: Array[String])
        {
            // Log level settings
            LogSettings.setLogLevels()
    //注意下面没有循环:In the previous code snippet, there is no loop construct telling the application to repeat till the running 
    //application is terminated. This is achieved by the Spark Streaming library itself.        
            // Create the Spark Session and the spark context               
            val spark = SparkSession.builder.appName(getClass.getSimpleName).getOrCreate()
            // Get the Spark context from the Spark session for creating the streaming context
            val sc = spark.sparkContext 
            // Create the streaming context,这里的10秒时间是Batch interval
            val ssc = new StreamingContext(sc, Seconds(10))
            // Set the check point directory for saving the data to recover when there is a crash
            ssc.checkpoint("/tmp")
            println("Stream processing logic start")
            // Create a DStream that connects to localhost on port 9999
            // The StorageLevel.MEMORY_AND_DISK_SER indicates that the data will be stored in memory and if it overflows, in disk as well
            //下面一行是每个batch interval(这里是10秒)都创建一个DStream
            val appLogLines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
            //A filter transformation is applied next on the DStream(产生一个新DStream):Count each log message line containing the word ERROR
            val errorLines = appLogLines.filter(line => line.contains("ERROR"))
            //The next line prints the DStream contents to the console. In other words, for every batch interval, 
            //if there are lines containing the word ERROR , that get displayed in the console
            errorLines.print()
            // Count the number of messages by the windows and print them
            //窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍,这两个参数后面再解释
            //对每个滑动窗口里面的数据进行一次count计算:传入一个窗口长度参数,一个窗口移动速率参数,然后返回指定长度窗口中的元素个数
            errorLines.countByWindow(Seconds(30), Seconds(10)).print()
            //但是这个只会执行一次
            println("Stream processing logic end")
            // Start the streaming
            ssc.start()   
            // Wait till the application is terminated            
            ssc.awaitTermination()  
        }
    }
    object LogSettings{
        /**
        Necessary log4j logging level settings are done
        */
        def setLogLevels() {
            val log4jInitialized =
            Logger.getRootLogger.getAllAppenders.hasMoreElements
            if (!log4jInitialized) {
                // This is to make sure that the console is clean from other INFO messages printed by Spark
                Logger.getRootLogger.setLevel(Level.WARN)
            }
        }
    }
    
    滑动窗口的解释: image.png

    1.2 编译

    image.png

    1.3 部署到cluster上

    A 为了看到效果,先启动netcat程序(Ubuntu自带)

    image.png
    B 监控运行中的应用程序
    image.png
    C 提交任务到Spark上去
    在src同级目录增加一个submit.sh文件
    #!/bin/bash
    #-----------
    # submit.sh
    #-----------
    # IMPORTANT - Assumption is that the $SPARK_HOME and $KAFKA_HOME environment variables are already set in the system that is running the application
    
    # [FILLUP] Which is your Spark master. If monitoring is needed, use the desired Spark master or use local
    # When using the local mode. It is important to give more than one cores in square brackets
    #SPARK_MASTER=spark://Rajanarayanans-MacBook-Pro.local:7077
    #SPARK_MASTER=local[4]
    SPARK_MASTER=spark://yay-ThinkPad-T470-W10DG:7077
    
    # [OPTIONAL] Your Scala version
    SCALA_VERSION="2.11"
    
    # [OPTIONAL] Name of the application jar file. You should be OK to leave it like that
    APP_JAR="simple-project_2.11-1.0.jar"
    
    # [OPTIONAL] Absolute path to the application jar file
    PATH_TO_APP_JAR="target/scala-$SCALA_VERSION/$APP_JAR"
    
    # [OPTIONAL] Spark submit command
    SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit"
    
    # [OPTIONAL] Pass the application name to run as the parameter to this script
    APP_TO_RUN=$1
    
    sbt package
    
    $SPARK_SUBMIT --class $APP_TO_RUN --master $SPARK_MASTER --jars $PATH_TO_APP_JAR $PATH_TO_APP_JAR
    

    然后执行submit:

    yay@yay-ThinkPad-T470-W10DG:~/scalaproject/test1$ ./submit.sh com.packtpub.sfb.StreamingApps
    

    D 在nl窗口里面多次批量键入消息
    每隔一定时间键入一次:

    [Fri Dec 20 01:46:23 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
    forbidden by rule: /home/raj/
    [Fri Dec 20 01:46:23 2015] [WARN] [client 1.2.3.4.5.6] Directory index
    forbidden by rule: /home/raj/
    [Fri Dec 20 01:54:34 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
    forbidden by rule: /apache/web/test
    [Fri Dec 20 01:54:34 2015] [WARN] [client 1.2.3.4.5.6] Directory index
    forbidden by rule: /apache/web/test
    [Fri Dec 20 02:25:55 2015] [ERROR] [client 1.2.3.4.5.6] Client sent
    malformed Host header
    [Fri Dec 20 02:25:55 2015] [WARN] [client 1.2.3.4.5.6] Client sent
    malformed Host header
    [Mon Dec 20 23:02:01 2015] [ERROR] [client 1.2.3.4.5.6] user test:
    authentication failure for "/~raj/test": Password Mismatch
    [Mon Dec 20 23:02:01 2015] [WARN] [client 1.2.3.4.5.6] user test:
    authentication failure for "/~raj/test": Password Mismatch
    

    然后就能在submit的那个窗口里面看到含有error的内容被打印出来:

    image.png
    D 看监控结果
    image.png image.png image.png

    相关文章

      网友评论

          本文标题:008 Spark Stream 案例1:过滤日志

          本文链接:https://www.haomeiwen.com/subject/bqsqgctx.html