美文网首页我爱编程
第三章 初识Hadoop框架

第三章 初识Hadoop框架

作者: Sqlver | 来源:发表于2016-08-09 11:30 被阅读0次

    安装类型

    • 单机模式
    • 伪分布式集群模式
    • 多节点集群安装模式

    一个MapReduce程序的组成

    • Java程序客户机:一个Java程序,由集群中的一个客户端节点提交运行,这个客户端节点可以访问Hadoop集群,它经常由集群中的一个数据节点来充当,该节点仅是集群中的一台机器,并且有权限访问Hadoop
    • 自定义Mapper类:除非在最简单的应用场景下,MapReduce程序中的这个Mapper类通常是一个用户自定义类,如果不是在伪集群模式下运行MapReduce程序,这个类的实例会在远程任务节点上执行,这些任务节点往往与用来提交作业程序的客户端节点不同。
    • 自定义Reducer类:除非在最简单的应用场景下,MapReduce程序中的这个Reducer类通常是一个用户自定义类,与Mapper类一样,如果不是在伪集群模式下运行MapReduce程序作业,这个类的实例会在远程任务节点上执行,这些任务节点往往与用来提交作业程序的客户端节点不同。
    • 客户端函数库:客户端函数库不同于Hadoop系统的标准函数库,这个函数库是在客户端运行期间使用的。客户端需要使用的Hadoop系统标准库已经安装,并且使用通过Hadoop的Clienet命令(这与客户端程序不同)配置到CLASSPATH中。我们在文件夹$HADOOP_HOME/bin中可以找到它,其名称为hadoop。就像Java命令用来执行一个Java程序,hadoop命令用来执行客户端程序,该程序会启动一个Hadoop作业,这些函数库都被配置到了环境变量HADOOP_CLASSPATH中,这个变量是一个冒号分隔的函数列表。
    • 远程函数库:这个函数库是用户自定义Mapper类和Reducer类所需要的,这个远程函数库不包括Hadoop系统自带的函数库,因为Hadoop系统自带的函数库已经在每个数据节点都配置好了。举个例子,如果Mapper类需要用到一个特殊的XML解析器,包含这个解析器的函数库就必须要传输到执行这个Mapper类实例的远程数据节点。
    • Java程序档案文件:Java程序以JAR文件德尔形式打包,这个JAR文件中包含了客户端Java类,以及用户自定义Mapper和Reducer类,还包括客户端Java类,Mapper类和Reducer类用到的其他自定义依赖类。

    旧API编写wordcount

    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.TextOutputFormat;
    
    public class wordcount{
        public static class MyMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
            public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable>output,Reporter reporter) throws IOException{
                output.collect(new Text(value.toString()),new IntWritable(1));
            }
        }
        public static class MyReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{
            public void reduce(Text key,Iterator<IntWritable>values,OutputCollector<Text,IntWritable>output,Reporter reporter) throws IOException{
                int sum=0;
                while(values.hasNext())
                    sum+=values.next().get();
            output.collect(key,new IntWritable(sum));
            }
        }
        public static void main(String[] args) throws Exception{
            JobConf conf=new JobConf(wordcount.class);
            conf.setJobName("wordcount");
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);
            conf.setMapperClass(MyMapper.class);
            conf.setCombinerClass(MyReducer.class);
            conf.setReducerClass(MyReducer.class);
            conf.setNumReduceTasks(1);
            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(TextOutputFormat.class);
            FileInputFormat.setInputPaths(conf,new Path(args[0]));
            FileOutputFormat.setOutputPath(conf,new Path(args[1]));
            JobClient.runJob(conf);
        }
    }
    
    • JobConf时Hadoop框架中配置MapReduce任务的主要接口,框架会按照JobConf对象中的配置信息来执行作业

    • TextInputFormat向Hadoop框架声明其输入文件为文本格式,它是InputFormat的子类,现在我们知道TextInputFormat类会读取输入文件的每行作为一个记录

    • TextOutputFormat指定了该MapReduce作业的输出情况,举个例子,他会检测其输出目录是否存在,如果输出目录已经存在,Hadoop系统会拒绝该作业的执行。

    • FileInputFormat.setInputPaths(conf,new Path(args[0])向Hadoop框架向Hadoop框架声明了尧都区的文件的所在的目录,该输入目录可以包含一个或者多个文件,并且每个文件中每行含有一个单词,注意的是setInputPaths中使用的是复数,这个方法可以给Hadoop程序配置一个文件目录数组作为其输入数据路劲,这些输入目录的所有文件构成了该作业的输入数据源

    • FileOutputFormat.setOutputPath(conf,new Path(args[1])指定了作业的输出目录,作业的最终结果会输出保存在该目录下

    • conf.setOutputKeyClass和conf.setOutputValueClass指定了输出的键类和值类,他们与Reducer类相匹配,这里似乎有些多余,确实是这样的,键和值类必须要同指定的Reducer类一直,否则在作业执行的时候讲跑出RuntimeException异常

    • Reducer实例的执行数量默认为1,该值是可以改变的,并且常常可以用来提高程序的运行效率,调用JobConf.class的实例中setNumReduceTask(int n)方法就可以配置该参数。

    • 当程序开始时候,Mapper函数会从输入文件夹的输入文件夹中读取数据块,一个文件的字节流会被转换成一个记录(键/值对格式),然后作为Mapper的输入,键是当前字节偏移量,值是文件中读取的一行文件数据。

    • Mapper发送的每行单词和整数1,要注意的是,Hadoop作业程序需要使用与Integer.class对应的系统自带的IntWritable.class类,其原型是Hadoop 系统的底层I/O传输设计。
      紧接着是Hadoop的Shuffle阶段,这个过程在上面的列表中并不明显,所有的键在Shuffer/Sort阶段被排序,然后发送给Reducer。逻辑上Reducer接收到的是一个IntWritable.class的实例的迭代器,实际上Reducer使用的是相同的IntWritable.class类实例。当Reducer迭代去除键对应的一系列值,相同IntWritable.class类实例被复用,从内部运行看,Hadoop系统框架在其迭代中使用values.next()之后,调用了IntWirtable.set方法。

    • 最后当Reducer输出结果的时候,写入到指定的输出目录中,输出文件中的键和值的实例是TAB为默认分隔符,这个分隔符可以通过配置参数来修改
      conf.set("mapreduce.textoutputformat.separator",",")

    新APi编写wordcount

    import java.io.IOException;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    public class wordcountnew
    {
        public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
            public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
                String w=value.toString();
                context.write(new Text(w),new IntWritable(1));
            }
        }
        public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
            public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException,InterruptedException{
                int sum=0;
                for(IntWritable val:values){
                    sum+=val.get();
                }
                context.write(key,new IntWritable(sum));
            }
        }
        public static void main(String[] args) throws Exception{
            Job job=Job.getInstance(new Configuration());
            job.setJarByClass(wordcountnew.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            boolean status=job.waitForCompletion(true);
            if(status){
                System.exit(0);
            }else{
                System.exit(1);
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:第三章 初识Hadoop框架

        本文链接:https://www.haomeiwen.com/subject/nryqsttx.html