概念:
倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted index)。带有倒排索引的文件我们称为倒排索引文件
倒排列表用来记录有哪些文档包含了某个单词。一般在文档集合里会有很多文档包含某个单词,每个文档会记录文档编号(DocID),单词在这个文档中出现的次数(TF)及单词在文档中哪些位置出现过等信息,这样与一个文档相关的信息被称做倒排索引项(Posting),包含这个单词的一系列倒排索引项形成了列表结构,这就是某个单词对应的倒排列表。右图是倒排列表的示意图,在文档集合中出现过的所有单词及其对应的倒排列表组成了倒排索引。
代码实现:
第一步:TextMapper,将每个文档中出现的每个单词和出现的文档名称列举出来,传给Combiner,其中k,v值为:
<word:filename,1>
package com.neusoft.text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TextMapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String[] split = value.toString().split("\\s+");
for (String s:split) {
context.write(new Text(s+":"+fileName),new Text("1"));
}
}
}
第二步:使用Combiner尽心各一次汇总
传递到Combiner的值为:<word:fileName,List(1,1,1,1)>
将Mapper统计后的值进行一次汇总,汇总后的k,v值为:
<word:fileName,4>
将相同单词相同文件的单词汇总,并且将k,v重新组合,新的k,v值为:
<word,filename:4>
package com.neusoft.text;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TextCombiner extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (Text str:values) {
sum+=Integer.parseInt(str.toString());
}
String[] keys=key.toString().split(":");
context.write(new Text(keys[0]),new Text(keys[1]+":"+sum));
}
}
第三步:排序,并且将结果写出
对从Combiner发过来的数据进行汇总,倒序排序,然后再写出
Combiner发过来的数据类型为:
<word,List[filename1:2,filename2:1,filename3:3]>
结果数据类型为:<word,filename3:3,filename1:2,filename2:1>
排序实现方法:
这里将每一个k所对应的值的防盗一个对象中,然后将这些对象放到一个集合中,然后按这个对象的某一个属性排序
代码实现:
方法一:
工具类:
package com.neusoft.text;
public class Tool implements Comparable<Tool>{
private String fileName;
private Integer count;
public Tool(){}
public Tool(String fileName,Integer count){
this.fileName=fileName;
this.count=count;
}
@Override
public String toString() {
return this.fileName+":"+count;
}
@Override
public int compareTo(Tool o) {
return (int)(o.count-this.count);
}
}
Reducer:
package com.neusoft.text;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class TextReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<Tool> list=new ArrayList<>();
for (Text s:values) {
String[] strings=s.toString().split(":");
Tool tool=new Tool(strings[0],Integer.parseInt(strings[1]));
list.add(tool);
}
Collections.sort(list);
StringBuffer str=new StringBuffer();
for (Tool t:list) {
str.append(t+",");
}
str.deleteCharAt(str.length()-1);
context.write(new Text(key),new Text(str.toString()));
}
}
方法二:
不使用工具类,直接使用map自定义排序
package com.neusoft.text;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class TextReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
TreeMap<Integer,String> map=new TreeMap<Integer, String>(
new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
// return不为0,key就不会覆盖
return o2-o1>=0?1:-1;
}
}
);
for (Text text:values) {
String[] s=text.toString().split(":");
map.put(Integer.parseInt(s[1]),s[0]);
}
StringBuffer str=new StringBuffer();
for (Map.Entry entry:map.entrySet()) {
str.append(entry.getKey()+":"+entry.getValue()+",");
}
str.deleteCharAt(str.length()-1);
context.write(key,new Text(str.toString()));
}
}
第四步:Driver程序
package com.neusoft.text;
import com.neusoft.FileUtil;
import com.neusoft.sort.FlowBean;
import com.neusoft.sort.SortFlowDriver;
import com.neusoft.sort.SortFlowMapper;
import com.neusoft.sort.SortFlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TextDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root") ;
System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
if (args == null || args.length == 0) {
return;
}
// 确认输出文件路径是否存在,如果存在就删除
FileUtil.deleteDir(args[1]);
// 该对象会默认读取环境中的 hadoop 配置。当然,也可以通过 set 重新进行配置
Configuration configuration = new Configuration();
// job 是 yarn 中任务的抽象
Job job = Job.getInstance(configuration);
// 指定本程序的jar包所在的本地路径
job.setJarByClass(TextDriver.class);
// 设置Mapper/Reducer的工作类
job.setMapperClass(TextMapper.class);
job.setReducerClass(TextReducer.class);
// 设置Mapper的输出文件类型,要与Mapper的泛型对应
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置Reducer的输出文件类型,要与Reducer的泛型对应
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 使用Combiner
job.setCombinerClass(TextCombiner.class);
// 获取输入输出文件
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
// true表示一直等待map和reduce任务执行完成
boolean bResult = job.waitForCompletion(true);
System.exit(bResult ? 0 : 1);
}
}
第五步:删除文件或者文件夹的工具类:
package com.neusoft;
import java.io.File;
/**
* Created by bee on 3/25/17.
*/
public class FileUtil {
public static boolean deleteDir(String path) {
File dir = new File(path);
if (dir.exists()) {
for (File f : dir.listFiles()) {
if (f.isDirectory()) {
deleteDir(f.getAbsolutePath());
} else {
f.delete();
}
}
dir.delete();
return true;
} else {
System.out.println("文件(夹)不存在!");
return false;
}
}
}
第六步:最终输出结果
![](https://img.haomeiwen.com/i15063652/aff1b731489e509c.png)
网友评论