美文网首页
MapReduce Map端 join 的一个例子

MapReduce Map端 join 的一个例子

作者: 博弈史密斯 | 来源:发表于2018-06-23 22:00 被阅读0次

    什么是 Join

    Join,翻译过来是 加入、连接、结合的意思。
    而在数据处理中,join 是对表的操作。表是数据存储的一种形式,就像 excel 中的表一样。
    我们为了想得到想要的结果,需要分析多张表,而 把两张 或更多的表 进行结合,这样的操作 就叫 Join。

    那 在 MapReduce 中的 Join 就是指上面的操作,只不过可能不是处理的表,而是文件,或者是从表存储的介质 比如 MySql、Hbase 中读取的数据。

    举个 MapReduce 中 使用 Join 的例子:

    比如我们有两个文件,分别存储 订单信息:products.txt,和 商品信息:orders.txt ,详细数据如下:

    products.txt:

    //商品ID,商品名称,商品类型(数字表示,我们假设有一个数字和具体类型的映射)
    p0001,xiaomi,001
    p0002,chuizi,001
    

    orders.txt:

    //订单号,时间,商品id,购买数量 
    1001,20170710,p0001,1 
    1002,20170710,p0001,3 
    1003,20170710,p0001,3 
    1004,20170710,p0002,1
    

    我们想象有多个商品,并有海量的订单信息,并且存储在多个 HDFS 块中。

    如果我们想统计 每个商品的 购买数量,即这样的形式:

    xiaomi,7
    chuizi,1
    

    该怎么处理?

    我们分析上面我们想要的结果,商品名称和销量,这两个属性分别存放到不同的文件中,那我们就要考虑 在一个地方(mapper)读取这两个文件的数据,并把数据在一个地方(reducer)进行结合。这就是 MapReduce 中的 Join 了。

    我们用代码实现上面的过程 (只写出最主要的代码):
    Mapper:

    Text outKey = new Text();
    Text outValue = new Text();
    StringBuilder sb = new StringBuilder();
    
    protected void map {
        String[] split = value.toString().split(",");
        
        //两个文件 在一个 mapper 中处理
        if(name.equals("products.txt")) {
        
            //取商品ID 作为 输出key 和 商品名称 作为 输出value,即 第0、1 的数据
            outKey.set(split[0]);
            outValue.set("product#" + split[1]);
            context.write(outKey, outValue);
            
        } else {
            //取商品ID 作为 输出key 和 购买数量 作为 输出value,即 第2、3 的数据
            outKey.set(split[2]);
            outValue.set("order#" + split[3]);
            context.write(outKey, outValue);
        }
    }
    

    Reducer:

    //用来存放:商品ID、商品名称
    List<String> productsList = new ArrayList<>();
    
    //用来存放:商品ID、购买数量
    List<String> ordersList = new ArrayList<>();
    
    Text outValue = new Text();
    
    protected void reduce {
    
        for (Text text : values) {
            String value = text.toString();
            
            if(value.startsWith("product#")) {
                productsList.add(value.split("#")[1]); //取出 商品名称
                
            } else if(value.startsWith("order#")){
                ordersList.add(text.toString().split("#")[1]); //取出商品的销量
            }
        }
        int totalOrders = 0;
        for (int i=0; i < productsList.size(); i++) {
            for (int j=0; j < ordersList.size(); j++) {
                totalOrders += ordersList.get(j);
            }
            outValue.set(productsList.get(i) + "\t" + totalOrders );
            //最后的输出是:商品ID、商品名称、购买数量
            context.write(key, outValue);
        }
    }
    

    上面的代码即是 Join 的过程。

    下面我们说下 Map Join。

    什么是 Map Join

    Map Join 是指 Join 发生在 MapReduce 的 Map 阶段,而我们通常在 MapReduce 中把两张或多个表结合,是在 Reduce 端处理。

    为什么要使用 Map Join

    我们假设 某些商品卖的特别好,比如小米手机,产生了大量的订单,数据量特别大;而某些商品销量惨淡,比如锤子手机。那么某一个 Reduce Task 处理了大量数据,某个 Reduce Task 可能只处理几条数据,即产生了数据倾斜的问题。

    而 我们上面举的例子,商品相对于订单 的数据量来说,是非常小的,可能一个商城 有几百种商品,而订单量可能达到上千万。

    而 Map Join,适用于: 合并两个表数据,但有数据倾斜的问题,且一张表的数据量很小,另一张表数据量很大的情况。

    如何实现 Map Join

    我们可以通过 DistributedCache 来实现。
    DistributedCache 是一个提供给Map/Reduce框架的工具,用来缓存指定的文件到 每一个 slave 节点上。

    我们通过 DistributedCache 把小文件发给每一个 Mapper,在 每一个 Mapper 中实现 上面 Reducer 的功能,这样发送到 Reducer 中的数据已经是聚合过的数据,数据量大大减少,也就解决了数据倾斜的问题。

    代码流程:

    1. 在 Main 方法中调用 DistributedCache 的Api 把小文件保存起来。
    2. 在 Mapper 的 setup() 中通过 DistributedCache 的 Api 来获取小文件,并保存到 HashMap 中。
      Mapper 中有一个 setup() 方法,在 map() 之前执行,通常做一些初始化工作。
    3. 在 map() 中 从HasnMap 中读取数据,并读取大文件,把两个文件进行合并。
    4. 在 Reducer 中进一步处理。
    看一下具体代码:

    Main:

    public static void main(String[] args) {
        ...
        DistributedCache.addCacheFile(new Path("...").toUri() , conf);
    }
    

    Mapper 中的 setup():

    private static Map<String,String> productMap =  new HashMap<>();
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //Path[] files = context.getLocalCacheFiles();
        Path[] files = DistributedCache.getLocalCacheFiles(conf);
        
        String strPath = files[0].toUri().toString();
        
        BufferedReader br = new BufferedReader(new FileReader(strPath));
        
        while((readLine = br.readLine()) != null) {
            String[] split = readLine.split(",");
            String productId = split[0];
            String productName = split[1];
    
            productMap.put(productId, productName);
        }
    }
    

    Mapper 中的 map():

    @Override
    protected void map {
        //读取的 orders.txt 中的数据
        String[] split = value.toString().split(",");
    
        String productId = split[2];
        String saleSum = split[3];
    
        String productName = productMap.get(productId);
    
        outKey.set(productId);
        outValue.set(productName + "\t" + saleSum);
    
        context.write(outKey, outValue);
    }
    

    这样就完成了。

    相关文章

      网友评论

          本文标题:MapReduce Map端 join 的一个例子

          本文链接:https://www.haomeiwen.com/subject/qrqlyftx.html