Yarn上运行Hello World

作者: pq217 | 来源:发表于2022-11-23 20:52 被阅读0次

    前言

    上文提到yarn类似一个分布式操作系统,那么我们就可以自定义写一些应用在这个操作系统上运行

    当然也不能太过随意写,我们要运行在操作系统上就必然要遵守操作系统本身的规矩

    Yarn

    Yarn体系中,用户的主程序被称作ApplicationMaster,当然我们可以在ApplicationMaster中继续向RM申请资源来执行子程序,比如MapReduce中的MapTask和ReduceTask都属于子程序。

    这就好比我们平时写java,在main方法主线程中可以创建子线程跑一些逻辑

    • linux/windows中,我们创建java子线程不需要关心这个线程任务到底由哪个cpu完成,任务交给操作系统来调度
    • 同理yarn中,ApplicationMaster申请创建出来的子程序,我们不用考虑程序运行在哪台机器上,任务交给yarn来调度

    Hello World

    接下来我们就尝试写一个简单应用(输出Hello World),运行在yarn中,我们先不考虑使用子程序,直接在ApplicationMaster中输出Hello World

    ApplicationMaster

    写一个Hello World应用再简单不过了:

    public class MyAppMaster {
        public static void main(String[] args) {
            System.out.println("HELLO WORLD");
        }
    }
    

    但还是那句话,在yarn上运行就要遵守人家的规矩,而yarn规定:

    ApplicationMaster程序运行前需要向RM注册,运行结束后需要取消注册

    也就是说程序不是你想跑就能跑,你得告诉人家资源管理器一声,否则人家队伍怎么带?

    注册的相关逻辑如果真自己写还挺复杂,但好在hadoop为我们提供了客户端工具,我们引入依赖就方便了

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.1.3</version>
    </dependency>
    

    最终ApplicationMaster代码如下(就是增加了注册到RM和取消注册)

    public class MyAppMaster {
    
        /**
         * AppMaster 程序入口
         * @param args 执行参数
         */
        public static void main(String[] args) {
            MyAppMaster master = new MyAppMaster();
            master.run();
        }
    
        /**
         * AppMaster 运行
         */
        public void run() {
            try {
                // 开启am-rm client,建立rm-am的通道,用于注册AM
                AMRMClientAsync amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, null);
                amRmClient.init(new Configuration());
                amRmClient.start();
                String hostName = NetUtils.getHostname();
                // 注册至RM
                amRmClient.registerApplicationMaster(hostName, -1, null);
                // 运行程序
                doRun();
                // 解除注册
                amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 实际运行程序,就一个输出
         */
        private void doRun() {
            System.out.println("HELLO WORLD");
        }
    }
    

    到此我们的应用程序就写完了,并且遵守了yarn的规矩

    YarnClient

    应用程序写完了,怎么把程序部署到yarn上运行呐?

    yarn又有规定了:

    想让你的程序在我的平台上跑,需要你在RM上创建应用,并指定好应用名称、运行环境、程序(jar包)位置、启动命令、所需资源等

    当然这些数据的提交是有一定格式的,就像我们前端对接后端api,肯定是有一个json格式

    索性我们不要考虑这复杂的格式,因为hadoop-yarn-client依赖同样帮我们封装好了,就好似有了sdk,写写代码就可以和RM对接了,而这个负责对接RM上传应用程序和启动参数的代码,一般我们叫它:YarnClient

    我们开始写代码实现这个YarnClient

    1.配置

    首先我们要与RM沟通创建应用,首先要搞清楚RM在哪才能和它交互,所以先配置一下RM的IP地址

    Configuration conf = new Configuration();
    // 设置rm所在的ip地址
    conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
    

    其中192.168.10.101就是你运行RM的机器IP地址

    2.申请应用

    有了地址,就可以申请应用,这一步直接使用hadoop-yarn-client依赖的工具即可

    // 创建客户端
    YarnClient yarnClient = YarnClient.createYarnClient();
    // 初始配置
    yarnClient.init(conf);
    // 开启(建立连接)
    yarnClient.start();
    // 向RM发送请求创建应用
    YarnClientApplication application = yarnClient.createApplication();
    // 准备应用提交上下文(RM要求你提交的信息格式)
    ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
    // 获取分配的应用id
    ApplicationId appId = applicationSubmissionContext.getApplicationId();
    log.info("appId: {}", appId);
    

    其中ApplicationId就是RM给我们分配的应用ID,ApplicationSubmissionContext就是我们要提交的应用相关信息的载体

    所以接下来就是给applicationSubmissionContext填充应用名称、运行环境、程序(jar包)位置、启动命令、所需资源等信息再次提交给RM

    3.设置应用名称

    应用名称就起个"Hello World"

    // 设置应用名称
    applicationSubmissionContext.setApplicationName("Hello World");
    
    4.设置程序(jar包)位置

    这一步最重要,你得告诉RM你得程序在哪,一般都存在HDFS上,因为我懒着去上传,写了一个本地传送到HDFS的方法

    // 即上一步写的AppMaster jar包的本地位置
    String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
    String jarName = "my-yarn-app.jar";
    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
        put(jarName, addLocalToHdfs(jarPath, jarName));
    }};
    

    其中addLocalToHdfs就是上传到HDFS,并获取HDFS路径

    private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
        //获取文件系统
        Configuration configuration = new Configuration();
        //NameNode的ip和端口
        FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
        // 目标路径
        String dst =
                "hello/" + jarName;
        Path dstPath =
                new Path(fs.getHomeDirectory(), dst);
        // 上传
        fs.copyFromLocalFile(new Path(jarPath), dstPath);
        FileStatus scFileStatus = fs.getFileStatus(dstPath);
        // 关闭
        fs.close();
        LocalResource scRsrc = LocalResource.newInstance(
                        URL.fromURI(dstPath.toUri()),
                        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                        scFileStatus.getLen(), scFileStatus.getModificationTime());
        return scRsrc;
    }
    

    这一步需要引入hdfs-client依赖

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    
    5.设置程序环境

    这一步同样比较重要,我们需要设置程序运行的环境,jdk、yarn包什么的,设置了CLASSPATH

    Map<String, String> env = new HashMap<>();
    // 任务的运行依赖jar包的准备
    StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
            .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
    // yarn依赖包
    for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
        classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
        classPathEnv.append(c.trim());
    }
    env.put("CLASSPATH", classPathEnv.toString());
    
    6.设置启动脚本

    这一步一样至关重要,我们要告诉RM我们的程序怎么启动,因为Yarn不光支持java包这一种程序,所以我们要写java的启动命令,可以通过-Xms -Xmx等设置启动jvm参数

    List<String> commands = new ArrayList<String>() {{
        add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
    }};
    
    7.配置Container启动上下文

    资源、环境、启动命令等就组成了一个Container(AM的Container)启动的所需参数,把它们打包为container启动上下文,通过setAMContainerSpec设置到要提交的参数中

    ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
            localResources, env, commands, null, null, null);
    // 准备am Container的运行环境
    applicationSubmissionContext.setAMContainerSpec(amContainer);
    
    8.设置am程序所需硬件资源

    准备好了所有启动程序的信息,下一步就是告诉RM你这个AppMaster需要多少硬件资源,这样RM才能给你找合适的节点运行你的程序,通过setResource设置到要提交的参数中

    int memory = 1024;
    int vCores = 2;
    applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
    
    9.提交作业

    完事具备,提交给RM你的程序就会被跑起来了

    yarnClient.submitApplication(applicationSubmissionContext);
    
    完整代码

    YarnClient完整代码如下

    package me.pq.yarn;
    
    /**
     * @Author pq217
     * @Date 2022/11/18 17:47
     * @Description
     */
    public class MyYarnClient {
    
        private static Logger log = LoggerFactory.getLogger(MyYarnClient.class);
    
        public static void main(String[] args) {
            MyYarnClient client = new MyYarnClient();
            try {
                client.run();
            } catch (Exception e) {
                log.error("client run exception , please check log file.", e);
            }
        }
    
        /**
         * 客户端运行
         * @throws IOException
         * @throws YarnException
         * @throws URISyntaxException
         * @throws InterruptedException
         */
        public void run() throws IOException, YarnException, URISyntaxException, InterruptedException {
            /**=====1.配置=====**/
            Configuration conf = new Configuration();
            // 设置rm所在的ip地址
            conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
            /**=====2.申请app=====**/
            // 创建YarnClient和ResourceManager进行交互
            YarnClient yarnClient = YarnClient.createYarnClient();
            // 初始配置
            yarnClient.init(conf);
            // 开启(建立连接)
            yarnClient.start();
            // 向RM发送请求创建应用
            YarnClientApplication application = yarnClient.createApplication();
            // 准备应用提交上下文(RM要求你提交的信息格式)
            ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
            // 获取分配的应用id
            ApplicationId appId = applicationSubmissionContext.getApplicationId();
            log.info("appId: {}", appId);
            /**=====3.设置应用名称=====**/
            // 设置应用名称
            applicationSubmissionContext.setApplicationName("Hello World");
            /**=====4.准备程序(jar包)=====**/
            String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
            String jarName = "my-yarn-app.jar";
            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
                put(jarName, addLocalToHdfs(jarPath, jarName));
            }};
            /**=====5.准备程序环境=====**/
            Map<String, String> env = new HashMap<>();
            // 任务的运行依赖jar包的准备
            StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
                    .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
            // yarn依赖包
            for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
                classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
                classPathEnv.append(c.trim());
            }
            env.put("CLASSPATH", classPathEnv.toString());
    
            /**=====6.准备启动命令=====**/
            List<String> commands = new ArrayList<String>() {{
                add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
            }};
    
            /**=====7.构造am container运行资源+环境+脚本=====**/
            ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
                    localResources, env, commands, null, null, null);
            // 准备am Container的运行环境
            applicationSubmissionContext.setAMContainerSpec(amContainer);
            /**=====8.设置am程序所需资源=====**/
            int memory = 1024;
            int vCores = 2;
            applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
            /**=====9.提交并开始作业=====**/
            yarnClient.submitApplication(applicationSubmissionContext);
            /**=====10.查询作业是否完成=====**/
            for (;;) {
                Thread.sleep(500);
                ApplicationReport applicationReport = yarnClient.getApplicationReport(appId);
                YarnApplicationState state = applicationReport.getYarnApplicationState();
                FinalApplicationStatus status = applicationReport.getFinalApplicationStatus();
                if (state.equals(YarnApplicationState.FINISHED)) {
                    if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
                        log.info("程序运行成功!");
                        break;
                    } else  {
                        log.error("程序运行失败!");
                        break;
                    }
                } else if (state.equals(YarnApplicationState.FAILED) || state.equals(YarnApplicationState.KILLED) ) {
                    log.error("程序运行失败!");
                    break;
                }
                log.info("计算中...");
            }
        }
    
        /**
         * 上传本地jar包到hdfs
         * @param jarPath
         * @param jarName
         * @throws IOException
         */
        private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
            //获取文件系统
            Configuration configuration = new Configuration();
            //NameNode的ip和端口
            FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
            // 目标路径
            String dst =
                    "hello/" + jarName;
            Path dstPath =
                    new Path(fs.getHomeDirectory(), dst);
            // 上传
            fs.copyFromLocalFile(new Path(jarPath), dstPath);
            FileStatus scFileStatus = fs.getFileStatus(dstPath);
            // 关闭
            fs.close();
            LocalResource scRsrc = LocalResource.newInstance(
                            URL.fromURI(dstPath.toUri()),
                            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                            scFileStatus.getLen(), scFileStatus.getModificationTime());
            return scRsrc;
        }
    
    }
    

    测试

    应用写好了,上传应用的client也写好了,下面测一下

    首先使用maven-assembly插件给程序打jar包

    mvn clean package
    

    其次,本地idea直接运行YarnClient的main方法

    注意替换一下代码中的jar包地址和名称,以及AppMaster的全路径名,以及hadoop的ip地址等信息

    MyYarnClient的运行结果idea输出如下

    MyYarnClient

    打开yarn-web再看一下日志

    yarn-web

    成功实现了一个运行在Yarn上的小程序!

    分布式计算

    以上,我们完成了一个简单的程序运行在yarn上,但其实这个应用程序实际上只在一个节点上实际运行了System.out.println的代码,这就像去了一趟沃尔玛,买了瓶矿泉水

    yarn的优势是可以让我们的计算程序分给多个机器节点去执行,我们继续改造一下AppMaster,实现如下功能:

    • 添加两个子任务,子任务分别在HDFS中创建一个文件夹
    • 两个子任务结束之后,再运行输出Hello World

    ChildTask

    首先编写子任务,我为了省事,直接和AppMaster放一个项目中了,很简单的代码,创建一个/child/+服务器hostName的文件夹

    public class ChildTask {
    
        public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
            //获取文件系统
            Configuration configuration = new Configuration();
            //NameNode的ip和端口
            FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
            // hostName
            String hostName = NetUtils.getHostname();
            // 创建一个文件夹
            fs.mkdirs(new Path("/child/"+hostName));
            fs.close();
        }
    
    }
    

    AppMaster

    接下来要改造AppMaster,原来只是输出Hello World,现在要向RM申请Container用来执行子任务

    container请求

    首先申请Container需要向RM申请,所以使用amRmClient即可发出请求

    // 两个子任务,对应两个container
    int childTaskNum = 2;
    for (int i = 0; i < childTaskNum; i++) {
        // 向rm申请一个1M内存,1个CPU的资源容器
        int memory = 1024;
        int vCores = 1;
        AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
        amRmClient.addContainerRequest(containerRequest);
    }
    
    rm回调

    申请成功后,当rm分配出container时还要进行相关回调处理,所以amRmClient定义时要加上一个回调处理类

    // rm回调处理器
    AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
    // 开启am-rm client,建立rm-am的通道,用于注册AM, allocListener负责处理AM的响应
    AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
    

    RMCallBackHandler是rm响应的处理器

    private class RMCallBackHandler extends AMRMClientAsync.AbstractCallbackHandler {
    

    重点要实现两个方法

    • onContainersAllocated rm分配出containers的回调方法
    • onContainersCompleted container运行结束的方法
    onContainersCompleted

    这个方法主要是子任务运行完成,我们在AppMaster加几个内部变量控制所有子任务完成再输出"Hello World"

    // 充当锁
    private Object lock = new Object();
    // 任务个数
    private int childTaskNum = 2;
    // 已完成任务个数
    private int childTaskCompletedNum = 0;
    

    RMCallBackHandler的onContainersCompleted方法实现如下:

    @Override
    public void onContainersCompleted(List<ContainerStatus> statuses) {
        for (ContainerStatus status : statuses) {
            synchronized (lock) {
                System.out.println(++childTaskCompletedNum + " container completed");
                // 子任务全部完成
                if (childTaskCompletedNum == childTaskNum) {
                    lock.notify();
                }
            }
        }
    }
    

    doRun方法修改为如下

    private void doRun(AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient) throws InterruptedException {
        // 申请两个资源容器
        for (int i = 0; i < childTaskNum; i++) {
            // 向rm申请一个1M内存,1个CPU的资源容器
            int memory = 1024;
            int vCores = 1;
            AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
            amRmClient.addContainerRequest(containerRequest);
        }
        synchronized (lock) {
            // 等待子任务完成
            lock.wait();
        }
        System.out.println("HELLO WORLD");
    }
    

    到此即可实现申请两个container,两个container运行完后再执行输出"HELLO WORLD"

    onContainersAllocated

    这是RMCallBackHandler中要实现的重点方法,当container分配成功后要做什么?

    思路很简单,container分配之后当然要在对应的容器上运行我们的子任务:ChildTask,而子任务的运行一定是在container所指定的NM节点上,所以我们要提前初始化一个NM客户端:
    加一个内部属性以供AppMaster整个类使用

    NMClientAsyncImpl nmClientAsync;
    

    此时AppMaster run方法修改如下

    public void run() {
        try {
            // rm回调处理器
            AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
            // 开启am-rm client,建立rm-am的通道,用于注册AM, allocListener负责处理AM的响应
            AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
            amRmClient.init(new Configuration());
            amRmClient.start();
            String hostName = NetUtils.getHostname();
            // 注册至RM
            amRmClient.registerApplicationMaster(hostName, -1, null);
            // 初始化nmClient
            nmClientAsync = new NMClientAsyncImpl(new NMCallBackHandler());
            nmClientAsync.init(conf);
            nmClientAsync.start();
            // 运行程序
            doRun(amRmClient);
            // 解除注册
            amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
            // am-rm客户端关闭
            amRmClient.stop();
            // nm客户端关闭
            nmClientAsync.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    其中NMCallBackHandler是一个NM响应的Callback,可以通过实现其方法在container声明周期加入一些逻辑

    private class NMCallBackHandler extends NMClientAsync.AbstractCallbackHandler {
    }
    

    接下来就是实现onContainersAllocated,代码如下

    @Override
    public void onContainersAllocated(List<Container> containers) {
        try {
            for (Container container : containers) {
                System.out.println("container allocated, Node=" + container.getNodeHttpAddress());
                // 构建AM<->NM客户端并开启
                // 还是YarnClient containerLaunchContext那一套,这把直接去HDFS系统取文件,因为和YarnClient打包到一个jar上传
                Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
                    //NameNode的ip和端口
                    FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), conf, "root");
                    URI appUri = new URI("/user/root/hello/my-yarn-app.jar");
                    FileStatus fileStatus = fs.getFileStatus(new Path(appUri));
                    put("my-yarn-app.jar", LocalResource.newInstance(
                            URL.fromURI(appUri),
                            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                            fileStatus.getLen(), fileStatus.getModificationTime()));
                }};
                Map<String, String> env = new HashMap<>();
                StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
                        .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
                for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
                    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
                    classPathEnv.append(c.trim());
                }
                env.put("CLASSPATH", classPathEnv.toString());
                List<String> commands = new ArrayList<String>() {{
                    // 传入ip地址作为参数
                    add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx200m me.pq.yarn.ChildTask");
                }};
                ContainerLaunchContext containerLaunchContext = ContainerLaunchContext.newInstance(
                        localResources, env, commands, null, null, null);
                // nm节点启动container
                nmClientAsync.startContainerAsync(container, containerLaunchContext);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    代码就不详解了,和YarnClient提交的ContainerLaunchContext写法基本一致(最终运行me.pq.yarn.ChildTask而不是MyAppMaster),最后使用NM客户端的startContainerAsync方法让子任务运行在NM上

    值得一提的是我的ChildTask和AppMaster都在一个jar包下,所以这里不用上传了,直接去HDFS取即可

    测试

    代码写完了,测试一下,mvn clean package然后执行MyYarnClient main方法

    idea输出

    MyYarnClient

    HDFS-WEB上看一下子任务的文件夹创建是否成功

    HDFS-WEB

    可见文件夹创建出来了

    YARN-WEB看一下AppMaster的日志

    YARN-WEB AppMaster

    到此,实现了一个运行在yarn上的简单分布式计算程序~

    相关文章

      网友评论

        本文标题:Yarn上运行Hello World

        本文链接:https://www.haomeiwen.com/subject/anqnxdtx.html