美文网首页
(一)单表关联--mapreduce关联性操作

(一)单表关联--mapreduce关联性操作

作者: elrah | 来源:发表于2017-08-16 13:42 被阅读0次
    package mr;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
    
    public class MyGL {
        private static class MyGLMapper  extends  Mapper<LongWritable, Text, Text, Text>{  
            /*输入类型是LongWritable,Text(上下文);
                                   输出类型是Text,Text(也就是Reduce的输入类型)*/
             public void map(LongWritable k1, Text v1, Context context)   
                                 throws java.io.IOException, java.lang.InterruptedException
                                     //map()函数是固定模式的,三个参数
             {
                 // 1 2
                String[]  lines= v1.toString().split("\t");  
                                                  // \t 在同一个缓冲区内横向跳8个空格(Tab键);split()方法用于把一个字符串分割
                                                  //成字符串数组;v1指的是一行,把第一行的两个单词存进lines
                if(lines.length!=2 || lines[0].trim().equals("child"))      
                    return;    //child parent
            
                String word1=lines[0].trim();   //tom       ->去掉 lines[0]里面的空格符
                String word2=lines[1].trim();   //lucy  
                
                context.write(new Text(word1), new Text("1"+","+word1+","+word2));
                                              //第一个Text对应Mapper的第三个Text;第二个Text对应Mapper的第四个Text
                context.write(new Text(word2), new Text("2"+","+word1+","+word2));
                
                //tom,1+tom+lucy
                 System.out.println("map......"+word1+"-"+word2);
             }
            
        }
        
        private static class  MyGLReduce extends Reducer<Text, Text, Text, Text>{
             public void reduce(Text key, Iterable<Text> values, Context context) 
                                           throws java.io.IOException, java.lang.InterruptedException
                                      //context:上下文对象,在整个wordcount运算生命周期内存活
             {
                  List<String> grandch=new ArrayList();   //泛型
                  List<String> grandpa=new ArrayList();
                 
                                            /*  lucy 2+tom+lucy
                 lucy 1+lucy+mary
                 
                 2->split[1]    tom     2的话取1
                 1->split[2]    mary    1的话取2
                 
                 k3=tom  v3=mary   把这两个放在上下文
                 */
    
                 Iterator<Text>  it=values.iterator();  
                                                    // Iterator<Text>--输进来的第二个值
                 while(it.hasNext()){ 
                    String lines= it.next().toString();      
                                                                             //2,tom,lucy(对应MyGLMapper的context.write())
                    String [] words=lines.split(",");    
                                                                              //劈开 string 数组 ["2","tom","lucy"]
                    if(words[0].equals("1")){
                        grandpa.add(words[2]);
                    }else if(words[0].equals("2")){
                        grandch.add(words[1]);
                    }
                    else
                        return;
                    
                 }
                  for(String ch:grandch)
                 for(String pa:grandpa)  
                 context.write(new Text(ch), new Text(pa));    
                 
                 System.out.println("reduce......");
             }
             protected void cleanup(Context context) 
                                          throws java.io.IOException, java.lang.InterruptedException{
                 
             }
                
        }
    
        private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
        private static String OUTPUT_PATH="hdfs://master:9000/output/c/";
    
        public static void main(String[] args) throws Exception {   
            
            Configuration  conf=new Configuration();
            FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
         
            if(fs.exists(new Path(OUTPUT_PATH)))
                    fs.delete(new Path(OUTPUT_PATH));
            
            Job  job=new Job(conf,"myjob");
            
            job.setJarByClass(MyGL.class);
            job.setMapperClass(MyGLMapper.class);
            job.setReducerClass(MyGLReduce.class);
            
             
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //job.setCombinerClass(MyReduce.class);
             
            
            FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
            
            job.waitForCompletion(true);
    
        }
    
    }
    
    image.png

    当line[0]=1,line[1]=child;
    当line[0]=2,line[2]=grandpa;
    测试数据:

    child   parent 
    Tom Lucy
    Tom Jack
    Jone    Lucy
    Jone    Jack
    Lucy    Mary
    Lucy    Ben
    Jack    Alice
    Jack    Jesse
    Terry   Alice
    Terry   Jesse
    Philip  Terry
    Philip  Alma
    Mark    Terry
    Mark    Alma
    

    相关文章

      网友评论

          本文标题:(一)单表关联--mapreduce关联性操作

          本文链接:https://www.haomeiwen.com/subject/ifcirxtx.html