美文网首页
Hadoop 里MapReduce里 实现多个job任务 包含(

Hadoop 里MapReduce里 实现多个job任务 包含(

作者: 吴国友 | 来源:发表于2019-01-15 07:57 被阅读10次

    一、迭代式,所谓的迭代式,下一个执行的Job任务以上一个Job的输出作为输入,最终得到想要的结果。 这里我只写关键的代码了

    Job job = new Job(new Configuration(),“test”);
     
    JobConf jobConf=(JobConf) job.getConfiguration();
    jobConf.setJobName("hadoopJoinTask");
     
    //设置job输入路径
    FileInputFormat.setInputPaths(inputPath);
    //设置job输出的路径
    FileOutputFormat.setOutputPath(jobConf, outPath);
     
    Job job2 = new Job(new Configuration(),“test2”);
     
    JobConf jobConf2=(JobConf) job2.getConfiguration();
    jobConf2.setJobName("hadoopJoinTask");
     
    //设置job2输入路径  job的输出路径
    FileInputFormat.setInputPaths(outPath);
    //设置job2输出的路径
    FileOutputFormat.setOutputPath(jobConf2, outPath2);
    

    二、依赖式,工作中经常遇到这样的情况,比如job3需要等job1、job2、、、等执行完才能执行,因此job3是依赖于其他的job完成才能执行

    //hadoop2  查看hadoop源码 JobControl 发现有ControlledJob,  ControlledJob里有依赖方法  addDependingJob
     
            Job job = new Job(new Configuration(),"job1");
            Job job2 = new Job(new Configuration(),"job2");
     
            ControlledJob controlledJob=new ControlledJob(job.getConfiguration());
     
            //设置job
            controlledJob.setJob(job);
     
            ControlledJob controlledJob2=new ControlledJob(job2.getConfiguration());
            controlledJob2.setJob(job2);
            //这里就是设置job依赖的重要方法了,依赖于<span style="font-size: 9pt; font-family: Menlo;">controlledJob</span><span style="font-size: 9pt; font-family: Menlo;">  </span>
            controlledJob.addDependingJob(controlledJob);
     
            JobControl jc=new JobControl("jc");
            jc.addJob(controlledJob);
            jc.addJob(controlledJob2);
            //由于JobControl实现了Runnable 接口,而Runnable接口只有运行方法,没有结束方法,因此需要一个线程来辅助
     
            Thread jcThread = new Thread(jc);
            jcThread.start();
            while(true){
                //当job池里所有的job完成后,执行 下一步操作
                if(jc.allFinished()){
                    System.out.println(jc.getSuccessfulJobList());
                    jc.stop();
     
     
                }
                //获取执行失败的job列表
                if(jc.getFailedJobList().size() > 0){
                    System.out.println(jc.getFailedJobList());
                    jc.stop();
     
                }
            }
    
    

    三、链式

            Configuration conf = new Configuration();
            Job job = new Job(conf);
            job.setJobName("ChianJOb");
            // 在ChainMapper里面添加Map1
            Configuration map1conf = new Configuration(false);
            ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,
                    Text.class, Text.class, true, map1conf);
            // 在ChainReduce中加入Reducer,Map2;
            Configuration reduceConf = new Configuration(false);
            ChainReducer.setReducer(job, Reduce.class, LongWritable.class,
                    Text.class, Text.class, Text.class, true, map1conf);
            Configuration map2Conf = new Configuration();
            ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,
                    Text.class, Text.class, true, <span style="font-family: Menlo; font-size: 9pt;">map2Conf</span><span style="font-size: 9pt; font-family: Menlo;">);</span>
    

    相关文章

      网友评论

          本文标题:Hadoop 里MapReduce里 实现多个job任务 包含(

          本文链接:https://www.haomeiwen.com/subject/kphedqtx.html