美文网首页
Spark 使用 Redisson 读写 Redis 集群遇到的

Spark 使用 Redisson 读写 Redis 集群遇到的

作者: lei_charles | 来源:发表于2019-10-16 08:42 被阅读0次
  1. 遇到的相关问题
    1. 问题一:

      由于Spark2 环境使用的 netty-all-4.0.43.Final.jar 与 redisson 中的 netty-all-4.1.41.Final.jar 冲突,直接将 redisson 的 jar 包打包进项目中运行会报以下异常。

      Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: io.netty.util.internal.StringUtil.indexOfNonWhiteSpace(Ljava/lang/CharSequence;I)I
          at io.netty.resolver.dns.UnixResolverDnsServerAddressStreamProvider.parse(UnixResolverDnsServerAddressStreamProvider.java:175)
          at io.netty.resolver.dns.UnixResolverDnsServerAddressStreamProvider.<init>(UnixResolverDnsServerAddressStreamProvider.java:94)
          at io.netty.resolver.dns.UnixResolverDnsServerAddressStreamProvider.<init>(UnixResolverDnsServerAddressStreamProvider.java:128)
          at io.netty.resolver.dns.UnixResolverDnsServerAddressStreamProvider.parseSilently(UnixResolverDnsServerAddressStreamProvider.java:70)
          at io.netty.resolver.dns.DnsServerAddressStreamProviders$1.provider(DnsServerAddressStreamProviders.java:57)
          at io.netty.resolver.dns.DnsServerAddressStreamProviders$1.<init>(DnsServerAddressStreamProviders.java:36)
          at io.netty.resolver.dns.DnsServerAddressStreamProviders.<clinit>(DnsServerAddressStreamProviders.java:34)
          at org.redisson.connection.MasterSlaveConnectionManager.<init>(MasterSlaveConnectionManager.java:208)
          at org.redisson.cluster.ClusterConnectionManager.<init>(ClusterConnectionManager.java:94)
          at org.redisson.config.ConfigSupport.createConnectionManager(ConfigSupport.java:200)
          at org.redisson.Redisson.<init>(Redisson.java:120)
          at org.redisson.Redisson.create(Redisson.java:160)
          at com.hdjt.bigdata.passengerFlow.RedissonUtils$.getRedissonClient(RedissonUtils.scala:30)
          at com.hdjt.bigdata.passengerFlow.RedissonUtils$.getRedissonClient(RedissonUtils.scala:16)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$$anonfun$main$1.apply(PassengerFlowMatchApp.scala:64)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$$anonfun$main$1.apply(PassengerFlowMatchApp.scala:58)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
          at scala.util.Try$.apply(Try.scala:192)
          at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      19/10/14 13:52:20 INFO scheduler.JobScheduler: Added jobs for time 1571032340000 ms
      19/10/14 13:52:20 INFO scheduler.JobScheduler: Starting job streaming job 1571032340000 ms.0 from job set of time 1571032340000 ms
      Exception in thread "streaming-job-executor-1" java.lang.NoClassDefFoundError: Could not initialize class io.netty.resolver.dns.DnsServerAddressStreamProviders
          at org.redisson.connection.MasterSlaveConnectionManager.<init>(MasterSlaveConnectionManager.java:208)
          at org.redisson.cluster.ClusterConnectionManager.<init>(ClusterConnectionManager.java:94)
          at org.redisson.config.ConfigSupport.createConnectionManager(ConfigSupport.java:200)
          at org.redisson.Redisson.<init>(Redisson.java:120)
          at org.redisson.Redisson.create(Redisson.java:160)
          at com.hdjt.bigdata.passengerFlow.RedissonUtils$.getRedissonClient(RedissonUtils.scala:30)
          at com.hdjt.bigdata.passengerFlow.RedissonUtils$.getRedissonClient(RedissonUtils.scala:16)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$$anonfun$main$1.apply(PassengerFlowMatchApp.scala:64)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$$anonfun$main$1.apply(PassengerFlowMatchApp.scala:58)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
          at scala.util.Try$.apply(Try.scala:192)
          at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      
    2. 问题二:

      先将与 netty 相关的 jar 从 redisson 的 jar 包排除(方法如下), 再将项目打包并提交运行,会报异常信息如下。

      排除 netty 相关的 jar

      <dependency>
          <groupId>org.redisson</groupId>
          <artifactId>redisson</artifactId>
          <exclusions>
              <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-common</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-codec</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-codec-dns</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-buffer</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-transport</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-handler</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-resolver-dns</artifactId>
              </exclusion>
          </exclusions>
          <version>3.11.4</version>
      </dependency>
      

      异常信息:

      Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: io/netty/resolver/AddressResolverGroup
          at org.redisson.config.Config.<init>(Config.java:92)
          at com.hdjt.bigdata.passengerFlow.RedissonUtils$.getRedissonClient(RedissonUtils.scala:23)
          at com.hdjt.bigdata.passengerFlow.RedissonUtils$.getRedissonClient(RedissonUtils.scala:16)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$$anonfun$main$1.apply(PassengerFlowMatchApp.scala:64)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$$anonfun$main$1.apply(PassengerFlowMatchApp.scala:58)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
          at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
          at scala.util.Try$.apply(Try.scala:192)
          at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
          at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.ClassNotFoundException: io.netty.resolver.AddressResolverGroup
          at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
          ... 24 more
      
    3. 问题三:

      先将与 netty 相关的 jar 从 redisson 的 jar 包排除(方法如下), 并将 Spark2 环境中的 netty 替换为与 redisson 所依赖 netty 的同一版本,然后再将项目打包并提交运行,无法提交 Spark 程序到 Yarn ,会报异常信息如下。

      19/10/14 14:10:03 ERROR util.Utils: Uncaught exception in thread main
      java.lang.NullPointerException
          at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:141)
          at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1485)
          at org.apache.spark.SparkEnv.stop(SparkEnv.scala:90)
          at org.apache.spark.SparkContext$$anonfun$stop$11.apply$mcV$sp(SparkContext.scala:1937)
          at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
          at org.apache.spark.SparkContext.stop(SparkContext.scala:1936)
          at org.apache.spark.SparkContext.<init>(SparkContext.scala:587)
          at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
          at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
          at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
          at scala.Option.getOrElse(Option.scala:121)
          at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$.main(PassengerFlowMatchApp.scala:41)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp.main(PassengerFlowMatchApp.scala)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
          at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
          at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
          at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
          at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      19/10/14 14:10:03 INFO spark.SparkContext: Successfully stopped SparkContext
      Exception in thread "main" org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
          at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
          at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
          at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173)
          at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
          at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
          at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
          at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
          at scala.Option.getOrElse(Option.scala:121)
          at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp$.main(PassengerFlowMatchApp.scala:41)
          at com.hdjt.bigdata.passengerFlow.PassengerFlowMatchApp.main(PassengerFlowMatchApp.scala)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
          at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
          at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
          at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
          at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      
  2. 解决方法:

    利用 Maven 打包插件 maven-shade-plugin 将 redisson 中依赖的 netty 包名重新定位,比如改为 my.redisson.io.netty, 插件配置如下。然后将项目打包并提交运行,便可正常运行。

    提醒:
    在使用上述方式解决之前,本人还尝试了使用在提交 Spark 程序脚本中使用优先使用用户添加的 jar spark.executor.userClassPathFirst=truespark.driver.userClassPathFirs=true 参数,但任然无法解决由于 jar 引起的冲突问题。

    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <relocations>
                                <relocation>
                                    <pattern>io.netty</pattern>
                                    <shadedPattern>my.redisson.io.netty</shadedPattern>
                                </relocation>
                            </relocations>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

相关文章

网友评论

      本文标题:Spark 使用 Redisson 读写 Redis 集群遇到的

      本文链接:https://www.haomeiwen.com/subject/xnzpmctx.html