美文网首页
MapReduce实现‘单表关联’

MapReduce实现‘单表关联’

作者: VVictoriaLee | 来源:发表于2017-08-16 14:03 被阅读0次
    "单表关联"要求从给出的数据中寻找所关心的数据,它是对原始数据所包含信息的挖掘

    样例输入:(孩子-父母)注意,单词之间为[tab]键,不是空格键

    child  parent 
    Tom  Lucy
    Tom  Jack
    Jone  Lucy
    Jone  Jack
    Lucy  Mary
    Lucy  Ben
    Jack  Alice
    Jack      Jesse
    Terry  Alice
    Terry  Jesse
    Philip  Terry
    Philip  Alma
    Mark  Terry
    Mark  Alma
    
    
    

    要求输出:(子孙-爷奶)

    grandch grandpa
    Jone    Alice
    Jone    Jesse
    Tom        Alice
    Tom        Jesse
    Jone    Mary
    Jone    Ben
    Tom        Mary
    Tom     Ben
    Mark    Alice
    Mark    Jesse
    Philip  Alice
    Philip  Jesse
    
    

    MyGL.java类完整代码

    package mr;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
    
    public class MyGL {
        
        
        
        static class MyGLMapper  extends  Mapper<LongWritable, Text, Text, Text>{  
            
             public void map(LongWritable k1, Text v1, Context context) 
                             throws java.io.IOException, java.lang.InterruptedException
             {
                String[]  lines= v1.toString().split("\t");
                if(lines.length!=2 || lines[0].equals("child")) return;
                String word1=lines[0];
                String word2=lines[1];
                
                context.write(new Text(word1), new Text("1"+","+word1+","+word2));
                context.write(new Text(word2), new Text("2"+","+word1+","+word2));
                
            System.out.println("map......");
             }
            
        }
        
        static class  MyGLReduce extends Reducer<Text, Text, Text, Text>{
            
            protected void setup(Context context) 
                    throws java.io.IOException, java.lang.InterruptedException{
                context.write(new Text("grandch"),new Text("grandpa"));
            }
            
             public void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException, java.lang.InterruptedException
             {
                 List<String> grandch=new ArrayList();
                 List<String> grandpa=new ArrayList();
                 
                 Iterator<Text>  it=values.iterator();
                 while(it.hasNext()){
                    String lines=it.next().toString();
                    String[] words=lines.split(",");
                    if(words[0].equals("1")){
                        grandpa.add(words[2]);
                    }
                    else if(words[0].equals("2")){
                        grandch.add(words[1]);
                    }
                    else return;
                 }
                 for(String ch:grandch){
                     for(String pa:grandpa){
                         context.write(new Text(ch), new Text(pa));
                     }
                 }
                     
                 
                 System.out.println("reduce......");
             }
                
        }
    
        private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
        private static String OUTPUT_PATH="hdfs://master:9000/output/MyGLResult/";
    
        public static void main(String[] args) throws Exception {   
            
            Configuration  conf=new Configuration();
            FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
         
            if(fs.exists(new Path(OUTPUT_PATH)))
                    fs.delete(new Path(OUTPUT_PATH));
            
            Job  job=new Job(conf,"myjob");
            
            job.setJarByClass(MyGL.class);
            job.setMapperClass(MyGLMapper.class);
            job.setReducerClass(MyGLReduce.class);
             
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
             
            
            FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
            
            job.waitForCompletion(true);
    
        }
    
    }
    

    -〉Map完成了将输入文件分成两个表的过程:(仅截取部分数据解释,结合代码理解)

    表一:1表示key1值是value1[2]的孩子

    image.png

    表二:2表示key2值是value1[1]的父亲/母亲


    -〉洗牌,分区(相同key值一个区)

    image.png

    -〉reduce使用两个数组将祖辈和孙辈提取出来

    if(words[0].equals("1")){
                        grandpa.add(words[2]);
                    }
                    else if(words[0].equals("2")){
                        grandch.add(words[1]);
                    }
    

    例如区2:
    words[0]=1,提取出grandpa(Mary,Ben)
    words[0]=2,提取出grandch(Tom)

    for(String ch:grandch){
                     for(String pa:grandpa){
                         context.write(new Text(ch), new Text(pa));
                     }
                 }
    

    两层循环,每一个孙辈和祖辈搭配一次
    (Tom,Mary)
    (Tom,Ben)

    因为对hadoop源码还不是很熟悉,所以不能很好地解释代码,欢迎大家建议和指导。

    相关文章

      网友评论

          本文标题:MapReduce实现‘单表关联’

          本文链接:https://www.haomeiwen.com/subject/blzbrxtx.html