在MaxCompute中利用bitmap进行数据处理

作者: 25e88d6d7cbb | 来源:发表于2019-08-07 15:15 被阅读0次

    很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。
    本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。

    
    import com.aliyun.odps.OdpsException;
    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.data.TableInfo;
    import com.aliyun.odps.mapred.JobClient;
    import com.aliyun.odps.mapred.MapperBase;
    import com.aliyun.odps.mapred.ReducerBase;
    import com.aliyun.odps.mapred.conf.JobConf;
    import com.aliyun.odps.mapred.utils.InputUtils;
    import com.aliyun.odps.mapred.utils.OutputUtils;
    import com.aliyun.odps.mapred.utils.SchemaUtils;
    import org.roaringbitmap.RoaringBitmap;
    import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
    
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.nio.ByteBuffer;
    import java.util.Base64;
    import java.util.Iterator;
    
    public class bitmapDemo2
    {
    
        public static class BitMapper extends MapperBase {
    
            Record key;
            Record value;
            @Override
            public void setup(TaskContext context) throws IOException {
                key = context.createMapOutputKeyRecord();
                value = context.createMapOutputValueRecord();
            }
    
            @Override
            public void map(long recordNum, Record record, TaskContext context)
                    throws IOException
            {
                RoaringBitmap mrb=new RoaringBitmap();
                long AID=0;
                {
                    {
                        {
                            {
                                AID=record.getBigint("id");
                                mrb.add((int) AID);
                                //获取key
                                key.set(new Object[] {record.getString("active_date")});
    
                            }
                        }
                    }
                }
                ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
                mrb.serialize(new DataOutputStream(new OutputStream(){
                    ByteBuffer mBB;
                    OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                    public void close() {}
                    public void flush() {}
                    public void write(int b) {
                        mBB.put((byte) b);}
                    public void write(byte[] b) {mBB.put(b);}
                    public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
                }.init(outbb)));
                String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
                value.set(new Object[] {serializedstring});
                context.write(key, value);
            }
        }
    
        public static class BitReducer extends ReducerBase {
            private Record result = null;
    
            public void setup(TaskContext context) throws IOException {
                result = context.createOutputRecord();
            }
    
            public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
                long fcount = 0;
                RoaringBitmap rbm=new RoaringBitmap();
                while (values.hasNext())
                {
                    Record val = values.next();
                    ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
                    ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
                    RoaringBitmap p= new RoaringBitmap(irb);
                    rbm.or(p);
                }
                ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
                rbm.serialize(new DataOutputStream(new OutputStream(){
                    ByteBuffer mBB;
                    OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                    public void close() {}
                    public void flush() {}
                    public void write(int b) {
                        mBB.put((byte) b);}
                    public void write(byte[] b) {mBB.put(b);}
                    public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
                }.init(outbb)));
                String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
                result.set(0, key.get(0));
                result.set(1, serializedstring);
                context.write(result);
            }
        }
        public static void main( String[] args ) throws OdpsException
        {
    
            System.out.println("begin.........");
            JobConf job = new JobConf();
    
            job.setMapperClass(BitMapper.class);
            job.setReducerClass(BitReducer.class);
    
            job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
            job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));
    
            InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
    //        +------------+-------------+
    //        | id         | active_date |
    //        +------------+-------------+
    //        | 1          | 20190729    |
    //        | 2          | 20190729    |
    //        | 3          | 20190730    |
    //        | 4          | 20190801    |
    //        | 5          | 20190801    |
    //        +------------+-------------+
            OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
    //        +-------------+------------+
    //        | active_date | bit_map    |
    //        +-------------+------------+
    //        20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
    //        20190730,OjAAAAEAAAAAAAAAEAAAAAMA
    //        20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D
    
            JobClient.runJob(job);
        }
    }
    
    

    对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。



    本文作者:圣远

    原文链接

    本文为云栖社区原创内容,未经允许不得转载。

    相关文章

      网友评论

        本文标题:在MaxCompute中利用bitmap进行数据处理

        本文链接:https://www.haomeiwen.com/subject/bvbodctx.html