美文网首页我爱编程
OpenTSDB中的数据存储

OpenTSDB中的数据存储

作者: 即墨灯火 | 来源:发表于2018-03-09 16:40 被阅读840次

本文将为大家介绍,OpenTSDB这一构建在HBase上的时序数据库,是如何组织其数据,以及又做了哪些优化

OpenTSDB对value的存储优化

OpenTSDB在存储时,会根据值的实际类型,即整型或浮点型,以及其值的实际大小,即1B、2B、4B、8B进行区分,通过额外的标志位来记录其长度,以达到尽量少的使用存储空间的目的。

  • 整型
  //package net.opentsdb.core;
  //TSDB.class
  /**
   * Adds a single integer value data point in the TSDB.
   * @param metric A non-empty string.
   * @param timestamp The timestamp associated with the value.
   * @param value The value of the data point.
   * @param tags The tags on this series.  This map must be non-empty.
   * @return A deferred object that indicates the completion of the request.
   * The {@link Object} has not special meaning and can be {@code null} (think
   * of it as {@code Deferred<Void>}). But you probably want to attach at
   * least an errback to this {@code Deferred} to handle failures.
   * @throws IllegalArgumentException if the timestamp is less than or equal
   * to the previous timestamp added or 0 for the first timestamp, or if the
   * difference with the previous timestamp is too large.
   * @throws IllegalArgumentException if the metric name is empty or contains
   * illegal characters.
   * @throws IllegalArgumentException if the tags list is empty or one of the
   * elements contains illegal characters.
   * @throws HBaseException (deferred) if there was a problem while persisting
   * data.
   */
  public Deferred<Object> addPoint(final String metric,
                                   final long timestamp,
                                   final long value,
                                   final Map<String, String> tags) {
    final byte[] v;
    if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
      v = new byte[] { (byte) value };
    } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) {
      v = Bytes.fromShort((short) value);
    } else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) {
      v = Bytes.fromInt((int) value);
    } else {
      v = Bytes.fromLong(value);
    }

    final short flags = (short) (v.length - 1);  // Just the length.
    return addPointInternal(metric, timestamp, v, tags, flags);
  }
  • 双精度浮点数
  //package net.opentsdb.core;
  //TSDB.class
  /**
   * Adds a double precision floating-point value data point in the TSDB.
   * @param metric A non-empty string.
   * @param timestamp The timestamp associated with the value.
   * @param value The value of the data point.
   * @param tags The tags on this series.  This map must be non-empty.
   * @return A deferred object that indicates the completion of the request.
   * The {@link Object} has not special meaning and can be {@code null} (think
   * of it as {@code Deferred<Void>}). But you probably want to attach at
   * least an errback to this {@code Deferred} to handle failures.
   * @throws IllegalArgumentException if the timestamp is less than or equal
   * to the previous timestamp added or 0 for the first timestamp, or if the
   * difference with the previous timestamp is too large.
   * @throws IllegalArgumentException if the metric name is empty or contains
   * illegal characters.
   * @throws IllegalArgumentException if the value is NaN or infinite.
   * @throws IllegalArgumentException if the tags list is empty or one of the
   * elements contains illegal characters.
   * @throws HBaseException (deferred) if there was a problem while persisting
   * data.
   * @since 1.2
   */
  public Deferred<Object> addPoint(final String metric,
                                   final long timestamp,
                                   final double value,
                                   final Map<String, String> tags) {
    if (Double.isNaN(value) || Double.isInfinite(value)) {
      throw new IllegalArgumentException("value is NaN or Infinite: " + value
                                         + " for metric=" + metric
                                         + " timestamp=" + timestamp);
    }
    final short flags = Const.FLAG_FLOAT | 0x7;  // A float stored on 8 bytes.
    return addPointInternal(metric, timestamp,
                            Bytes.fromLong(Double.doubleToRawLongBits(value)),
                            tags, flags);
  }
  • 单精度浮点数
  //package net.opentsdb.core;
  //TSDB.class
  /**
   * Adds a single floating-point value data point in the TSDB.
   * @param metric A non-empty string.
   * @param timestamp The timestamp associated with the value.
   * @param value The value of the data point.
   * @param tags The tags on this series.  This map must be non-empty.
   * @return A deferred object that indicates the completion of the request.
   * The {@link Object} has not special meaning and can be {@code null} (think
   * of it as {@code Deferred<Void>}). But you probably want to attach at
   * least an errback to this {@code Deferred} to handle failures.
   * @throws IllegalArgumentException if the timestamp is less than or equal
   * to the previous timestamp added or 0 for the first timestamp, or if the
   * difference with the previous timestamp is too large.
   * @throws IllegalArgumentException if the metric name is empty or contains
   * illegal characters.
   * @throws IllegalArgumentException if the value is NaN or infinite.
   * @throws IllegalArgumentException if the tags list is empty or one of the
   * elements contains illegal characters.
   * @throws HBaseException (deferred) if there was a problem while persisting
   * data.
   */
  public Deferred<Object> addPoint(final String metric,
                                   final long timestamp,
                                   final float value,
                                   final Map<String, String> tags) {
    if (Float.isNaN(value) || Float.isInfinite(value)) {
      throw new IllegalArgumentException("value is NaN or Infinite: " + value
                                         + " for metric=" + metric
                                         + " timestamp=" + timestamp);
    }
    final short flags = Const.FLAG_FLOAT | 0x3;  // A float stored on 4 bytes.
    return addPointInternal(metric, timestamp,
                            Bytes.fromInt(Float.floatToRawIntBits(value)),
                            tags, flags);
  }

OpenTSDB的rowkey设计

rowkey由metric,tags,base-timestamp组成。这里的base-timestamp以小时为精度,单位为秒,即同一小时、相同metric、tags的所有数据拥有相同的metric

OpenTSDB为metric、tagKey和tagValue分配UniqueID,建立原始值与UniqueID的索引,数据表存储metric、tagKey和tagValue对应的UniqueID而不是原始值

    //package net.opentsdb.core;
    //IncomingDataPoints.class
    /**
    * Returns a partially initialized row key for this metric and these tags. The
    * only thing left to fill in is the base timestamp.
    */
    static byte[] rowKeyTemplate(final TSDB tsdb, final String metric,
    final Map<String, String> tags) {
      final short metric_width = tsdb.metrics.width();
      final short tag_name_width = tsdb.tag_names.width();
      final short tag_value_width = tsdb.tag_values.width();
      final short num_tags = (short) tags.size();

      int row_size = (Const.SALT_WIDTH() + metric_width + Const.TIMESTAMP_BYTES
              + tag_name_width * num_tags + tag_value_width * num_tags);
      final byte[] row = new byte[row_size];

      short pos = (short) Const.SALT_WIDTH();

      copyInRowKey(row, pos,
              (tsdb.config.auto_metric() ? tsdb.metrics.getOrCreateId(metric)
                      : tsdb.metrics.getId(metric)));
      pos += metric_width;

      pos += Const.TIMESTAMP_BYTES;

      for (final byte[] tag : Tags.resolveOrCreateAll(tsdb, tags)) {
        copyInRowKey(row, pos, tag);
        pos += tag.length;
      }
      return row;
  }

那么,在OpenTSDB中,数据类型的标志位和其真实对应的时间点是怎么存储的呢?

答案是,通过HBase的Qualifier存储了这些信息

  //package net.opentsdb.core;
  //Internal.class
  /**
   * Returns a 2 or 4 byte qualifier based on the timestamp and the flags. If
   * the timestamp is in seconds, this returns a 2 byte qualifier. If it's in
   * milliseconds, returns a 4 byte qualifier 
   * @param timestamp A Unix epoch timestamp in seconds or milliseconds
   * @param flags Flags to set on the qualifier (length &| float)
   * @return A 2 or 4 byte qualifier for storage in column or compacted column
   * @since 2.0
   */
  public static byte[] buildQualifier(final long timestamp, final short flags) {
    final long base_time;
    if ((timestamp & Const.SECOND_MASK) != 0) {
      // drop the ms timestamp to seconds to calculate the base timestamp
      base_time = ((timestamp / 1000) - ((timestamp / 1000) 
          % Const.MAX_TIMESPAN));
      final int qual = (int) (((timestamp - (base_time * 1000) 
          << (Const.MS_FLAG_BITS)) | flags) | Const.MS_FLAG);
      return Bytes.fromInt(qual);
    } else {
      base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
      final short qual = (short) ((timestamp - base_time) << Const.FLAG_BITS
          | flags); //3600秒,二进制需要12位来表示,flag,需要4位,故左移4位并做或操作,
                    //刚好为2个Byte,将这两个信息存储在一个short中
      return Bytes.fromShort(qual);
    }
  }

An example of KeyValue (a cell of HBase)

Cell

相关文章

网友评论

    本文标题:OpenTSDB中的数据存储

    本文链接:https://www.haomeiwen.com/subject/edopfftx.html