美文网首页Hadoop玩转大数据程序员
Hadoop实验——MapReduce编程(2)

Hadoop实验——MapReduce编程(2)

作者: Tiny_16 | 来源:发表于2016-11-27 20:19 被阅读2933次

    实验目的

    1. 通过实验掌握基本的MapReduce编程方法。
    2. 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。
    3. 通过操作MapReduce的实验,模仿实验内容,深入理解MapReduce的过程,和shuffle的具体意义。

    实验平台

    • 操作系统:Ubuntu-16.04
    • Hadoop版本:2.6.0
    • JDK版本:1.8
    • IDE:Eclipse

    实验内容和要求

    一,编程实现文件二次排序:

    1. 现有一个输入文件,包含两列数据,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。下面是输入文件和输出文件的一个样例供参考。
    • 输入文件secondarysort.txt的样例如下:
    20 21 
    50 51 
    50 52 
    50 53 
    50 54 
    60 51 
    60 53 
    60 52 
    60 56 
    60 57 
    70 58 
    60 61 
    70 54 
    70 55 
    70 56 
    70 57 
    70 58 
    1 2 
    3 4 
    5 6 
    7 82 
    203 21 
    50 512 
    50 522 
    50 53 
    530 54 
    40 511 
    20 53 
    20 522 
    60 56 
    60 57 
    740 58 
    63 61 
    730 54 
    71 55 
    71 56 
    73 57 
    74 58 
    12 211 
    31 42 
    50 62 
    7 8
    
    • 输出文件的样例如下:


    实验过程:

    1. 创建文件secondarysort.txt


      将上面样例内容复制进去
    2. 在HDFS建立secondarysort_input文件夹(执行这步之前要开启hadoop相关进程)


    3. 上传输入文件到HDFS中的secondarysort_input文件夹


    4. 接着打开eclipse
      Eclipse的使用
      1. 点开上次实验的项目,找到 src 文件夹,右键选择 New -> Class


      2. 输入 Package 和 Name,然后Finish


      3. 写好Java代码(给的代码里要修改HDFS和本地路径),右键选择 Run As -> Run on Hadoop,结果在HDFS系统中查看


    实验代码:

    package cn.edu.zucc.mapreduce2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LoggerJob {
        public static class LoggerMapper extends
                Mapper<LongWritable, Text, Text, IntWritable> {
            private IntWritable counter = new IntWritable(1);
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String result = handleLog(line);
                if (result != null && result.length() > 0) {
                    context.write(new Text(result), counter);
                }
            }
    
            private String handleLog(String line) {
                StringBuffer sBuffer = new StringBuffer();
                try {
                    if (line.length() > 0) {
                        if (line.indexOf("GET") > 0) {
                            String tmp = line.substring(line.indexOf("GET"),
                                    line.indexOf("HTTP/1.0"));
                            sBuffer.append(tmp.trim());
                        } else if (line.indexOf("POST") > 0) {
                            String tmp = line.substring(line.indexOf("POST"),
                                    line.indexOf("HTTP/1.0"));
                            sBuffer.append(tmp.trim());
                        }
                    } else {
                        return null;
                    }
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return sBuffer.toString();
            }
    
        }
    
        public static class LoggerReducer extends
                Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                                  Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                context.write(key, new IntWritable(sum));
    
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            String[] otherArgs = new String[]{"loggerjob_input",
                    "loggerjob_output"};
            if (otherArgs.length != 2) {
                System.err.println("Usage: loggerjob <in> <out>");
                System.exit(2);
            }
            Job job = Job.getInstance(conf, "loggerjob");
            job.setJarByClass(LoggerJob.class);
            job.setMapperClass(LoggerMapper.class);
            job.setReducerClass(LoggerReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    
    模仿上题完成以下内容:对于输入文件,要求依次按照顺序对字段进行排序,如果第一个字段排好后再根据第二个字段的升序排序最后在根据第三个字段进行排序,对下面是输入文件和输出文件的一个样例供参考。
    • 输入文件的样例如下:
    a1,b2,c5
    a4,b1,c3
    a1,b2,c4
    a2,b2,c4
    a2,b1,c4
    a4,b1,c2
    
    • 输出文件的结果为:
    a1  b2,c4
    a1  b2,c5
    a2  b1,c4
    a2  b2,c4
    a4  b1,c2
    a4  b1,c3
    

    二,编写程序实现对非结构化日志文件处理:

    1. 现有一个输入文件,要求区分不同的HTTP请求对文件进行输出。下面是输入文件和输出文件的一个样例供参考。
    • 输入文件的样例如下:
    127.0.0.1 - - [03/Jul/2014:23:53:32 +0800] "POST /service/addViewTimes_544.htm HTTP/1.0" 200 2 0.004
    127.0.0.1 - - [03/Jul/2014:23:54:53 +0800] "GET /html/20140620/900.html HTTP/1.0" 200 151770 0.054
    127.0.0.1 - - [03/Jul/2014:23:57:42 +0800] "GET /html/20140620/872.html HTTP/1.0" 200 52373 0.034
    127.0.0.1 - - [03/Jul/2014:23:58:17 +0800] "POST /service/addViewTimes_900.htm HTTP/1.0" 200 2 0.003
    127.0.0.1 - - [03/Jul/2014:23:58:51 +0800] "GET / HTTP/1.0" 200 70044 0.057
    127.0.0.1 - - [03/Jul/2014:23:58:51 +0800] "GET / HTTP/1.0" 200 70044 0.057
    
    • 输出文件的结果为:
    GET /   2
    GET /html/20140620/872.html 1
    GET /html/20140620/900.html 1
    POST /service/addViewTimes_544.htm  1
    POST /service/addViewTimes_900.htm  1
    

    实验过程:

    1. 创建文件


      将上面样例内容复制进去
    2. 在HDFS建立loggerjob_input文件夹


    3. 上传样例到HDFS中的loggerjob_input文件夹


    4. 到eclipse上执行代码

    实验代码:

    package cn.edu.zucc.mapreduce2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LoggerJob {
        public static class LoggerMapper extends
                Mapper<LongWritable, Text, Text, IntWritable> {
            private IntWritable counter = new IntWritable(1);
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String result = handleLog(line);
                if (result != null && result.length() > 0) {
                    context.write(new Text(result), counter);
                }
            }
    
            private String handleLog(String line) {
                StringBuffer sBuffer = new StringBuffer();
                try {
                    if (line.length() > 0) {
                        if (line.indexOf("GET") > 0) {
                            String tmp = line.substring(line.indexOf("GET"),
                                    line.indexOf("HTTP/1.0"));
                            sBuffer.append(tmp.trim());
                        } else if (line.indexOf("POST") > 0) {
                            String tmp = line.substring(line.indexOf("POST"),
                                    line.indexOf("HTTP/1.0"));
                            sBuffer.append(tmp.trim());
                        }
                    } else {
                        return null;
                    }
    
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println(line);
                }
                return sBuffer.toString();
            }
    
        }
    
        public static class LoggerReducer extends
                Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                context.write(key, new IntWritable(sum));
    
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            String[] otherArgs = new String[] { "loggerjob_input",
                    "loggerjob_output" };
            if (otherArgs.length != 2) {
                System.err.println("Usage: loggerjob <in> <out>");
                System.exit(2);
            }
            Job job = Job.getInstance(conf, "loggerjob");
            job.setJarByClass(LoggerJob.class);
            job.setMapperClass(LoggerMapper.class);
            job.setReducerClass(LoggerReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    
    模仿上题完成以下内容:对于气象数据,要求获取数据中的最高温度和最低温度。下面是输入文件和输出文件的一个样例供参考。
    • 输入文件的样例如下:
    0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
    0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
    0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
    0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
    

    数据说明:
    第15-19个字符是year
    第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据。
    第50位值只能是0、1、4、5、9几个数字

    • 输出文件结果为:
    1949    111
    1950    3
    

    三,编写程序实现对输入文件进行倒排索引:

    1. 现有三个文本文档,需要根据单词查找文档,并且还要考虑权重问题。下面是输入文件和输出文件的一个样例供参考。
    • 输入文件的d1.txt的样例如下:
    Mapreduce is simple is easy
    
    • 输入文件的d2.txt的样例如下:
    Mapreduce is powerful is userful
    
    • 输入文件的d3.txt的样例如下:
    Hello Mapreduce Bye Mapreduce
    
    • 输出文件的结果为:
    Bye hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:1;
    Hello   hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:1;
    Mapreduce   hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:2;hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
    easy    hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
    is  hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:2;hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:2;
    powerful    hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;
    simple  hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
    userful hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;
    
    1. 创建文件


      将上面样例内容复制进去
    2. 在HDFS建立invertedindex_input文件夹


    3. 上传样例到HDFS中的invertedindex_input文件夹


    4. 到eclipse上执行代码

    实验代码:

    package cn.edu.zucc.mapreduce2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class InvertedIndex {
        public static class InvertedIndexMap extends Mapper<Object, Text, Text, Text> {
            private static final Text one = new Text("1");
            private Text keyInfo = new Text();
            private FileSplit split;
    
            @Override
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                split = (FileSplit) context.getInputSplit();
                String line = value.toString();
                String[] strings = line.split(" ");
                for (String s : strings) {
                    keyInfo.set(s + ":" + this.split.getPath().toString());
                    context.write(keyInfo, one);
                }
            }
        }
    
        public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
            Text info = new Text();
    
            @Override
            public void reduce(Text key, Iterable<Text> values, Context contex) throws IOException, InterruptedException {
                Integer sum = 0;
                for (Text value : values) {
                    sum += Integer.parseInt(value.toString());
                }
                int splitIndex = key.toString().indexOf(":");
                info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
                key.set(key.toString().substring(0, splitIndex));
                contex.write(key, info);
            }
        }
    
        public static class InvertedIndexReduce extends Reducer<Text, Text, Text, Text> {
            private Text result = new Text();
    
            @Override
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                String fileList = "";
                for (Text value : values) {
                    fileList += value.toString() + ";";
                }
                result.set(fileList);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            String[] otherArgs = new String[]{"invertedindex_input",
                    "invertedindex_output"};
            if (otherArgs.length != 2) {
                System.err.println("Usage: InvertedIndex <in> <out>");
                System.exit(2);
            }
            Job job = Job.getInstance(conf, "InvertedIndex");
            job.setJarByClass(InvertedIndex.class);
            job.setMapperClass(InvertedIndexMap.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setCombinerClass(InvertedIndexCombiner.class);
            job.setReducerClass(InvertedIndexReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }
    
    模仿上题完成以下内容:对于输入文件,请编写MapReduce程序,统计文件中的通话记录情况。下面是输入文件和输出文件的一个样例供参考。
    • 输入文件 A 的样例如下:
    13588888888 112
    13678987879 13509098987
    18987655436 110
    2543789 112
    15699807656 110
    011-678987 112
    
    • 输出文件 B 的结果为:
    110 15699807656|18987655436|
    112 011-678987|2543789|13588888888|
    13509098987 13678987879|
    

    相关文章

      网友评论

        本文标题:Hadoop实验——MapReduce编程(2)

        本文链接:https://www.haomeiwen.com/subject/pkmbpttx.html