美文网首页
Hadoop开发--MapReduce编程--示例代码(十)

Hadoop开发--MapReduce编程--示例代码(十)

作者: 无剑_君 | 来源:发表于2019-12-05 08:09 被阅读0次

一、手机号分类

1)数据实体类

public class DataBean implements Writable {
    // 手机号
    private String telNo;
    // 上行流量
    private long upPayLoad;
    // 下行流量
    private long downPayLoad;
    // 总流量
    private long totalPayLoad;

    // 序列化
    // 注意:序列化和反序列化一定要注意类型和顺序,比如我们序列化的时候先序列化字符串telNo,
    //反序列化的时候就应该先反序列化telNo
    public void write(DataOutput out) throws IOException {
        out.writeUTF(telNo);
        out.writeLong(upPayLoad);
        out.writeLong(downPayLoad);
        out.writeLong(totalPayLoad);
    }

    // 反序列化
    public void readFields(DataInput in) throws IOException {
        this.telNo = in.readUTF();
        this.upPayLoad = in.readLong();
        this.downPayLoad = in.readLong();
        this.totalPayLoad = in.readLong();
    }

    public DataBean() {
    }

    public DataBean(String telNo, long upPayLoad, long downPayLoad) {
        super();
        this.telNo = telNo;
        this.upPayLoad = upPayLoad;
        this.downPayLoad = downPayLoad;
        this.totalPayLoad = upPayLoad + downPayLoad;
    }

    @Override
    public String toString() {
        return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
    }

    public String getTelNo() {
        return telNo;
    }

    public void setTelNo(String telNo) {
        this.telNo = telNo;
    }

    public long getUpPayLoad() {
        return upPayLoad;
    }

    public void setUpPayLoad(long upPayLoad) {
        this.upPayLoad = upPayLoad;
    }

    public long getDownPayLoad() {
        return downPayLoad;
    }

    public void setDownPayLoad(long downPayLoad) {
        this.downPayLoad = downPayLoad;
    }

    public long getTotalPayLoad() {
        return totalPayLoad;
    }

    public void setTotalPayLoad(long totalPayLoad) {
        this.totalPayLoad = totalPayLoad;
    }
}

2)分区类

public class ProviderPartitioner extends Partitioner<Text, DataBean> {
    /**
     * numPartitions---分区的值是由Reducer的数量决定的,
     * 起几个Reducer就创建几个分区
     */
    public static Map<String, Integer> providerMap = new HashMap<String, Integer>();
    static{
        providerMap.put("135", 1);
        providerMap.put("136", 1);
        providerMap.put("137", 1);
        providerMap.put("138", 1);
        providerMap.put("139", 1);
        providerMap.put("150", 2);
        providerMap.put("159", 2);
        providerMap.put("182", 3);
        providerMap.put("183", 3);
    }

    @Override
    public int getPartition(Text key, DataBean value, int numPartitions) {
        // key是电话号码
        String telNo = key.toString();
        // 我们截取前3位,比如135、136、150、182等等,通过前三位就可以知道是移动、联通还是电信或是其它
        String sub_tel = telNo.substring(0, 3);
        Integer num = providerMap.get(sub_tel);
        if (num == null) {
            return 0;
        }
        return num;
    }
}

3)Map类

public class MapperClass extends Mapper<LongWritable, Text, Text, DataBean> {
    Text text = null;
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");
        // 我们要使用的数据的第二列(列索引号为1)就是手机号,第9列(列索引号8)是上行流量,
        //第10列(列索引号9)是下行流量
        String telNo = fields[1];
        long up = Long.parseLong(fields[8]);
        long down = Long.parseLong(fields[9]);
        DataBean bean = new DataBean(telNo, up, down);
        text = new Text(telNo);
        context.write(text, bean);
    }

4)Reduce类

public class ReduceClass extends Reducer<Text, DataBean, Text, DataBean> {
    @Override
    protected void reduce(Text key, Iterable<DataBean> v2s, Context context) throws IOException, InterruptedException {
        long up_sum = 0;
        long down_sum = 0;
        for (DataBean bean : v2s) {
            // 累计上行流量
            up_sum += bean.getUpPayLoad();
            // 累计下行流量
            down_sum += bean.getDownPayLoad();
        }
        //创建Bean对象
        DataBean bean = new DataBean(key.toString(), up_sum, down_sum);
        context.write(key, bean);
    }
}

5)测试类

public class DataCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 通过传入的class 找到job的jar包
        job.setJarByClass(DataCount.class);

        job.setMapperClass(MapperClass.class);
        // 键值类
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DataBean.class);
        // 输入路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 告诉Job我们自定义了分区功能
        job.setPartitionerClass(ProviderPartitioner.class);

        job.setReducerClass(ReduceClass.class);
        // 键值类
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DataBean.class);
        // 输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //设置Reducer的数量,默认情况下只启动一个Reducer,一个Reducer对应一个文件,
        //我们现在想要得到4个文件,自然而然我们得启动多个Reducer,为了程序的灵活性我
        //们通过参数的形式给它赋值。
        job.setNumReduceTasks(Integer.parseInt(args[2]));
        //
        job.waitForCompletion(true);
    }
}

二、年最高气温

(一)年最高气温

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TemperatureTest {
    public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            // 注意:空格的数量
            String[] data = line.split("\\s+");
            int airTemperature=Integer.valueOf(data[4]);
            context.write(new Text(data[0]), new IntWritable(airTemperature));
        }
    }

    public static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            int maxValue = Integer.MIN_VALUE;
            for (IntWritable value : values) {
                maxValue = Math.max(maxValue, value.get());
            }
            context.write(key, new IntWritable(maxValue));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.set("mapred.job.tracker", "192.168.71.129:9000");
        Job job = Job.getInstance(conf);

        // 重要:指定本job所在的jar包
        job.setJarByClass(TemperatureTest.class);

        // 设置wordCountJob所用的mapper逻辑类为哪个类
        job.setMapperClass(MaxTemperatureMapper.class);
        // 设置map阶段输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 设置wordCountJob所用的reducer逻辑类为哪个类
        job.setReducerClass(MaxTemperatureReducer.class);
        // 设置最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置要处理的文本数据所存放的路径
        FileInputFormat.setInputPaths(job, "hdfs://192.168.71.129:9000/013470-99999-2016");
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.71.129:9000/Temperature_out"));

        // 提交job给hadoop集群
        job.waitForCompletion(true);
    }
}

(二)年最高气温分组排序
1)键值对类

public class KeyPair implements WritableComparable<KeyPair> {  
    private int year;  //年
    private int hot;   //气温
    public int getYear() {  
            return year;  
    }  
    public void setYear(int year) {  
            this.year = year;  
    }  
    public int getHot() {  
            return hot;  
    }  
    public void setHot(int hot) {  
            this.hot = hot;  
    }  
    @Override  
    public void readFields(DataInput in) throws IOException {  
            this.year=in.readInt();  
            this.hot=in.readInt();  
             
    }  
    @Override  
    public void write(DataOutput out) throws IOException {  
            out.writeInt(year);  
            out.writeInt(hot);  
             
    }  
    @Override  
    public int compareTo(KeyPair o) {  
//            System.out.println(Integer.compare(1, 1));//0  
//            System.out.println(Integer.compare(1, 2));//-1  
//先对比年,如果相等,结果为0,返回  
             
            int result=Integer.compare(year, o.getYear());  
            if (result!=0){  
                    return result;  
            }  
            return Integer.compare(hot, o.hot);  
    }  

    @Override  
    public String toString() {  
            return year+"\t"+hot;  
    }  
     
    @Override  
    public int hashCode() {  
            return new Integer(year+hot).hashCode();  
    }  
}

2)排序类

public class HotSort extends WritableComparator{  
    public HotSort() {  
            super(KeyPair.class, true);  
    }  
    @Override  
    public int compare(WritableComparable a, WritableComparable b) {  
            KeyPair o1=(KeyPair) a;  
            KeyPair o2=(KeyPair) b;  
            int res=Integer.compare(o1.getYear(), o2.getYear());  
            if (res!=0){  
                    return res;  
            }  
            return -Integer.compare(o1.getHot(),o2.getHot());//降序排序  
    }  
}  

3)分区类

public class HotPartition extends Partitioner<KeyPair, Text>{  
    @Override  
    public int getPartition(KeyPair key, Text value, int num) {  
            return (key.getYear()*127%num);  
    }    
}

4)分组类

public class HotGroup extends WritableComparator{    
    public HotGroup() {  
            super(KeyPair.class, true);  
    }  

    @Override  
    public int compare(WritableComparable a, WritableComparable b) {  
            KeyPair o1=(KeyPair) a;  
            KeyPair o2=(KeyPair) b;       
            return Integer.compare(o1.getYear(),o2.getYear());  
    }   
}

5)Mapper类

public class HotMapper extends Mapper<LongWritable, Text, KeyPair, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] ss = line.split("\t");
        if (ss.length == 2) {
            int year = Integer.parseInt(ss[0].substring(0, 4));
            int hot = Integer.parseInt(ss[1].substring(0, ss[1].indexOf("°C")));
            KeyPair kp = new KeyPair();
            kp.setYear(year);
            kp.setHot(hot);
            context.write(kp, value);
        }
    }
}

6)HotReduce类

public class HotReduce extends Reducer<KeyPair, Text, KeyPair, Text>{    
    @Override  
    protected void reduce(KeyPair kp, Iterable<Text> i,Context context)  
                    throws IOException, InterruptedException {  
            for (Text text : i) {  
                    context.write(kp, text);  
            }  
    }     
}  

7)测试类

public class RunTempJob {
     public static void main(String args[]) throws IOException, InterruptedException{
            //获取配置
            Configuration conf=new Configuration();

            //修改命令行的配置
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println("使用: temp <in> <out>");
                System.exit(2);
            }
            //创建Job
            Job job=Job.getInstance(conf,"temp");
            //1.设置job运行的类
            job.setJarByClass(RunTempJob.class);
            //2.设置map和reduce的类
            job.setMapperClass(HotMapper.class);
            job.setReducerClass(HotReduce.class);
            //3.设置map的输出的key和value 的类型
            job.setMapOutputKeyClass(KeyPair.class);
            job.setMapOutputValueClass(Text.class);
            
            //4.设置输入文件的目录和输出文件的目录
            FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
            
            //5.设置Reduce task的数量 每个年份对应一个reduce task
            job.setNumReduceTasks(3);//3个年份
            
            //5.设置partition sort Group的class
            job.setPartitionerClass(HotPartition.class);
            job.setSortComparatorClass(HotSort.class);
            job.setGroupingComparatorClass(HotGroup.class);
            
            //6.提交job 等待运行结束并在客户端显示运行信息
            boolean isSuccess= false;
            
            try {
                isSuccess = job.waitForCompletion(true);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            //7.结束程序
            System.exit(isSuccess ?0:1);
        }
}

三、计算奇数行与偶数行数据之和

1.代码一
1)Partitioner类

public class MyPartitioner extends Partitioner<LongWritable, IntWritable> {  
    @Override  
    public int getPartition(LongWritable key, IntWritable value, int arg2) {  
        /** 
         * 根据行号进行分区,把行号为的偶数的分区到0号reduce 
         * 把行号为奇数的分区到1号reduce,并把key的值设置为0或1 
         * 目的是为了在进入reduce时奇数和偶数能被分别放到同一个 
         * 迭代器中以便求和操作 
         */  
        if( key.get() % 2 == 0) {  
            key.set(0);  
            return 0;  
        } else {  
            key.set(1);  
            return 1;  
        }  
    }  
}  

2)Map类

public class MyMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {  
      private long lineNum = 0;  
    private LongWritable okey = new LongWritable();  
    @Override  
    protected void map(LongWritable key, Text value, Context context)  
            throws IOException, InterruptedException {  
        lineNum ++;  
        okey.set(lineNum);  
        /** 
         * 输出行号作为key,并把行的值作为value,这里只是简单的说明的patitioner的定制 
         * 不考虑多mapper情况下行号控制,这里只关注partitioner的使用就行 
         */  
        context.write(okey, new IntWritable(Integer.parseInt(value.toString())));  
    }  
}  

3)Reduce类

public class MyReducer extends Reducer<LongWritable, IntWritable, Text, IntWritable> {  
    @Override  
    protected void reduce(LongWritable key, Iterable<IntWritable> value, Context context)  throws IOException, InterruptedException {  
        int sum = 0;  
        for( IntWritable val : value) {  
            sum += val.get();  
        }  
        if( key.get() == 0 ) {  
            context.write(new Text("偶数行之和为:"), new IntWritable(sum));  
        } else if ( key.get() == 1) {  
            context.write(new Text("奇数行之和为:"), new IntWritable(sum));  
        }  
    }  
}

4)测试类

public class JobMain {
    static Path inPath=new Path("hdfs://192.168.146.136:9000/input");
    static Path outPath=new Path("hdfs://192.168.146.136:9000/output");
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "partitioner-job");
        job.setJarByClass(JobMain.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置自定义的Partitioner对map输出进行分区
        job.setPartitionerClass(MyPartitioner.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 设置job的reducer的个数为2
        job.setNumReduceTasks(2);
        FileInputFormat.addInputPath(job, inPath);

        FileSystem fs = FileSystem.get(configuration);
        //如果路径存在则删除
//      if (fs.exists(outPath)) {
//          fs.delete(outPath, true);
//      }
        FileOutputFormat.setOutputPath(job, outPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

  1. 代码二
    1)Recordreader类
public class MyRecordReader extends RecordReader<LongWritable, Text> {
    private long start; //开始
    private long end;   //结束
    private long pos;//表示行号
    private FSDataInputStream fin = null ;
    private LongWritable key = null ;
    private Text value = null ;
    private LineReader reader = null ;
    @Override
    public void close() throws IOException {
        fin.close() ;
    }

    @Override
    public LongWritable getCurrentKey() throws IOException,
            InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
            throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit)inputSplit;
        //获取文件开始
        start = fileSplit.getStart();
        //文件结束
        end = start + fileSplit.getLength() ;
        //获取配置
        Configuration conf = context.getConfiguration() ;
        //得到文件路径
        Path path = fileSplit.getPath() ;
        FileSystem fs = path.getFileSystem(conf) ;
        fin = fs.open(path) ;
        //定位到起始位置
        fin.seek(start);
        //读取一行数据
        reader = new LineReader(fin);
        pos = 1 ;
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(key == null){
            key = new LongWritable() ;
        }
        //设定键的位置
        key.set(pos) ;

        if(value == null){
            value = new Text() ;
        }

        if(reader.readLine(value) ==0){
            return false ;
        }
        //位置偏移
        pos++ ;
        return true ;
    }
}

2)MyFileInputFormat类

public class MyFileInputFormat extends FileInputFormat<LongWritable, Text> {
    /**
     * 重写FileInputFormat创建记录读取方法
     * 读取数据
     */
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
            TaskAttemptContext arg1) throws IOException, InterruptedException {
        return new MyRecordReader();
    }
    /**
     * 自定义分隔
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false ;
    }
}

3)分区类

public class MyPartitioner extends Partitioner<LongWritable, Text> {
    @Override
    public int getPartition(LongWritable key, Text value, int numPartitions) {
        if(key.get() % 2 == 0){
            key.set(1) ;
            return 1 ;
        }else{
            key.set(0) ;
            return 0 ;
        }
    }
}

4)Mapper类

public class MapperClass extends Mapper<LongWritable, Text, LongWritable, Text> {
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        context.write(key, value) ;
    }
}

5)Reducer类

public class ReducerClass extends Reducer<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<Text> value,Context context)
            throws IOException, InterruptedException {
        int sum = 0 ;
        for(Text val : value){
            sum += Integer.valueOf(val.toString()) ;
        }
        Text writeKey = new Text() ;
        IntWritable writeValue = new IntWritable() ;
        if(key.get() == 0){
            writeKey.set("奇数行之和:") ;
        }else{
            writeKey.set("偶数行之和:") ;
        }
        writeValue.set(sum) ;
        context.write(writeKey, writeValue) ;
    }
}

四、常见错误:

如果出现数据格式不正确的话,请使用ANSI格式保存数据,而不用使用UTF-8。

相关文章

网友评论

      本文标题:Hadoop开发--MapReduce编程--示例代码(十)

      本文链接:https://www.haomeiwen.com/subject/whfjwctx.html