美文网首页眼君的大数据之路
MapReduce开发笔记(三、join的MR实现)

MapReduce开发笔记(三、join的MR实现)

作者: 眼君 | 来源:发表于2020-09-04 14:34 被阅读0次

reduce端的Join

package com.wenhuan.reducejoin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ReduceJoin {
    static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
        /*
         * 
         * map端做端事情:发送数据的时候需要打标记
         * 
         * */
        
        //由于map中需要知道数据来源,所以最好在进入map函数之前可以获取文件的名字
        String filename = "";
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //获取文件切片
            InputSplit inputsplit = context.getInputSplit();
            //从文件切片获取实现类
            FileSplit fs = (FileSplit)inputsplit;
            filename = fs.getPath().getName();
        }
        
        Text k = new Text();
        Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //解析出来每一行内容,打标记
            String[] infos = value.toString().split("\t");
            if(filename.equals("orders")) {
                k.set(infos[0]);
                v.set("OR"+infos[1] + "\t" + infos[2] + "\t" + infos[3]);
                context.write(k, v);
            }else {
                //
                k.set(infos[0]);
                v.set("PR" + infos[1] + "\t" + infos[2]);
                context.write(k, v);
            }
        }
    }
    
    
    static class MyReducer extends Reducer<Text,Text,Text,NullWritable>{
        Text k = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            /*
             * 关联关系:一对多,一个商品对应多个订单
             * */
            //创建两个集合
            List<String> orderList = new ArrayList<String>();
            List<String> proList = new ArrayList<String>();
            for(Text v:values) {
                String vv = v.toString();
                if(vv.startsWith("OR")) {
                    orderList.add(vv.substring(2));
                }else {
                    proList.add(vv.substring(2));
                }
            }
            
            if(orderList.size() > 0 && proList.size() > 0) {
                for(String ol:orderList) {
                    for(String pl:proList){
                        String res = key.toString() + "\t" + ol + "\t" + pl;
                        k.set(res);
                        context.write(k, NullWritable.get());
                    }
                }
            }
        }
    }
    。。。
}

map端的Join

相比reduce端的Join,我们可以考虑将小表的数据加载到每个运行maptask的内存中。我们每次在map端只需要读大表,当读取到大表中的每行数据,可以直接和内存中的小表进行关联,这样仅仅在map阶段就可以完成join的操作。

在驱动程序中可以通过job.addCacheFile(url)将指定URL的文件加载到缓存中。

这样在mapper的setup中就可以获取小表。

package com.wenhuan.mapjoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Mapjoin {
    static class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
        //setup读取缓存中的数据,封装到容器中
        Map<String,String> map = new HashMap<String,String>();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //获取缓存中的数据路径
            Path path = context.getLocalCacheFiles()[0];
            String p = path.toString();
            BufferedReader br = new BufferedReader(new FileReader(p));
            String line = null;
            while((line = br.readLine()) != null) {
                String[] infos = line.split("\t");
                map.put(infos[0],infos[1]+"\t"+infos[2]);
            }
        }
        
        Text k = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //解析出来每一行内容,打标记
            String[] infos = value.toString().split("\t");
            String pid = infos[2];
            //关联
            if(map.containsKey(pid)) {
                String res = value.toString() + map.get(pid);
                k.set(res);
                context.write(k, NullWritable.get());
            }
        }
    }
    
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //加载配置文件
        Configuration conf = new Configuration();
        //启动一个Job,封装maper和reducer
        Job job = Job.getInstance(conf);
        //设置计算程序的主驱动类,运行的时候打成jar包运行。
        job.setJarByClass(Mapjoin.class);
        //设置Maper
        job.setMapperClass(MyMapper.class);
        
        job.setNumReduceTasks(0);
        //这里指最终的输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //将指定小表加载到缓冲中
        job.addCacheFile(new URI("/info/product"));
        
        //设置输入路径和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交,需要打印日志
        job.waitForCompletion(true);
    }
}

由于Mapjoin不需要Reduce阶段,必须设置NumReduceTasks为0。

相关文章

网友评论

    本文标题:MapReduce开发笔记(三、join的MR实现)

    本文链接:https://www.haomeiwen.com/subject/hlvzsktx.html