美文网首页
MapReduce中分组组件

MapReduce中分组组件

作者: 羋学僧 | 来源:发表于2020-08-13 09:27 被阅读0次

    MapReduce中分组组件

    一、GroupComparator

    定义 分组:分组是mapreduce中shuffle组件当中reduce端的一个功能组件,主要的作用是决定哪些数据作 为一组。

    我们可以自定义分组实现不同的key作为同一个组

    二、分组 案例

    需求

    经典案例:求出每一个订单中成交金额最大的一笔交易

    示例数据如下

        订单编号 商品编号 金额
        order_001 goods_001 100
        order_001 goods_002 200
        order_002 goods_003 300
        order_002 goods_004 400
        order_002 goods_005 500
        order_003 goods_001 100
    

    预期结果:

    order_001   goods_002   200
    order_002   goods_005   500
    order_003   goods_001   100
    

    步骤:

    1、定义实体类
    2、定义Mapper
    3、定义分区
    4、定义分组
    5、定义Reducer
    6、定义主类

    OrderBean.java

    /**
     * 订单实体类
     */
    public class OrderBean implements WritableComparable<OrderBean>{
    
        private String orderId; //订单编号
        private Double price; //订单中某个商品的价格
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public Double getPrice() {
            return price;
        }
    
        public void setPrice(Double price) {
            this.price = price;
        }
    
        @Override
        public String toString() {
            return "OrderBean{" +
                    "orderId='" + orderId + '\'' +
                    ", price=" + price +
                    '}';
        }
    
        /**
         * @param o 实体参数
         * @return
         * 指定排序的规则
         */
        @Override
        public int compareTo(OrderBean o) {
            //1、先比较订单的id,如果id一样,则将订单的金额排序(降序)
            int i = this.orderId.compareTo(o.orderId);
            if (i == 0){
                //因为是降序,所以有-1
                i = this.price.compareTo(o.price) * -1;
            }
            return i;
        }
    
        /**
         * 实现对象的序列化
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(orderId);
            out.writeDouble(price);
        }
    
        /**
         * 实现对象的反序列化
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            this.orderId = in.readUTF();
            this.price = in.readDouble();
        }
    }
    
    

    OrderMapper.java

    /**
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     *     KEYIN:偏移量
     *     VALUEIN:一行文本
     *     KEYOUT:k2 OrderBean
     *     VALUEOUT:v2 文本
     */
    public class OrderMapper extends Mapper<LongWritable,Text,OrderBean,Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //1、拆分行文本数据,得到订单的id和订单的金额
            String[] split = value.toString().split("\t");
    
            //2、封装OrderBean实体类
            OrderBean orderBean = new OrderBean();
            orderBean.setOrderId(split[0]);
            orderBean.setPrice(Double.parseDouble(split[2]));
    
            //3、写入上下文
            context.write(orderBean,value);
        }
    }
    
    

    OrderPartition.java

    /**
     * Partitioner<KEY, VALUE>
     *     KEY:k2
     *     VALUE:v2
     */
    public class OrderPartition extends Partitioner<OrderBean,Text> {
    
        /**
         *
         * @param orderBean k2
         * @param text v2
         * @param numPartitions ReduceTask的个数
         * @return  返回的是分区的编号:比如说:ReduceTask的个数3个,返回的编号是 0 1 2
         */
        @Override
        public int getPartition(OrderBean orderBean, Text text, int numPartitions) {
            return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }
    
    

    OrderGroup.java

    /**
     * 订单的分组类
     * 实现分组有固定的步骤:
     * 1、继承WritableComparator
     * 2、调用父类的构造器
     * 3、指定分组的规则,重写一个方法
     */
    public class OrderGroup extends WritableComparator {
    
        //1、继承WritableComparator类
        //2、调用父类的构造器
        public OrderGroup(){
            //第一个参数就是分组使用的javabean,第二个参数就是布尔类型,表示是否可以创建这个类的实例
            super(OrderBean.class,true);
        }
    
        // 3、指定分组的规则,需要重写一个方法
    
        /**
         * @param a  WritableComparable是接口,Orderbean实现了这个接口
         * @param b WritableComparable是接口,Orderbean实现了这个接口
         * @return
         */
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            //1、对形参a b 做强制类型转换
            OrderBean first = (OrderBean) a;
            OrderBean second = (OrderBean) b;
    
            //2、指定分组的规则
            return first.getOrderId().compareTo(second.getOrderId());
        }
    }
    
    

    OrderReducer.java

    /**
     * Reducer
     *  Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     *      KEYIN:k2
     *      VALUEIN:v2
     *      KEYOUT :k3 一行文本
     *      VALUEOUT:v3 NullWritable
     *
     */
    public class OrderReducer extends Reducer<OrderBean,Text,Text,NullWritable> {
        @Override
        protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int i = 0;
            //获取top N ,下面的代码就是取出来top1。
            for (Text value : values) {
                context.write(value,NullWritable.get());
                i++;
                if (i >= 1){
                    break;
                }
            }
        }
    }
    
    

    JobMain.java

    /**
     * 求订单最大值的主类
     */
    public class JobMain {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //一、初始化一个job
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration, "mygroup");
    
            //二、配置Job信息
            //1、设置输入信息
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job,new Path("D://input/orders.txt"));
    
            //2、设置mapper
            job.setMapperClass(OrderMapper.class);
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(Text.class);
    
            //3 4 5 6  shuffle
            //分区设置
            job.setPartitionerClass(OrderPartition.class);
    
            //分组设置
            job.setGroupingComparatorClass(OrderGroup.class);
    
            //7、设置Reducer
            job.setReducerClass(OrderReducer.class);
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            //8、设置输出
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job,new Path("D://mygroup_out"));
    
            //三、等待完成
            boolean b = job.waitForCompletion(true);
            System.out.println(b);
            System.exit(b ? 0 : 1);
        }
    }
    
    

    实际结果:

    相关文章

      网友评论

          本文标题:MapReduce中分组组件

          本文链接:https://www.haomeiwen.com/subject/nrxmdktx.html