美文网首页
OS X下MapReduce程序运行的几种模式

OS X下MapReduce程序运行的几种模式

作者: MARCO马浩翔 | 来源:发表于2017-08-17 16:57 被阅读0次

    1.MapReduce程序运行的模式简介

    1. 程序运行模式

      • 本地模式
        • 利用本地的JVM运行,使用本地的IDE进行debug
      • 远程模式
        • 提交至远程的集群上运行,使用本地的IDE进行debug
        • 提交至远程的集群上运行,不使用本地IDE进行debug
    2. 数据存放路径

      • 远程文件系统(hdfs)
      • 本地文件系统(local file system)

    2.开发环境简介

    • 操作系统:macOS Sierra 10.12.6
    • Java版本:1.8.0_131-b11
    • Hadoop版本:hadoop-2.7.4
    • IDE:IntelliJ IDEA

    3.MapReduce程序运行例子

    3.1 程序需求

    学校里开设了多门课程,有语文(chinese)、数学(math)、英语(english)等。经过了一次年级统考后,每个学生的成绩都被记录在多个文本文件中,文本文件格式如下。

    • math.txt
    Ben 75
    Jack 60
    May 85
    Tom 91
    
    • english.txt
    Jack 72
    May 60
    Tom 62
    Ben 90
    
    • chinese.txt
    Ben 79
    May 88
    Tom 68
    Jack 70
    

    现需要根据以上的文本文件,算出每个学生在本次统考中的平均分,并将结果用一个总的文件averageScore.txt进行存储。averageScore.txt的格式如下。

    • averageScore.txt
    #name #score
    Ben 0.0
    May 0.0
    Tom 0.0
    Jack 0.0
    

    3.2 程序设计思路

    3.2.1 Mapper的处理逻辑

    Mapper每次从文本文件中读取1行内容,即调用1次map方法。Mapper需要把原始数据中一行的内容拆分成学生姓名(student name)和该门课程的分数(score)。按照需求,本程序最终要算出每一个学生的平均分,所以学生姓名应作为一个key,对应的value即为该生的平均分(实际上是不严谨的,因为在实际环境中会出现多个学生重名的现象,若不作特殊处理,key是不允许重复的。最根本的解决方案是采用学号作为key,但为了演示直观,仅采用学生姓名作为key)

    Mapper读完一行的数据后,把{student name,score}这个key-value写入中间结果,准备传送给Reducer作下一步的运算。

    3.2.2 Reducer的处理逻辑

    Reducer接收到的数据,实际上是一个key与该key对应的value的一个集合(并不仅仅是一个value)。在本需求中,传入reduce方法的参数是学生姓名,以及该生多门课程分数的集合,类似于Ben,[60,70,80,...]。所以Reducer需要将集合中的分数求和,然后求出平均值,最终得到一个{student name, average score}key-value对。

    3.2.3 具体代码设计

    • AVGMapper类
      用于实现map方法
    package mr;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    
    /**
     * Created by marco on 2017/8/17.
     */
    public class AVGMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>
    {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString();
            if(line.length() == 0)  // 文件格式错误,出现空行
                return;
            String[] split = line.split(" ");
            String stuName = split[0];
            String stuScore = split[1];
            double score = Double.parseDouble(stuScore);    // 转成double类型,方便后续求均值计算
            context.write(new Text(stuName), new DoubleWritable(score));
        }
    }
    
    • AVGReducer类
      用于实现reduce方法
    package mr;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    
    /**
     * Created by marco on 2017/8/17.
     */
    public class AVGReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
    {
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
        {
            double sum = 0;
            int length = 0;
            for(DoubleWritable value : values)
            {
                sum += value.get();
                length++;
            }
    
            double avgScore = sum / (double)length;
            context.write(key, new DoubleWritable(avgScore));
        }
    }
    
    
    • AVGRunner类
      用于关联Mapper与Reducer,并创建MapReduce任务(Job)提交运行。基本代码如下所示。
    package mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * Created by marco on 2017/8/17.
     */
    public class AVGRunner
    {
        static public void main(String[] args) throws Exception
        {
            // 设置hdfs的handler
            Configuration fsConf = new Configuration();
            fsConf.set("fs.default.name","hdfs://localhost:9000/");
            FileSystem fs = FileSystem.get(fsConf);
    
            // MapReduce的配置参数
            Configuration mrConf = new Configuration();
    
            // 新建一个求平均值的Job
            Job avgJob = Job.getInstance(mrConf);
            avgJob.setJarByClass(AVGRunner.class);
    
            // 设置Mapper类与Reducer类
            avgJob.setMapperClass(AVGMapper.class);
            avgJob.setReducerClass(AVGReducer.class);
    
            // 设置输入输出的数据结构
            avgJob.setMapOutputKeyClass(Text.class);
            avgJob.setMapOutputValueClass(DoubleWritable.class);
            avgJob.setOutputKeyClass(Text.class);
            avgJob.setOutputValueClass(DoubleWritable.class);
    
            // 检查结果输出目录,若已存在则删除输出目录
            if(fs.exists(new Path("/avg/output")))
            {
                fs.delete(new Path("/avg/output"), true);
            }
    
            // 设置数据目录以及结果输出目录
            FileInputFormat.setInputPaths(avgJob, new Path(""));
            FileOutputFormat.setOutputPath(avgJob, new Path(""));
    
            // 提交任务,等待完成
            System.exit(avgJob.waitForCompletion(true)?0:1);
        }
    }
    
    

    3.3 MapReduce程序运行

    若使用本地文件系统的数据文件,且在本地模式运行,无需配置hdfs相关的参数,数据目录以及结果输出目录填写本地路径即可。(确保结果输出文件夹未被创建,否则会报异常)

    // 均填写本地文件路径即可
    FileInputFormat.setInputPaths(avgJob, new Path(""));
    FileOutputFormat.setOutputPath(avgJob, new Path(""));
    

    若使用hdfs上的数据文件,且在本地模式运行,应配置hdfs相关参数,数据目录以及结果输出目录均填写hdfs的路径。(确保结果输出文件夹未被创建,否则会报异常)

    // 设置hdfs参数,并用该配置创建一个新的Job
    Configuration fsConf = new Configuration();
    fsConf.set("fs.default.name","hdfs://localhost:9000/");
    Job avgJob = Job.getInstance(fsConf);
    
    
    // 均填写hdfs路径即可
    FileInputFormat.setInputPaths(avgJob, new Path(""));
    FileOutputFormat.setOutputPath(avgJob, new Path(""));
    

    3.3.1 本地模式运行

    本地模式运行,直接编译执行AVGRunner的main方法即可,程序运行结束后会在自行设置的结果输出目录中生成运行结果。

    3.3.2 远程集群运行

    首先使用IDE将程序打成一个jar包,本例中命名为hadoop.jar

    提交到远程集群上运行分两种情况

    • 使用本地IDE(IntelliJ IDEA)运行,任务被提交到集群运行,但可使用IDE进行跟踪debug

      新建一个MapReduce的配置对象,将已经打包好的jar包传入配置中

        // MapReduce的配置参数,远程运行,本地debug
        Configuration mrConf = new Configuration();
        mrConf.set("mapreduce.job.jar","hadoop.jar");
        mrConf.set("mapreduce.framework.name","yarn");
        
        //利用以上配置新建一个Job
        Job avgJob = Job.getInstance(mrConf);
        avgJob.setJarByClass(AVGRunner.class);
      
    • 在终端直接使用hadoop命令将任务提交到集群运行,无法使用IDE进行跟踪debug

      直接在终端中输入hadoop命令

      hadoop jar $jar包名称 $待执行的类的名称
      

      在本例中应输入

      hadoop jar avg.jar mr.AVGRunner
      

    ####################### 注意⚠️ #######################

    在OS X中,使用IntelliJ IDEA打包jar包后,若在终端中直接使用hadoop jar $jar包名称 $待执行的类的名称提交MapReduce任务,会报出异常,因为OS X系统的文件系统对大小写不敏感(case-insensitive)

    经过对此异常的搜索,暂时的解决方案是通过删除jar包中的LICENSE文件,使任务顺利提交。

    # 在终端中执行以下命令
    zip -d $jar包名称 META-INF/LICENSE
    zip -d $jar包名称 LICENSE
    

    #####################################################

    可以看到使用了hadoop命令提交任务后,系统调用了RPC框架和Yarn框架中的一些服务,用于远程运行,而非使用LocalJobSubmitter于本地运行。

    并且在MapReduce任务管理页面可看到任务已经完成的历史记录。

    4.总结

    MapReduce任务可在本地运行,也可提交到集群上运行。

    在开发初期,需要编写Demo程序时,可在本地进行开发与测试,将数据文件放置在本地文件系统,直接使用IDE运行主类的main方法,观察运行结果。

    上线前调试,可采用远程模式运行,不直接使用hadoop命令提交,而是使用IDE进行提交与debug,这样既可以保证程序运行在远处集群上(生产环境or开发环境),也可以在本地方便跟踪调试。

    可上线时,使用hadoop命令直接提交到远程集群,并通过localhost:50070(默认配置)的任务管理页面进行观察。

    相关文章

      网友评论

          本文标题:OS X下MapReduce程序运行的几种模式

          本文链接:https://www.haomeiwen.com/subject/hktnrxtx.html