在实际应用中经常需要去获取yarn中application的执行状态(比如某个application是submitted还是running或者finished),通常可以通过如下三种方式实现:
- 查看yarn web;
- 调用yarn提供的REST API;
- yarnclient API。
第一种方式是最常见的;第二种方式由于是单点,可能会存在单点故障,可靠性不高;第三种不常用但是可靠性高,本文讲述如何通过第三种方式来获取yarn 资源管理器中的application的执行状态。
1. application执行状态
hadoop-yarn模块中在枚举类org.apache.hadoop.yarn.api.records.YarnApplicationState中定义了被提交的application的执行状态,如下:
public enum YarnApplicationState {
/** Application which was just created. */
NEW,
/** Application which is being saved. */
NEW_SAVING,
/** Application which has been submitted. */
SUBMITTED,
/** Application has been accepted by the scheduler */
ACCEPTED,
/** Application which is currently running. */
RUNNING,
/** Application which finished successfully. */
FINISHED,
/** Application which failed. */
FAILED,
/** Application which was terminated by a user or admin. */
KILLED
}
此外还在同一个package下定义了枚举类FinalApplicationStatus来表示某个application的最终执行状态,状态包括:
public enum FinalApplicationStatus {
/** Undefined state when either the application has not yet finished */
UNDEFINED,
/** Application which finished successfully. */
SUCCEEDED,
/** Application which failed. */
FAILED,
/** Application which was terminated by a user or admin. */
KILLED
}
2. 业务代码(scala语言编写)
使用maven管理依赖,pom.xml中添加依赖
......
<properties>
<hadoop.version>2.6.0-cdh5.8.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
......
scala代码
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.YarnException
object ReadSparkJobStatus{
def main(args: Array[String]): Unit = {
val yarnConf = new YarnConfiguration
// YarnConfiguration继承自Configuration
val yarnClient = YarnClient.createYarnClient
yarnClient.init(yarnConf)
yarnClient.start()
try {
val applications = yarnClient.getApplications(java.util.EnumSet.of(YarnApplicationState.RUNNING))
// 获取running状态的applications
if (applications.size > 0) {
for(i <- 0 until applications.size()){
// 此处的applicationid、name、queue即yarn web界面application的信息,如图所示
println("ApplicationId ============> " +applications.get(i).getApplicationId)
println("name ============> " + applications.get(i).getName)
println("queue ============> " + applications.get(i).getQueue)
println("user ============> " + applications.get(i).getUser)
}
}
else println(">>>>>> no target applications found on yarn.")
} catch {
case e: YarnException =>
e.printStackTrace()
case e: IOException =>
e.printStackTrace()
}
yarnClient.stop()
}
}
yarn web界面application的信息
application的执行状态示例3. 运行结果
ApplicationId ============> application_1553758691889_1111
name ============> Spark shell
queue ============> root.users.user1
user ============> datalake
ApplicationId ============> application_1553758691889_1101
name ============> Spark shell
queue ============> root.users.user2
user ============> datalake
ApplicationId ============> application_1553758691889_1103
name ============> aps_sparkstreaming
queue ============> root.users.aps
user ============> aps
网友评论