美文网首页
聊聊flink的InternalTimeServiceManag

聊聊flink的InternalTimeServiceManag

作者: go4it | 来源:发表于2019-01-18 13:34 被阅读24次

    本文主要研究一下flink的InternalTimeServiceManager

    InternalTimeServiceManager

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java

    @Internal
    public class InternalTimeServiceManager<K> {
    
        @VisibleForTesting
        static final String TIMER_STATE_PREFIX = "_timer_state";
        @VisibleForTesting
        static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
        @VisibleForTesting
        static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
    
        private final KeyGroupRange localKeyGroupRange;
        private final KeyContext keyContext;
    
        private final PriorityQueueSetFactory priorityQueueSetFactory;
        private final ProcessingTimeService processingTimeService;
    
        private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
    
        private final boolean useLegacySynchronousSnapshots;
    
        InternalTimeServiceManager(
            KeyGroupRange localKeyGroupRange,
            KeyContext keyContext,
            PriorityQueueSetFactory priorityQueueSetFactory,
            ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {
    
            this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);
            this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
            this.keyContext = Preconditions.checkNotNull(keyContext);
            this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
            this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
    
            this.timerServices = new HashMap<>();
        }
    
        @SuppressWarnings("unchecked")
        public <N> InternalTimerService<N> getInternalTimerService(
            String name,
            TimerSerializer<K, N> timerSerializer,
            Triggerable<K, N> triggerable) {
    
            InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
    
            timerService.startTimerService(
                timerSerializer.getKeySerializer(),
                timerSerializer.getNamespaceSerializer(),
                triggerable);
    
            return timerService;
        }
    
        @SuppressWarnings("unchecked")
        <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
            InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
            if (timerService == null) {
    
                timerService = new InternalTimerServiceImpl<>(
                    localKeyGroupRange,
                    keyContext,
                    processingTimeService,
                    createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
                    createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
    
                timerServices.put(name, timerService);
            }
            return timerService;
        }
    
        Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
            return Collections.unmodifiableMap(timerServices);
        }
    
        private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
            String name,
            TimerSerializer<K, N> timerSerializer) {
            return priorityQueueSetFactory.create(
                name,
                timerSerializer);
        }
    
        public void advanceWatermark(Watermark watermark) throws Exception {
            for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
                service.advanceWatermark(watermark.getTimestamp());
            }
        }
    
        //////////////////              Fault Tolerance Methods             ///////////////////
    
        public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
            Preconditions.checkState(useLegacySynchronousSnapshots);
            InternalTimerServiceSerializationProxy<K> serializationProxy =
                new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);
    
            serializationProxy.write(stream);
        }
    
        public void restoreStateForKeyGroup(
                InputStream stream,
                int keyGroupIdx,
                ClassLoader userCodeClassLoader) throws IOException {
    
            InternalTimerServiceSerializationProxy<K> serializationProxy =
                new InternalTimerServiceSerializationProxy<>(
                    this,
                    userCodeClassLoader,
                    keyGroupIdx);
    
            serializationProxy.read(stream);
        }
    
        ////////////////////            Methods used ONLY IN TESTS              ////////////////////
    
        @VisibleForTesting
        public int numProcessingTimeTimers() {
            int count = 0;
            for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
                count += timerService.numProcessingTimeTimers();
            }
            return count;
        }
    
        @VisibleForTesting
        public int numEventTimeTimers() {
            int count = 0;
            for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
                count += timerService.numEventTimeTimers();
            }
            return count;
        }
    }
    
    • InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射
    • getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
    • registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的

    PriorityQueueSetFactory

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java

    public interface PriorityQueueSetFactory {
    
        @Nonnull
        <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
            @Nonnull String stateName,
            @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
    }
    
    • PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口

    HeapPriorityQueueElement

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java

    @Internal
    public interface HeapPriorityQueueElement {
    
        /**
         * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
         * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
         * elements are removed from a {@link HeapPriorityQueue}.
         */
        int NOT_CONTAINED = Integer.MIN_VALUE;
    
        /**
         * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
         */
        int getInternalIndex();
    
        /**
         * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
         * {@link HeapPriorityQueue}.
         *
         * @param newIndex the new index in the timer heap.
         */
        void setInternalIndex(int newIndex);
    }
    
    • HeapPriorityQueueElement接口定义了HeapPriorityQueue所要求的元素类型,它定义了getInternalIndex、setInternalIndex方法

    PriorityComparable

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java

    public interface PriorityComparable<T> {
    
        int comparePriorityTo(@Nonnull T other);
    }
    
    • PriorityComparable定义了comparePriorityTo方法,用于根据priority来进行比对

    Keyed

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java

    public interface Keyed<K> {
    
        K getKey();
    }
    
    • Keyed接口定义了getKey方法,用于返回该对象的key

    InternalTimer

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java

    @Internal
    public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {
    
        /** Function to extract the key from a {@link InternalTimer}. */
        KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;
    
        /** Function to compare instances of {@link InternalTimer}. */
        PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =
            (left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());
        /**
         * Returns the timestamp of the timer. This value determines the point in time when the timer will fire.
         */
        long getTimestamp();
    
        /**
         * Returns the key that is bound to this timer.
         */
        @Nonnull
        @Override
        K getKey();
    
        /**
         * Returns the namespace that is bound to this timer.
         */
        @Nonnull
        N getNamespace();
    }
    
    • InternalTimer继承了PriorityComparable、Keyed接口,它定义了getTimestamp、getKey、getNamespace方法,同时内置了KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATOR

    TimerHeapInternalTimer

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java

    @Internal
    public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {
    
        /** The key for which the timer is scoped. */
        @Nonnull
        private final K key;
    
        /** The namespace for which the timer is scoped. */
        @Nonnull
        private final N namespace;
    
        /** The expiration timestamp. */
        private final long timestamp;
    
        private transient int timerHeapIndex;
    
        TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
            this.timestamp = timestamp;
            this.key = key;
            this.namespace = namespace;
            this.timerHeapIndex = NOT_CONTAINED;
        }
    
        @Override
        public long getTimestamp() {
            return timestamp;
        }
    
        @Nonnull
        @Override
        public K getKey() {
            return key;
        }
    
        @Nonnull
        @Override
        public N getNamespace() {
            return namespace;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
    
            if (o instanceof InternalTimer) {
                InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
                return timestamp == timer.getTimestamp()
                    && key.equals(timer.getKey())
                    && namespace.equals(timer.getNamespace());
            }
    
            return false;
        }
    
        @Override
        public int getInternalIndex() {
            return timerHeapIndex;
        }
    
        @Override
        public void setInternalIndex(int newIndex) {
            this.timerHeapIndex = newIndex;
        }
    
        void removedFromTimerQueue() {
            setInternalIndex(NOT_CONTAINED);
        }
    
        @Override
        public int hashCode() {
            int result = (int) (timestamp ^ (timestamp >>> 32));
            result = 31 * result + key.hashCode();
            result = 31 * result + namespace.hashCode();
            return result;
        }
    
        @Override
        public String toString() {
            return "Timer{" +
                    "timestamp=" + timestamp +
                    ", key=" + key +
                    ", namespace=" + namespace +
                    '}';
        }
    
        @Override
        public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
            return Long.compare(timestamp, other.getTimestamp());
        }
    }
    
    • TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口;这里removedFromTimerQueue接口是调用setInternalIndex(NOT_CONTAINED),即改动其index为NOT_CONTAINED,逻辑删除

    HeapPriorityQueueSetFactory

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java

    public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
    
        @Nonnull
        private final KeyGroupRange keyGroupRange;
    
        @Nonnegative
        private final int totalKeyGroups;
    
        @Nonnegative
        private final int minimumCapacity;
    
        public HeapPriorityQueueSetFactory(
            @Nonnull KeyGroupRange keyGroupRange,
            @Nonnegative int totalKeyGroups,
            @Nonnegative int minimumCapacity) {
    
            this.keyGroupRange = keyGroupRange;
            this.totalKeyGroups = totalKeyGroups;
            this.minimumCapacity = minimumCapacity;
        }
    
        @Nonnull
        @Override
        public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
            @Nonnull String stateName,
            @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
    
            return new HeapPriorityQueueSet<>(
                PriorityComparator.forPriorityComparableObjects(),
                KeyExtractorFunction.forKeyedObjects(),
                minimumCapacity,
                keyGroupRange,
                totalKeyGroups);
        }
    }
    
    • HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet

    小结

    • InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射;getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
    • registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
    • PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口(InternalTimer继承了PriorityComparable、Keyed接口,TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口);HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的InternalTimeServiceManag

          本文链接:https://www.haomeiwen.com/subject/wtkndqtx.html