MapReduce实现join

作者: 心_的方向 | 来源:发表于2016-10-26 22:51 被阅读488次

    需求分析

    一个电商网站后台数据存在两个表(可以看为两个文件):
    用户表信息:用户ID、用户名、电话
    订单表信息:用户ID、订单ID、商品价格、订单日期
    如果想把两张表关联成:用户ID、用户名、电话、订单ID,价格,日期,并且按照需求对其输出。

    如何用mapreduce实现?

    实现逻辑分析

    1. 在map端读取两个表,设置输出格式为 <用户ID,(用户名,电话)> 或者为 <用户ID,(订单ID,商品价格,订单日期)>。所以map输出的key值为用户ID,value值为用户信息或者订单表信息构成的字符串。


      join1.png
    2. shuffle分组会合并相同key值的value。


      join2.png
    3. reduce端输出,为了输出的方便,需要知道values序列里面哪一个来自于用户表,哪一个来自于订单表。这里可以自定义数据,在map输出时给每一个信息定一个标签,reduce端根据这个标签识别(这里需要自定义数据类型)。


      join3.png

    代码实现

    • 测试数据
    1. customers.csv文件

    1,Stephanie Leung,555-555-5555
    2,Edward Kim,123-456-7890
    3,Jose Madriz,281-330-8004
    4,David Stork,408-555-0000

    1. orders.csv文件

    3,A,12.95,02-Jun-2008
    1,B,88.25,20-May-2008
    2,C,32.00,30-Nov-2007
    3,D,25.02,22-Jan-2009

    • 自定义数据类型关键代码
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class JoinWritable implements Writable {
        
        // 标签,用于区分data来自于用户表还是订单表
        private String tag;
        // 数据
        private String data;
    
        public JoinWritable() {
        }
    
        public JoinWritable(String tag, String data) {
            this.set(tag, data);
        }
        
        public void set(String tag, String data) {
            this.setTag(tag);
            this.setData(data);
        }
    
        public void write(DataOutput arg0) throws IOException {
            arg0.writeUTF(this.getTag());
            arg0.writeUTF(this.getData());
        }
    
        public void readFields(DataInput arg0) throws IOException {
            this.setTag(arg0.readUTF());
            this.setData(arg0.readUTF());
        }
    
        public String getTag() {
            return tag;
        }
    
        public void setTag(String tag) {
            this.tag = tag;
        }
    
        public String getData() {
            return data;
        }
    
        public void setData(String data) {
            this.data = data;
        }
    
    }
    
    • map端关键代码
        private static class ModuelMapper extends Mapper<LongWritable, Text, LongWritable, JoinWritable> {
            
            private LongWritable mapOutPutKey = new LongWritable();
            private JoinWritable mapOutPutValue = new JoinWritable();
    
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String lineValue = value.toString();
                //分割CSV文件
                String[] values = lineValue.split(",");
                
                //过滤脏数据
                int length = values.length;
                if(length != 3 && length != 4) {
                    return;
                }
                
                // 两个文件第一列都为ID,第二列为name
                Long cid = Long.valueOf(values[0]);
                String name = values[1];
                
                // 读取用户表设置key和value
                if(length == 3) {
                    String phoneNum = values[2];
                    mapOutPutKey.set(cid);
                    // "customer"用于标记用户表
                    mapOutPutValue.set("customer", name + "," + phoneNum);
                }
                // 读取订单表设置key和value
                if(length == 4) {
                    String price = values[2];
                    String date = values[3];
                    mapOutPutKey.set(cid);
                    // "order"用于标记订单表
                    mapOutPutValue.set("order", name + "," + price + "," + date);
                }
                
                context.write(mapOutPutKey, mapOutPutValue);
            }
        
        }
    
    • reduce端代码
        private static class ModuelReducer extends Reducer<LongWritable, JoinWritable, Text, NullWritable> {
            
            private Text reduceOutPutKey = new Text();
    
            public void reduce(LongWritable key, Iterable<JoinWritable> values, Context context)
                    throws IOException, InterruptedException {
                // 用户信息
                String customerInfo = null;
                // 订单信息,一个用户可能对应多个订单信息,所以需要用容器
                List<String> list = new ArrayList<String>();
                
                // 判断values来自于用户信息表还是订单表,然后取出值。
                for(JoinWritable value : values) {
                    if("customer".equals(value.getTag())) {
                        customerInfo = value.getData();
                    } else if("order".equals(value.getTag())) {
                        list.add(value.getData());
                    }
                }
                
                // 以    "cid,用户信息,订单信息"  的形式 设置key
                for(String value : list) {
                    reduceOutPutKey.set(key.get() + "," + customerInfo + "," + value);
                    context.write(reduceOutPutKey, NullWritable.get());
                }
            }
        }
    
    • 测试结果

    1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
    2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
    3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009
    3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008

    相关文章

      网友评论

        本文标题:MapReduce实现join

        本文链接:https://www.haomeiwen.com/subject/qwaduttx.html