美文网首页
spark streaming集成 kafka1.0

spark streaming集成 kafka1.0

作者: alaya_c09d | 来源:发表于2019-08-13 20:44 被阅读0次

一.环境

Ambari HDP 2.6.2.205
Spark on Yarn

二.Spark streaming集成 Kafka0.8升级到Kafka1.0

1.spark-streaming-kafka-0.8

maven配置

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId>
            <version>2.0.2</version>
            <scope>provided</scope>
</dependency>

spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar 在HDFS中位置

[umecron@umetrip-hdp26-xxxxxx ~]$ hadoop fs -ls /hdp/apps/2.6.2.0-205/spark2/
Found 1 items
-rwxrwxrwx   3 umecron hdfs  192983958 2019-08-13 19:24 /hdp/apps/2.6.2.0-205/spark2/spark2-hdp-yarn-archive.tar.gz

spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar 也放置在各节点/usr/hdp/2.6.2.0-205/spark2/jars/ 目录下。
Spark on yarn 模式下,优先从HDFS获取相关依赖jar包。

各应用上线时不需要将 spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar 打入自己的jar包。

2.升级为Kafka1.0

参考https://spark.apache.org/docs/1.1.0/streaming-kafka-integration.html

maven

         <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.11</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming_2.11</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-tags_2.11</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.2</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>commons-lang3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.0.2</version>
            <scope>provided</scope>
        </dependency>

注意:spark-streaming-kafka-0-10_2.11-2.1.1.jar 与spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar不同。
spark-streaming-kafka-0-10_2.11-2.1.1.jar 仅包含核心代码,不包含kafka-client等模块,所以不能仅将其放入HDFS路径下。
spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar是一个fat包,所有依赖均在该jar中。

两者大小如下:

[umecron@umetrip-hdp26-147110 ~]$ du -sh /usr/hdp/2.6.2.0-205/spark2/jars/spark-streaming-kafka-0-*
212K    /usr/hdp/2.6.2.0-205/spark2/jars/spark-streaming-kafka-0-10_2.11-2.1.1.jar
11M     /usr/hdp/2.6.2.0-205/spark2/jars/spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar

所以,升级到Kafka1.0时,需要按照上述maven配置,将spark-streaming-kafka-0-10_2.11-2.1.1.jar 以及其依赖的jar都打入自己应用jar包。

3.遇到的问题

开始时maven配置为

 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
 </dependency>

提交到Yarn集群时报错:

Log Type: stderr

Log Upload Time: Thu Aug 08 09:28:04 +0800 2019

Log Length: 6213

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/appdata/disk03/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/appdata/disk03/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/log4j-slf4j-impl-2.11.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/08/08 09:20:51 ERROR umeflightstatusErr: FlightStatusRemoteServiceFactory ApplicationContextAware setApplicationContext !
19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.maxRedirections to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=30000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.164:6380##10.237.65.165:6380##10.237.65.166:6379, redis.pool.maxWaitMillis=10000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.164-6379}
19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.soTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=30000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.164:6380##10.237.65.165:6380##10.237.65.166:6379, redis.pool.maxWaitMillis=10000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.164-6379}
19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.connectionTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=30000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.164:6380##10.237.65.165:6380##10.237.65.166:6379, redis.pool.maxWaitMillis=10000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.164-6379}
19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.maxRedirections to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=3000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.188:6380##10.237.65.189:6380##10.237.65.190:6379, redis.pool.maxWaitMillis=1000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.188-6379}
19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.soTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=3000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.188:6380##10.237.65.189:6380##10.237.65.190:6379, redis.pool.maxWaitMillis=1000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.188-6379}
19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.connectionTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=3000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.188:6380##10.237.65.189:6380##10.237.65.190:6379, redis.pool.maxWaitMillis=1000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.188-6379}
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
    at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
    at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
19/08/08 09:28:04 ERROR ApplicationMaster: RECEIVED SIGNAL TERM

原因是实际调用的是Kafka0.8的接口,KafkaConsumer.subscribe()参数类型为String,而Kafka1.0中KafkaConsumer.subscribe()参数类型为Collection 。

解决方式spark-submit 添加参数:

--conf spark.driver.userClassPathFirst=true
--conf spark.executor.userClassPathFirst=true

然后出现了新的错误:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/appdata/disk04/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/appdata/disk04/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/log4j-slf4j-impl-2.11.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/08/08 09:29:36 ERROR umeflightstatusErr: FlightStatusRemoteServiceFactory ApplicationContextAware setApplicationContext !
19/08/08 09:29:40 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 9152079989358155551
java.lang.ClassCastException: cannot assign instance of scala.concurrent.duration.FiniteDuration to field org.apache.spark.rpc.RpcTimeout.duration of type scala.concurrent.duration.FiniteDuration in instance of org.apache.spark.rpc.RpcTimeout
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:259)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:308)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:258)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:257)
    at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:577)
    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:562)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:159)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:748)

证明添加的参数起作用了,初始化成功,但反序列化时有问题。
scala.concurrent.duration.FiniteDuration 该类属于scala-library。

image.png

通过依赖关系发现spark-streaming-kafka-0-10_2.11-->kafka_2.11-->scala-library, 且为<scope>compile</scope>,可能与Yarn集群中冲突。所以将spark-streaming-kafka-0-10_2.11依赖的scala-library及其他spark包全部排除。问题解决。

相关文章

网友评论

      本文标题:spark streaming集成 kafka1.0

      本文链接:https://www.haomeiwen.com/subject/itxijctx.html