美文网首页
MapReduce单表连接

MapReduce单表连接

作者: 月巴巴 | 来源:发表于2017-08-16 09:40 被阅读0次

    例如给出表child-parent表,要求输出grandchildren-grandparent表
    输入
    Tom Lucy
    Tom Jack
    Jone Lucy
    Jone Jack
    Lucy Mary
    Lucy Ben
    Jack Alice
    Jack Jesse

    输出
    Tom Alice
    Tom Jesse
    Jone Alice
    Jone Jesse
    Tom Mary
    Tom Ben
    Jone Mary
    Jone Ben

    用输入的单表构建两个表,即child-parent表和parent-child表,将两个表自然连接,就可以得到结果。程序的关键在于在map中构建出左右两表。

    代码

    package com.hy.hadoop;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.log4j.BasicConfigurator;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.ArrayList;
    import java.util.List;
    
    public class SingleJoin {
    
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, Text> {
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                String[] lines=value.toString().split(" ");
                String parentName=lines[1];
                String childName=lines[0];
                context.write(new Text(parentName),new Text(""+1+" "+childName)); //右表
                context.write(new Text(childName),new Text(""+2+" "+parentName)); //左表
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, Text, Text, Text> {
            public void reduce(Text key, Iterable<Text> values,
                               Context context
            ) throws IOException, InterruptedException {
                List<String> grandchild=new ArrayList<String>();
                List<String> grandparent=new ArrayList<String>();
                for (Text val : values) {
                    String[] tmp=val.toString().split(" ");
                    if(tmp[0].equals("1"))
                    grandchild.add(tmp[1]);
                    else
                        grandparent.add(tmp[1]);
                }
                if(grandchild.size()!=0&&grandparent.size()!=0){
                    for (String gc:grandchild){
                        for(String gp:grandparent){
                            context.write(new Text(gc),new Text(gp));
                        }
                    }
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            BasicConfigurator.configure();
            Configuration conf = new Configuration();
            conf.set("mapreduce.cluster.local.dir", "/Users/hy/hadoop/var");
            Job job = Job.getInstance(conf, "Single join");
            job.setJarByClass(WordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            FileSystem fs = FileSystem.get(conf);
            //如果输出文件夹存在,则删除
            if (fs.exists(new Path(args[1]))) {
                fs.delete(new Path(args[1]),true);
            }
            if(!job.waitForCompletion(true))
                System.exit(1);
    
            //输出结果
            BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(new Path(args[1]+"/part-r-00000"))));
            try {
                String line;
                line=br.readLine();
                while (line != null){
                    System.out.println(line);
                    line = br.readLine();
                }
            } finally {
                br.close();
            }
        }
    }

    相关文章

      网友评论

          本文标题:MapReduce单表连接

          本文链接:https://www.haomeiwen.com/subject/ghnbrxtx.html