美文网首页
MapReduce入门案例

MapReduce入门案例

作者: 如虎添 | 来源:发表于2019-12-15 19:28 被阅读0次

    来源:千峰科技王从明老师

    第三节:MapReduce的核心思想

    3.1 MapReduce是什么

    3.1.1 论文

    Google的三篇论文:
    <GFS>,阐述了分布式思想用于存储数据集大的文件, 
            成就了Hadoop的核心模块HDFS
    <MapReduce>, 阐述了在分布式文件系统上分析计算处理大数据集的框架思想(map,reduce),
            成就了Hadoop的核心模块Mapreduce
    <bigtable>,阐述了一种新型的存储数据的模式,区别于传统型数据库的表结构的一种新型的表结构思想
            成就了Hbase这个框架。
    

    3.1.2 概念

    mapreduce是运行在分布式文件系统上的一个并行分析计算框架。有高可靠的,高容错的优势。

    3.2 MapReduce核心思想

    3.2.1 说明

    MapReduce简称MR, 采用了函数式编程语言(Lisp)的思想里的map和reduce。在函数式编程语言中map(映射)是用来处理列表中杂乱无章的数据的,列表中的数据都是元素,map是分别对每一个元素进行处理,所有元素的类型不需要相同。reduce(规约)思想,是对列表中的数据进行迭代运算的,那么就需要列表中的元素的类型相同。

    3.2.2 思想概述

    MR正是借鉴了上述思想,来设计自己的计算框架的。整个计算框架分为两个阶段,一个是Map阶段,一个是Reduce阶段。Map阶段会将文件中的数据按照一行一行的进行解析为KeyValue键值对,每一个键值对都会进入一个map函数单独处理,处理的结果依然是keyvalue键值对的形式,做为输出。Reduce阶段会接受各个节点的Map阶段输出的键值对,然后进行归并处理,调用reduce函数进行分析(换句话说,reduce接收的多个K2,V2,经过shuffle阶段,k2,v2已经形成一个新列表<k2,List<v2>>),经过reduce函数迭代处理后,新数据再次以键值对的形式输出,输出到HDFS上进行持久化保存。
    

    3.2.3 数据扭转

    [图片上传失败...(image-41102d-1576409110811)]

    Map阶段对应是MapTask,   MapTask的个数基本上与文件的块数有关系。比如256M的文件,会有两个MapTask任务
    260M的文件在HDFS上有三个块,但是Mapreduce会分配两个MapTask。一个task处理第一个128M,第二个Task处理第二个128M和第三个的4M.          有一个1.1的比较值,即最后一个块不足块大小的0.1倍,那就会将此块分给上一个块对应的MapTask.
    
    Reduce阶段对应是ReduceTask, 个数由开发人员决定。涉及到一个分区技术,将不同的key按照一定的逻辑进行分区。比如:a-g开头的分为一个区,h-n分为一个区,o-t分为一个区,u-z分为一个区。分区个数由分区逻辑来决定。reduceTask的个数不能小于分区数,原因是一个分区要有一个reductTask来处理。reduce个数可以大于分区数,结果会产生空文件。
    

    第四节:MapReduce的入门案例

    4.1 案例需求

    使用MR进行词频统计,即wordcount

    4.2 案例分析

    f1.txt内容如下
    hello world hello java
    hello php
    php is good
    bigdata is good
    
    f2.txt
    hello python hello c++
    hello c#
    girl is good
    boy is good
    

    [图片上传失败...(image-6fc7a5-1576409110811)]4.3 编写mr的模板

    1)编写Map阶段的Mapper类型

    package com.changsha.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 自定义的类型需要继承Mapper,同时需要指定k1,v1,k2,v2的泛型
     * k1:是记录的偏移量,LongWritable         带有Writable的类型都是实现了Hadoop的序列化机制Writable接口
     * v1:行记录,          Text,Hadoop中提供的类型
     * k2:单词做为k2,  Text
     * v2:整数1,   可以使用IntWritable,LongWritable......
     * 
     * 
     * 重写Mapper里提供的map函数,map函数的三个参数就是k1,v1和上下文context,提供了获取k1v1的逻辑,还提供了写出k2,v2的方法
     * @author Michael
     *
     */
    public class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        Text k2 = new Text();
        //设计一个1的LongWritable类型
        LongWritable v2 = new LongWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //偏移量,不需要进行分析,value是一行记录,我们需要进行切分成数组,每一个元素是一个单词
            String[] arr = value.toString().split(" ");  //  {hello,java,word,hello,hello}
            //将每一个元素设计成k2,v2   v2是1
            for(String str:arr) {
                //将每一个元素封装成Text类型的k2
                k2.set(str);            
                context.write(k2, v2);
            }
        }
        
    }
    
    

    2)编写Reduce阶段的Reducer类型

    package com.changsha.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        //每一个<k2,list<v2>>调用一次reduce
        //<hello,<1,1,1,1,1,1>>
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
            
            long sum = 0;
            for(LongWritable v2:values) {
                //将v2转出long类型进行叠加
                sum+=v2.get();
            }
            //将sum转出LongWritable类型
            LongWritable v3 = new LongWritable(sum);
            //将k2作为k3, 和v3一起写出去
            context.write(key, v3);
        
        }   
    }
    
    

    3)编写运行job的驱动类

    package com.changsha.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    /**
     * 设计一个wordcount的job作业
     * @author Michael
     *
     */
    public class MyDriver {
        public static void main(String[] args) throws Exception {
            //获取分布式文件系统的配置信息 构造器里的逻辑会读取core-site.mxl等四个文件和core-default.xml等四个文件
            Configuration conf = new Configuration();
            //获取一个Job实例
            Job job =Job.getInstance(conf, "wordcount");
            
            //设置作业的驱动包
            job.setJarByClass(MyDriver.class);
            
            //设置作业的mapper类型和reducer类型
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            
            //设置k3,v3的泛型          如果k2,v2和k3,v3的泛型不一致,需要单独设置k2,v2的泛型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            
    //      job.setMapOutputKeyClass(theClass);
    //      job.setMapOutputValueClass(theClass);
            
            //设置job作业的要分析的数据源和结果存储路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //提交作业
            System.exit(job.waitForCompletion(true)?0:1);
            
        }
    }
    
    

    第一节:分区器的使用

    1.1 分区器的简介

    partitioner是mr中的一个插件,可以对mr中的key进行运算分析,来达到把所有的数据按照我们所需要的逻辑进行分不同的区域,每.一个区域中的数据由一个reduceTask来进行处理。mr中默认的分区器是HashPartitioner,原理是使用key的hash值对reduceTask个数进行取模(取余),

    public class HashPartitioner<K, V> extends Partitioner<K, V> {
    
      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(K key, V value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    
    }
    

    注意:方法getPartition(K key,V value,int numReduceTasks)的返回值是int类型,返回值的范围必须是从0开始的连续的自然数。0表示第一个分区,1表示第二个分区.......

    1.2 自定义分区器

    针对于wordcount案例,我们可以这样分区:[a-g],[h-n],[o-t],[u-z]这样分区。

    package com.changsha.mr;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    /**
     * 分区器的kv说的是k2,v2,因此泛型也需要进行设置
     * @author Michael
     */
    public class MyPartitoner extends Partitioner<Text,LongWritable>{
    
        @Override
        public int getPartition(Text key, LongWritable value, int numReduceTasks) {
            //:[a-g],[h-n],[o-t],[u-z]分区
            String str = key.toString();
            if(str.substring(0,1).matches("([a-g]|[A-G])")) {
                return 0;
            }else if(str.substring(0,1).matches("([h-n]|[H-N])")) {
                return 1;
            }else if(str.substring(0,1).matches("([o-t]|[O-T])")) {
                return 2;
            }else {
                return 3;  //只要不是前三个区的开头字符,都在这个区里
            }
        }
    }
    

    修改MyDriver类型

    package com.changsha.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 设计一个wordcount的job作业
     * @author Michael
     *
     */
    public class MyDriver {
        public static void main(String[] args) throws Exception {
            //获取分布式文件系统的配置信息 构造器里的逻辑会读取core-site.mxl等四个文件和core-default.xml等四个文件
            Configuration conf = new Configuration();
            //获取一个Job实例
            Job job =Job.getInstance(conf, "wordcount");
            
            //设置作业的驱动包
            job.setJarByClass(MyDriver.class);
            
            //设置作业的mapper类型和reducer类型
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            
            //设置k3,v3的泛型          如果k2,v2和k3,v3的泛型不一致,需要单独设置k2,v2的泛型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            
            //设置job的分区器
            job.setPartitionerClass(MyPartitoner.class);
            job.setNumReduceTasks(5);
            
            
    //      job.setMapOutputKeyClass(theClass);
    //      job.setMapOutputValueClass(theClass);
            
            //设置job作业的要分析的数据源和结果存储路径
            FileInputFormat.setInputPaths(job, new Path("D:\\academia\\The teaching material\\The required data\\wordcount"));
            FileOutputFormat.setOutputPath(job, new Path("D:/wc1"));
            
            //提交作业
            System.exit(job.waitForCompletion(true)?0:1);       
        }
    }
    

    第二节:TopN案例

    2.1 项目需求:

    rate.json文件里存储的是电影评分信息

    {"movie":"1193","rate":"5","datetime":"978300760","uid":"1"}
    

    求:每一个用户对电影评分的前10名

    2.2 项目分析:

    第一种解决方案分析:

    由于k1是行偏移量,v1是行记录,可以通过java语言进行v1字符串分析截取,将uid截取出来,然后将uid当成k2,

    v1当v2.这样写出去。在reduce函数中,uid为1的所有电影信息都在一个reduce函数中,然后再将评分和电影id截取出来,放到list中,使用list的排序进行降序,然后使用for循环取出前10

    第二种解决方案:使用Hadoop的序列化机制,自定义类型

    定义一个类型RateBean,封装每一行的四个属性信息。这个类型需要实现Writable接口,如果想利用集合进行排序,还需要实现Comparable接口

    最终输出的结果个数如下: uid movie rate

    2.3 代码演示

    1)自定义类型RateBean

    package com.changsha.rate;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class RateBean implements WritableComparable<RateBean> {
        // 设计四个属性
        private int movie;
        private int rate;
        private String datetime;
        private int uid;
    
        // 必须提供无参构造器
        public RateBean() {
        }
    
        public RateBean(int movie, int rate, String datetime, int uid) {
            super();
            this.movie = movie;
            this.rate = rate;
            this.datetime = datetime;
            this.uid = uid;
        }
    
        public int getMovie() {
            return movie;
        }
    
        public void setMovie(int movie) {
            this.movie = movie;
        }
    
        public int getRate() {
            return rate;
        }
    
        public void setRate(int rate) {
            this.rate = rate;
        }
    
        public String getDatetime() {
            return datetime;
        }
    
        public void setDatetime(String datetime) {
            this.datetime = datetime;
        }
    
        public int getUid() {
            return uid;
        }
    
        public void setUid(int uid) {
            this.uid = uid;
        }
    
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((datetime == null) ? 0 : datetime.hashCode());
            result = prime * result + movie;
            result = prime * result + rate;
            result = prime * result + uid;
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            RateBean other = (RateBean) obj;
            if (datetime == null) {
                if (other.datetime != null)
                    return false;
            } else if (!datetime.equals(other.datetime))
                return false;
            if (movie != other.movie)
                return false;
            if (rate != other.rate)
                return false;
            if (uid != other.uid)
                return false;
            return true;
        }
    
        public String toString() {
            return uid + "\t" + rate+"\t" + movie;
        }
    
        //序列化方法,将数据序列化成字节序列
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(movie);
            out.writeInt(uid);
            out.writeInt(rate);
            out.writeUTF(datetime);
    
        }
    
        // 反序列化方法,用于将字节序列反序列化成数据,一定要和序列化的顺序一致
        @Override
        public void readFields(DataInput in) throws IOException {
            movie=in.readInt();
            uid = in.readInt();
            rate = in.readInt();
            datetime = in.readUTF();
        }
    
        //使用二次排序的比较规则,先比较uid 升序,再比较rate,降序
        @Override
        public int compareTo(RateBean other) {
            int num = this.uid-other.uid;
            if(num==0) {
                num = other.rate-this.rate;
            }
            return num;
        }
    }
    

    2)RateMapper

    package com.changsha.rate;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    /**
     * 
     *   k2:uid     这样的同一个uid的所有电影都会进入一个reduce函数中
     *   v2: RateBean   是uid的一个电影信息
     *
     */
    public class RateMapper extends Mapper<LongWritable,Text,IntWritable,RateBean>{
        ObjectMapper  objectMapper = new ObjectMapper();
        IntWritable k2 = new IntWritable();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //分析value,将value解析成RateBean对象
            String line = value.toString();
            /*
             * readValue(String str,Class class)
             * str:是一个json字符串
             * 方法的逻辑是,将json字符串解析成有相应的key的类对象
             * {"movie":"2804","rate":"5","datetime":"978300719","uid":"1","studentname":"zhangsan"}
             */
            RateBean rateBean = objectMapper.readValue(line, RateBean.class);
            //将rateBean的uid当成k2,rateBean对象当成v2
            k2.set(rateBean.getUid());
            context.write(k2, rateBean);
        }
    }
    
    1. RateReducer
    package com.changsha.rate;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    /**
     * 
     * k2:  uid
     * List<v2>  {RateBean,RateBean.........}
     *
     */
    public class RateReducer extends Reducer<IntWritable,RateBean,RateBean,NullWritable>{
    
        @Override
        protected void reduce(IntWritable key, Iterable<RateBean> values,Context context)
                throws IOException, InterruptedException {
            //将当前uid的所有rateBean取出放入集合中,使用集合工具类进行排序
            List<RateBean>  beans= new ArrayList<RateBean>();
            for(RateBean bean:values) {
                int uid = bean.getUid();
                int movie = bean.getMovie();
                String datetime = bean.getDatetime();
                int rate = bean.getRate();
                RateBean b1 = new RateBean(movie, rate, datetime, uid);
                beans.add(b1);
            }
            //使用集合工具类进行排序
            Collections.sort(beans);
            //取出前十名
            for(int i=0;i<10;i++) {
                RateBean bean = beans.get(i);
                context.write(bean, NullWritable.get());
            }
        }
    }
    

    4)RateDriver

    package com.changsha.rate;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    /**
     * 设计一个wordcount的job作业
     * @author Michael
     *
     */
    public class RateDriver {
        public static void main(String[] args) throws Exception {
            //获取分布式文件系统的配置信息 构造器里的逻辑会读取core-site.mxl等四个文件和core-default.xml等四个文件
            Configuration conf = new Configuration();
            //获取一个Job实例
            Job job =Job.getInstance(conf, "wordcount");
            
            //设置作业的驱动包
            job.setJarByClass(RateDriver.class);
            
            //设置作业的mapper类型和reducer类型
            job.setMapperClass(RateMapper.class);
            job.setReducerClass(RateReducer.class);
            
            //设置k3,v3的泛型          如果k2,v2和k3,v3的泛型不一致,需要单独设置k2,v2的泛型
            job.setOutputKeyClass(RateBean.class);
            job.setOutputValueClass(NullWritable.class);
            
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(RateBean.class);
            
            
            
            //设置job作业的要分析的数据源和结果存储路径
            FileInputFormat.setInputPaths(job, new Path("D:\\academia\\The teaching material\\The required data\\rating.json"));
            FileOutputFormat.setOutputPath(job, new Path("D:/rate1"));
            
            //提交作业
            System.exit(job.waitForCompletion(true)?0:1);
            
        }
    }
    

    相关文章

      网友评论

          本文标题:MapReduce入门案例

          本文链接:https://www.haomeiwen.com/subject/yhjxnctx.html