自定义Rowkey规则读取Hbase数据

作者: 大猪大猪 | 来源:发表于2018-08-09 14:55 被阅读13次

    在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制TableInputFormat来实现我们的需求了,我们还可以采用Flink的DataSet的方式读取,另外下面还有Spark读取的例子。

    使用教程

    Md5Util.java

    import org.apache.commons.codec.binary.Hex;
    
    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    public class Md5Util {
        public static String md5(byte[] key) {
            return md5(key, 0, key.length);
        }
    
        public static String md5(byte[] key, int offset, int length) {
            try {
                MessageDigest e = MessageDigest.getInstance("MD5");
                e.update(key, offset, length);
                byte[] digest = e.digest();
                return new String(Hex.encodeHex(digest));
            } catch (NoSuchAlgorithmException var5) {
                throw new RuntimeException("Error computing MD5 hash", var5);
            }
        }
    
        public static String md5(String str) {
            return md5(str.getBytes());
        }
        public static String md5(String str,int offset, int length) {
            return md5(str.getBytes(),offset,length);
        }
    }
    

    数据Split方式

    private Connection connection;
        private Admin admin;
    
        @Before
        public void init() throws Exception {
            System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
            System.setProperty("sun.security.krb5.debug", "false");
            final String user = "hbase/abc.demo.com@DEMO.COM";
            final String keyPath = "/home/dounine/kerberos/lake.keytab";
    
            Configuration conf = new Configuration();
            conf.addResource("hbase-site.xml");
    
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(user, keyPath);
    
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
        }
    
    @Test
        public void createTable() throws IOException {
            TableName table = TableName.valueOf("logTable1");
            TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table);
            tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName());
            tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2");
    
            ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build();
            ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build();
            ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build();
            tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF));
            try {
                byte[][] splitKeys = new byte[4][];
                splitKeys[0] = Bytes.toBytes("00");
                splitKeys[1] = Bytes.toBytes("40");
                splitKeys[2] = Bytes.toBytes("80");
                splitKeys[3] = Bytes.toBytes("c0");
                admin.createTable(tableDesc.build(),splitKeys);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    

    logTable1数据写入方式

    public class HbaseKerberos{
        private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class);
        private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
        private static final String TABLE_NAME = "logTable1";
    
        public void insertDataToHbase1(String appKey,List<Log> hasDatas) throws IOException {
            Table table = HbaseUtils.getTable(TABLE_NAME);
            Long sumCount = 0L;
    
            /**
             * 常规值
             */
            byte[] extCF = Bytes.toBytes("ext");//CF列族
            Random random = new Random();
            List<Put> rows = new ArrayList<>();
            for (Log logEntity : hasDatas) {
                JSONObject dataJsonObject = logEntity.getData();
                JSONObject extJsonObject = dataJsonObject.getJSONObject("ext");
                String userId = extJsonObject.getString("userId");
                String timeStr = logEntity.getTime().format(dtf);
              
                String md5Str = Md5Util.md5(userId);
                String rowKey = new StringBuilder()
                        .append(md5Str.substring(0,2))//md5出来的前两位最高为ff,00~ff为256位,后期Region可以增加那么多,足够使用了。
                        .append("|")
                        .append(timeStr)//时间
                        .append("|")
                        .append(CrcUtil.getCrcValue(appKey))
                        .append("|")
                        .append(md5Str.substring(2,8))
                        .append("|")
                        .append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2))
                        .toString();
                Put row = new Put(Bytes.toBytes(rowKey));
    
                for(String keyName : extJsonObject.keySet()){
                    String value = extJsonObject.getString(keyName);
                    if(StringUtils.isNotBlank(value)){
                        row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value));
                    }
                }
                row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString()));
          
                /**
                 * 设备信息
                 */
                putFieldToRow(logEntity.getData(),"device",row);
    
                /**
                 * 位置信息
                 */
                putFieldToRow(logEntity.getData(),"location",row);
    
                rows.add(row);
            }
            for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){
                Object[] results = new Object[(durtation[1]-durtation[0])];
                try {
                    table.batch(rows.subList(durtation[0], durtation[1]),results);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                sumCount += (durtation[1]-durtation[0]);
            }
            LOGGER.info("write data count:" + sumCount);
        }
    }
    

    logTable1数据

    00|20180518203401772|2352356512|4519 column=ext:appKey, timestamp=1533646292389, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                      
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:channelCode, timestamp=1533646292389, value=guanlan-resurrection-002-                            
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:createDateTime, timestamp=1533646292389, value=1526646836093                                     
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:retain, timestamp=1533646292389, value=17670                                                     
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:scene, timestamp=1533646292389, value=1007                                                       
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:shareId, timestamp=1533646292389, value=ogJmG5ItE_nBCS3pg5XCvGotGI1c                             
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:time, timestamp=1533646292389, value=2018-05-18T20:34:01                                         
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:type, timestamp=1533646292389, value=login_in                                                    
     f3|f1                                                                                                                                            
     00|20180518203401772|2352356512|4519 column=ext:userId, timestamp=1533646292389, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                              
     f3|f1                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:appKey, timestamp=1533646347725, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                      
     f3|54                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:channelCode, timestamp=1533646347725, value=guanlan-regular-001-                                 
     f3|54                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:createDateTime, timestamp=1533646347725, value=1526646839075                                     
     f3|54                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:retain, timestamp=1533646347725, value=17670                                                     
     f3|54                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:shareId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                             
     f3|54                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:time, timestamp=1533646347725, value=2018-05-18T20:34:06                                         
     f3|54                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:type, timestamp=1533646347725, value=sharesuccess                                                
     f3|54                                                                                                                                            
     00|20180518203406167|2352356512|4519 column=ext:userId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                              
     f3|54                                                                                                                                            
     00|20180518203407144|2352356512|5ca1 column=ext:appKey, timestamp=1533646294045, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                      
     c4|bc                                                                                                                                            
     00|20180518203407144|2352356512|5ca1 column=ext:createDateTime, timestamp=1533646294045, value=1526646849745                                     
     c4|bc                                                                                                                                            
     00|20180518203407144|2352356512|5ca1 column=ext:retain, timestamp=1533646294045, value=17670                                                     
     c4|bc                                                                                                                                            
     00|20180518203407144|2352356512|5ca1 column=ext:scene, timestamp=1533646294045, value=1037                                                       
     c4|bc                                                                                                                                            
     00|20180518203407144|2352356512|5ca1 column=ext:time, timestamp=1533646294045, value=2018-05-18T20:34:07                                         
     c4|bc                                                                                                                                            
     00|20180518203407144|2352356512|5ca1 column=ext:type, timestamp=1533646294045, value=login_in  
    

    CustomTableInputFormat.java

    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.hbase.HRegionLocation;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableSplit;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.hbase.util.Strings;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.net.DNS;
    
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.UnknownHostException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    public class CustomTableInputFormat extends TableInputFormat {
    
        private HashMap<InetAddress, String> reverseDNSCacheMap =
                new HashMap<>();
        private List<String> keys = new ArrayList<>();
    
        public CustomTableInputFormat(){
            super();
            for(int i =0;i<256;i++){
                keys.add(StringUtils.substring("00"+Integer.toHexString(i),-2));
            }
        }
    
        @Override
        public List<InputSplit> getSplits(JobContext context) throws IOException {
            super.initialize(context);
            TableName tableName = super.getTable().getName();
            RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin());
            List<InputSplit> splits = new ArrayList<>();
    
            for (String key : keys) {
                HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(key), false);
                InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
                InetAddress regionAddress = isa.getAddress();
                String regionLocation;
                regionLocation = reverseDNS(regionAddress);
    
                byte[] regionName = location.getRegion().getRegionName();
                String encodedRegionName = location.getRegion().getEncodedName();
                long regionSize = sizeCalculator.getRegionSize(regionName);
    
                byte[] splitStart = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStartRow());
                byte[] splitStop = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStopRow());
    
                TableSplit split = new TableSplit(tableName, this.getScan(),
                        splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
                splits.add(split);
            }
            return splits;
        }
    
        String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
            String hostName = this.reverseDNSCacheMap.get(ipAddress);
            if (hostName == null) {
                String ipAddressString = null;
                try {
                    ipAddressString = DNS.reverseDns(ipAddress, null);
                } catch (Exception e) {
                    ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
                }
                if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
                hostName = Strings.domainNamePointerToHostName(ipAddressString);
                this.reverseDNSCacheMap.put(ipAddress, hostName);
            }
            return hostName;
        }
    }
    

    Flink例子

    static Configuration conf;
    
        static {
            HadoopKrbLogin.login();
            conf = new Configuration();
            String tableName = "logTable1";
            conf.addResource("hbase-site.xml");
    
            Scan scan = new Scan();
            scan.setCaching(1000);
            scan.withStartRow("201805182039".getBytes());
            scan.withStopRow("201805182040".getBytes());
            scan.setCacheBlocks(false);
            conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName);
            ClientProtos.Scan proto = null;
            try {
                proto = ProtobufUtil.toScan(scan);
            } catch (IOException e) {
                e.printStackTrace();
            }
            String ScanToString = Base64.encodeBytes(proto.toByteArray());
            conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString);
        }
    
        public static void main(String[] args) throws Exception {
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<Tuple2<ImmutableBytesWritable, Result>> hbase = env.createInput(
                    HadoopInputs.createHadoopInput(
                            new CustomTableInputFormat(),
                            ImmutableBytesWritable.class,
                            Result.class,
                            Job.getInstance(conf)
                    )
            );
    
            DataSet<LogEntity> toTuple = hbase.map(
                    new MapFunction<Tuple2<ImmutableBytesWritable, Result>, LogEntity>() {
                        public LogEntity map(Tuple2<ImmutableBytesWritable, Result> record) throws Exception {
                            Result result = record.f1;
                            return result2Entity(result);
                        }
                    });
    }
    private static LogEntity result2Entity(Result result) {
            JSONObject root = new JSONObject();
            JSONObject ext = new JSONObject();
            JSONObject device = new JSONObject();
            JSONObject location = new JSONObject();
            for (Cell cell : result.rawCells()) {
                byte[] family = CellUtil.cloneFamily(cell);
                byte[] column = CellUtil.cloneQualifier(cell);
                byte[] value = CellUtil.cloneValue(cell);
                String columnName = Bytes.toString(column);
                if ("ext".equals(Bytes.toString(family))) {
                    if ("durationTime".equals(columnName)) {
                        ext.put(columnName, Bytes.toLong(value));
                    } else if ("time".equals(columnName)) {
                        root.put(columnName, Bytes.toString(value));
                        root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
                    } else {
                        ext.put(columnName, Bytes.toString(value));
                    }
                } else if ("device".equals(Bytes.toString(family))) {
                    device.put(columnName, Bytes.toString(value));
                } else if ("location".equals(Bytes.toString(family))) {
                    location.put(columnName, Bytes.toString(value));
                }
            }
            JSONObject data = new JSONObject();
            if (device.keySet().size() > 0) {
                data.put("device", device);
            }
            if (location.keySet().size() > 0) {
                data.put("location", location);
            }
            data.put("ext", ext);
            root.put("data", data);
            return JSON.parseObject(root.toString(), LogEntity.class);
        }
    

    Spark 例子

    public class SimpleApp implements Serializable {
     static Configuration cfg = null;
        static {
            HadoopKrbLogin.login();
            cfg = new Configuration();
            String tableName = "logTable1";
    
            cfg.addResource("hbase-site.xml");
    
            Scan scan = new Scan();
            scan.setCaching(1000);
            scan.withStartRow("201805182039".getBytes());
            scan.withStopRow("201805182040".getBytes());
            scan.setCacheBlocks(false);
            cfg.set(TableInputFormat.INPUT_TABLE, tableName);
            ClientProtos.Scan proto = null;
            try {
                proto = ProtobufUtil.toScan(scan);
            } catch (IOException e) {
                e.printStackTrace();
            }
            String ScanToString = Base64.encodeBytes(proto.toByteArray());
            cfg.set(TableInputFormat.SCAN, ScanToString);
        }
    
    public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf()
                    .setMaster("local")
                    .setAppName("HbaseDemo");
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
                    jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class);
    
            // do some transformation
            JavaRDD<LogEntity> rdd1 = hBaseRDD.mapPartitions((FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, LogEntity>)
                    tuple2Iterator -> {
                        List<LogEntity> logEntities = new ArrayList<>();
                        while (tuple2Iterator.hasNext()) {
                            Tuple2<ImmutableBytesWritable, Result> tuple = tuple2Iterator.next();
                            Result result = tuple._2;
                            String rowKey = Bytes.toString(result.getRow());
                            logEntities.add(result2Entity(result));
                        }
                        return logEntities.iterator();
                    });
    
    }
    private static LogEntity result2Entity(Result result) {
            JSONObject root = new JSONObject();
            JSONObject ext = new JSONObject();
            JSONObject device = new JSONObject();
            JSONObject location = new JSONObject();
            for (Cell cell : result.rawCells()) {
                byte[] family = CellUtil.cloneFamily(cell);
                byte[] column = CellUtil.cloneQualifier(cell);
                byte[] value = CellUtil.cloneValue(cell);
                String columnName = Bytes.toString(column);
                if ("ext".equals(Bytes.toString(family))) {
                    if ("durationTime".equals(columnName)) {
                        ext.put(columnName, Bytes.toLong(value));
                    } else if ("time".equals(columnName)) {
                        root.put(columnName, Bytes.toString(value));
                        root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
                    } else {
                        ext.put(columnName, Bytes.toString(value));
                    }
                } else if ("device".equals(Bytes.toString(family))) {
                    device.put(columnName, Bytes.toString(value));
                } else if ("location".equals(Bytes.toString(family))) {
                    location.put(columnName, Bytes.toString(value));
                }
            }
            JSONObject data = new JSONObject();
            if (device.keySet().size() > 0) {
                data.put("device", device);
            }
            if (location.keySet().size() > 0) {
                data.put("location", location);
            }
            data.put("ext", ext);
            root.put("data", data);
            return JSON.parseObject(root.toString(), LogEntity.class);
        }
    

    相关文章

      网友评论

        本文标题:自定义Rowkey规则读取Hbase数据

        本文链接:https://www.haomeiwen.com/subject/mnaqbftx.html