美文网首页
eclipse用于hadoop开发(二、wordCount)

eclipse用于hadoop开发(二、wordCount)

作者: 小T呀 | 来源:发表于2020-04-05 16:03 被阅读0次

    一、程序
    1、WCMapper.java

    package cn.itcast.hadoop.mr.wordcount;
    import java.io.IOException;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    /**
     * 默认情况框架传递给我们的mapper的输入数据:key——要处理行的起始偏移量;value——行文本
     * mapper的输出数据:key——文本;value—文本统计序列
     */
    public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            // 具体业务逻辑即写在此处,而且业务要处理的数据已经被框架传递进来,在方法参数key-value中
            //super.map(key, value, context);
            String line = value.toString();
            String[] words = StringUtils.split(line, " ");
            for(String word : words) {
                context.write(new Text(word), new LongWritable(1));
            }
        }
    }
    

    2、 WCReducer.java

    package cn.itcast.hadoop.mr.wordcount;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        //框架在mapper处理完成后,将所有的kv对缓存起来,进行分组,然后传递进一个组<key,values{}>,调用一次reduce方法
        //<hello,{1,1,1,1...}>
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            //super.reduce(arg0, arg1, arg2);
            long count = 0;
            for(LongWritable value : values) {
                count += value.get();
            }
            context.write(new Text(key), new LongWritable(count));
        }
    }
    

    3、WCRunner.java

    package cn.itcast.hadoop.mr.wordcount;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /*
     * 用于描述一个特定的作业
     * 该作业使用哪一个类作为逻辑处理中的map、reduce
     * 要处理的数据源于哪个路径,结果置于哪个路径
     */
    public class WCRunner {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //原来的new方法已经弃用,使用静态方法
            //Job myjob = new Job();
            Configuration conf = new Configuration();
            Job myjob = Job.getInstance(conf);
            
            //设置本job所用的类在哪个jar包(运行所需)
            myjob.setJarByClass(WCRunner.class);
            
            //本job使用的map、reduce的类
            myjob.setMapperClass(WCMapper.class);
            myjob.setReducerClass(WCReducer.class);
            
            //Map的k-v输出
            myjob.setMapOutputKeyClass(Text.class);  
            myjob.setMapOutputValueClass(LongWritable.class);
            //Reduce的k-v输出(其实此方法可以对Map、reduce同时起作用,此处展示的是若map-reduce的数据类型有变动的写法)
            myjob.setOutputKeyClass(Text.class);  
            myjob.setOutputValueClass(LongWritable.class);
            
            //原始数据的位置
            //FileInputFormat.setInputPaths(myjob, new Path("/myJavaAPItest/WordCount/SrcData"));//服务器
            FileInputFormat.setInputPaths(myjob, new Path("C:\\Users\\13533\\Documents\\Java\\hadoop-local-test\\myJavaAPItest\\WordCount\\SrcData"));//本地模式
            //输出数据的位置
            //FileOutputFormat.setOutputPath(myjob, new Path("/myJavaAPItest/WordCount/Output"));//服务器
            FileOutputFormat.setOutputPath(myjob, new Path("C:\\Users\\13533\\Documents\\Java\\hadoop-local-test\\myJavaAPItest\\WordCount\\Output"));//本地模式
            
            //将job提交集群运行(运行进度提示true)
            myjob.waitForCompletion(true);
        }
    }
    

    二、运行
    1、服务器(linux)
    wordcount所在工程/所在包进行导包为myWordCount.jar
    服务器上运行:hadoop jar myWordCount.jar cn.itcast.hadoop.mr.wordcount.WCRunner
    2、本地(win)
    直接运行WCRunner.java的main函数

    相关文章

      网友评论

          本文标题:eclipse用于hadoop开发(二、wordCount)

          本文链接:https://www.haomeiwen.com/subject/jebuphtx.html