美文网首页
Hadoop MapReduce应用案例

Hadoop MapReduce应用案例

作者: Yohann丶blog | 来源:发表于2021-05-23 21:52 被阅读0次
    1141621777867_.pic.jpg

    环境

    • CentOS 6.8 64位 1核 2GB

    • JDK 1.7.0_55 64 位

    • Hadoop 1.1.2

    准备测试数据

    • 创建部门文件 dept,内容如下
    10,DEVELOP,BEIJING
    20,UI,SHANGHAI
    30,TEST,GUANGZHOU
    40,OPS,SHENZHEN
    
    • 创建员工文件 emp,内容如下
    7369,SMITH,PRIMARY,7902,17-12月-80,10500,,20
    7499,ALLEN,SENIOR,7698,20-2月-81,15500,300,30
    7521,WARD,SENIOR,7698,22-2月-81,17500,500,30
    7566,JONES,MANAGER,7839,02-4月-81,20500,,20
    7654,MARTIN,SENIOR,7698,28-9月-81,16500,1400,30
    7698,BLAKE,MANAGER,7839,01-5月-81,21000,,30
    7782,CLARK,MANAGER,7839,09-6月-81,20000,,10
    7839,KING,SENIOR,,17-11月-81,14500,,10
    7844,TURNER,SENIOR,7698,08-9月-81,19500,0,30
    7900,JAMES,PRIMARY,7698,03-12月-81,10000,,30
    7902,FORD,SENIOR,7566,03-12月-81,19000,,20
    7934,MILLER,PRIMARY,7782,23-1月-82,9500,,10
    
    • 在 HDFS 创建 /class6/input 目录
    $ hadoop fs -mkdir -p /class6/input
    
    • 把 dept、emp 上传到 HDFS 中 /class6/input 目录中
    $ hadoop fs -copyFromLocal dept /class6/input
    $ hadoop fs -copyFromLocal emp /class6/input
    
    • 在HDFS中查看 /class6/input 目录
    $ hadoop fs -ls /class6/input
    Found 2 items
    -rw-r--r--   1 yohann supergroup         80 2021-05-15 22:39 /class6/input/dept
    -rw-r--r--   1 yohann supergroup        538 2021-05-15 22:39 /class6/input/emp
    
    • 创建 /app/hadoop-1.1.2/myclass/class6 目录并进入
    $ cd /app/hadoop-1.1.2/myclass
    $ mkdir class6
    $ cd class6
    

    求各个部门的总工资

    • 创建 Q1SumDeptSalary.java,代码如下
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Q1SumDeptSalary extends Configured implements Tool {
    
        public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
    
            private Map<String, String> deptMap = new HashMap<String, String>();
            private String[] kv;
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                BufferedReader in = null;
                try {
    
                    Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                    String deptIdName = null;
                    for (Path path : paths) {
    
                        if (path.toString().contains("dept")) {
                            in = new BufferedReader(new FileReader(path.toString()));
                            while (null != (deptIdName = in.readLine())) {
                                
                                deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        if (in != null) {
                            in.close();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                kv = value.toString().split(",");
    
                if (deptMap.containsKey(kv[7])) {
                    if (null != kv[5] && !"".equals(kv[5].toString())) {
                        context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                    }
                }
            }
        }
    
        public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {
    
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
                long sumSalary = 0;
                for (Text val : values) {
                    sumSalary += Long.parseLong(val.toString());
                }
    
                context.write(key, new LongWritable(sumSalary));
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
    
            Job job = new Job(getConf(), "Q1SumDeptSalary");
            job.setJobName("Q1SumDeptSalary");
            job.setJarByClass(Q1SumDeptSalary.class);
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);
    
            job.setInputFormatClass(TextInputFormat.class);
    
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
            DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
            FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
    
            job.waitForCompletion(true);
            return job.isSuccessful() ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
            System.exit(res);
        }
    }
    
    • 编译打包
    $ javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java
    $ jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class
    $ mv *.jar /app/hadoop-1.1.2/
    $ rm Q1SumDept*.class
    
    • 测试
    $ hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1
    $ hadoop fs -cat /class6/out1/part-r-00000
    DEVELOP 44000
    TEST    100000
    UI  50000
    

    求各个部门的人数和平均工资

    • 创建 Q2DeptNumberAveSalary.java,代码如下
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Q2DeptNumberAveSalary extends Configured implements Tool {
    
        public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
    
            private Map<String, String> deptMap = new HashMap<String, String>();
            private String[] kv;
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                BufferedReader in = null;
                try {
                    Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                    String deptIdName = null;
                    for (Path path : paths) {
    
                        if (path.toString().contains("dept")) {
                            in = new BufferedReader(new FileReader(path.toString()));
                            while (null != (deptIdName = in.readLine())) {
                                
                                deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        if (in != null) {
                            in.close();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                kv = value.toString().split(",");
    
                if (deptMap.containsKey(kv[7])) {
                    if (null != kv[5] && !"".equals(kv[5].toString())) {
                        context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                    }
                }
            }
        }
    
        public static class Reduce extends Reducer<Text, Text, Text, Text> {
    
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
                long sumSalary = 0;
                int deptNumber = 0;
    
                for (Text val : values) {
                    sumSalary += Long.parseLong(val.toString());
                    deptNumber++;
                }
    
                context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber));
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
    
            Job job = new Job(getConf(), "Q2DeptNumberAveSalary");
            job.setJobName("Q2DeptNumberAveSalary");
            job.setJarByClass(Q2DeptNumberAveSalary.class);
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);
    
            job.setInputFormatClass(TextInputFormat.class);
    
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
            DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
            FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
    
            job.waitForCompletion(true);
            return job.isSuccessful() ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);
            System.exit(res);
        }
    }
    
    • 编译打包
    $ javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java
    $ jar cvf ./Q2DeptNumberAveSalary.jar ./Q2DeptNum*.class
    $ mv *.jar /app/hadoop-1.1.2/
    $ rm Q2DeptNum*.class
    
    • 测试
    $ hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out2
    $ hadoop fs -cat /class6/out2/part-r-00000
    DEVELOP Dept Number:3, Ave Salary:14666
    TEST    Dept Number:6, Ave Salary:16666
    UI  Dept Number:3, Ave Salary:16666
    

    相关文章

      网友评论

          本文标题:Hadoop MapReduce应用案例

          本文链接:https://www.haomeiwen.com/subject/kgttsltx.html