美文网首页
eclipse用于hadoop开发(二、wordCount)

eclipse用于hadoop开发(二、wordCount)

作者: 小T呀 | 来源:发表于2020-04-05 16:03 被阅读0次

一、程序
1、WCMapper.java

package cn.itcast.hadoop.mr.wordcount;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 默认情况框架传递给我们的mapper的输入数据:key——要处理行的起始偏移量;value——行文本
 * mapper的输出数据:key——文本;value—文本统计序列
 */
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        // 具体业务逻辑即写在此处,而且业务要处理的数据已经被框架传递进来,在方法参数key-value中
        //super.map(key, value, context);
        String line = value.toString();
        String[] words = StringUtils.split(line, " ");
        for(String word : words) {
            context.write(new Text(word), new LongWritable(1));
        }
    }
}

2、 WCReducer.java

package cn.itcast.hadoop.mr.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    //框架在mapper处理完成后,将所有的kv对缓存起来,进行分组,然后传递进一个组<key,values{}>,调用一次reduce方法
    //<hello,{1,1,1,1...}>
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        //super.reduce(arg0, arg1, arg2);
        long count = 0;
        for(LongWritable value : values) {
            count += value.get();
        }
        context.write(new Text(key), new LongWritable(count));
    }
}

3、WCRunner.java

package cn.itcast.hadoop.mr.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * 用于描述一个特定的作业
 * 该作业使用哪一个类作为逻辑处理中的map、reduce
 * 要处理的数据源于哪个路径,结果置于哪个路径
 */
public class WCRunner {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //原来的new方法已经弃用,使用静态方法
        //Job myjob = new Job();
        Configuration conf = new Configuration();
        Job myjob = Job.getInstance(conf);
        
        //设置本job所用的类在哪个jar包(运行所需)
        myjob.setJarByClass(WCRunner.class);
        
        //本job使用的map、reduce的类
        myjob.setMapperClass(WCMapper.class);
        myjob.setReducerClass(WCReducer.class);
        
        //Map的k-v输出
        myjob.setMapOutputKeyClass(Text.class);  
        myjob.setMapOutputValueClass(LongWritable.class);
        //Reduce的k-v输出(其实此方法可以对Map、reduce同时起作用,此处展示的是若map-reduce的数据类型有变动的写法)
        myjob.setOutputKeyClass(Text.class);  
        myjob.setOutputValueClass(LongWritable.class);
        
        //原始数据的位置
        //FileInputFormat.setInputPaths(myjob, new Path("/myJavaAPItest/WordCount/SrcData"));//服务器
        FileInputFormat.setInputPaths(myjob, new Path("C:\\Users\\13533\\Documents\\Java\\hadoop-local-test\\myJavaAPItest\\WordCount\\SrcData"));//本地模式
        //输出数据的位置
        //FileOutputFormat.setOutputPath(myjob, new Path("/myJavaAPItest/WordCount/Output"));//服务器
        FileOutputFormat.setOutputPath(myjob, new Path("C:\\Users\\13533\\Documents\\Java\\hadoop-local-test\\myJavaAPItest\\WordCount\\Output"));//本地模式
        
        //将job提交集群运行(运行进度提示true)
        myjob.waitForCompletion(true);
    }
}

二、运行
1、服务器(linux)
wordcount所在工程/所在包进行导包为myWordCount.jar
服务器上运行:hadoop jar myWordCount.jar cn.itcast.hadoop.mr.wordcount.WCRunner
2、本地(win)
直接运行WCRunner.java的main函数

相关文章

网友评论

      本文标题:eclipse用于hadoop开发(二、wordCount)

      本文链接:https://www.haomeiwen.com/subject/jebuphtx.html