美文网首页Flink
聊聊flink的StateTtlConfig

聊聊flink的StateTtlConfig

作者: go4it | 来源:发表于2018-12-24 17:31 被阅读65次

    本文主要研究一下flink的StateTtlConfig

    实例

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.time.Time;
    
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
        
    ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
    stateDescriptor.enableTimeToLive(ttlConfig);
    
    • 这里利用builder创建StateTtlConfig,之后通过StateDescriptor的enableTimeToLive方法传递该config

    StateTtlConfig

    flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java

    /**
     * Configuration of state TTL logic.
     *
     * <p>Note: The map state with TTL currently supports {@code null} user values
     * only if the user value serializer can handle {@code null} values.
     * If the serializer does not support {@code null} values,
     * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
     * at the cost of an extra byte in the serialized form.
     */
    public class StateTtlConfig implements Serializable {
    
        private static final long serialVersionUID = -7592693245044289793L;
    
        public static final StateTtlConfig DISABLED =
            newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build();
    
        /**
         * This option value configures when to update last access timestamp which prolongs state TTL.
         */
        public enum UpdateType {
            /** TTL is disabled. State does not expire. */
            Disabled,
            /** Last access timestamp is initialised when state is created and updated on every write operation. */
            OnCreateAndWrite,
            /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
            OnReadAndWrite
        }
    
        /**
         * This option configures whether expired user value can be returned or not.
         */
        public enum StateVisibility {
            /** Return expired user value if it is not cleaned up yet. */
            ReturnExpiredIfNotCleanedUp,
            /** Never return expired user value. */
            NeverReturnExpired
        }
    
        /**
         * This option configures time scale to use for ttl.
         */
        public enum TimeCharacteristic {
            /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
            ProcessingTime
        }
    
        private final UpdateType updateType;
        private final StateVisibility stateVisibility;
        private final TimeCharacteristic timeCharacteristic;
        private final Time ttl;
        private final CleanupStrategies cleanupStrategies;
    
        private StateTtlConfig(
            UpdateType updateType,
            StateVisibility stateVisibility,
            TimeCharacteristic timeCharacteristic,
            Time ttl,
            CleanupStrategies cleanupStrategies) {
            this.updateType = Preconditions.checkNotNull(updateType);
            this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
            this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
            this.ttl = Preconditions.checkNotNull(ttl);
            this.cleanupStrategies = cleanupStrategies;
            Preconditions.checkArgument(ttl.toMilliseconds() > 0,
                "TTL is expected to be positive");
        }
    
        @Nonnull
        public UpdateType getUpdateType() {
            return updateType;
        }
    
        @Nonnull
        public StateVisibility getStateVisibility() {
            return stateVisibility;
        }
    
        @Nonnull
        public Time getTtl() {
            return ttl;
        }
    
        @Nonnull
        public TimeCharacteristic getTimeCharacteristic() {
            return timeCharacteristic;
        }
    
        public boolean isEnabled() {
            return updateType != UpdateType.Disabled;
        }
    
        @Nonnull
        public CleanupStrategies getCleanupStrategies() {
            return cleanupStrategies;
        }
    
        @Override
        public String toString() {
            return "StateTtlConfig{" +
                "updateType=" + updateType +
                ", stateVisibility=" + stateVisibility +
                ", timeCharacteristic=" + timeCharacteristic +
                ", ttl=" + ttl +
                '}';
        }
    
        @Nonnull
        public static Builder newBuilder(@Nonnull Time ttl) {
            return new Builder(ttl);
        }
    
        /**
         * Builder for the {@link StateTtlConfig}.
         */
        public static class Builder {
    
            private UpdateType updateType = OnCreateAndWrite;
            private StateVisibility stateVisibility = NeverReturnExpired;
            private TimeCharacteristic timeCharacteristic = ProcessingTime;
            private Time ttl;
            private CleanupStrategies cleanupStrategies = new CleanupStrategies();
    
            public Builder(@Nonnull Time ttl) {
                this.ttl = ttl;
            }
    
            /**
             * Sets the ttl update type.
             *
             * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
             */
            @Nonnull
            public Builder setUpdateType(UpdateType updateType) {
                this.updateType = updateType;
                return this;
            }
    
            @Nonnull
            public Builder updateTtlOnCreateAndWrite() {
                return setUpdateType(UpdateType.OnCreateAndWrite);
            }
    
            @Nonnull
            public Builder updateTtlOnReadAndWrite() {
                return setUpdateType(UpdateType.OnReadAndWrite);
            }
    
            /**
             * Sets the state visibility.
             *
             * @param stateVisibility The state visibility configures whether expired user value can be returned or not.
             */
            @Nonnull
            public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {
                this.stateVisibility = stateVisibility;
                return this;
            }
    
            @Nonnull
            public Builder returnExpiredIfNotCleanedUp() {
                return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);
            }
    
            @Nonnull
            public Builder neverReturnExpired() {
                return setStateVisibility(StateVisibility.NeverReturnExpired);
            }
    
            /**
             * Sets the time characteristic.
             *
             * @param timeCharacteristic The time characteristic configures time scale to use for ttl.
             */
            @Nonnull
            public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) {
                this.timeCharacteristic = timeCharacteristic;
                return this;
            }
    
            @Nonnull
            public Builder useProcessingTime() {
                return setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            }
    
            /** Cleanup expired state in full snapshot on checkpoint. */
            @Nonnull
            public Builder cleanupFullSnapshot() {
                cleanupStrategies.strategies.put(
                    CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
                    new CleanupStrategies.CleanupStrategy() {  });
                return this;
            }
    
            /**
             * Sets the ttl time.
             * @param ttl The ttl time.
             */
            @Nonnull
            public Builder setTtl(@Nonnull Time ttl) {
                this.ttl = ttl;
                return this;
            }
    
            @Nonnull
            public StateTtlConfig build() {
                return new StateTtlConfig(
                    updateType,
                    stateVisibility,
                    timeCharacteristic,
                    ttl,
                    cleanupStrategies);
            }
        }
    
        /**
         * TTL cleanup strategies.
         *
         * <p>This class configures when to cleanup expired state with TTL.
         * By default, state is always cleaned up on explicit read access if found expired.
         * Currently cleanup of state full snapshot can be additionally activated.
         */
        public static class CleanupStrategies implements Serializable {
            private static final long serialVersionUID = -1617740467277313524L;
    
            /** Fixed strategies ordinals in {@code strategies} config field. */
            enum Strategies {
                FULL_STATE_SCAN_SNAPSHOT
            }
    
            /** Base interface for cleanup strategies configurations. */
            interface CleanupStrategy extends Serializable {
    
            }
    
            final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);
    
            public boolean inFullSnapshot() {
                return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
            }
        }
    }
    
    • StateTtlConfig用于设置state的TTL属性,这里定义了三个枚举,分别是UpdateType(Disabled、OnCreateAndWrite、OnReadAndWrite)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired)、TimeCharacteristic(ProcessingTime)
    • StateTtlConfig定义了CleanupStrategies,即TTL state的清理策略,默认在读取到expired的state时会进行清理,目前还额外提供在FULL_STATE_SCAN_SNAPSHOT的时候进行清理(在checkpoint时清理full snapshot中的expired state)的选项
    • StateTtlConfig还提供了一个Builder,用于快速设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies

    AbstractKeyedStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java

        /**
         * @see KeyedStateBackend
         */
        @Override
        @SuppressWarnings("unchecked")
        public <N, S extends State, V> S getOrCreateKeyedState(
                final TypeSerializer<N> namespaceSerializer,
                StateDescriptor<S, V> stateDescriptor) throws Exception {
            checkNotNull(namespaceSerializer, "Namespace serializer");
            checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
                    "This operation cannot use partitioned state.");
    
            InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
            if (kvState == null) {
                if (!stateDescriptor.isSerializerInitialized()) {
                    stateDescriptor.initializeSerializerUnlessSet(executionConfig);
                }
                kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
                    namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
                keyValueStatesByName.put(stateDescriptor.getName(), kvState);
                publishQueryableStateIfEnabled(stateDescriptor, kvState);
            }
            return (S) kvState;
        }
    
    • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState

    TtlStateFactory

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.java

    /**
     * This state factory wraps state objects, produced by backends, with TTL logic.
     */
    public class TtlStateFactory<N, SV, S extends State, IS extends S> {
        public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, SV> stateDesc,
            KeyedStateFactory originalStateFactory,
            TtlTimeProvider timeProvider) throws Exception {
            Preconditions.checkNotNull(namespaceSerializer);
            Preconditions.checkNotNull(stateDesc);
            Preconditions.checkNotNull(originalStateFactory);
            Preconditions.checkNotNull(timeProvider);
            return  stateDesc.getTtlConfig().isEnabled() ?
                new TtlStateFactory<N, SV, S, IS>(
                    namespaceSerializer, stateDesc, originalStateFactory, timeProvider)
                    .createState() :
                originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
        }
    
        private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;
    
        private final TypeSerializer<N> namespaceSerializer;
        private final StateDescriptor<S, SV> stateDesc;
        private final KeyedStateFactory originalStateFactory;
        private final StateTtlConfig ttlConfig;
        private final TtlTimeProvider timeProvider;
        private final long ttl;
    
        private TtlStateFactory(
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, SV> stateDesc,
            KeyedStateFactory originalStateFactory,
            TtlTimeProvider timeProvider) {
            this.namespaceSerializer = namespaceSerializer;
            this.stateDesc = stateDesc;
            this.originalStateFactory = originalStateFactory;
            this.ttlConfig = stateDesc.getTtlConfig();
            this.timeProvider = timeProvider;
            this.ttl = ttlConfig.getTtl().toMilliseconds();
            this.stateFactories = createStateFactories();
        }
    
        private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
            return Stream.of(
                Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
                Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
                Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
                Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
                Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
                Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
            ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
        }
    
        private IS createState() throws Exception {
            SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
            if (stateFactory == null) {
                String message = String.format("State %s is not supported by %s",
                    stateDesc.getClass(), TtlStateFactory.class);
                throw new FlinkRuntimeException(message);
            }
            return stateFactory.get();
        }
    
        @SuppressWarnings("unchecked")
        private IS createValueState() throws Exception {
            ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
                stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
            return (IS) new TtlValueState<>(
                originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
                ttlConfig, timeProvider, stateDesc.getSerializer());
        }
    
        @SuppressWarnings("unchecked")
        private <T> IS createListState() throws Exception {
            ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
            ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
                stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
            return (IS) new TtlListState<>(
                originalStateFactory.createInternalState(
                    namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
                ttlConfig, timeProvider, listStateDesc.getSerializer());
        }
    
        @SuppressWarnings("unchecked")
        private <UK, UV> IS createMapState() throws Exception {
            MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
            MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
                stateDesc.getName(),
                mapStateDesc.getKeySerializer(),
                new TtlSerializer<>(mapStateDesc.getValueSerializer()));
            return (IS) new TtlMapState<>(
                originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
                ttlConfig, timeProvider, mapStateDesc.getSerializer());
        }
    
        @SuppressWarnings("unchecked")
        private IS createReducingState() throws Exception {
            ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
            ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
                stateDesc.getName(),
                new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
                new TtlSerializer<>(stateDesc.getSerializer()));
            return (IS) new TtlReducingState<>(
                originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
                ttlConfig, timeProvider, stateDesc.getSerializer());
        }
    
        @SuppressWarnings("unchecked")
        private <IN, OUT> IS createAggregatingState() throws Exception {
            AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
                (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
            TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
                aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
            AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
                stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
            return (IS) new TtlAggregatingState<>(
                originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
                ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
        }
    
        @SuppressWarnings({"deprecation", "unchecked"})
        private <T> IS createFoldingState() throws Exception {
            FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
            SV initAcc = stateDesc.getDefaultValue();
            TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
            FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
                stateDesc.getName(),
                ttlInitAcc,
                new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
                new TtlSerializer<>(stateDesc.getSerializer()));
            return (IS) new TtlFoldingState<>(
                originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
                ttlConfig, timeProvider, stateDesc.getSerializer());
        }
    
        //......
    }
    
    • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建state,如果开启ttl则调用new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState(),否则调用originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
    • 这里createStateFactories创建了不同类型的StateDescriptor对应创建方法的map,在createState的时候,根据指定类型自动调用对应的SupplierWithException,省去if else的判断
    • ValueStateDescriptor对应createValueState方法,创建的是TtlValueState;ListStateDescriptor对应createListState方法,创建的是TtlListState;MapStateDescriptor对应createMapState方法,创建的是TtlMapState;ReducingStateDescriptor对应createReducingState方法,创建的是TtlReducingState;AggregatingStateDescriptor对应createAggregatingState方法,创建的是TtlAggregatingState;FoldingStateDescriptor对应createFoldingState方法,创建的是TtlFoldingState

    小结

    • StateTtlConfig用于设置state的TTL属性,这里主要设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies这几个属性
    • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState
    • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建对应的state;TtlStateFactory的createState会根据不同类型的StateDescriptor创建对应类型的ttl state

    doc

    相关文章

      网友评论

        本文标题:聊聊flink的StateTtlConfig

        本文链接:https://www.haomeiwen.com/subject/dsgzkqtx.html