美文网首页
MapReduce案例

MapReduce案例

作者: 上杉丶零 | 来源:发表于2019-01-20 22:55 被阅读0次

一、单词统计

  • 需求分析
    统计每个单词出现的次数
  • 输入样例
hello world hadoop
hadoop hbase hive
hadoop sqoop flue
hbase redis
  • 输出样例
flue 1
hadoop 3
hbase 2
hello 1
hive 1
redis 1
sqoop 1
world 1
  • 示例代码
package com.bjsxt.hdfs.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MainClass {
    public static void main(String[] args) {
        if (args == null || args.length != 2) {
            throw new RuntimeException("需要指定<输入路径>和<输出路径>");
        }

        Configuration configuration = new Configuration();

        try {
            Job job = Job.getInstance(configuration);
            job.setJarByClass(MainClass.class);
            job.setMapperClass(WordCountMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            job.setReducerClass(WordCountReducer.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.waitForCompletion(true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package com.bjsxt.hdfs.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        for (String s : value.toString().split(" ")) {
            context.write(new Text(s), new LongWritable(1));
        }
    }
}
package com.bjsxt.hdfs.wordcount;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> valueIterable, Context context) throws IOException, InterruptedException {
        Iterator<LongWritable> valueIterator = valueIterable.iterator();
        long l = 0L;

        while (valueIterator.hasNext()) {
            l += valueIterator.next().get();
        }

        context.write(key, new LongWritable(l));
    }
}

二、温度统计

  • 需求分析
    统计每一年的每个月中温度最高的两天
  • 输入样例
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
  • 输出样例
1950-01-01 11:21:02 32c
1950-10-02 12:21:02 41c
1950-10-01 12:21:02 37c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1951-07-03 12:21:03 47c
1951-07-02 12:21:02 46c
1951-12-01 12:21:02 23c
  • 示例代码
package com.bjsxt.hdfs.weather;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MainClass {
    public static void main(String[] args) {
        if (args == null || args.length != 2) {
            throw new RuntimeException("需要指定<输入路径>和<输出路径>");
        }

        Configuration configuration = new Configuration();

        try {
            Job job = Job.getInstance(configuration);
            job.setJarByClass(MainClass.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.setMapperClass(WeatherMapper.class);
            job.setMapOutputKeyClass(Weather.class);
            job.setMapOutputValueClass(Text.class);
            job.setSortComparatorClass(WeatherSortComparator.class);
            job.setGroupingComparatorClass(WeatherGroupingComparator.class);
            job.setPartitionerClass(WeatherPartitioner.class);
            job.setReducerClass(WeatherReducer.class);
            job.setNumReduceTasks(2);
            job.waitForCompletion(true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package com.bjsxt.hdfs.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Weather implements WritableComparable<Weather> {
    private int year;
    private int month;
    private int temperature;
    private String line;

    public Weather() {}

    public Weather(int year, int month, int temperature, String line) {
        this.year = year;
        this.month = month;
        this.temperature = temperature;
        this.line = line;
    }

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getTemperature() {
        return temperature;
    }

    public void setTemperature(int temperature) {
        this.temperature = temperature;
    }

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(year);
        dataOutput.writeInt(month);
        dataOutput.writeInt(temperature);
        dataOutput.writeUTF(line);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        setYear(dataInput.readInt());
        setMonth(dataInput.readInt());
        setTemperature(dataInput.readInt());
        setLine(dataInput.readUTF());
    }

    @Override
    public int compareTo(Weather other) {
        return line.compareTo(other.getLine());
    }
}
package com.bjsxt.hdfs.weather;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeatherMapper extends Mapper<LongWritable, Text, Weather, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] ss = value.toString().split("\t");
        String[] sss = ss[0].split("-");
        context.write(new Weather(Integer.parseInt(sss[0]), Integer.parseInt(sss[1]), Integer.parseInt(ss[1].substring(0, ss[1].length() - 1)), ss[0]), new Text(ss[1]));
    }
}
package com.bjsxt.hdfs.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class WeatherSortComparator extends WritableComparator {
    public WeatherSortComparator() {
        super(Weather.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) {
        Weather weather1 = (Weather) writableComparable1;
        Weather weather2 = (Weather) writableComparable2;
        int result = weather1.getYear() - weather2.getYear();

        if (result == 0) {
            result = weather1.getMonth() - weather2.getMonth();

            if (result == 0) {
                result = weather2.getTemperature() - weather1.getTemperature();
            }
        }

        return result;
    }
}
package com.bjsxt.hdfs.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class WeatherGroupingComparator extends WritableComparator {
    public WeatherGroupingComparator() {
        super(Weather.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) {
        Weather weather1 = (Weather) writableComparable1;
        Weather weather2 = (Weather) writableComparable2;
        int result = weather1.getYear() - weather2.getYear();

        if (result == 0) {
            result = weather1.getMonth() - weather2.getMonth();
        }

        return result;
    }
}
package com.bjsxt.hdfs.weather;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WeatherPartitioner extends Partitioner<Weather, Text> {
    @Override
    public int getPartition(Weather key, Text value, int numPartitions) {
        return key.getYear() % numPartitions;
    }
}
package com.bjsxt.hdfs.weather;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WeatherReducer extends Reducer<Weather, Text, Text, Text> {
    @Override
    protected void reduce(Weather key, Iterable<Text> valueIterable, Context context) throws IOException, InterruptedException {
        int counter = 1;

        for (Text value : valueIterable) {
            if (counter > 2) {
                break;
            }

            context.write(new Text(key.getLine()), value);
            counter++;
        }
    }
}

相关文章

网友评论

      本文标题:MapReduce案例

      本文链接:https://www.haomeiwen.com/subject/rhixjqtx.html