头文件:
//package com.company;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.*;
import java.util.Random;
import java.util.StringTokenizer;
剩余代码部分
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class TokenizerMapper2
extends Mapper<Object, Text,IntWritable,Text>{
int c=0;
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
IntWritable a=new IntWritable(Integer.parseInt(itr.nextToken()));
Text b=new Text(itr.nextToken());
if((c<100)&b.getLength()>5){
context.write(a, b);
c++;
}
}
}
private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
//System.out.println("ss");
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
//System.out.println("ss1");
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
int number=0;
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void Write(String name){
try
{
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
conf.set("fs.hdfs.omp", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs=FileSystem.get(conf);
FSDataOutputStream os=fs.create(new Path(name));
BufferedReader br=new BufferedReader(new FileReader("output//part-r-00000"));
String str=null;
int n=1;
while((str=br.readLine())!=null){
str=str+"\n";
byte[] buff=str.getBytes();
os.write(buff,0,buff.length);
if(n<=10){
System.out.println(str);
}n++;
}
br.close();
os.close();
fs.close();
}
catch(Exception e)
{
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Path tempDir = new Path("wordcount-temp1-" + Integer.toString(
new Random().nextInt(Integer.MAX_VALUE))); //定义一个临时目录
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
try{
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input/book.txt"));
FileOutputFormat.setOutputPath(job, tempDir);//先将词频统计任务的输出结果写到临时目
//录中, 下一个排序任务以临时目录为输入目录。
job.setOutputFormatClass(SequenceFileOutputFormat.class);
if(job.waitForCompletion(true))
{
Job sortJob = new Job(conf, "sort");
sortJob.setJarByClass(WordCount.class);
FileInputFormat.addInputPath(sortJob, tempDir);
sortJob.setInputFormatClass(SequenceFileInputFormat.class);
/*InverseMapper由hadoop库提供,作用是实现map()之后的数据对的key和value交换*/
sortJob.setMapperClass(InverseMapper.class);
/*将 Reducer 的个数限定为1, 最终输出的结果文件就是一个。*/
sortJob.setNumReduceTasks(1);
FileOutputFormat.setOutputPath(sortJob,new Path("hdfs://localhost:9000/output"));
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(Text.class);//升序排序ok
sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);
if(sortJob.waitForCompletion(true)){
Write("result");
}else{
System.out.println("1-- not");
System.exit(1);
}
System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
}
}finally{
FileSystem.get(conf).deleteOnExit(tempDir);
}
}
}
网友评论