美文网首页
How does Index Source work in Pr

How does Index Source work in Pr

作者: 疯狂的轻骑兵 | 来源:发表于2021-06-17 14:27 被阅读0次

Overview

Presto Index Source is an optimizing strategy based on the index of data source. It can improve query performance by avoiding reading of data that would be filtered by join condition.

In Index Source strategy, presto will transfer keys from the left table(probe side) to the right table(the Index Source), then the Index Source will do a lookup operation to fetch records according to the keys it received. After that the left and right table can do a hash join operation. That's to say, using index source will not create any splits or do a table scan operation, index source will read records according to the keys it received directly.

So Presto Index Source performs well when the right table is extremely large and we only need a few of them according to the join conditions. Besides, the right table must have an efficient way to fetch rows associated with keys.

Index Source is a little bit like dynamic filter, they all focus on how to reduce the data amount to be read, but dynamic filter has a timeout, and it focuses on the left table.

Code tracing

In order to figure out how Index Source works, I did some code debugging job, so let's start from a simple test case in class com.facebook.presto.tests.AbstractTestIndexedQueries#testBasicIndexJoin. SQL is :

SELECT * FROM (SELECT * FROM lineitem WHERE partkey % 8 = 0) l JOIN orders o ON l.orderkey = o.orderkey;

From its execution plan we can see that Index Source has changed its right table's ScanProjectOperator to an IndexSourceOperator.

IndexSource[tpch_indexed:com.facebook.presto.tests.tpch.TpchIndexHandle@2981c060, lookup = [orderkey_63]] => [orderkey_63:bigint, custkey:bigint, orderstatus:varchar(1), totalprice:double, orderdate:date, orderpriority:varchar(15), clerk:varchar(15), shippriority:integer, comment_64:varchar(79)]
        CPU: 1.68s (30.82%), Scheduled: 1.77s (28.76%), Output: 332605 rows (41.22MB)
        Input avg.: 523.79 rows, Input std.dev.: 102.78%
        orderkey_63 := tpch:orderkey
        custkey := tpch:custkey
        orderstatus := tpch:orderstatus
        totalprice := tpch:totalprice
        orderdate := tpch:orderdate
        orderpriority := tpch:orderpriority
        clerk := tpch:clerk
        shippriority := tpch:shippriority
        comment_64 := tpch:comment

IndexSourceOperator is defined in class com.facebook.presto.operator.index.IndexSourceOperator. Its core code is :

    @Override
    public Supplier<Optional<UpdatablePageSource>> addSplit(Split split)
    {
        requireNonNull(split, "split is null");
        checkState(source == null, "Index source split already set");

        IndexSplit indexSplit = (IndexSplit) split.getConnectorSplit();

        // Normalize the incoming RecordSet to something that can be consumed by the index
        RecordSet normalizedRecordSet = probeKeyNormalizer.apply(indexSplit.getKeyRecordSet());
        // !!!!!filter the right table's records according to the left table's key set ( indexSplit.getKeyRecordSet() )
        ConnectorPageSource result = index.lookup(normalizedRecordSet);
        // create right table's page source according to the filter result, and read the result set page by page later.
        source = new PageSourceOperator(result, operatorContext);

        Object splitInfo = split.getInfo();
        if (splitInfo != null) {
            operatorContext.setInfoSupplier(() -> new SplitOperatorInfo(splitInfo));
        }

        return Optional::empty;
    }

Method ConnectorIndex#lookup is refer to the lookup operation in execution plan. Step into this method. This interface is only implemented by TpchConnectorIndex:

    @Override
    public ConnectorPageSource lookup(RecordSet rawInputRecordSet)
    {
        // convert the input record set from the column ordering in the query to
        // match the column ordering of the index
        RecordSet inputRecordSet = keyFormatter.apply(rawInputRecordSet);

        // !!!!!lookup the values in the index
        RecordSet rawOutputRecordSet = indexedTable.lookupKeys(inputRecordSet);

        // convert the output record set of the index into the column ordering
        // expect by the query
        return new RecordPageSource(outputFormatter.apply(rawOutputRecordSet));
    }

Let's step into method IndexedTable#lookupKeys to see how they do this filtering job:

        public RecordSet lookupKeys(RecordSet recordSet)
        {
            // Since we only return a cached copy of IndexedTable, please make sure you reorder the input to same order of keyColumns
            checkArgument(recordSet.getColumnTypes().equals(keyTypes), "Input RecordSet keys do not match expected key type");

            Iterable<RecordSet> outputRecordSets = Iterables.transform(tupleIterable(recordSet), key -> {
                for (Object value : key.getValues()) {
                    if (value == null) {
                        throw new IllegalArgumentException("TPCH index does not support null values");
                    }
                }
                // lookup record by specified key
                return lookupKey(key);
            });

            // We will return result same order as outputColumns
            return new ConcatRecordSet(outputRecordSets, outputTypes);
        }

        private RecordSet lookupKey(MaterializedTuple tupleKey)
        {
            // fetch records from cache key -> record mapping in local attribute keyToValues
            return new MaterializedTupleRecordSet(keyToValues.get(tupleKey), outputTypes);
        }

For tpch is just a test connector, so when com.facebook.presto.tests.tpch.IndexedTpchConnectorFactory is initializing, it has read all the records in table orders and cached them as key -> record mapping in a ListMultimap named keyToValues, so here we can just get record by key and return. Below is how tpch connector cache data in table orders.

    public TpchIndexedData(String connectorId, TpchIndexSpec tpchIndexSpec)
    {
        requireNonNull(connectorId, "connectorId is null");
        requireNonNull(tpchIndexSpec, "tpchIndexSpec is null");

        TpchMetadata tpchMetadata = new TpchMetadata(connectorId);
        TpchRecordSetProvider tpchRecordSetProvider = new TpchRecordSetProvider();

        ImmutableMap.Builder<Set<TpchScaledColumn>, IndexedTable> indexedTablesBuilder = ImmutableMap.builder();

        Set<TpchScaledTable> tables = tpchIndexSpec.listIndexedTables();
        for (TpchScaledTable table : tables) {
            SchemaTableName tableName = new SchemaTableName("sf" + table.getScaleFactor(), table.getTableName());
            TpchTableHandle tableHandle = tpchMetadata.getTableHandle(null, tableName);
            Map<String, ColumnHandle> columnHandles = new LinkedHashMap<>(tpchMetadata.getColumnHandles(null, tableHandle));
            for (Set<String> columnNames : tpchIndexSpec.getColumnIndexes(table)) {
                List<String> keyColumnNames = ImmutableList.copyOf(columnNames); // Finalize the key order
                Set<TpchScaledColumn> keyColumns = keyColumnNames.stream()
                        .map(name -> new TpchScaledColumn(table, name))
                        .collect(toImmutableSet());

                TpchTable<?> tpchTable = TpchTable.getTable(table.getTableName());
                RecordSet recordSet = tpchRecordSetProvider.getRecordSet(tpchTable, ImmutableList.copyOf(columnHandles.values()), table.getScaleFactor(), 0, 1, TupleDomain.all());
                IndexedTable indexedTable = indexTable(recordSet, ImmutableList.copyOf(columnHandles.keySet()), keyColumnNames);
                indexedTablesBuilder.put(keyColumns, indexedTable);
            }
        }

        indexedTables = indexedTablesBuilder.build();
    }

    private static IndexedTable indexTable(RecordSet recordSet, final List<String> outputColumns, List<String> keyColumns)
    {
        List<Integer> keyPositions = keyColumns.stream()
                .map(columnName -> {
                    int position = outputColumns.indexOf(columnName);
                    checkState(position != -1);
                    return position;
                })
                .collect(toImmutableList());

        ImmutableListMultimap.Builder<MaterializedTuple, MaterializedTuple> indexedValuesBuilder = ImmutableListMultimap.builder();

        List<Type> outputTypes = recordSet.getColumnTypes();
        List<Type> keyTypes = extractPositionValues(outputTypes, keyPositions);

        RecordCursor cursor = recordSet.cursor();
        while (cursor.advanceNextPosition()) {
            List<Object> values = extractValues(cursor, outputTypes);
            List<Object> keyValues = extractPositionValues(values, keyPositions);

            indexedValuesBuilder.put(new MaterializedTuple(keyValues), new MaterializedTuple(values));
        }

        return new IndexedTable(keyColumns, keyTypes, outputColumns, outputTypes, indexedValuesBuilder.build());
    }

相关文章

网友评论

      本文标题:How does Index Source work in Pr

      本文链接:https://www.haomeiwen.com/subject/hmdcyltx.html