美文网首页
聊聊flink KeyedStream的KeySelector

聊聊flink KeyedStream的KeySelector

作者: go4it | 来源:发表于2018-12-28 10:32 被阅读58次

    本文主要研究一下flink KeyedStream的KeySelector

    KeyedStream

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

    @Public
    public class KeyedStream<T, KEY> extends DataStream<T> {
    
        /**
         * The key selector that can get the key by which the stream if partitioned from the elements.
         */
        private final KeySelector<T, KEY> keySelector;
    
        /** The type of the key by which the stream is partitioned. */
        private final TypeInformation<KEY> keyType;
    
        /**
         * Creates a new {@link KeyedStream} using the given {@link KeySelector}
         * to partition operator state by key.
         *
         * @param dataStream
         *            Base stream of data
         * @param keySelector
         *            Function for determining state partitions
         */
        public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
            this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
        }
    
        /**
         * Creates a new {@link KeyedStream} using the given {@link KeySelector}
         * to partition operator state by key.
         *
         * @param dataStream
         *            Base stream of data
         * @param keySelector
         *            Function for determining state partitions
         */
        public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
            this(
                dataStream,
                new PartitionTransformation<>(
                    dataStream.getTransformation(),
                    new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
                keySelector,
                keyType);
        }
    
        /**
         * Creates a new {@link KeyedStream} using the given {@link KeySelector} and {@link TypeInformation}
         * to partition operator state by key, where the partitioning is defined by a {@link PartitionTransformation}.
         *
         * @param stream
         *            Base stream of data
         * @param partitionTransformation
         *            Function that determines how the keys are distributed to downstream operator(s)
         * @param keySelector
         *            Function to extract keys from the base stream
         * @param keyType
         *            Defines the type of the extracted keys
         */
        @Internal
        KeyedStream(
            DataStream<T> stream,
            PartitionTransformation<T> partitionTransformation,
            KeySelector<T, KEY> keySelector,
            TypeInformation<KEY> keyType) {
    
            super(stream.getExecutionEnvironment(), partitionTransformation);
            this.keySelector = clean(keySelector);
            this.keyType = validateKeyType(keyType);
        }
    
        //......
    }
    
    • 这里可以看到KeyedStream的不同构造器中都需要一个KeySelector类型的参数

    KeySelector

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/java/functions/KeySelector.java

    @Public
    @FunctionalInterface
    public interface KeySelector<IN, KEY> extends Function, Serializable {
    
        /**
         * User-defined function that deterministically extracts the key from an object.
         *
         * <p>For example for a class:
         * <pre>
         *  public class Word {
         *      String word;
         *      int count;
         *  }
         * </pre>
         * The key extractor could return the word as
         * a key to group all Word objects by the String they contain.
         *
         * <p>The code would look like this
         * <pre>
         *  public String getKey(Word w) {
         *      return w.word;
         *  }
         * </pre>
         *
         * @param value The object to get the key from.
         * @return The extracted key.
         *
         * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
         *                   and trigger recovery or cancellation of the program.
         */
        KEY getKey(IN value) throws Exception;
    }
    
    • KeySelector接口继承了Function接口,定义了getKey方法,用于从IN类型中提取出KEY

    DataStream.keyBy

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

        /**
         * It creates a new {@link KeyedStream} that uses the provided key for partitioning
         * its operator states.
         *
         * @param key
         *            The KeySelector to be used for extracting the key for partitioning
         * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
         */
        public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
            Preconditions.checkNotNull(key);
            return new KeyedStream<>(this, clean(key));
        }
    
        /**
         * It creates a new {@link KeyedStream} that uses the provided key with explicit type information
         * for partitioning its operator states.
         *
         * @param key The KeySelector to be used for extracting the key for partitioning.
         * @param keyType The type information describing the key type.
         * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
         */
        public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
            Preconditions.checkNotNull(key);
            Preconditions.checkNotNull(keyType);
            return new KeyedStream<>(this, clean(key), keyType);
        }
    
        /**
         * Partitions the operator state of a {@link DataStream} by the given key positions.
         *
         * @param fields
         *            The position of the fields on which the {@link DataStream}
         *            will be grouped.
         * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
         */
        public KeyedStream<T, Tuple> keyBy(int... fields) {
            if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
                return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
            } else {
                return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
            }
        }
    
        /**
         * Partitions the operator state of a {@link DataStream} using field expressions.
         * A field expression is either the name of a public field or a getter method with parentheses
         * of the {@link DataStream}'s underlying type. A dot can be used to drill
         * down into objects, as in {@code "field1.getInnerField2()" }.
         *
         * @param fields
         *            One or more field expressions on which the state of the {@link DataStream} operators will be
         *            partitioned.
         * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
         **/
        public KeyedStream<T, Tuple> keyBy(String... fields) {
            return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
        }
    
        private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
            return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
                    getType(), getExecutionConfig())));
        }
    
    • DataStream的keyBy方法用于将DataStream转换为KeyedStream,该方法有不同的重载
    • 一个是支持变长int数组,这个通常用于简单tuple类型,int为tuple的小标,从0开始,如果是多个int,表示是组合key,比如keyBy(0,1)表示要用tuple的第一个和第二个字段作为key;
    • 一个是支持变长String数组,这个通常用于复杂tuple类型及POJO类型,对于POJO,String用于指定字段名,也支持对象/tuple嵌套属性,比如user.zip,对于对象类型的tuple,f0表示该tuple的第一个字段
    • 一个是支持KeySelector,通过Key Selector Function可以自由指定key,比如从对象提取然后做些处理
    • keyBy(int... fields)及keyBy(String... fields)里头均有调用到私有的keyBy(Keys<T> keys)方法,由于KeyedStream的构造器都需要KeySelector参数,所以该方法最后也是通过KeySelectorUtil.getSelectorForKeys将Keys转换为KeySelector对象

    Keys.ExpressionKeys

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/operators/Keys.java

        /**
         * Represents (nested) field access through string and integer-based keys
         */
        public static class ExpressionKeys<T> extends Keys<T> {
            
            public static final String SELECT_ALL_CHAR = "*";
            public static final String SELECT_ALL_CHAR_SCALA = "_";
            private static final Pattern WILD_CARD_REGEX = Pattern.compile("[\\.]?("
                    + "\\" + SELECT_ALL_CHAR + "|"
                    + "\\" + SELECT_ALL_CHAR_SCALA +")$");
    
            // Flattened fields representing keys fields
            private List<FlatFieldDescriptor> keyFields;
            private TypeInformation<?>[] originalKeyTypes;
    
            //......
    
            /**
             * Create String-based (nested) field expression keys on a composite type.
             */
            public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
                checkNotNull(keyExpressions, "Field expression cannot be null.");
    
                this.keyFields = new ArrayList<>(keyExpressions.length);
    
                if (type instanceof CompositeType){
                    CompositeType<T> cType = (CompositeType<T>) type;
                    this.originalKeyTypes = new TypeInformation<?>[keyExpressions.length];
    
                    // extract the keys on their flat position
                    for (int i = 0; i < keyExpressions.length; i++) {
                        String keyExpr = keyExpressions[i];
    
                        if (keyExpr == null) {
                            throw new InvalidProgramException("Expression key may not be null.");
                        }
                        // strip off whitespace
                        keyExpr = keyExpr.trim();
    
                        List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr);
    
                        if (flatFields.size() == 0) {
                            throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType);
                        }
                        // check if all nested fields can be used as keys
                        for (FlatFieldDescriptor field : flatFields) {
                            if (!field.getType().isKeyType()) {
                                throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key.");
                            }
                        }
                        // add flat fields to key fields
                        keyFields.addAll(flatFields);
    
                        String strippedKeyExpr = WILD_CARD_REGEX.matcher(keyExpr).replaceAll("");
                        if (strippedKeyExpr.isEmpty()) {
                            this.originalKeyTypes[i] = type;
                        } else {
                            this.originalKeyTypes[i] = cType.getTypeAt(strippedKeyExpr);
                        }
                    }
                }
                else {
                    if (!type.isKeyType()) {
                        throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
                    }
    
                    // check that all key expressions are valid
                    for (String keyExpr : keyExpressions) {
                        if (keyExpr == null) {
                            throw new InvalidProgramException("Expression key may not be null.");
                        }
                        // strip off whitespace
                        keyExpr = keyExpr.trim();
                        // check that full type is addressed
                        if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
                            throw new InvalidProgramException(
                                "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types.");
                        }
                        // add full type as key
                        keyFields.add(new FlatFieldDescriptor(0, type));
                    }
                    this.originalKeyTypes = new TypeInformation[] {type};
                }
            }
    
            //......
        }
    
    • ExpressionKeys是Keys里头的一个静态类,它继承了Keys对象;keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法

    KeySelectorUtil.getSelectorForKeys

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/keys/KeySelectorUtil.java

    @Internal
    public final class KeySelectorUtil {
    
        public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
            if (!(typeInfo instanceof CompositeType)) {
                throw new InvalidTypesException(
                        "This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
            }
    
            CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
    
            int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
            int numKeyFields = logicalKeyPositions.length;
    
            TypeInformation<?>[] typeInfos = keys.getKeyFieldTypes();
            // use ascending order here, the code paths for that are usually a slight bit faster
            boolean[] orders = new boolean[numKeyFields];
            for (int i = 0; i < numKeyFields; i++) {
                orders[i] = true;
            }
    
            TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
            return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
        }
    
        //......
    }
    
    • KeySelectorUtil.getSelectorForKeys方法用于将Keys转换为KeySelector类型

    小结

    • KeyedStream的不同构造器中都需要一个KeySelector参数
    • DataStream的keyBy方法有不同的重载,支持变长int数组,变长String数组以及KeySelector类型
    • keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法,该方法通过调用KeySelectorUtil.getSelectorForKeys方法将Keys转换为KeySelector类型

    doc

    相关文章

      网友评论

          本文标题:聊聊flink KeyedStream的KeySelector

          本文链接:https://www.haomeiwen.com/subject/iycslqtx.html