Spark Streaming&Flume&Ka

作者: 董二弯 | 来源:发表于2019-05-28 22:14 被阅读4次

    前几章一起学习了Spark Streaming整合FlumeSpark Streaming整合Kafka。这一章一起学习三者的整合搭建一个流处理平台环境。整体数据流向和处理流程如下:

    image.png

    整合日志输出到Flume

    • Flume agent的配置
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 192.168.30.131
    a1.sources.r1.port = 41414
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    注意:这里sink的类型为logger,在测试log4j能够把日志输入到flume后在把sink改为KafkaSink。在开发中做一步测一步是较好的习惯,如果串联了整个流程测试,出现了问题不方便定位。

    启动agent
    flume-ng agent --name a1 --conf ../conf --conf-file ../conf/log4j_to_flume.conf -Dflume.root.logger=INFO,console
    
    • log4j配置
      添加pom依赖整合flume
    <dependency>
                <groupId>org.apache.flume.flume-ng-clients</groupId>
                <artifactId>flume-ng-log4jappender</artifactId>
                <version>1.9.0</version>
            </dependency>
    

    配置

    log4j.rootLogger=INFO,stdout,flume
    
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target = System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    
    
    log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname=192.168.30.131
    log4j.appender.flume.Port=41414
    log4j.appender.flume.UnsafeMode=true
    log4j.appender.flume.layout=org.apache.log4j.PatternLayout
    

    此处的Hostname和Port要和flume agent配置中的相同

    • 模拟日志产生
    public class LoggerGenerator2 {
    
        private static Logger logger = Logger.getLogger(LoggerGenerator2.class.getName());
    
        public static void main(String[] args) throws Exception{
            int index = 0;
            while(true) {
                Thread.sleep(1000);
                logger.info("value : " + index++);
            }
        }
    }
    

    注意
    我在启动程序后遇到了以下问题

    image.png
    经过排查,是因为我的pom文件中引入了spark streaming的依赖包,其中包含了avro的依赖,和flume-ng-log4jappender的avro依赖起了冲突,通过以下方式解决了问题
    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.avro</groupId>
                        <artifactId>avro</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    • 测试
      启动程序,观察agent的客户端,发现可以打印出日志信息,表示log4j到flume的整合成功。

    整合Flume到Kafka

    这一步把Flume收集的日志输出到Kafka,关键的一步就是把上一把agent的类型配置为KafkaSink

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 192.168.30.131
    a1.sources.r1.port = 41414
    
    # Describe the sink
    
    #设置Kafka接收器
    a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
    #设置Kafka的broker地址和端口号
    a1.sinks.k1.brokerList=192.168.30.131:9092
    #设置Kafka的Topic
    a1.sinks.k1.topic=kafka_spark
    #设置序列化方式
    a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    #一批中要处理的消息数。较大批量可提高吞吐量,同时增加延迟
    a1.sinks.k1.batchSize = 3
    a1.sinks.k1.requiredAcks = 1
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    启动kafka环境,并启动一个消费者。启动日志模拟生成的程序,观察消费者客户端,出现以下内容说明日志已经输出到了Kafka。


    image.png

    整合Kafka到Spark Streaming

    Spark Streaming整合Kafka已经做了详细介绍,这里简单回顾
    核心pom

     <!-- Spark Streaming 依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.avro</groupId>
                        <artifactId>avro</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
     <!-- Spark Streaming 集成 kafka-->
     <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                <version>2.4.3</version>
    </dependency>
    

    scala程序

    object Test {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
          .setAppName("DirectKafka")
          .setMaster("local[2]")
    
        val ssc = new StreamingContext(conf, Seconds(2))
    
        val topicsSet = Array("kafka_spark")
        val kafkaParams = mutable.HashMap[String, String]()
        //必须添加以下参数,否则会报错
        kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
        kafkaParams.put("group.id", "group1")
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        val messages = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
          )
        )
        // 业务逻辑处理,这里为简单打印
       val lines = messages.map(_.value)
        lines.print()
        //开始计算
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    先根据以上流程,启动Kafka环境、agent、模拟日志生成程序。启动Spark Streaming程序,观察控制台


    image.png

    根据百度查询添加以下配置

    val conf = new SparkConf().setAppName("SparkHistoryTags").setMaster("local")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    

    重新启动


    image.png

    成功

    Spark Streaming对接收到的数据进行处理

    根据以上步骤,已经成功的把日志信息输出到了spark streaming。此时可以通过Spark Streaming对接收到的数据进行处理。把处理后的结果保存到关系型数据库(如MySQL)。在通过Javaweb程序把结果可视化即可。后面的文章将一起学习如果使用Spark Streaming对日志进行处理。

    相关文章

      网友评论

        本文标题:Spark Streaming&Flume&Ka

        本文链接:https://www.haomeiwen.com/subject/chaftctx.html