美文网首页
spark streaming集成 kafka1.0

spark streaming集成 kafka1.0

作者: alaya_c09d | 来源:发表于2019-08-13 20:44 被阅读0次

    一.环境

    Ambari HDP 2.6.2.205
    Spark on Yarn

    二.Spark streaming集成 Kafka0.8升级到Kafka1.0

    1.spark-streaming-kafka-0.8

    maven配置

    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId>
                <version>2.0.2</version>
                <scope>provided</scope>
    </dependency>
    

    spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar 在HDFS中位置

    [umecron@umetrip-hdp26-xxxxxx ~]$ hadoop fs -ls /hdp/apps/2.6.2.0-205/spark2/
    Found 1 items
    -rwxrwxrwx   3 umecron hdfs  192983958 2019-08-13 19:24 /hdp/apps/2.6.2.0-205/spark2/spark2-hdp-yarn-archive.tar.gz
    
    

    spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar 也放置在各节点/usr/hdp/2.6.2.0-205/spark2/jars/ 目录下。
    Spark on yarn 模式下,优先从HDFS获取相关依赖jar包。

    各应用上线时不需要将 spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar 打入自己的jar包。

    2.升级为Kafka1.0

    参考https://spark.apache.org/docs/1.1.0/streaming-kafka-integration.html

    maven

             <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.1.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-core_2.11</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-streaming_2.11</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-tags_2.11</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.scala-lang</groupId>
                        <artifactId>scala-library</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.0.2</version>
                <scope>provided</scope>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-lang3</artifactId>
                        <groupId>org.apache.commons</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.0.2</version>
                <scope>provided</scope>
            </dependency>
    

    注意:spark-streaming-kafka-0-10_2.11-2.1.1.jar 与spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar不同。
    spark-streaming-kafka-0-10_2.11-2.1.1.jar 仅包含核心代码,不包含kafka-client等模块,所以不能仅将其放入HDFS路径下。
    spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar是一个fat包,所有依赖均在该jar中。

    两者大小如下:

    [umecron@umetrip-hdp26-147110 ~]$ du -sh /usr/hdp/2.6.2.0-205/spark2/jars/spark-streaming-kafka-0-*
    212K    /usr/hdp/2.6.2.0-205/spark2/jars/spark-streaming-kafka-0-10_2.11-2.1.1.jar
    11M     /usr/hdp/2.6.2.0-205/spark2/jars/spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar
    

    所以,升级到Kafka1.0时,需要按照上述maven配置,将spark-streaming-kafka-0-10_2.11-2.1.1.jar 以及其依赖的jar都打入自己应用jar包。

    3.遇到的问题

    开始时maven配置为

     <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.1.0</version>
     </dependency>
    

    提交到Yarn集群时报错:

    Log Type: stderr
    
    Log Upload Time: Thu Aug 08 09:28:04 +0800 2019
    
    Log Length: 6213
    
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/opt/appdata/disk03/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/appdata/disk03/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/log4j-slf4j-impl-2.11.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    19/08/08 09:20:51 ERROR umeflightstatusErr: FlightStatusRemoteServiceFactory ApplicationContextAware setApplicationContext !
    19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.maxRedirections to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=30000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.164:6380##10.237.65.165:6380##10.237.65.166:6379, redis.pool.maxWaitMillis=10000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.164-6379}
    19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.soTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=30000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.164:6380##10.237.65.165:6380##10.237.65.166:6379, redis.pool.maxWaitMillis=10000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.164-6379}
    19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.connectionTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=30000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.164:6380##10.237.65.165:6380##10.237.65.166:6379, redis.pool.maxWaitMillis=10000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.164-6379}
    19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.maxRedirections to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=3000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.188:6380##10.237.65.189:6380##10.237.65.190:6379, redis.pool.maxWaitMillis=1000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.188-6379}
    19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.soTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=3000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.188:6380##10.237.65.189:6380##10.237.65.190:6379, redis.pool.maxWaitMillis=1000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.188-6379}
    19/08/08 09:20:59 ERROR DisconfStoreFileProcessorImpl: cannot find redis.cluster.connectionTimeout to be injected. file content is: {redis.pool.testOnReturn=false, redis.pool.maxIdle=3000, redis.pool.jmxEnabled=true, redis.simple.host=, redis.sentinel.hosts=10.237.65.188:6380##10.237.65.189:6380##10.237.65.190:6379, redis.pool.maxWaitMillis=1000, redis.pool.minIdle=100, redis.pool.jmxNamePrefix=jedis-pool, redis.connectionType=sentinel, redis.pool.maxTotal=5000, redis.cluster.shardInfo=, redis.pool.testOnBorrow=true, redis.sentinel.master=sentinel-10.237.65.188-6379}
    Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    19/08/08 09:28:04 ERROR ApplicationMaster: RECEIVED SIGNAL TERM
    

    原因是实际调用的是Kafka0.8的接口,KafkaConsumer.subscribe()参数类型为String,而Kafka1.0中KafkaConsumer.subscribe()参数类型为Collection 。

    解决方式spark-submit 添加参数:

    --conf spark.driver.userClassPathFirst=true
    --conf spark.executor.userClassPathFirst=true
    

    然后出现了新的错误:

    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/opt/appdata/disk04/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/appdata/disk04/hadoop/yarn/local/filecache/17/spark2-hdp-yarn-archive.tar.gz/log4j-slf4j-impl-2.11.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    19/08/08 09:29:36 ERROR umeflightstatusErr: FlightStatusRemoteServiceFactory ApplicationContextAware setApplicationContext !
    19/08/08 09:29:40 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 9152079989358155551
    java.lang.ClassCastException: cannot assign instance of scala.concurrent.duration.FiniteDuration to field org.apache.spark.rpc.RpcTimeout.duration of type scala.concurrent.duration.FiniteDuration in instance of org.apache.spark.rpc.RpcTimeout
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:259)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:308)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:258)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:257)
        at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:577)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:562)
        at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:159)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)
    

    证明添加的参数起作用了,初始化成功,但反序列化时有问题。
    scala.concurrent.duration.FiniteDuration 该类属于scala-library。

    image.png

    通过依赖关系发现spark-streaming-kafka-0-10_2.11-->kafka_2.11-->scala-library, 且为<scope>compile</scope>,可能与Yarn集群中冲突。所以将spark-streaming-kafka-0-10_2.11依赖的scala-library及其他spark包全部排除。问题解决。

    相关文章

      网友评论

          本文标题:spark streaming集成 kafka1.0

          本文链接:https://www.haomeiwen.com/subject/itxijctx.html