环境准备
- spark 集群在linux 服务器,个人代码在本地调试,不希望每次就 打jar 包在上传再运行
- 使用spark Java 的SparkLauncher,本地提交任务到集群,java 中获取 appId 并监控任务状态。
- 需要在本地搭建 hadoop+ spark 的环境,并且拷贝 cluster 的配置到本地,覆盖本机的配置。
- 是否成功的检验方法是,在本地运行spark-submit 看能否提交任务。
spark Launcher 的原理。
- 本质上是从java 程序里面 构建 相应的参数,然后 在java 开启线程调用 exec(cmd)命令,并获取输出。
SparkLauncher launcher = new SparkLauncher()
.setAppName(job.getName())
.setAppResource(sparkConfig.getJarPath())
.setMainClass(job.getMainClass())
.setMaster(sparkConfig.getMaster())
.setDeployMode(sparkConfig.getDeployMode());
if( ! isWindows())
{
//can only work in linux
launcher.setConf("spark.driver.memory", sparkConfig.getDriverMemory())
.setConf("spark.executor.memory", sparkConfig.getExecutorMemory())
.setConf("spark.executor.cores", sparkConfig.getExecutorCores());
}
- 提交并且监控状态
SparkAppHandle handle = launcher.setVerbose(true).startApplication(new SparkAppHandle.Listener()
{
@Override
public void stateChanged(SparkAppHandle sparkAppHandle)
{
log.info("appId {} stateChanged:{}", sparkAppHandle.getAppId(), sparkAppHandle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle)
{
log.info("appId {} infoChanged:{}", sparkAppHandle.getAppId(), sparkAppHandle.getState().toString());
}
});
log.info("The task is executing, please wait ....");
遇到的坑
if( ! isWindows())
{
//can only work in linux
launcher.setConf("spark.driver.memory", sparkConfig.getDriverMemory())
.setConf("spark.executor.memory", sparkConfig.getExecutorMemory())
.setConf("spark.executor.cores", sparkConfig.getExecutorCores());
}
在window 环境下,设置 setConf的时候会有问题,主要是 sparkLauncher会把这个参数
用 双引号 引用起来"spark.executor.memory=4G", 当这个传给windows cmd的时候, 就会报错。
所以在window下面,才有一个这样的判断。
网友评论