美文网首页
MapReduce实验--Join

MapReduce实验--Join

作者: 快点学 | 来源:发表于2019-12-02 21:25 被阅读0次

    一、Join操作

    Join操作是RDBMS中常见的操作,将多个文件按相同键连接起来。在这里我们使用Mapreduce对两个文件执行Join操作

    二、原理

    MapReduce实现Join操作有多种方式,在这里我们使用最常见的Reduce端连接:

    • Map端将连接字段按不同文件打标签后作为key,其余部分作为value,最后进行输出。
    • Reduce端从带有标签的key中提取出连接字段,作为输出key,values对应输出values。

    三、示例

    1. 文件

    data.txt:

    201001 1003 abc
    201002 1005 def
    201003 1006 ghi
    201004 1003 jkl
    201005 1004 mno
    201006 1005 pqr

    info.txt

    1003 kaka
    1004 da
    1005 jue
    1006 zhao

    2. 代码

    package Mapreduce_Join;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class MR_Join_Comparator extends WritableComparator {
        
        public MR_Join_Comparator() {
            // TODO Auto-generated constructor stub
            super(TextPair.class, true);
        }
        
        public int compare(WritableComparable a, WritableComparable b) {
            TextPair tp1 = (TextPair)a;
            TextPair tp2 = (TextPair)b;
            return tp1.getFirst().compareTo(tp2.getFirst());
        }
    }
    
    
    package Mapreduce_Join;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class MR_Join_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context)
                throws IOException, InterruptedException {
            String path = ((FileSplit)context.getInputSplit()).getPath().toString();
            
            if (path.contains("data.txt")) {
                String values[] = value.toString().split("\t");
                
                if (values.length < 3) {
                    return;
                } else {
                    TextPair tpPair = new TextPair(new Text(values[1]), new Text("1"));
                    context.write(tpPair, new Text(values[0]+"\t"+values[2]));
                }
            }
            
            if (path.contains("info.txt")) {
                String values[] = value.toString().split("\t");
                
                if (values.length < 2) {
                    return;
                } else {
                    TextPair tpPair = new TextPair(new Text(values[0]), new Text("0"));
                    context.write(tpPair, new Text(values[1]));
                }
            }
        }
    }
    
    package Mapreduce_Join;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class MR_Join_Partitioner extends Partitioner<TextPair, Text> {
    
        @Override
        public int getPartition(TextPair key, Text value, int numPartitions) {
            // TODO Auto-generated method stub
            return Math.abs(key.getFirst().hashCode()*127)%numPartitions;
        }
    }
    
    
    package Mapreduce_Join;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MR_Join_Reduce extends Reducer<TextPair, Text, Text, Text> {
    
        @Override
        protected void reduce(TextPair key, Iterable<Text> values, Reducer<TextPair, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            Text pid = key.getFirst();
            String desc = values.iterator().next().toString();
            while (values.iterator().hasNext()) {
                context.write(pid, new Text(values.iterator().next().toString()+"\t"+desc));
            }
        }
    }
    
    
    package Mapreduce_Join;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    public class TextPair implements WritableComparable<TextPair> {
        
        private Text first;
        private Text second;
        
        public TextPair() {
            set(new Text(), new Text());
        }
        
        public TextPair(String first, String second) {
            set(new Text(first), new Text(second));
        }
        
        public TextPair(Text first, Text second) {
            set(first, second);
        }
        
        public void set(Text first, Text second) {
            this.first = first;
            this.second = second;
        }
    
        public Text getFirst() {
            return first;
        }
    
        public Text getSecond() {
            return second;
        }
    
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            first.write(out);
            second.write(out);
        }
    
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            first.readFields(in);
            second.readFields(in);
        }
    
        public int compareTo(TextPair o) {
            // TODO Auto-generated method stub
            int cmp = first.compareTo(o.first);
            if (cmp!=0) {
                return cmp;
            }
            return second.compareTo(o.second);
        }
    
    }
    
    
    package Mapreduce_Join;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class MRJoin {
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            GenericOptionsParser parser = new GenericOptionsParser(conf, args);
            
    //      String[] otherArgs = parser.getRemainingArgs();
    //      if (args.length < 3) {
    //          System.err.println("Usage: MRJoin <in_path_one> <in_path_two> <output>");
    //          System.exit(2);
    //      }
    //      
            Job job = new Job(conf, "MRJoin");
            job.setJarByClass(MRJoin.class);
            
            job.setMapperClass(MR_Join_Mapper.class);
            job.setMapOutputKeyClass(TextPair.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setPartitionerClass(MR_Join_Partitioner.class);
            job.setGroupingComparatorClass(MR_Join_Comparator.class);
            
            job.setReducerClass(MR_Join_Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            Path inpath1 = new Path("hdfs://localhost:9000/user/abc/data/data.txt");
            Path inpath2 = new Path("hdfs://localhost:9000/user/abc/data/info.txt");
            Path outpath = new Path("hdfs://localhost:9000/user/abc/data/out");
            
            FileInputFormat.addInputPath(job, inpath1);
            FileInputFormat.addInputPath(job, inpath2);
            FileOutputFormat.setOutputPath(job, outpath);
            
            System.exit(job.waitForCompletion(true)?0:1);
        }
    
    }
    
    

    Mapreduce程序执行流程:

    ① 配置
    ② 创建作业
    ③ 设置Jar包
    ④ 设置Mapper
    ⑤ 设置Partitioner和Comparator
    ⑥ 设置Reducer
    ⑦ 设置输入输出路径
    ⑧ 执行

    四、总结

    相关文章

      网友评论

          本文标题:MapReduce实验--Join

          本文链接:https://www.haomeiwen.com/subject/wnzfgctx.html