Flume+Kafka+Spark2.3.3 实战 之 序列化类

作者: 小飞牛_666 | 来源:发表于2019-07-01 15:04 被阅读83次

  上周搭起了公司的测试集群环境,本人使用的是 apache 版本的,在测试flume+kafka+spark的时候,在 Idea 上运行 spark 程序是没问题的,但是在把程序打成 jar 包之后却出现了问题,百度了两天,请教了几个大神,也没解决问题,这就有点不淡定了,周末都没有心情喝酒了,哈哈。

  这次测试中总共出现了三大问题  》》》
一个是 :
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
...
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.StringDeserializer
...

第二个是:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/KafkaUtils$
...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaUtils$
...

第三个是:
java.lang.ClassNotFoundException: com.hw.stream.FlumeKafkaStream
问题的解决方案:

先看下我的集群组件:

image.png
官网链接文档的api为:http://spark.apache.org/docs/2.3.3/streaming-kafka-0-10-integration.html
文档中提到的mvn如下:
image.png
  刚开始,看到这里我也是感觉集群中的 kafka 版本不对,后来发现与kafka的版本无关,只要大于等于 10 版本就可以了,之所以打包后在集群上报 第一个问题 和 第二个问题 的错误主要是因为 pom.xml 中 包的引入不全 以及 build 设置问题,这里经过修改之后的 pom.xml 文件中所涉及到的依赖应如下:
 <dependencies>
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <!-- 注:以下两个包很关键,这是spark操作 kafka 的关键包,必须要引入,否则会包 问题一 和 问题二 的错误 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <!-- 这里要和上面的依赖包的版本一致 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.2</version>
        </dependency>
    </dependencies>

  接下来这个 build 属性的设置很关键:
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- maven 打包集成插件 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <!-- 将依赖一起打包到 JAR -->
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 配置主程序 java -jar 默认Class,这里设置为自己的主类的绝对路径 -->
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.hw.stream.FlumeKafkaStream</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>


  主程序的代码我们完全可以根据官网案例进行编写,创建一个 FlumeKafkaStream 的scala 的 object 文件:
package com.hw.stream

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

/**
  * @author feiniu
  * @create 2019-06-30 23:15
  */
object FlumeKafkaStream {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaStream6")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(5))

    //设置kafka的参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop101:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "666",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    //注意:这里的主题要和你监控的主题一致
    val topics = Array("topic_ccc")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val lines = stream.map(x => x.value())
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_+_)

    //直接输出,当然这一步完全可以放入到数据库中去
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

  }

}

   别忘了在 resources 文件夹中放入log4j.properties文件,并进入 Project structure 做如下设置(否则即使妳在 pom.xml 中设置了主类路径以及 在 submit 指定了 --class 主类之后 同样还是报 第三个问题的错误,这个小事情有时候会搞死人的):
image.png
  接下来可以在 flume 的 conf 目录下设置监控的文件了 file-flume-kafka-producer.conf :
a1.sources= r1
a1.channels= c1

#配置数据源
a1.sources.r1.type=exec
#配置需要监控的日志输出文件或目录
a1.sources.r1.command=tail -F /opt/module/flume/mytestdata/yin.txt
a1.sources.r1.channels = c1

#配置数据通道
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_ccc
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

  接下来,在 hadoop 集群以及 zookeeper 已经启动的情况下,启动 flume 和 kafka 服务:
#在采集服务器上启动 flume 
bin/flume-ng agent \
--name a1 \
--conf conf  \
--conf-file conf/file-flume-kafka-producer.conf  \
-Dflume.root.logger=INFO,console

#三台服务器都启动 kafka
bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
  此时,在 Idea 中启动程序,再在被监控的日记中添加数据即可看到控制台输出统计的数据了,添加数据命令如下:
echo "spark spark flume kafka kafka kafka spark flink" >> yin.txt
  一般来说,运行本地的 Idea 是不会出错的,除非没导入完整的依赖,那么,接下来在我们打包的时候如果不注意可能会出现 问题一 和 问题二 那种情况,因此我们打包的时候必须要带上有关 kafka 的依赖包:
image.png

  最后我们再上传生成的 jar 包到服务去,并在 spark 中提交指令即可看到效果:

bin/spark-submit \
--class com.hw.stream.FlumeKafkaStream \
--master spark://hadoop101:7077 \
--executor-memory 4G \
--total-executor-cores 6  \
/opt/myjar/XXX.jar 

  到此为止,我们已经成功的完成了对集群组件以及相关的依赖包的简单测试;原本是不想写这篇文章的,只是这次耗的时候有点久,因此想记录一下,希望能帮到后来者。。。

相关文章

网友评论

    本文标题:Flume+Kafka+Spark2.3.3 实战 之 序列化类

    本文链接:https://www.haomeiwen.com/subject/eaiwcctx.html