美文网首页
MapReduce 之倒排索引

MapReduce 之倒排索引

作者: 博弈史密斯 | 来源:发表于2018-06-20 21:53 被阅读0次

    倒排索引 介绍:

    即是 统计每篇文章 每个单词出现的次数,以此达到在搜索引擎中 搜索关键字,检索出出现关键字 最多的文章

    需求:

    统计每个单词 在 a.txt、b.txt 出现的次数

    a.txt:

    hello tom
    hello jerry
    hello kitty
    jerry world
    

    b.txt:

    hello jerry
    hello tom
    jerry world
    

    分析:

    我们想达到这样的效果:

    hello   "a.txt->3  b.txt->2"
    jerry   "a.txt->2  b.txt->2"
    ...
    

    首先需要知道 文件名,我们通过下面方法实现:

    FileSplit inputSplit = (FileSplit) context.getInputSplit();
    Path path = inputSplit.getPath();
    String name = path.getName();
    
    map 阶段:

    读取每行内容,把 每个单词 + "->" + fileName 作为输入 K
    把 1 作为 输出 value,形成如下的格式:

    ("hello->a.txt", 1)
    ("hello->a.txt", 1)
    ("hello->a.txt", 1)
    
    ("hello->b.txt", 1)
    ("hello->b.txt", 1)
    
    combiner阶段:

    到达 combiner 的数据,相同key 的 value 会聚合到一起,如下格式:

    ("hello->a.txt", {1,1,1})
    ("hello->b.txt", {1,1})
    

    先遍历 value,计算 sum

    ("hello->a.txt", 3)
    ("hello->b.txt", 2)
    

    然后用 split("->") 进行分割,取出每个单词,作为输出K
    并把 fileName + "->" + sum 作为输出 value

    ("hello", "a.txt->3")
    ("hello", "b.txt->2")
    
    reduce 阶段:

    到 reduce 的数据,相同key的 value 聚合到一起:
    ("hello", {"a.txt->5", "b.txt->3"})

    我们要遍历 values,组成成一个 String,以空格分开:
    ("hello","a.txt->5 b.txt->3")

    实际代码

    public class InverseIndex {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            
            Job job = Job.getInstance(conf);
            //设置jar
            job.setJarByClass(InverseIndex.class);
            
            //设置Mapper相关的属性
            job.setMapperClass(IndexMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt
            
            //设置Reducer相关属性
            job.setReducerClass(IndexReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.setCombinerClass(IndexCombiner.class);
                    
            //提交任务
            job.waitForCompletion(true);
        }
        
        public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
    
            private Text k = new Text();
            private Text v = new Text();
            
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                    
                String line = value.toString();
                String[] fields = line.split(" ");
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                Path path = inputSplit.getPath();
                String name = path.getName();
    
                for (String f : fields) {
                    k.set(f + "->" + name);
                    v.set("1");
                    context.write(k, v);
                }
            }
        }
        
        public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
    
            private Text k = new Text();
            private Text v = new Text();
            
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                    
                String[] fields = key.toString().split("->");
                long sum = 0;
                for (Text t : values) {
                    sum += Long.parseLong(t.toString());
                }
                k.set(fields[0]);
                v.set(fields[1] + "->" + sum);
                context.write(k, v);
            }
        }
        
        public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
    
            private Text v = new Text();
            
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                    
                String value = "";
                for (Text t : values) {
                    value += t.toString() + " ";
                }
                v.set(value);
                context.write(key, v);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:MapReduce 之倒排索引

          本文链接:https://www.haomeiwen.com/subject/cbhfyftx.html