美文网首页大数据分析大数据玩转大数据
Spark + Hbase 自定义读取分片数据、深挖内部原理

Spark + Hbase 自定义读取分片数据、深挖内部原理

作者: 大猪大猪 | 来源:发表于2019-03-24 23:38 被阅读9次

    大猪 见很多文章都写了Hbase如何设计rowkey避免热点问题,就连 大猪 的文章也写过这样的优化,但是只说到了优化的点上,那如何方便读取呢?刚才就有一位老朋友跟我说他的方案,他是做了16个预分区,然后就把16个分区的数据使用spark的union起来,组成16个RDD,牛批的孩子,看到他这么干,我得写篇文章出来探讨一下这个问题了。

    Rowkey设计

    在设计Hbase的rowkey的时候,我们往往会在高位上设置加上数字或者是Hash用来打散数据,特别是日志数据。
    举个例子:

    00|2019010100000|ab2d3c
    41|2019010100001|ab2d3c
    ee|2019010100002|ab2d3c
    

    假设有8台RS,表创建的时候就要使用预分区,就像下面一样。

    create 'logTable',{NAME => 'info',
    CONFIGURATION => {
    'SPLIT_POLICY' =>'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy',
    'KeyPrefixRegionSplitPolicy.prefix_length'=>'2'
    },COMPRESSION=>'SNAPPY'},
    SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0']
    

    实际的表生成rowkey范围就会像下面这样


    Hbase 自定义读取分片数据

    上面我们的三条就会根据rowkey前2个位自动选择分区
    这样就达到打散的效果,热点问题就不会产生了。

    但是:我们如何同一分钟的数据会打到不同的分区,我们不能预先知道数据在哪些分区,通过一个Scan是查不完的,必要把所有分区都查下遍,才知道分区中有没有我们想的数据。

    00 => 会放在第一个分区    空 ~ 20
    41 => 会放在第三个分区    40 ~ 60
    e0 => 会放在最后一个分区   e0 ~ 空
    

    Hbase给我们的 TableInputFormat API里面只有设置startend,那让我怎么去读取这些分区的数据?

    我的那个老朋友的做法我已经猜到了他是怎么玩的了,因为他预分区是16个,就是0~f,也就是他去按照分区加start跟end去scan16次,每次得到一个RDD,再union起来就是他要的数据结果。

    var unionRdd:RDD[_] = null
    Array("1","2"...)
          .foreach(index => {
              val conf = new Configuration
              conf.set(TableInputFormat.SCAN_ROW_START,index + "|20190101000000")
              conf.set(TableInputFormat.SCAN_ROW_STOP,index + "|20190101000000")
              val rdd = sc.newAPIHadoopRDD(
              conf,
              classOf[TableInputFormat],
              classOf[ImmutableBytesWritable],
              classOf[Result]
              )
             unionRdd = logRdd.union(rdd)
          })
    

    大概就是这样写,unionRdd是最后合成的一个大RDD,后面用来计算。
    其实我的老朋友这样写其实也是可以的。我只想说,你真会玩。


    Hbase 自定义读取分片数据

    不知道union在spark上,是会产生shuffle操作的么?

    源码分析

    来来来,我们来看一下TableInputFormat的源码到底是怎么处理读取Hbase的分区数据的:
    我们看TableInputFormat类中,从getSplits => oneInputSplitPerRegion => 挖出这个方法

    private List<InputSplit> oneInputSplitPerRegion() throws IOException {
            RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(this.getRegionLocator(), this.getAdmin());
            TableName tableName = this.getTable().getName();
            Pair<byte[][], byte[][]> keys = this.getStartEndKeys();
            if (keys != null && keys.getFirst() != null && ((byte[][])keys.getFirst()).length != 0) {
                List<InputSplit> splits = new ArrayList(((byte[][])keys.getFirst()).length);
    
                for(int i = 0; i < ((byte[][])keys.getFirst()).length; ++i) {
                    if (this.includeRegionInSplit(((byte[][])keys.getFirst())[i], ((byte[][])keys.getSecond())[i])) {
                        byte[] startRow = this.scan.getStartRow();
                        byte[] stopRow = this.scan.getStopRow();
                        ...
                        // ============= 重点 ===============
                        TableSplit split = new TableSplit(tableName, this.scan, splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
                        splits.add(split);
                        // ============= 重点 ===============
                        }
                    }
                }
    
                return splits;
            }
    
    1. 其实从前三句就可以看出来了,通admin去拿到hbase表的所有分片信息。
      返回的多个InputSplit对应上的就是Spark的多个分区,如果有Hbase16个分片就会有16个分区。

    我们可以从NewHadoopRDD类中的getPartitions挖出来确实是这样子的。

    override def getPartitions: Array[Partition] = {
        val inputFormat = inputFormatClass.newInstance
        inputFormat match {
          case configurable: Configurable =>
            configurable.setConf(_conf)
          case _ =>
        }
        // =========== T =========
        val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
        // =========== T =========
        val rawSplits = if (ignoreEmptySplits) {
          allRowSplits.filter(_.getLength > 0)
        } else {
          allRowSplits
        }
        val result = new Array[Partition](rawSplits.size)
        for (i <- 0 until rawSplits.size) {
          result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
        }
        result
      }
    
    1. 看看oneInputSplitPerRegion方法上面注释重点的地方,其实就是你们写在程序conf配置上的 start = splitStartend = splitStop ,或者还有scan的各种过滤器等,再加的Hbase的 regionLocation 就组成一个分区查询了,你们的数据就是这么被Spark在每个分区上查出来给你们的。
    TableSplit split = new TableSplit(tableName, this.scan, splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
    splits.add(split);
    

    看到文章这里进度的小伙伴们,是不是已经想到怎么做了?

    结论

    既然的数据在分区上,我们重写TableInputFormat的getSplits获取分区就行了。

    scala版本TableInputFormat2

    class TableInputFormat2 extends TableInputFormat {
    
      @throws(classOf[IOException])
      override def getSplits(context: JobContext): util.List[InputSplit] = {
        this.initialize(context)
        val conf = context.getConfiguration
        //scan 的rowkey start 开始范围
        val start = conf.get(TableInputFormat.SCAN_ROW_START, "")
        //scan 的rowkey end 结束范围
        val end = conf.get(TableInputFormat.SCAN_ROW_STOP, "")
        //预分区起始,例如:0
        val splitStart = conf.get("hbase.table.split.startkey", "0")
        //邓分区结束,例如:f  
        val splitEnd = conf.get("hbase.table.split.endkey", "")
        //预分区进制
        val rowkeyRadix = conf.getInt("hbase.table.split.radix", 10)
        //连接符号,例如:00|20190101
        val rowkeyConcat = conf.get("hbase.table.split.concat", "")
        //直接读取指定分区
        val regionSplits = conf.get("hbase.table.split.list", "")
        val numLength = Math.max(splitEnd.length, 1)
        val preString = "000000000000000000000000000000"
        val scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN))
        if (StringUtils.isNotBlank(regionSplits) || StringUtils.isNoneBlank(splitEnd)) {
          var repls: Array[String] = null
          if (StringUtils.isNotBlank(regionSplits)) {
            repls = regionSplits.trim.split(",", -1)
          } else {
            if (rowkeyRadix == 10 || rowkeyRadix == 16) {
              repls = (lang.Long.valueOf(splitStart, rowkeyRadix).toInt to lang.Long.valueOf(splitEnd, rowkeyRadix).toInt)
                .map(x => if (rowkeyRadix == 16) Integer.toHexString(x) else x.toString)
                .map(i => s"$preString$i".takeRight(numLength))
                .toArray
            } else throw new Exception(rowkeyRadix + " => radix only working in ( 16 | 8 )")
          }
          repls
            .map {
              prefix =>
                val location = getRegionLocator.getRegionLocation(Bytes.toBytes(prefix))
    
                val splitStart: Array[Byte] = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(start))
                val splitStop: Array[Byte] = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(end))
                new TableSplit(getTable.getName, scan, splitStart, splitStop, location.getHostname)
            }.toList
        } else {
          super.getSplits(context)
        }
      }
    }
    
    1. 以上实现了10进制与16进制读取分区操作
    2. 也可以直接指定分区读取

    要上scala的代码了

    Hbase 自定义读取分片数据
    //比如要查一天的数据
    val conf: Configuration = new Configuration()
    conf.set("hbase.mapreduce.inputtable", "logTable")
    conf.set(TableInputFormat.SCAN_ROW_START, "20190101000000")
    conf.set(TableInputFormat.SCAN_ROW_STOP, "20190102000000")
    conf.set("hbase.table.split.startkey", "0")
    conf.set("hbase.table.split.endkey", "f")
    conf.set("hbase.table.split.radix", 16)
    conf.set("hbase.table.split.concat", "|")
    
    val logRdd = sc.newAPIHadoopRDD(
              conf,
              //classOf[TableInputFormat],
              classOf[TableInputFormat2],
              classOf[ImmutableBytesWritable],
              classOf[Result]
            )
    println(logRdd.count())
    

    不会scala?

    Spark 读取 Hbase 自定义读取分片数据

    给你个Java版本的好了

    public class TableInputFormat2 extends TableInputFormat {
    
        @Override
        public List<InputSplit> getSplits(JobContext context) throws IOException {
            this.initialize(context);
            Configuration conf = context.getConfiguration();
            String start = conf.get(TableInputFormat.SCAN_ROW_START, "");
            String end = conf.get(TableInputFormat.SCAN_ROW_STOP, "");
            String splitStart = conf.get("hbase.table.split.startkey", "0");
            String splitEnd = conf.get("hbase.table.split.endkey", "");
            int rowkeyRadix = conf.getInt("hbase.table.split.radix", 10);
            String rowkeyConcat = conf.get("hbase.table.split.concat", "");
            String regionSplits = conf.get("hbase.table.split.list", "");
            int numLength = Math.max(splitEnd.length(), 1);
            String preString = "000000000000000000000000000000";
            Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
            if (StringUtils.isNotBlank(regionSplits) || StringUtils.isNoneBlank(splitEnd)) {
                String[] repls = null;
                if (StringUtils.isNotBlank(regionSplits)) {
                    repls = regionSplits.trim().split(",", -1);
                } else {
                    Integer s = Integer.parseInt(splitStart);
                    Integer e = Long.valueOf(end, rowkeyRadix).intValue() + 1;
                    repls = IntStream
                            .range(s, e)
                            .mapToObj(value -> {
                                if (rowkeyRadix == 16) {
                                    String ss = preString + Integer.toHexString(value);
                                    return StringUtils.substring(ss, ss.length() - numLength, ss.length());
                                } else {
                                    String ss = String.valueOf(value);
                                    return StringUtils.substring(ss, ss.length() - numLength, ss.length());
                                }
                            })
                            .collect(Collectors.toList()).toArray(new String[]{});
    
                }
                List<InputSplit> splitLists = new ArrayList<>();
                for (String prefix : repls) {
                    HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(prefix));
                    byte[] _splitStart = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(start));
                    byte[] _splitStop = Bytes.add(Bytes.toBytes(prefix + rowkeyConcat), Bytes.toBytes(end));
                    new TableSplit(getTable().getName(), scan, _splitStart, _splitStop, location.getHostname());
                }
                return splitLists;
            }
            return super.getSplits(context);
        }
    }
    

    程序

    SparkSession spark = SparkSession.builder()
                    .appName("test")
                    .master("local")
                    .getOrCreate();
    
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.mapreduce.inputtable", "logTable");
            conf.set(TableInputFormat.SCAN_ROW_START, "20190101000000");
            conf.set(TableInputFormat.SCAN_ROW_STOP, "20190102000000");
            conf.set("hbase.table.split.startkey", "0");
            conf.set("hbase.table.split.endkey", "f");
            conf.setInt("hbase.table.split.radix", 16);
            conf.set("hbase.table.split.concat", "|");
    
            RDD<Tuple2<ImmutableBytesWritable, Result>> logRdd = spark.sparkContext()
                    .newAPIHadoopRDD(
                            conf,
                            TableInputFormat3.class,
                            ImmutableBytesWritable.class,
                            Result.class
                    );
    
    System.out.println(logRdd.count());
    

    运行没错请告诉,反正我没运行过。


    Hbase TableInputFormat

    相关文章

      网友评论

        本文标题:Spark + Hbase 自定义读取分片数据、深挖内部原理

        本文链接:https://www.haomeiwen.com/subject/ummpvqtx.html