美文网首页
离线计算组件篇-MapReduce-Join连接

离线计算组件篇-MapReduce-Join连接

作者: CoderInsight | 来源:发表于2022-12-02 21:44 被阅读0次

    11.mapreduce的join

    1. reduce join

    (1)、需求

    订单数据表t_order:

    id date pid amount
    1001 20150710 P0001 2
    1002 20150710 P0002 3
    1002 20150710 P0003 3

    商品信息表t_product

    id pname category_id price
    P0001 小米5 1000 2000
    P0002 锤子T1 1000 3000

    假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

    select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product  b on a.pid = b.id  
    

    (2)、实现机制

    • 通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
    • 定义Mapper:
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //现在我们读取了两个文件,如何确定当前处理的这一行数据是来自哪一个文件里面的
            //方式一:通过获取文件的切片,获得文件明
            /*
            FileSplit inputSplit = (FileSplit) context.getInputSplit();//获取我们输入的文件的切片
            //获取文件名称
            String name = inputSplit.getPath().getName();
            if (name.equals("orders.txt")) {
                //订单表数据
            } else {
                //商品表数据
            }
            */
            
            String[] split = value.toString().split(",");
    
            //方式二:因为t_product表,都是以p开头,所以可以作为判断的依据
            if (value.toString().startsWith("p")) {
                //p0002,锤子T1,1000,3000
                //以商品id作为key2,相同商品的数据都会到一起去
                context.write(new Text(split[0]), value);
            } else {
                //order
                // 1001,20150710,p0001,2
                context.write(new Text(split[2]), value);
            }
        }
    }
    
    • 定义Reducer:
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class ReduceJoinReducer extends Reducer<Text,Text,Text, NullWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String order = "";
            String product = "";
            for (Text value : values) {
                if (value.toString().startsWith("p")) {
                    product = value.toString();
                }else {
                    order = value.toString();
                }
            }
            // 将结果保存
            context.write(new Text(order + "\t" + product),NullWritable.get());
        }
    }
    
    • 定义main方法
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class ReduceJoinMain extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //获取job对象
            Job job = Job.getInstance(super.getConf(), ReduceJoinMain.class.getSimpleName());
            job.setJarByClass(ReduceJoinMain.class);
    
            //第一步:读取文件
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path(args[0]));
    
            //第二步:设置自定义mapper逻辑
            job.setMapperClass(ReduceJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            //分区,排序,规约,分组 省略
    
            //第七步:设置reduce逻辑
            job.setReducerClass(ReduceJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            //第八步:设置输出数据路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path(args[1]));
    
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);
            System.exit(run);
        }
    }
    
    

    2. map join

    (1)、原理阐述

    • 适用于关联表中有小表的情形;

    • 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

    • 将小文件上传到hdfs中,然后在Main函数中将文件添加到缓存中,然后使用java io的方式读取文件;

      在Map()函数中将要实现join的字段放在key中,从而实现两个文件之间的join。

    (2)、实现示例

    • 先在mapper类中预先定义好小表,进行join
    • 引入实际场景中的解决方案:一次加载数据库或者用
    • 定义mapper类:
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        //用于保存商品表的数据;productMap中的key是商品id,value是与key对应的表记录
        private Map<String, String> productMap;
    
        /**
         * 初始化方法,只在程序启动调用一次
         *
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
    
            productMap = new HashMap<String, String>();
            // 通过context上下文对象去 获取配置文件对象
            Configuration configuration = context.getConfiguration();
    
            // 通过Job对象去 获取到所有的缓存文件
            // 方式一
            URI[] cacheFiles = Job.getInstance(context.getConfiguration()).getCacheFiles();
            //方式二:deprecated
            //URI[] cacheFiles = DistributedCache.getCacheFiles(configuration);
    
            //现在只有一个缓存文件放进了分布式缓存中
            URI cacheFile = cacheFiles[0];
    
            //获取FileSystem
            FileSystem fileSystem = FileSystem.get(cacheFile, configuration);
            //读取文件,获取到输入流。这里面装的都是商品表的数据
            FSDataInputStream fsDataInputStream = fileSystem.open(new Path(cacheFile));
    
            /**
             * 商品表数据如下:
             * p0001,xiaomi,1000,2
             * p0002,appale,1000,3
             * p0003,samsung,1000,4
             */
            //获取到BufferedReader之后,可以一行一行的读取数据
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream));
            String line = null;
            //每次循环,获得表的一行数据
            while ((line = bufferedReader.readLine()) != null) {
                String[] split = line.split(",");
                // productMap中的key是商品id,value是与key对应的表记录
                productMap.put(split[0], line);
            }
        }
    
        /**
         * @param key
         * @param value   订单表的记录,如 1001,20150710,p0001,2
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            //获取订单表的商品id
            String pid = split[2];
    
            //获取商品表的数据
            String productLine = productMap.get(pid);
    
            System.out.println("Value中的数据:" + value.toString());
            System.out.println("商品表中的数据:" + productLine);
            // 此时把所有的数据直接拼接在一起了,然后便于观察最后的实际拼接的结果;在实际条件下其实也可以使用这种方式,然后在测试的没有问题之后的,可以再把这个删除
            
            context.write(new Text(value.toString() + "\t" + productLine), NullWritable.get());
        }
    }
    
    
    • 定义main方法:
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.net.URI;
    
    /**
     * 需求:同reduce join
     * 实现:select a.id, a.date, b.name, b.category_id, b.price
     * from t_order a
     * join t_product b
     * on a.pid = b.id
     */
    public class MapJoinMain extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //分布式缓存的hdfs路径
            URI uri = new URI("hdfs://node01:8020/cache/product.txt");
    
            // 本地路径:需要用一下形式file:///C:/1、HK/.../pdts.txt   ;需要指定到具体的文件;如果是使用file:///c:\\1、HK\\3、ME...不符合语法会报错
            // URI uri = new URI("file:///C:/1、HK/3、ME/2、高级0x/1、Hadoop集群升级课件/9、MapReduce/MR第一次/12、join操作/map端join/cache/pdts.txt");
            Configuration configuration = super.getConf();
            // 添加缓存文件 方式二:deprecated
            // DistributedCache.addCacheFile(uri, configuration);
    
            //获取job对象
            Job job = Job.getInstance(configuration, MapJoinMain.class.getSimpleName());
            //添加缓存文件:方式一
            job.addCacheFile(uri);
            job.setJarByClass(MapJoinMain.class);
    
            //读取文件,解析成为key,value对
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path(args[0]));
    
            job.setMapperClass(MapJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            //没有reducer逻辑,不用设置了
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setNumReduceTasks(2);
    
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new MapJoinMain(), args);
            System.exit(run);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:离线计算组件篇-MapReduce-Join连接

          本文链接:https://www.haomeiwen.com/subject/mloffdtx.html