Druid内部处理
druid处理InputRow,主要是通过IncrementalIndex类处理。期间各种add()函数调用,把之前构建好的MapBaseInputRow里面的数据进行编码处理,构建bitmap,进行聚合等。
RealtimeManager
realtimeManager里面通过 @LifecycleStart 注解调用start()函数,逐个提交任务。这里FireChief是Runnable接口,其中处理数据的逻辑封装在里面。
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">//chief是Runnable接口。
private final Map<String, Map<Integer, FireChief>> chiefs;
@LifecycleStart
public void start() throws IOException
{
serverAnnouncer.announce();
fireChiefExecutor = Execs.multiThreaded(fireDepartments.size(), "chief-%d");
for (final FireDepartment fireDepartment : fireDepartments) {
final DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment, conglomerate);
chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap<>())
.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
//提交任务
fireChiefExecutor.submit(chief);
}
}</pre>
|
FireChief的源码中的run方法。
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">@Override
public void run()
{
initPlumber();
try {
final Closer closer = Closer.create();
try {
Object metadata = plumber.startJob();
Firehose firehose;
FirehoseV2 firehoseV2;
final boolean success;
if (fireDepartment.checkFirehoseV2()) {
firehoseV2 = initFirehoseV2(metadata);
closer.register(firehoseV2);
//运行firehose函数
success = runFirehoseV2(firehoseV2);
} else {
firehose = initFirehose();
closer.register(firehose);
success = runFirehose(firehose);
}
if (success) {
// pluber.finishJob() is called only when every processing is successfully finished.
closer.register(() -> plumber.finishJob());
}
}
catch (InterruptedException e) {
log.warn("Interrupted while running a firehose");
throw closer.rethrow(e);
}
catch (Exception e) {
log.makeAlert(
e,
"[%s] aborted realtime processing[%s]",
e.getClass().getSimpleName(),
fireDepartment.getDataSchema().getDataSource()
).emit();
throw closer.rethrow(e);
}
catch (Error e) {
log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
throw closer.rethrow(e);
}
finally {
closer.close();
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
</pre>
|
这里主要是 runFirehose(firehose),里面调用了Plumbers.addNextRow方法。
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">private boolean runFirehose(Firehose firehose)
{
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
if (Thread.interrupted() || stopping) {
return false;
}
Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics);
}
return true;
}
</pre>
|
addNextRow方法调用
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">public static void addNextRow(
final Supplier<Committer> committerSupplier,
final Firehose firehose,
final Plumber plumber,
final boolean reportParseExceptions,
final FireDepartmentMetrics metrics
)
{
final InputRow inputRow;
try {
//这里调用了firehose的nextRow方法,承接 转成InputRow 部分
inputRow = firehose.nextRow();
}
catch (ParseException e) {
if (reportParseExceptions) {
throw e;
} else {
log.debug(e, "Discarded row due to exception, considering unparseable.");
metrics.incrementUnparseable();
return;
}
}
if (inputRow == null) {
log.debug("Discarded null row, considering thrownAway.");
metrics.incrementThrownAway();
return;
}
final int numRows;
try {
//将转成inputRow 加入到plumber处理中。这里的plumber一般是Realtimeplumber
numRows = plumber.add(inputRow, committerSupplier);
}
catch (IndexSizeExceededException e) {
// Shouldn't happen if this is only being called by a single thread.
// plumber.add should be swapping out indexes before they fill up.
throw new ISE(e, "WTF?! Index size exceeded, this shouldn't happen. Bad Plumber!");
}
</pre>
|
RealtimePlumber
realtimePlumber里面的add方法主要通过 getSink获取sink,之后调用sink的add方法。
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
long messageTimestamp = row.getTimestampFromEpoch();
final Sink sink = getSink(messageTimestamp);
metrics.reportMessageMaxTimestamp(messageTimestamp);
if (sink == null) {
return -1;
}
final int numRows = sink.add(row, false);
if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
persist(committerSupplier.get());
}
return numRows;
}</pre>
|
Add调用
Sink里面的add方法,本质是调用index的add方法。这里的index一般是
<pre style="margin: 10px 0px 0px;">IncrementalIndex</pre>
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">public int add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
{
if (currHydrant == null) {
throw new IAE("No currHydrant but given row[%s]", row);
}
synchronized (hydrantLock) {
if (!writable) {
return ADD_FAILED;
}
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return ADD_FAILED; // the hydrant was swapped without being replaced
}
return index.add(row, skipMaxRowsInMemoryCheck);
}
}</pre>
|
IncrementalIndex
<pre style="margin: 10px 0px 0px;">IncrementalIndex的add方法主要调用toTimeAndDims()方法,详细看这里:</pre>
<pre style="margin: 10px 0px 0px;">里面又调用addToFacts方法。</pre>
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">public int add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
{
TimeAndDims key = toTimeAndDims(row);
final int rv = addToFacts(
metrics,
deserializeComplexMetrics,
reportParseExceptions,
row,
numEntries,
key,
in,
rowSupplier,
skipMaxRowsInMemoryCheck
);
updateMaxIngestedTime(row.getTimestamp());
return rv;}</pre>
|
这里是采用LSM方式,通过堆内存写入的方式,OnheapIncrementalIndex实现 addToFacts。
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">@Override
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException
{
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) {
aggs = concurrentGet(priorIndex);
doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions);
} else {
aggs = new Aggregator[metrics.length];
factorizeAggs(metrics, aggs, rowContainer, row);
doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions);
final int rowIndex = indexIncrement.getAndIncrement();
concurrentSet(rowIndex, aggs);
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount
&& facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX
&& !skipMaxRowsInMemoryCheck) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
final int prev = facts.putIfAbsent(key, rowIndex);
if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet();
} else {
// We lost a race
aggs = concurrentGet(prev);
doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions);
// Free up the misfire
concurrentRemove(rowIndex);
// This is expected to occur ~80% of the time in the worst scenarios
}
}
return numEntries.get();
}
</pre>
|
这里是真正实现聚合的地方。通过调用doAggregate()方法,调用不同的聚合器的agg.aggregate() 聚合函数实现聚合。
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">private void doAggregate(
AggregatorFactory[] metrics,
Aggregator[] aggs,
ThreadLocal<InputRow> rowContainer,
InputRow row,
boolean reportParseExceptions
)
{
rowContainer.set(row);
for (int i = 0; i < aggs.length; i++) {
final Aggregator agg = aggs[i];
synchronized (agg) {
try {
//这里是调用了不同的聚合器的聚合方法。例如longSum聚合,是把对应的值相加而已
// @Override
// public void aggregate()
// {
// sum += selector.getLong();
// }
agg.aggregate();
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
throw new ParseException(e, "Encountered parse error for aggregator[%s]", metrics[i].getName());
} else {
log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
}
}
}
}
rowContainer.set(null);
}</pre>
|
获取聚合的值
<pre style="margin: 10px 0px 0px;">OnheapIncrementalIndex里面通过调用 getAggVal来调用不同聚合的get方法,获取聚合后的值。在iterator方法中,调用getAggVal实现迭代器的转换。</pre>
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">@Override
protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition)
{
return agg.get();
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null, false).iterator();
}
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending)
{
return () -> {
final List<DimensionDesc> dimensions = getDimensions();
return Iterators.transform(
getFacts().iterator(descending),
timeAndDims -> {
final int rowOffset = timeAndDims.getRowIndex();
Object[] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
Object dim = theDims[i];
DimensionDesc dimensionDesc = dimensions.get(i);
if (dimensionDesc == null) {
continue;
}
String dimensionName = dimensionDesc.getName();
DimensionHandler handler = dimensionDesc.getHandler();
if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
theVals.put(dimensionName, null);
continue;
}
final DimensionIndexer indexer = dimensionDesc.getIndexer();
Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList(dim, DimensionIndexer.LIST);
theVals.put(dimensionName, rowVals);
}
AggregatorType[] aggs = getAggsForRow(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
//此处真正获取聚合的值。
theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
}
//如果有聚合后有后续操作,继续处理
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
//返回MapBaseRow,druid中基本的数据载体
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
);
};
}</pre>
|
persist
realtimePlumber里面的add方法调用add后,继续调用persist方法。
转成InputRow
KafkaEightFirehoseFactory
以kafka数据举例。首先KafkaEightFirehoseFactory实现 FirehoseFactory接口,实现上文中nextRow方法。
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
//构建一个实时数据流 stream
final Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(
ImmutableMap.of(
feed,
1
)
);
final List<KafkaStream<byte[], byte[]>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
final KafkaStream<byte[], byte[]> stream = streamList.get(0);
//转换成迭代器
final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
返回一个Firehose()
return new Firehose()
{
Iterator<InputRow> nextIterator = Iterators.emptyIterator();
@Override
public boolean hasMore()
{
return nextIterator.hasNext() || iter.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{
try {
if (!nextIterator.hasNext()) {
//真是的一行数据转成byte[]
final byte[] message = iter.next().message();
if (message == null) {
return null;
}
//解析一下这条数据 这里调用了parseBatch
nextIterator = theParser.parseBatch(ByteBuffer.wrap(message)).iterator();
}
return nextIterator.next();
}
catch (InvalidMessageException e) {
/*
IF the CRC is caused within the wire transfer, this is not the best way to handel CRC.
Probably it is better to shutdown the fireHose without commit and start it again.
*/
log.error(e, "Message failed its checksum and it is corrupt, will skip it");
return null;
}
}
parseBatch函数</pre>
|
StringInputRowParser
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">StringInputRowParser
@Nullable
private InputRow parseMap(@Nullable Map<String, Object> theMap)
{
// If a header is present in the data (and with proper configurations), a null is returned
if (theMap == null) {
return null;
}
//这里调用MapInputRowParser的parseBatch函数
return Iterators.getOnlyElement(mapParser.parseBatch(theMap).iterator());
}</pre>
|
MapInputRowParse
MapInputRowParser的parseBatch函数,处理完之后返回一个 MapBasedInputRow对象。这个是druid里面存储一行数据的基本类。主要包含了
<pre style="margin: 10px 0px 0px;">long timestamp 时间戳 List<String> dimensions 维度数组, Map<String, Object> event 真正的数据即,维度:维度值。</pre>
|
<pre style="margin: 0px; tab-size: 4; white-space: pre-wrap;">@Override
public List<InputRow> parseBatch(Map<String, Object> theMap)
{
final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions()
? parseSpec.getDimensionsSpec().getDimensionNames()
: Lists.newArrayList(
Sets.difference(
theMap.keySet(),
parseSpec.getDimensionsSpec()
.getDimensionExclusions()
)
);
final DateTime timestamp;
try {
timestamp = parseSpec.getTimestampSpec().extractTimestamp(theMap);
if (timestamp == null) {
final String input = theMap.toString();
throw new NullPointerException(
StringUtils.format(
"Null timestamp in input: %s",
input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
}
catch (Exception e) {
throw new ParseException(e, "Unparseable timestamp found!");
}
return ImmutableList.of(new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap));
}</pre>
|
至此,一行数据转换成MapBaseInputRow对象,即InputRow对象。
网友评论