上周搭起了公司的测试集群环境,本人使用的是 apache 版本的,在测试flume+kafka+spark的时候,在 Idea 上运行 spark 程序是没问题的,但是在把程序打成 jar 包之后却出现了问题,百度了两天,请教了几个大神,也没解决问题,这就有点不淡定了,周末都没有心情喝酒了,哈哈。
这次测试中总共出现了三大问题 》》》
一个是 :
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
...
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.StringDeserializer
...
第二个是:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/KafkaUtils$
...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaUtils$
...
第三个是:
java.lang.ClassNotFoundException: com.hw.stream.FlumeKafkaStream
问题的解决方案:
先看下我的集群组件:
image.png官网链接文档的api为:http://spark.apache.org/docs/2.3.3/streaming-kafka-0-10-integration.html
文档中提到的mvn如下:
image.png
刚开始,看到这里我也是感觉集群中的 kafka 版本不对,后来发现与kafka的版本无关,只要大于等于 10 版本就可以了,之所以打包后在集群上报 第一个问题 和 第二个问题 的错误主要是因为 pom.xml 中 包的引入不全 以及 build 设置问题,这里经过修改之后的 pom.xml 文件中所涉及到的依赖应如下:
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<!-- 注:以下两个包很关键,这是spark操作 kafka 的关键包,必须要引入,否则会包 问题一 和 问题二 的错误 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<!-- 这里要和上面的依赖包的版本一致 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
</dependencies>
接下来这个 build 属性的设置很关键:
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- maven 打包集成插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<!-- 将依赖一起打包到 JAR -->
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 配置主程序 java -jar 默认Class,这里设置为自己的主类的绝对路径 -->
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.hw.stream.FlumeKafkaStream</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
主程序的代码我们完全可以根据官网案例进行编写,创建一个 FlumeKafkaStream 的scala 的 object 文件:
package com.hw.stream
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
/**
* @author feiniu
* @create 2019-06-30 23:15
*/
object FlumeKafkaStream {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaStream6")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
//设置kafka的参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop101:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "666",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//注意:这里的主题要和你监控的主题一致
val topics = Array("topic_ccc")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(x => x.value())
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_+_)
//直接输出,当然这一步完全可以放入到数据库中去
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
别忘了在 resources 文件夹中放入log4j.properties文件,并进入 Project structure 做如下设置(否则即使妳在 pom.xml 中设置了主类路径以及 在 submit 指定了 --class 主类之后 同样还是报 第三个问题的错误,这个小事情有时候会搞死人的):
image.png接下来可以在 flume 的 conf 目录下设置监控的文件了 file-flume-kafka-producer.conf :
a1.sources= r1
a1.channels= c1
#配置数据源
a1.sources.r1.type=exec
#配置需要监控的日志输出文件或目录
a1.sources.r1.command=tail -F /opt/module/flume/mytestdata/yin.txt
a1.sources.r1.channels = c1
#配置数据通道
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_ccc
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
接下来,在 hadoop 集群以及 zookeeper 已经启动的情况下,启动 flume 和 kafka 服务:
#在采集服务器上启动 flume
bin/flume-ng agent \
--name a1 \
--conf conf \
--conf-file conf/file-flume-kafka-producer.conf \
-Dflume.root.logger=INFO,console
#三台服务器都启动 kafka
bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
此时,在 Idea 中启动程序,再在被监控的日记中添加数据即可看到控制台输出统计的数据了,添加数据命令如下:
echo "spark spark flume kafka kafka kafka spark flink" >> yin.txt
一般来说,运行本地的 Idea 是不会出错的,除非没导入完整的依赖,那么,接下来在我们打包的时候如果不注意可能会出现 问题一 和 问题二 那种情况,因此我们打包的时候必须要带上有关 kafka 的依赖包:
image.png最后我们再上传生成的 jar 包到服务去,并在 spark 中提交指令即可看到效果:
bin/spark-submit \
--class com.hw.stream.FlumeKafkaStream \
--master spark://hadoop101:7077 \
--executor-memory 4G \
--total-executor-cores 6 \
/opt/myjar/XXX.jar
到此为止,我们已经成功的完成了对集群组件以及相关的依赖包的简单测试;原本是不想写这篇文章的,只是这次耗的时候有点久,因此想记录一下,希望能帮到后来者。。。
网友评论