本文将为大家介绍,OpenTSDB这一构建在HBase上的时序数据库,是如何组织其数据,以及又做了哪些优化
OpenTSDB对value的存储优化
OpenTSDB在存储时,会根据值的实际类型,即整型或浮点型,以及其值的实际大小,即1B、2B、4B、8B进行区分,通过额外的标志位来记录其长度,以达到尽量少的使用存储空间的目的。
- 整型
//package net.opentsdb.core;
//TSDB.class
/**
* Adds a single integer value data point in the TSDB.
* @param metric A non-empty string.
* @param timestamp The timestamp associated with the value.
* @param value The value of the data point.
* @param tags The tags on this series. This map must be non-empty.
* @return A deferred object that indicates the completion of the request.
* The {@link Object} has not special meaning and can be {@code null} (think
* of it as {@code Deferred<Void>}). But you probably want to attach at
* least an errback to this {@code Deferred} to handle failures.
* @throws IllegalArgumentException if the timestamp is less than or equal
* to the previous timestamp added or 0 for the first timestamp, or if the
* difference with the previous timestamp is too large.
* @throws IllegalArgumentException if the metric name is empty or contains
* illegal characters.
* @throws IllegalArgumentException if the tags list is empty or one of the
* elements contains illegal characters.
* @throws HBaseException (deferred) if there was a problem while persisting
* data.
*/
public Deferred<Object> addPoint(final String metric,
final long timestamp,
final long value,
final Map<String, String> tags) {
final byte[] v;
if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
v = new byte[] { (byte) value };
} else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) {
v = Bytes.fromShort((short) value);
} else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) {
v = Bytes.fromInt((int) value);
} else {
v = Bytes.fromLong(value);
}
final short flags = (short) (v.length - 1); // Just the length.
return addPointInternal(metric, timestamp, v, tags, flags);
}
- 双精度浮点数
//package net.opentsdb.core;
//TSDB.class
/**
* Adds a double precision floating-point value data point in the TSDB.
* @param metric A non-empty string.
* @param timestamp The timestamp associated with the value.
* @param value The value of the data point.
* @param tags The tags on this series. This map must be non-empty.
* @return A deferred object that indicates the completion of the request.
* The {@link Object} has not special meaning and can be {@code null} (think
* of it as {@code Deferred<Void>}). But you probably want to attach at
* least an errback to this {@code Deferred} to handle failures.
* @throws IllegalArgumentException if the timestamp is less than or equal
* to the previous timestamp added or 0 for the first timestamp, or if the
* difference with the previous timestamp is too large.
* @throws IllegalArgumentException if the metric name is empty or contains
* illegal characters.
* @throws IllegalArgumentException if the value is NaN or infinite.
* @throws IllegalArgumentException if the tags list is empty or one of the
* elements contains illegal characters.
* @throws HBaseException (deferred) if there was a problem while persisting
* data.
* @since 1.2
*/
public Deferred<Object> addPoint(final String metric,
final long timestamp,
final double value,
final Map<String, String> tags) {
if (Double.isNaN(value) || Double.isInfinite(value)) {
throw new IllegalArgumentException("value is NaN or Infinite: " + value
+ " for metric=" + metric
+ " timestamp=" + timestamp);
}
final short flags = Const.FLAG_FLOAT | 0x7; // A float stored on 8 bytes.
return addPointInternal(metric, timestamp,
Bytes.fromLong(Double.doubleToRawLongBits(value)),
tags, flags);
}
- 单精度浮点数
//package net.opentsdb.core;
//TSDB.class
/**
* Adds a single floating-point value data point in the TSDB.
* @param metric A non-empty string.
* @param timestamp The timestamp associated with the value.
* @param value The value of the data point.
* @param tags The tags on this series. This map must be non-empty.
* @return A deferred object that indicates the completion of the request.
* The {@link Object} has not special meaning and can be {@code null} (think
* of it as {@code Deferred<Void>}). But you probably want to attach at
* least an errback to this {@code Deferred} to handle failures.
* @throws IllegalArgumentException if the timestamp is less than or equal
* to the previous timestamp added or 0 for the first timestamp, or if the
* difference with the previous timestamp is too large.
* @throws IllegalArgumentException if the metric name is empty or contains
* illegal characters.
* @throws IllegalArgumentException if the value is NaN or infinite.
* @throws IllegalArgumentException if the tags list is empty or one of the
* elements contains illegal characters.
* @throws HBaseException (deferred) if there was a problem while persisting
* data.
*/
public Deferred<Object> addPoint(final String metric,
final long timestamp,
final float value,
final Map<String, String> tags) {
if (Float.isNaN(value) || Float.isInfinite(value)) {
throw new IllegalArgumentException("value is NaN or Infinite: " + value
+ " for metric=" + metric
+ " timestamp=" + timestamp);
}
final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes.
return addPointInternal(metric, timestamp,
Bytes.fromInt(Float.floatToRawIntBits(value)),
tags, flags);
}
OpenTSDB的rowkey设计
rowkey由metric,tags,base-timestamp组成。这里的base-timestamp以小时为精度,单位为秒,即同一小时、相同metric、tags的所有数据拥有相同的metric
OpenTSDB为metric、tagKey和tagValue分配UniqueID,建立原始值与UniqueID的索引,数据表存储metric、tagKey和tagValue对应的UniqueID而不是原始值
//package net.opentsdb.core;
//IncomingDataPoints.class
/**
* Returns a partially initialized row key for this metric and these tags. The
* only thing left to fill in is the base timestamp.
*/
static byte[] rowKeyTemplate(final TSDB tsdb, final String metric,
final Map<String, String> tags) {
final short metric_width = tsdb.metrics.width();
final short tag_name_width = tsdb.tag_names.width();
final short tag_value_width = tsdb.tag_values.width();
final short num_tags = (short) tags.size();
int row_size = (Const.SALT_WIDTH() + metric_width + Const.TIMESTAMP_BYTES
+ tag_name_width * num_tags + tag_value_width * num_tags);
final byte[] row = new byte[row_size];
short pos = (short) Const.SALT_WIDTH();
copyInRowKey(row, pos,
(tsdb.config.auto_metric() ? tsdb.metrics.getOrCreateId(metric)
: tsdb.metrics.getId(metric)));
pos += metric_width;
pos += Const.TIMESTAMP_BYTES;
for (final byte[] tag : Tags.resolveOrCreateAll(tsdb, tags)) {
copyInRowKey(row, pos, tag);
pos += tag.length;
}
return row;
}
那么,在OpenTSDB中,数据类型的标志位和其真实对应的时间点是怎么存储的呢?
答案是,通过HBase的Qualifier存储了这些信息
//package net.opentsdb.core;
//Internal.class
/**
* Returns a 2 or 4 byte qualifier based on the timestamp and the flags. If
* the timestamp is in seconds, this returns a 2 byte qualifier. If it's in
* milliseconds, returns a 4 byte qualifier
* @param timestamp A Unix epoch timestamp in seconds or milliseconds
* @param flags Flags to set on the qualifier (length &| float)
* @return A 2 or 4 byte qualifier for storage in column or compacted column
* @since 2.0
*/
public static byte[] buildQualifier(final long timestamp, final short flags) {
final long base_time;
if ((timestamp & Const.SECOND_MASK) != 0) {
// drop the ms timestamp to seconds to calculate the base timestamp
base_time = ((timestamp / 1000) - ((timestamp / 1000)
% Const.MAX_TIMESPAN));
final int qual = (int) (((timestamp - (base_time * 1000)
<< (Const.MS_FLAG_BITS)) | flags) | Const.MS_FLAG);
return Bytes.fromInt(qual);
} else {
base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
final short qual = (short) ((timestamp - base_time) << Const.FLAG_BITS
| flags); //3600秒,二进制需要12位来表示,flag,需要4位,故左移4位并做或操作,
//刚好为2个Byte,将这两个信息存储在一个short中
return Bytes.fromShort(qual);
}
}
网友评论