本文主要参考了如下两篇博文:
Windows下IntelliJ IDEA中调试Spark Standalone
怎么解决java.lang.NoClassDefFoundError错误
一、通过IDEA连接远程集群运行应用就是个大坑。。
写在2017年10月3日凌晨0点57分:
最初这篇博文的名字是“IDEA开发Spark应用并提交远程Spark 2.1.0 standalone集群运行”,当时以不能SSH免密码登录远程主机为由,直接连接本地集群运行应用,后来又尝试了一下通过IDEA连接部署在实验室服务器上的Spark集群来运行应用,然后就发现这里实在是坑太多。如果部署模式是“client”,则会在本地主机起一个driver进程,这就需要本地主机和集群所有节点都能够互相SSH免密码登录!!如果部署模式是“cluster”,然后IDEA运行项里配置“VM options”为-Dspark.submit.deployMode=cluster,结果很不幸,driver依然在本机运行,然后执行器和驱动器无法互相通信。
通过spark-submit脚本,设置部署模式为cluster,这样driver会在某一个worker上启动,然而我们的jar包依然在本机,还是需要序列化分发到各个节点上,然而这样就需要本地主机可以免密码SSH登录所有集群节点!!
当然本地开发测试还是可以通过IDEA连接本地集群运行应用,毕竟借助IDEA,我们只需点一下“运行”即可,还是方便很多。所以啊,本文决定把题目和内容改一下,把远程改成本地!
二、前言
虽然之前也有许多博文讲如何基于IDEA来开发运行Spark应用,但是他们大多数都以本地模式运行(例如:conf.setMaster("local[*]")),而本地模式只是在本机以多线程方式运行Spark应用,这与standalone模式运行有很大差别。
此外,虽然也有一些博文介绍了如何使用IDEA将开发的Spark应用提交到本地Spark集群执行,但是这些博文涉及的Spark版本都比较老,较新版本的Spark(例如2.1.0及以后)取消了lib目录,也就是说新版本没有assembly jar包(例如:spark-assembly-1.5.2-hadoop2.6.0.jar),取而代之的是jars目录,这就使得在新版本里,依赖包的添加发生了一些变化。
综上所述,还是很有必要重新写一篇关于IDEA提交Spark应用到本地集群的博文,这篇博文关注点在于如何让IDEA连接Spark 2.1.0 standalone集群。
这篇博文首先介绍了几种Spark应用开发和运行的方式,然后介绍了如何通过IDEA连接Spark standalone集群来开发运行Spark应用,最后对可能发生的错误给出了解决方法。
注意:由于大部分情况下,我们不能随意访问远程主机,例如不能直接ssh免密码登录远程主机,所以本文只是将IDEA连接到了本机的Spark 2.1.0 standalone集群,也就是将Spark应用提交到本地集群运行。
三、开发环境
操作系统:CentOS 7
Spark:2.1.0
IDEA:社区版2016.3.1
Scala:2.11.8
JDK:1.8.0_111
四、Spark应用开发运行综述
1、开发
用Scala开发Spark应用,官方推荐使用SBT项目构建工具,当然由于Scala基于JVM,所以使用Maven也是可以的。有时我们想在项目中混合使用Java和Scala,这时Maven就成了最佳选择,当然这个时候就需要在pom.xml中配置Java和Scala的编译工具。
可以纯粹使用项目构建工具SBT或者Maven来完成Spark开发(不借助IDE),包括按照模板创建项目、编译、构建项目、打包和发布等等。当然也可以借助IDEA集成的SBT和Maven插件来高效完成上述工作(IDE+项目构建工具)。
当然这篇博文只使用IDEA来完成上述工作,没有使用项目构建工具。
2、运行
开发完项目后当然就要运行我们的Spark应用了,运行也有好几种方式。
(1)第一种方式主要用于开发测试完毕,准备线上运行。借助IDEA或者使用项目构建工具将Spark应用打成Jar包,接着上传到服务器(windows使用FileZilla,Linux使用scp命令),然后通过spark-submit脚本连接集群(通过master的URL可以连接到不同集群)并运行Spark应用。
(2)第二种方式是本地测试。打包后直接提交本地Spark standalone集群(伪分布式,本机运行master和worker)。
(3)开发时会频繁地运行Spark应用,前面两种方式就显得比较麻烦(需要打包、上传、提交),第三种方式用于开发过程中。通过IDEA连接Spark standalone集群,只需点击IDEA的“运行”按钮即可运行Spark应用(不需要手动打包,不需要上传到服务器,不需要手动运行spark-submit脚本)。当然也可以以local模式运行(可以在应用程序中使用conf.setMaster("local[*]")或者在IDEA运行项中配置“VM options”为-Dspark.master=local[*],然后点击IDEA的“运行”按钮),此时在本地以多线程方式运行Spark应用,local模式不需要开启Spark集群。
下面介绍如何通过IDEA连接Spark standalone集群,来快速开发调试Spark应用。
五、通过IDEA将开发的Spark应用提交到本地Spark 2.1.0 standalone集群运行
1、前提要求(版本信息见“开发环境”)
(1)需要有一个Spark standalone集群,不需要hdfs,能启动运行即可。
(2)IntelliJ IDEA、Scala插件、Java JDK、Scala SDK都已安装配置完毕。
2、创建项目
新建一个Scala项目。启动IntelliJ IDEA,选择New Project,然后选择Scala,点击下一步,接着配置项目名称、SDK版本和项目位置等。注意:IDEA中项目的概念类似于Eclipse中的工作区(workspace),而IDEA中模块(module)的概念类似于Eclipse中的项目。
图1 新建一个Scala项目 图2 配置项目(名称、SDK和位置等)3、添加依赖的Jar包
如前文所述,较新版本的Spark(例如2.1.0及以后)取消了lib目录,取而代之的是jars目录,所以这里依赖包的添加相比于旧版本发生了一些变化。
首先按如下步骤(图3、图4和图5)为我们开发的Spark应用添加依赖,把Spark 2.1.0中的jars目录整个添加进来(这样jars目录里所有的jar包都会被添加到classpath中),并调整它的scope为“编译”(这样我们后面打的包会包括jars目录里所有的jar包)。
看到这里,读者可能会心生疑惑,我们开发的应用事实上用不着这么多的jar包,例如关于机器学习或者流计算的jar包我们肯定用不上,另外,这些jar包Spark集群中都有,为什么还要把scope设为“编译”呢?
如果是手动打包然后通过spark-submit脚本运行应用的话,确实只需添加几个jar包即可,而且scope还应该设为“provided”,这样可以减少最终生成jar包的大小。然而通过IDEA提交Spark应用到本地集群,还就必须按前述步骤做,否则会报java.lang.NoClassDefFoundError的错误(不把整个jars目录添加到classpath中会报错,不把它的scope设为“编译”也会报错)。
java.lang.ClassNotFoundException和java.lang.NoClassDefFoundError这两种错误看起来有点像,但它们是完全不同的。NoClassDefFoundError是指JVM运行时无法在classpath中找到对应的类进行加载;而ClassNotFoundException是指编译的时候classpath中找不到对应的类。
图3 为Spark应用添加依赖 图4 把Spark 2.1.0中的jars目录整个添加进来 图5 调整scope为“编译”4、完成代码
在src目录下新建一个名为WordCount的object,这是我们的main class。接着在源文件中输入Spark的HelloWorld程序——WordCount。因为这里连接的是本地Spark 2.1.0 standalone集群,所以直接从本地读取文件。
有两点需要注意:
(1)因为要提交Spark应用到本地集群运行,所以需要配置master的URL,可以在应用程序中使用conf.setMaster("spark://host:port")或者在IDEA运行项中配置“VM options”为-Dspark.master=spark://host:port,“host”就是master所在的主机名,如果没有在/etc/hosts中建立主机名和IP的映射关系,则直接使用IP。
(2)通过conf.setJars(List("/path/to/xxx.jar")),告诉Spark集群我们要提交的包含作业代码的jar包在哪里,记住路径中千万别包含中文,不然会出错。
图6 在src目录下新建WordCount.scala 图7 输入WordCount代码5、配置打包
即便是通过IDEA提交Spark应用到本地集群执行,我们还是需要配置打包。Spark应用之所以能够分布式运行,正是因为jar包被分发到了各个worker之中。
我们将应用程序打成可执行jar包,并为它设置main class,然后再把“构建项目自动打包”的选项勾上,这样才能做到点击“运行”就提交到本地集群运行。
图8 将Spark应用打包 图9 可执行jar包,配置main class 图10 构建项目时自动打包6、配置IDEA运行项
这里结合spark-submit脚本,说明一下IDEA是怎么做到一键提交Spark应用到本地集群执行。当我们点击“运行”按钮,IDEA会自动构建项目,并重新打包;然后根据应用程序中的conf.setJars(List("/path/to/xxx.jar")),可以找到需要提交到集群的jar包;根据conf.setMaster("spark://host:port")或者IDEA运行项中“VM options”的-Dspark.master=spark://host:port连接到Spark standalone集群,这相当于配置spark-submit脚本的--master;然后IDEA运行项中的“Main class”指定了Spark应用的main class,这相当于配置spark-submit脚本的--class;如果还想为Spark应用做一些其他配置,可以通过conf.set("xxx", "XXX")或者“VM options”添加一些其他配置(形如-Dxxx.yyy=zzz);传给Spark应用main方法的参数在“Program arguments”中输入。
所以使用IDEA连接集群,通过IDEA将Spark应用提交本地集群执行,其实就是IDEA替我们完成了打包和执行spark-submit脚本的工作。
图11 配置IDEA运行项7、启动Spark集群
在点击“运行”按钮之前,需要确保目标本地Spark standalone集群已经启动。
注意:我们开发时使用的Spark版本要和本地集群一致。
8、运行,观察IDEA控制台结果,观察Spark Web UI。
终于到了最后一步,点击“运行”按钮,见证奇迹吧(2333333......)。
可以看到,控制台成功输出了词频统计。也可以打开Spark Web UI,详细看一下应用执行情况。
图12 控制台输出结果 图13 Spark Web UI查看应用详细执行情况六、可能出现的错误
1、可能会出现如下错误:创建SparkContext时出现异常
java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (starting from 0)! Consider explicitly setting the appropriate port for the service 'sparkDriver '(for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries.
解决方法:在IDEA运行项“VM options”中加入“-Dspark.driver.host=127.0.0.1”和“-Dspark.driver.port=7077” 这两项, 配置driver进程在本机的7077端口运行。
2、还可能出现这个错误:Hadoop权限错误
org.apache.hadoop.security.AccessControlException: Permission denied: user=yourUsername, access=WRITE, inode="/user/otherUsername/":otherUsername:supergroup:drwxr-xr-x
这是因为,HDFS中每个用户都有自己的home目录(与Linux系统一致),用户自己的目录只有该用户拥有写权限,远程连接Spark集群运行应用时,若使用本机用户尝试对其他用户在HDFS中的home目录进行写操作,会出现这个问题。
解决办法:
(1)修改Hadoop 配置文件hdfs-site.xml,设置dfs.permissions.enabled为false即可,这样不会再有权限限制,具体如图14所示。当然,开发测试完了最好还是把这个权限限制机制打开。
图14 修改Hadoop配置文件hdfs-site.xml(2)使用和远程集群一致的用户,或者在同名用户home目录下进行写操作。
转载请注明如下内容:
文章来自简书,作者:就是杨宗
原文链接:http://www.jianshu.com/p/b4e4658c459c
网友评论
我的邮箱:1054459304@qq.com
出现这种错误:18/02/06 11:58:49 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/02/06 11:58:51 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
18/02/06 11:58:53 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
18/02/06 11:58:55 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("spark://192.168.1.110:7077")
.setAppName("WordCount")
.setJars(Seq("out/artifacts/WordCountExample_jar/WordCountExample.jar"))
val sc = new SparkContext(conf)
val inputRdd = sc.textFile("hdfs://192.168.1.110:8020/data/words.log")
val wordCountRDD = inputRdd.flatMap(_.split("\t")).map((_, 1)).reduceByKey(_ + _).sortBy(_._2, false)
val result = wordCountRDD.collect()
for (pair <- result) {
println(pair)
}
sc.stop()
}
}
我的scala版本:2.1.6
spark版本:spark-2.2.1-bin-hadoop2.7.tgz
hadoop版本:2.7.3
18/01/20 01:33:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.113, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
...
能抽空帮我看看问题吗??
邮箱 372402715@qq.com
非常感谢