美文网首页
聊聊storm的window trigger

聊聊storm的window trigger

作者: go4it | 来源:发表于2018-11-12 23:12 被阅读2次

    本文主要研究一下storm的window trigger

    WindowTridentProcessor.prepare

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

        public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
            this.topologyContext = context;
            List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
            if (parents.size() != 1) {
                throw new RuntimeException("Aggregation related operation can only have one parent");
            }
    
            Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);
    
            this.tridentContext = tridentContext;
            collector = new FreshCollector(tridentContext);
            projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);
    
            windowStore = windowStoreFactory.create(stormConf);
            windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
            windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);
    
            tridentWindowManager = storeTuplesInStore ?
                    new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                    : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());
    
            tridentWindowManager.prepare();
        }
    
    • 这里调用了tridentWindowManager.prepare()

    AbstractTridentWindowManager.prepare

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

        public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
                                            Aggregator aggregator, BatchOutputCollector delegateCollector) {
            this.windowTaskId = windowTaskId;
            this.windowStore = windowStore;
            this.aggregator = aggregator;
            this.delegateCollector = delegateCollector;
    
            windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;
    
            windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
    
            WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
            EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
            windowManager.setEvictionPolicy(evictionPolicy);
            triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
            windowManager.setTriggerPolicy(triggerPolicy);
        }
    
        public void prepare() {
            preInitialize();
    
            initialize();
    
            postInitialize();
        }
    
        private void postInitialize() {
            // start trigger once the initialization is done.
            triggerPolicy.start();
        }
    
    • AbstractTridentWindowManager在构造器里头调用windowStrategy.getTriggerPolicy获取triggerPolicy;prepare方法调用了postInitialize,而它触发triggerPolicy.start()

    SlidingDurationWindowStrategy.getTriggerPolicy

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java

        /**
         * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
         *
         * @param triggerHandler
         * @param evictionPolicy
         * @return
         */
        @Override
        public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
            return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
        }
    
    • 以SlidingDurationWindowStrategy为例,这里创建的是TimeTriggerPolicy,其duration为windowConfig.getSlidingLength(),而triggerHandler则为WindowManager

    TimeTriggerPolicy.start

    storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/TimeTriggerPolicy.java

        public void start() {
            executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
        }
    
       private Runnable newTriggerTask() {
            return new Runnable() {
                @Override
                public void run() {
                    // do not process current timestamp since tuples might arrive while the trigger is executing
                    long now = System.currentTimeMillis() - 1;
                    try {
                        /*
                         * set the current timestamp as the reference time for the eviction policy
                         * to evict the events
                         */
                        if (evictionPolicy != null) {
                            evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
                        }
                        handler.onTrigger();
                    } catch (Throwable th) {
                        LOG.error("handler.onTrigger failed ", th);
                        /*
                         * propagate it so that task gets canceled and the exception
                         * can be retrieved from executorFuture.get()
                         */
                        throw th;
                    }
                }
            };
        }
    
    • start方法注册了一个调度任务,每隔duration触发(windowConfig.getSlidingLength());而run方法是触发handler.onTrigger(),即WindowManager.onTrigger()

    WindowManager.onTrigger

    storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

        /**
         * The callback invoked by the trigger policy.
         */
        @Override
        public boolean onTrigger() {
            List<Event<T>> windowEvents = null;
            List<T> expired = null;
            try {
                lock.lock();
                /*
                 * scan the entire window to handle out of order events in
                 * the case of time based windows.
                 */
                windowEvents = scanEvents(true);
                expired = new ArrayList<>(expiredEvents);
                expiredEvents.clear();
            } finally {
                lock.unlock();
            }
            List<T> events = new ArrayList<>();
            List<T> newEvents = new ArrayList<>();
            for (Event<T> event : windowEvents) {
                events.add(event.get());
                if (!prevWindowEvents.contains(event)) {
                    newEvents.add(event.get());
                }
            }
            prevWindowEvents.clear();
            if (!events.isEmpty()) {
                prevWindowEvents.addAll(windowEvents);
                LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
                windowLifecycleListener.onActivation(events, newEvents, expired);
            } else {
                LOG.debug("No events in the window, skipping onActivation");
            }
            triggerPolicy.reset();
            return !events.isEmpty();
        }
    
    
    • 这里调用了windowLifecycleListener.onActivation(events, newEvents, expired),而windowLifecycleListener为AbstractTridentWindowManager的TridentWindowLifeCycleListener

    TridentWindowLifeCycleListener.onActivation

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

        /**
         * Listener to reeive any activation/expiry of windowing events and take further action on them.
         */
        class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {
    
            @Override
            public void onExpiry(List<T> expiredEvents) {
                LOG.debug("onExpiry is invoked");
                onTuplesExpired(expiredEvents);
            }
    
            @Override
            public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
                LOG.debug("onActivation is invoked with events size: [{}]", events.size());
                // trigger occurred, create an aggregation and keep them in store
                int currentTriggerId = triggerId.incrementAndGet();
                execAggregatorAndStoreResult(currentTriggerId, events);
            }
        }
    
       private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
            List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);
    
            // run aggregator to compute the result
            AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
            Object state = aggregator.init(currentTriggerId, collector);
            for (TridentTuple resultTuple : resultTuples) {
                aggregator.aggregate(state, resultTuple, collector);
            }
            aggregator.complete(state, collector);
    
            List<List<Object>> resultantAggregatedValue = collector.values;
    
            ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                    new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
            windowStore.putAll(entries);
    
            pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
        }
    
    • TridentWindowLifeCycleListener.onActivation方法主要是execAggregatorAndStoreResult
    • 而execAggregatorAndStoreResult则依次调用aggregator的init、aggregate及complete方法
    • 最后将TriggerResult放入pendingTriggers

    小结

    • storm在TimeTriggerPolicy.start的时候注册了定时任务TriggerTask,以SlidingDurationWindowStrategy为例,它的调度间隔为windowConfig.getSlidingLength()
    • TriggerTask定时触发WindowManager.onTrigger方法,该方法会回调windowLifecycleListener.onActivation
    • AbstractTridentWindowManager提供了TridentWindowLifeCycleListener,它的onActivation主要是调用execAggregatorAndStoreResult;而execAggregatorAndStoreResult方法主要完成对aggregator的一系列调用,先是调用init方法,然后遍历resultTuples挨个调用aggregate方法,最后complete方法(从这里可以清晰看到Aggregator接口的各个方法的调用逻辑及顺序)

    doc

    相关文章

      网友评论

          本文标题:聊聊storm的window trigger

          本文链接:https://www.haomeiwen.com/subject/zqbhfqtx.html