美文网首页
MapReduce的类型与格式

MapReduce的类型与格式

作者: Vechace | 来源:发表于2018-06-23 10:15 被阅读2次

    MapReduce数据处理模型
    map和reduce函数的输入和输出时键值对。

    MapReduce的类型

    Hadoop的 MapReduce中,map函数和reduce函数遵循如下格式:

    • map:(k1,v1) --->list(k2,v2)
    • combiner:(k2,list(v2))---> lsit(k2,v2)
    • reduce:(k2,list(v2)) --->list(k3,v3)

    默认的MapReduce作业

    1、默认mapper是Mapper类,将输入的键和值原封不动地写到输出中:

    public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{
        protected void map(KEYIN key,VALUEIN value,
            Context context) throws IOException,InterruptedException{
                context.write((KEYOUT)key,(VALUE)value);
        }
    }
    

    Mapper是一个泛型类型(generic type),可以接受任何键或值的类型。map任务的数量等于输入文件被划分成的分块数,取决于输入文件的大小及文件块的大小。

    2、默认的partitioner(分区索引)是HashPartitioner,它对每条记录的键进行哈希操作,以绝对该记录应该属于哪个分区:

    public class HashPartitioner<K,V>extends Partitioner<K,V>{
        public int getPartitioner(k key,V value,int numPartitions){
            return (key.hashCode()& Integer.MAX_VALUE) % numPartitions;
        }
    }
    

    键的哈希码被转换成一个非负整数,它由哈希值与最大的整型值做一次按位与操作而获得,然后用分区数,进行取模操作,以决定该记录属于哪个分区索引。

    每个分区由一个reduce任务处理,所以分区数等于作业的reduce任务个数,默认情况下,只有一个reducer,即只有一个分区,该情况下想,所有数据都放在同一分区,partitioner操作变得无关紧要,如果有多个reduce任务,这是HashPartitioner的作用便体现出来。

    3、默认reducer是Reducer类型,也是一个泛型类型,把所有输入写到输出中:

    public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{
        protected void reduce(KEYIN key,Iterable<VALUEIN> values,
            Context context)throws IOException,InterruptedException{
                for(VALUEIN value:values){
                    context.write((KEYOUT)key,(VALUEOUT)value);
                }
        }
        
    }
    

    输入格式

    输入分片与记录

    一个输入分片(split)是一个由单个map操作来处理的输入块,每个map操作只处理一个输入分片。每个分片被划分为若干条记录,每条记录就是一个键值对,map一个接一个地处理记录。

    输入分片在包org.apache.hadoop.mapreduce下,用InputSplit接口表示

    public abstract class InputSplit{
        public abstract long getLength() throws IOException,InterruptedException;
        public abstract String[] getLocations()throws IOException,InterruptedException;
    }
    

    InputSplit包含一个以字节为单位的长度和一组存储位置(一组主机名),分片并不包含数据本身,而是指向数据的引用,存储位置用于供MapReduce系统使用以便将map任务尽量放在分片数据附近,而分片大小用来排序分片,以便优先处理最大的分片,从而最小优化作业运行时间(贪心算法)。

    MapReduce开发不用直接处理InputSplit,它由InputFormat创建的(在MapReduce驱动程序中,InputFormat负责创建输入分片并将它们分割成记录),InputFormat类定义如下:

    public abstract class InputFormat<K,V>{
        public abstract List<InputSplit> getSplits(JobContext context)
            throws IOException,InterruptedException;
        
        public abstract RecordReader<K,V> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException,InterruptedException;
    }
    
    • 运行作业的客户端(驱动程序)通过getSplits()计算分片,然后将它们发送到application master
    • application master使用其存储位置信息(InputSplit中的字段)来调度map任务,从而在集群上处理这些分片数据。
    • map任务把输入分片传给InputFormat的createRecordReader()方法来获取这个分片的RecordReader。RecordReader就像记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,然后再传递给map函数。

    查看Mapper的run方法,如下:

    public void run(Context context)throws IOException,InterruptedException{
        setup(context);
        while(context.nextKeyalue()){
            map(context.getCurrentKey(),context.getCurrentValue(),context);
        }
        cleanup(context);
    }
    
    • 首先运行setup(),重复调用Context的nextKeyValue()为mapper产生键值对象。
    • 通过Context,键值从RecordReader中被检索出并传递给map()方法
    • 当reader读到stream的结尾时,nextKeyValue()方法返回false,map任务运行其cleanup()方法,然后结束。

    Mapper的run方法是公共的 ,可以由用户定制。MultithreadedMapRunner是另一个MapRunnable接口的实现,可配置指定个数的线程来并发运行多个mapper。

    FileInputFormat类

    FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类,它提供两个功能:

    • 用于指出作业的输入文件位置
    • 为输入文件生成分片的代码实现。
    FileInputFormat类图.png

    FileInputFormat类的输入路径

    作业的输入被设定成一组路径,FileInputFormat提供4种静态方法来设定Job

    public static void addInputPath(Job job,Path path);
    public static void addInputPaths(Job job,String commaSeperatedPaths);
    public static void setInputPaths(Job job,Path inputPaths);
    public static void setInputPaths(Job job,String commaSeperatedPaths);
    

    FileInputFormat类的输入分片

    FileInputFormat负责把大文件(超过HDFS块的大小)分割成小文件(HDFS块大小),分片大小在默认情况下:

    max(minimumSize, min(maximumSize,blockSize))

    避免切分
    两种实现方法:

    • 增加最小分片大小,设置为最大文件大小,即long.MAX_VALUE
    • 使用FileInputFormat具体子类,重写isSplitable()方法,把返回值设置为false,如下
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    public class NonSplittableTextInputFormat extends TextInputFormat{
        @Override
        protected boolean isSplitable(JobContext context,Path file){
            return false;
        }
    }
    

    文本输入

    TextInputFormat是默认的InputFormat,每天记录时一行输入,键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行内容,不包括任何终止符,被打包成一个Text对象。如下:

    //如下文本
    On the top of the Crumpetty Tre 
    The Quangle Wangle sat,
    But his face you could not see,
    On accout of his Beaver Hat.
    
    //每条记录值,键值对
    (0,On the top of the Crumpetty Tre )
    (33,The Quangle Wangle sat,)
    (57,But his face you could not see,)
    (89,On accout of his Beaver Hat.)
    

    其他类:KeyValueTextInputFormat、NLineInputFormat

    二进制输入

    SequenceFileInputFormat类、SequenceFileAsTextInputFormat类、SequenceFileAsBinaryInputFormat类、FileLengthInputFormat类。

    数据库输入/输出

    在关系数据库与HDFS之间移动数据的方法是:使用Sqoop

    其他方式:Hbase的TableInputFormat

    输出格式

    类图:


    OutputFormat类图.png

    参考资料:《Hadoop权威指南》

    相关文章

      网友评论

          本文标题:MapReduce的类型与格式

          本文链接:https://www.haomeiwen.com/subject/qvdryftx.html