Flume+Kafka+Spark2.3.3 实战 之 序列化类

作者: 小飞牛_666 | 来源:发表于2019-07-01 15:04 被阅读83次

      上周搭起了公司的测试集群环境,本人使用的是 apache 版本的,在测试flume+kafka+spark的时候,在 Idea 上运行 spark 程序是没问题的,但是在把程序打成 jar 包之后却出现了问题,百度了两天,请教了几个大神,也没解决问题,这就有点不淡定了,周末都没有心情喝酒了,哈哈。

      这次测试中总共出现了三大问题  》》》
    一个是 :
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
    ...
    Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.StringDeserializer
    ...
    
    
    第二个是:
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/KafkaUtils$
    ...
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaUtils$
    ...
    
    
    第三个是:
    java.lang.ClassNotFoundException: com.hw.stream.FlumeKafkaStream
    
    问题的解决方案:

    先看下我的集群组件:

    image.png
    官网链接文档的api为:http://spark.apache.org/docs/2.3.3/streaming-kafka-0-10-integration.html
    文档中提到的mvn如下:
    image.png
      刚开始,看到这里我也是感觉集群中的 kafka 版本不对,后来发现与kafka的版本无关,只要大于等于 10 版本就可以了,之所以打包后在集群上报 第一个问题 和 第二个问题 的错误主要是因为 pom.xml 中 包的引入不全 以及 build 设置问题,这里经过修改之后的 pom.xml 文件中所涉及到的依赖应如下:
     <dependencies>
            <!-- Spark的依赖引入 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>
    
            <!-- 注:以下两个包很关键,这是spark操作 kafka 的关键包,必须要引入,否则会包 问题一 和 问题二 的错误 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>
    
            <!-- 这里要和上面的依赖包的版本一致 -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.2.2</version>
            </dependency>
        </dependencies>
    
    
      接下来这个 build 属性的设置很关键:
        <build>
            <sourceDirectory>src</sourceDirectory>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <!-- maven 打包集成插件 -->
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <descriptorRefs>
                            <!-- 将依赖一起打包到 JAR -->
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <!-- 配置主程序 java -jar 默认Class,这里设置为自己的主类的绝对路径 -->
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                                <mainClass>com.hw.stream.FlumeKafkaStream</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    
    
      主程序的代码我们完全可以根据官网案例进行编写,创建一个 FlumeKafkaStream 的scala 的 object 文件:
    package com.hw.stream
    
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    /**
      * @author feiniu
      * @create 2019-06-30 23:15
      */
    object FlumeKafkaStream {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaStream6")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val sc = spark.sparkContext
        val ssc = new StreamingContext(sc, Seconds(5))
    
        //设置kafka的参数
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "hadoop101:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "666",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        //注意:这里的主题要和你监控的主题一致
        val topics = Array("topic_ccc")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
    
        val lines = stream.map(x => x.value())
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_+_)
    
        //直接输出,当然这一步完全可以放入到数据库中去
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }
    
    
       别忘了在 resources 文件夹中放入log4j.properties文件,并进入 Project structure 做如下设置(否则即使妳在 pom.xml 中设置了主类路径以及 在 submit 指定了 --class 主类之后 同样还是报 第三个问题的错误,这个小事情有时候会搞死人的):
    image.png
      接下来可以在 flume 的 conf 目录下设置监控的文件了 file-flume-kafka-producer.conf :
    a1.sources= r1
    a1.channels= c1
    
    #配置数据源
    a1.sources.r1.type=exec
    #配置需要监控的日志输出文件或目录
    a1.sources.r1.command=tail -F /opt/module/flume/mytestdata/yin.txt
    a1.sources.r1.channels = c1
    
    #配置数据通道
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
    a1.channels.c1.kafka.topic = topic_ccc
    a1.channels.c1.parseAsFlumeEvent = false
    a1.channels.c1.kafka.consumer.group.id = flume-consumer
    
    
      接下来,在 hadoop 集群以及 zookeeper 已经启动的情况下,启动 flume 和 kafka 服务:
    #在采集服务器上启动 flume 
    bin/flume-ng agent \
    --name a1 \
    --conf conf  \
    --conf-file conf/file-flume-kafka-producer.conf  \
    -Dflume.root.logger=INFO,console
    
    #三台服务器都启动 kafka
    bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
    
      此时,在 Idea 中启动程序,再在被监控的日记中添加数据即可看到控制台输出统计的数据了,添加数据命令如下:
    echo "spark spark flume kafka kafka kafka spark flink" >> yin.txt
    
      一般来说,运行本地的 Idea 是不会出错的,除非没导入完整的依赖,那么,接下来在我们打包的时候如果不注意可能会出现 问题一 和 问题二 那种情况,因此我们打包的时候必须要带上有关 kafka 的依赖包:
    image.png

      最后我们再上传生成的 jar 包到服务去,并在 spark 中提交指令即可看到效果:

    bin/spark-submit \
    --class com.hw.stream.FlumeKafkaStream \
    --master spark://hadoop101:7077 \
    --executor-memory 4G \
    --total-executor-cores 6  \
    /opt/myjar/XXX.jar 
    
    

      到此为止,我们已经成功的完成了对集群组件以及相关的依赖包的简单测试;原本是不想写这篇文章的,只是这次耗的时候有点久,因此想记录一下,希望能帮到后来者。。。

    相关文章

      网友评论

        本文标题:Flume+Kafka+Spark2.3.3 实战 之 序列化类

        本文链接:https://www.haomeiwen.com/subject/eaiwcctx.html