官方文档中有对应的例子
1、在本地程序目录建立文件test单词和路径如下:
Paste_Image.png2、在hdfs中建立input/wc文件夹然后上传文件到test到改目录下
Paste_Image.png3、maper类的编写
package com.topwqp.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* LongWritable 和 Long对应
* Text和String对应
* 四个参数:前两个是输入类型(对于单词统计,输入就是 单词编号(LongWritable) ,单词(Text))
* 后两个是输出类型(对于单词统计,输出就是单词(Text)和个数(LongWritable))
* @author wqp
*
*/
public class WcMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
//线程启动真正调用的方法
/**这是整个mapper的过程,单词拆分
* 前两个参数是输入数据
* 第二个参数就是context
* 每次调用map方法会传入split中一行数据key:该行数据所在文件中的位置下标,value为这行数据
*/
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line = value.toString(); //获取一行数据
StringTokenizer st = new StringTokenizer(line); //按照空格进行拆分后的单词
while(st.hasMoreTokens()){
String word = st.nextToken();//每一个单词
//进行输出
context.write(new Text(word), new IntWritable(1));//map输出
}
}
}
4、reducer类的编写
package com.topwqp.mr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 输入的数据就是mapper的输出
* @author wqp
*
*/
public class WcReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> iterable,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum = 0;
//每一个相当于这种方式: example WORD [1,1,1,1,3,2] 所以计算如下
for(IntWritable i:iterable){
sum = sum + i.get();
}
context.write(key, new IntWritable(sum));
}
}
5、Job类编写
package com.topwqp.mr;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 对应的jobtracker
* @author wqp
*
*/
public class JobRun {
public static void main(String[] args) {
//configuration中配置的key value和 配置文件下的conf/mapred-site.xml保持一致
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9001");
try{
Job job = new Job(conf);
//当前类是运行入口
job.setJarByClass(JobRun.class);
//mapper类
job.setMapperClass(WcMapper.class);
//reducer类
job.setReducerClass(WcReducer.class);
//最终统计结果输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);//设置reduce任务的个数,默认是一个
//mapreduce 输入数据所在的目录或者文件
FileInputFormat.addInputPath(job, new Path("/Users/wangqiupeng/Documents/xplan/bigdata/data/hadoop-1.2.1/input/wc/"));
//mapreduce执行之后的输出数据的目录 这个输出路径的部分目录可以没有,如果没有会自动创建
FileOutputFormat.setOutputPath(job, new Path("/Users/wangqiupeng/Documents/xplan/bigdata/data/hadoop-1.2.1/output/wc/"));
//等待job完成退出
System.exit(job.waitForCompletion(true) ? 0 :1);
}catch(Exception e){
e.printStackTrace();
}
}
}
打包运行:
打包应用程序上传到执行服务器,由于我本机是伪分布式,所以就在我本机上执行,注意如果是分布式,打包后的jar包应该放在mapred-site.xml中的mapred.job.tracker中配置的参数的机器上
打包以后再hadoop/bin目录下执行:ss:bin wqp$ ./hadoop jar /Users/wangqiupeng/Downloads/wc.jar com.topwqp.mr.JobRun
因为输入为:
Paste_Image.png所以
执行成功后统计单词个数输出:
以上成功统计出单词个数
导出jar包:
dabao.png执行完成后的输出信息:
ss:bin wqp$ ./hadoop jar /Users/wangqiupeng/Downloads/wc.jar com.topwqp.mr.JobRun
16/08/20 00:37:01 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
16/08/20 00:37:01 INFO input.FileInputFormat: Total input paths to process : 1
16/08/20 00:37:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/08/20 00:37:01 WARN snappy.LoadSnappy: Snappy native library not loaded
16/08/20 00:37:01 INFO mapred.JobClient: Running job: job_201608200016_0006
16/08/20 00:37:02 INFO mapred.JobClient: map 0% reduce 0%
16/08/20 00:37:06 INFO mapred.JobClient: map 100% reduce 0%
16/08/20 00:37:13 INFO mapred.JobClient: map 100% reduce 33%
16/08/20 00:37:14 INFO mapred.JobClient: map 100% reduce 100%
16/08/20 00:37:14 INFO mapred.JobClient: Job complete: job_201608200016_0006
16/08/20 00:37:14 INFO mapred.JobClient: Counters: 26
16/08/20 00:37:14 INFO mapred.JobClient: Job Counters
16/08/20 00:37:14 INFO mapred.JobClient: Launched reduce tasks=1
16/08/20 00:37:14 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=3148
16/08/20 00:37:14 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
16/08/20 00:37:14 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
16/08/20 00:37:14 INFO mapred.JobClient: Launched map tasks=1
16/08/20 00:37:14 INFO mapred.JobClient: Data-local map tasks=1
16/08/20 00:37:14 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=8142
16/08/20 00:37:14 INFO mapred.JobClient: File Output Format Counters
16/08/20 00:37:14 INFO mapred.JobClient: Bytes Written=49
16/08/20 00:37:14 INFO mapred.JobClient: FileSystemCounters
16/08/20 00:37:14 INFO mapred.JobClient: FILE_BYTES_READ=128
16/08/20 00:37:14 INFO mapred.JobClient: HDFS_BYTES_READ=222
16/08/20 00:37:14 INFO mapred.JobClient: FILE_BYTES_WRITTEN=132952
16/08/20 00:37:14 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=49
16/08/20 00:37:14 INFO mapred.JobClient: File Input Format Counters
16/08/20 00:37:14 INFO mapred.JobClient: Bytes Read=62
16/08/20 00:37:14 INFO mapred.JobClient: Map-Reduce Framework
16/08/20 00:37:14 INFO mapred.JobClient: Reduce input groups=6
16/08/20 00:37:14 INFO mapred.JobClient: Map output materialized bytes=128
16/08/20 00:37:14 INFO mapred.JobClient: Combine output records=0
16/08/20 00:37:14 INFO mapred.JobClient: Map input records=3
16/08/20 00:37:14 INFO mapred.JobClient: Reduce shuffle bytes=128
16/08/20 00:37:14 INFO mapred.JobClient: Reduce output records=6
16/08/20 00:37:14 INFO mapred.JobClient: Spilled Records=20
16/08/20 00:37:14 INFO mapred.JobClient: Map output bytes=102
16/08/20 00:37:14 INFO mapred.JobClient: Total committed heap usage (bytes)=308281344
16/08/20 00:37:14 INFO mapred.JobClient: Combine input records=0
16/08/20 00:37:14 INFO mapred.JobClient: Map output records=10
16/08/20 00:37:14 INFO mapred.JobClient: SPLIT_RAW_BYTES=160
16/08/20 00:37:14 INFO mapred.JobClient: Reduce input records=10
网友评论