美文网首页大数据Hadoop玩转大数据
一个利用mapreduce思想单词计数的实例

一个利用mapreduce思想单词计数的实例

作者: DayDayUpppppp | 来源:发表于2017-03-16 21:49 被阅读0次

    这里写得是,如果利用mapreduce分布式的计算框架来写一个单词计数的demo。比如说,给出一个文件,然后,输出是统计文件里面所有的单词出现的次数。

    map 函数
    package mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    /*
     * KEYIN 读到的一行数据的偏移量 lONG
     * KEYOUT 读到的一行数据的内容 STRING
     * keyout 输出一个单词,是一个string的类型
     * valueout int值
     * hadoop 有自己的一套序列化的机制,他的序列化比jdk的序列化更加精简,
     * 可以提高网络的传输效率
     * long --》 longwritable
     * string --》 text
     * integer --》 intwritable
     * null  --> nullwritable
     */
    public class worldcount extends Mapper<LongWritable,Text,Text,IntWritable>{
        //重载Mapper类的map方法
        //key 其实对应   keyin
        // value 就是  keyout
        protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
            // 将拿到的这行数据按照空格切分
            String line=value.toString();
            String [] linewords = line.split(" ");
            for(String word:linewords){
                // 所以在context里面写的内容就是 key:word,value 是1
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }
    
    
    reduce 函数
    package mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class worldcountreduce extends  Reducer <Text,IntWritable,Text,IntWritable> {
        // 一组相同的key,调用一次reduce
        //相当于调用一次 ,计算一个key对应的个数
        protected void reduce (Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            
            //统计单词数
            int count=0;
            for(IntWritable value :values){
                count=count+value.get();
            }
            
            //将输出的结果放到context 里面
            context.write(key,new IntWritable(count));
        }
        
    }
    
    
    jobclient 函数
    package mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    /*
     *job 提交器 是yarn集群的一个客户端,他负责将mr程序需要的信息全部封装成一个配置文件里面
     *然后连同我们mr程序所在的一个jar包,一起提交给yarn,有yarn去启动mr程序中的mrappmaster 
     */
    public class jobclient {
        public static void main(String []args) throws IOException, ReflectiveOperationException, InterruptedException{
            Configuration conf=new Configuration();
            //conf.set("yarn.resoucemanager.hostname", value);  
            Job job=Job.getInstance(conf);
            //job.setJar("~/code/WordCount.jar");
            //告知客户端的提交器 mr程序所在的jar包
            //这样就不必使用setjar 这样的方法了
            job.setJarByClass(jobclient.class);
            // 告知mrapp master ,map 和reduce 对应的实现类
            job.setMapperClass(worldcount.class);
            job.setReducerClass(worldcountreduce.class);
            //告知输入,和输出的数据结构的类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //告知mrappmaster 我们启动的reduce tash的数量
            //启动maptask 的数量 是yarn 会自动的计算
            job.setNumReduceTasks(3);
            
            //指定一个目录而不是文件
            FileInputFormat.setInputPaths(job, new Path("hdfs://localhost/wordcount/"));
            FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost/wordcount/output/"));
            // job.submit()
            //这个要比job.submit 要好,因为这个client并不会在提交任务之后,就退出,而是创建一个线程去监控 map和reduce的运行
            boolean res=job.waitForCompletion(true);
            // 执行成功 状态吗 是0,执行失败 状态码是100
            // 通过echo $? 显示状态码
            System.out.println("wakakka ");
            System.exit(res?0:100);
        }
    }
    
    
    小结:

    其实代码的业务逻辑很简单,并不是很复杂。但是刚刚开始学的时候,不会熟悉,如何在hadoop这样的环境下去运行代码,走了很多弯路。所以,我决定,写成一个每一步的截图的教程。

    一个简明的Mapreduce 原理分析:
    http://www.jianshu.com/p/6b6a42a0740c

    Hadoop入门—基本原理简介:
    http://www.jianshu.com/p/a8b08350960f


    1.创建一个java项目

    然后导入需要的jar包(都是关于hadoop的包),在dfs里面是使用了maven,可以很方便的就需要的包导入进来,但是也可以收到的去导入一下。

    hadoop所有的jar包都在hadoop/share/目录下面
    hdfs是关于文件系统的包,mapreduce是关于mpreduce计算框架的包


    image.png

    进入common目录下面,还有之目录lib下面的包


    image.png

    子目录lib下面的包


    image.png

    反正,如果为了不出上面错误,可以都导入进来。


    image.png
    1. step2 然后就可以完成写代码的任务了


      image.png
      image.png
      image.png
    image.png

    step3 :然后将代码打包成一个jar 包


    image.png image.png

    执行jar包:(input文件已经上传到 dfs上面了)


    image.png
    image.png
    image.png
    image.png
    image.png
    查看输出的文件夹,查看输出结果,因为指定了reduce task ,所以可以看到结果是这样的,有三个输出文件:

    mapreduce 的运行流程:
    a. runjar 启动客户端
    b. 启动mrappmaster (mr的 主控节点)
    c. yarnchild 启动,maptask 节点
    d. yarnchild 启动,reduce节点
    e. 关闭yarnchild
    f. 关闭runjar

    执行程序的时候:
    maptask的节点是程序自己顶的,无法设置。maptask的个数,取决于文件的个数和文件的大小。reducetask的节点是可以自己设定的。

    相关文章

      网友评论

        本文标题:一个利用mapreduce思想单词计数的实例

        本文链接:https://www.haomeiwen.com/subject/duhsnttx.html