美文网首页
聊聊storm的AggregateProcessor的execu

聊聊storm的AggregateProcessor的execu

作者: go4it | 来源:发表于2018-11-15 00:37 被阅读16次

    本文主要研究一下storm的AggregateProcessor的execute及finishBatch方法

    实例

            TridentTopology topology = new TridentTopology();
            topology.newStream("spout1", spout)
                    .groupBy(new Fields("user"))
                    .aggregate(new Fields("user","score"),new UserCountAggregator(),new Fields("val"))
                    .toStream()
                    .parallelismHint(1)
                    .each(new Fields("val"),new PrintEachFunc(),new Fields());
    

    TridentBoltExecutor

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

        private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
            if(tracked.failed) {
                failBatch(tracked);
                _collector.fail(tuple);
                return;
            }
            CoordCondition cond = tracked.condition;
            boolean delayed = tracked.delayedAck==null &&
                                  (cond.commitStream!=null && type==TupleType.COMMIT
                                   || cond.commitStream==null);
            if(delayed) {
                tracked.delayedAck = tuple;
            }
            boolean failed = false;
            if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
                if(tracked.receivedTuples == tracked.expectedTupleCount) {
                    finishBatch(tracked, tuple);                
                } else {
                    //TODO: add logging that not all tuples were received
                    failBatch(tracked);
                    _collector.fail(tuple);
                    failed = true;
                }
            }
            
            if(!delayed && !failed) {
                _collector.ack(tuple);
            }
            
        }
    
       private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
            boolean success = true;
            try {
                _bolt.finishBatch(tracked.info);
                String stream = COORD_STREAM(tracked.info.batchGroup);
                for(Integer task: tracked.condition.targetTasks) {
                    _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
                }
                if(tracked.delayedAck!=null) {
                    _collector.ack(tracked.delayedAck);
                    tracked.delayedAck = null;
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
                success = false;
            }
            _batches.remove(tracked.info.batchId.getId());
            return success;
        }
    
        public static class TrackedBatch {
            int attemptId;
            BatchInfo info;
            CoordCondition condition;
            int reportedTasks = 0;
            int expectedTupleCount = 0;
            int receivedTuples = 0;
            Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
            //......
        }
    
    • 用户的spout以及groupBy操作最后都是被包装为TridentBoltExecutor,而groupBy的TridentBoltExecutor则是包装了SubtopologyBolt
    • TridentBoltExecutor在checkFinish方法里头会调用finishBatch操作(另外接收到REGULAR类型的tuple时,在tracked.condition.expectedTaskReports==0的时候也会调用finishBatch操作,对于spout来说tracked.condition.expectedTaskReports为0,因为它是数据源,所以不用接收COORD_STREAM更新expectedTaskReports以及expectedTupleCount),而该操作会往COORD_STREAM这个stream发送new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)),也就是new Fields("id", "count"),即batchId以及发送给目的task的tuple数量,告知下游的它给task发送了多少tuple(taskEmittedTuples数据在CoordinatedOutputCollector的emit及emitDirect方法里头维护)
    • 下游也是TridentBoltExecutor,它在接收到COORD_STREAM发来的数据时,更新expectedTupleCount,而每个TridentBoltExecutor在checkFinish方法里头会判断,如果receivedTuples等于expectedTupleCount则表示完整接收完上游发过来的tuple,然后触发finishBatch操作

    SubtopologyBolt

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

    public class SubtopologyBolt implements ITridentBatchBolt {
        //......
        @Override
        public void execute(BatchInfo batchInfo, Tuple tuple) {
            String sourceStream = tuple.getSourceStreamId();
            InitialReceiver ir = _roots.get(sourceStream);
            if(ir==null) {
                throw new RuntimeException("Received unexpected tuple " + tuple.toString());
            }
            ir.receive((ProcessorContext) batchInfo.state, tuple);
        }
    
        @Override
        public void finishBatch(BatchInfo batchInfo) {
            for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
                p.finishBatch((ProcessorContext) batchInfo.state);
            }
        }
    
        @Override
        public Object initBatchState(String batchGroup, Object batchId) {
            ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
            for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
                p.startBatch(ret);
            }
            return ret;
        }
    
        @Override
        public void cleanup() {
            for(String bg: _myTopologicallyOrdered.keySet()) {
                for(TridentProcessor p: _myTopologicallyOrdered.get(bg)) {
                    p.cleanup();
                }   
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            for(Node n: _nodes) {
                declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
            }        
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        protected static class InitialReceiver {
            List<TridentProcessor> _receivers = new ArrayList<>();
            RootFactory _factory;
            ProjectionFactory _project;
            String _stream;
            
            public InitialReceiver(String stream, Fields allFields) {
                // TODO: don't want to project for non-batch bolts...???
                // how to distinguish "batch" streams from non-batch streams?
                _stream = stream;
                _factory = new RootFactory(allFields);
                List<String> projected = new ArrayList<>(allFields.toList());
                projected.remove(0);
                _project = new ProjectionFactory(_factory, new Fields(projected));
            }
            
            public void receive(ProcessorContext context, Tuple tuple) {
                TridentTuple t = _project.create(_factory.create(tuple));
                for(TridentProcessor r: _receivers) {
                    r.execute(context, _stream, t);
                }            
            }
            
            public void addReceiver(TridentProcessor p) {
                _receivers.add(p);
            }
            
            public Factory getOutputFactory() {
                return _project;
            }
        }
    }
    
    • groupBy操作被包装为一个SubtopologyBolt,它的outputFields的第一个field为$batchId
    • execute方法会获取对应的InitialReceiver,然后调用receive方法;InitialReceiver的receive方法调用_receivers的execute,这里的receive为AggregateProcessor
    • finishBatch方法挨个调用_myTopologicallyOrdered.get(batchInfo.batchGroup)返回的TridentProcessor的finishBatch方法,这里就是AggregateProcessor及EachProcessor;BatchInfo,包含batchId、processorContext及batchGroup信息,这里将processorContext(包含TransactionAttempt类型的batchId以及Object数组state,state里头包含GroupCollector、aggregate累加结果等)传递给finishBatch方法

    AggregateProcessor

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AggregateProcessor.java

    public class AggregateProcessor implements TridentProcessor {
        Aggregator _agg;
        TridentContext _context;
        FreshCollector _collector;
        Fields _inputFields;
        ProjectionFactory _projection;
    
        public AggregateProcessor(Fields inputFields, Aggregator agg) {
            _agg = agg;
            _inputFields = inputFields;
        }
        
        @Override
        public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
            List<Factory> parents = tridentContext.getParentTupleFactories();
            if(parents.size()!=1) {
                throw new RuntimeException("Aggregate operation can only have one parent");
            }
            _context = tridentContext;
            _collector = new FreshCollector(tridentContext);
            _projection = new ProjectionFactory(parents.get(0), _inputFields);
            _agg.prepare(conf, new TridentOperationContext(context, _projection));
        }
    
        @Override
        public void cleanup() {
            _agg.cleanup();
        }
    
        @Override
        public void startBatch(ProcessorContext processorContext) {
            _collector.setContext(processorContext);
            processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
        }    
    
        @Override
        public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
            _collector.setContext(processorContext);
            _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
        }
        
        @Override
        public void finishBatch(ProcessorContext processorContext) {
            _collector.setContext(processorContext);
            _agg.complete(processorContext.state[_context.getStateIndex()], _collector);
        }
     
        @Override
        public Factory getOutputFactory() {
            return _collector.getOutputFactory();
        }
    }
    
    • AggregateProcessor在prepare创建了FreshCollector以及ProjectionFactory
    • 对于GroupBy操作来说,这里的_agg为GroupedAggregator,_agg.prepare传递的context为TridentOperationContext
    • finishBatch方法这里调用_agg.complete方法,传入的arr数组,第一个元素为GroupCollector,第二元素为aggregator的累加值;传入的_collector为FreshCollector

    GroupedAggregator

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/impl/GroupedAggregator.java

    public class GroupedAggregator implements Aggregator<Object[]> {
        ProjectionFactory _groupFactory;
        ProjectionFactory _inputFactory;
        Aggregator _agg;
        ComboList.Factory _fact;
        Fields _inFields;
        Fields _groupFields;
        
        public GroupedAggregator(Aggregator agg, Fields group, Fields input, int outSize) {
            _groupFields = group;
            _inFields = input;
            _agg = agg;
            int[] sizes = new int[2];
            sizes[0] = _groupFields.size();
            sizes[1] = outSize;
            _fact = new ComboList.Factory(sizes);
        }
        
        @Override
        public void prepare(Map conf, TridentOperationContext context) {
            _inputFactory = context.makeProjectionFactory(_inFields);
            _groupFactory = context.makeProjectionFactory(_groupFields);
            _agg.prepare(conf, new TridentOperationContext(context, _inputFactory));
        }
    
        @Override
        public Object[] init(Object batchId, TridentCollector collector) {
            return new Object[] {new GroupCollector(collector, _fact), new HashMap(), batchId};
        }
    
        @Override
        public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collector) {
            GroupCollector groupColl = (GroupCollector) arr[0];
            Map<List, Object> val = (Map) arr[1];
            TridentTuple group = _groupFactory.create((TridentTupleView) tuple);
            TridentTuple input = _inputFactory.create((TridentTupleView) tuple);
            Object curr;
            if(!val.containsKey(group)) {
                curr = _agg.init(arr[2], groupColl);
                val.put((List) group, curr);
            } else {
                curr = val.get(group);
            }
            groupColl.currGroup = group;
            _agg.aggregate(curr, input, groupColl);
            
        }
    
        @Override
        public void complete(Object[] arr, TridentCollector collector) {
            Map<List, Object> val = (Map) arr[1];        
            GroupCollector groupColl = (GroupCollector) arr[0];
            for(Entry<List, Object> e: val.entrySet()) {
                groupColl.currGroup = e.getKey();
                _agg.complete(e.getValue(), groupColl);
            }
        }
    
        @Override
        public void cleanup() {
            _agg.cleanup();
        }
        
    }
    
    • aggregate方法的arr[0]为GroupCollector;arr[1]为map,key为group字段的TridentTupleView,value为_agg的init返回值用于累加;arr[2]为TransactionAttempt
    • _agg这里为ChainedAggregatorImpl,aggregate首先获取tuple的group字段以及输入的tuple,然后判断arr[1]是否有该group的值,没有就调用_agg的init初始化一个并添加到map
    • aggregate方法最后调用_agg.aggregate进行累加

    ChainedAggregatorImpl

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/impl/ChainedAggregatorImpl.java

    public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
        Aggregator[] _aggs;
        ProjectionFactory[] _inputFactories;
        ComboList.Factory _fact;
        Fields[] _inputFields;
        
        
        
        public ChainedAggregatorImpl(Aggregator[] aggs, Fields[] inputFields, ComboList.Factory fact) {
            _aggs = aggs;
            _inputFields = inputFields;
            _fact = fact;
            if(_aggs.length!=_inputFields.length) {
                throw new IllegalArgumentException("Require input fields for each aggregator");
            }
        }
        
        public void prepare(Map conf, TridentOperationContext context) {
            _inputFactories = new ProjectionFactory[_inputFields.length];
            for(int i=0; i<_inputFields.length; i++) {
                _inputFactories[i] = context.makeProjectionFactory(_inputFields[i]);
                _aggs[i].prepare(conf, new TridentOperationContext(context, _inputFactories[i]));
            }
        }
        
        public ChainedResult init(Object batchId, TridentCollector collector) {
            ChainedResult initted = new ChainedResult(collector, _aggs.length);
            for(int i=0; i<_aggs.length; i++) {
                initted.objs[i] = _aggs[i].init(batchId, initted.collectors[i]);
            }
            return initted;
        }
        
        public void aggregate(ChainedResult val, TridentTuple tuple, TridentCollector collector) {
            val.setFollowThroughCollector(collector);
            for(int i=0; i<_aggs.length; i++) {
                TridentTuple projected = _inputFactories[i].create((TridentTupleView) tuple);
                _aggs[i].aggregate(val.objs[i], projected, val.collectors[i]);
            }
        }
        
        public void complete(ChainedResult val, TridentCollector collector) {
            val.setFollowThroughCollector(collector);
            for(int i=0; i<_aggs.length; i++) {
                _aggs[i].complete(val.objs[i], val.collectors[i]);
            }
            if(_aggs.length > 1) { // otherwise, tuples were emitted directly
                int[] indices = new int[val.collectors.length];
                for(int i=0; i<indices.length; i++) {
                    indices[i] = 0;
                }
                boolean keepGoing = true;
                //emit cross-join of all emitted tuples
                while(keepGoing) {
                    List[] combined = new List[_aggs.length];
                    for(int i=0; i< _aggs.length; i++) {
                        CaptureCollector capturer = (CaptureCollector) val.collectors[i];
                        combined[i] = capturer.captured.get(indices[i]);
                    }
                    collector.emit(_fact.create(combined));
                    keepGoing = increment(val.collectors, indices, indices.length - 1);
                }
            }
        }
        
        //return false if can't increment anymore
        private boolean increment(TridentCollector[] lengths, int[] indices, int j) {
            if(j==-1) return false;
            indices[j]++;
            CaptureCollector capturer = (CaptureCollector) lengths[j];
            if(indices[j] >= capturer.captured.size()) {
                indices[j] = 0;
                return increment(lengths, indices, j-1);
            }
            return true;
        }
        
        public void cleanup() {
           for(Aggregator a: _aggs) {
               a.cleanup();
           } 
        } 
    }
    
    • init方法返回的是ChainedResult,它的objs字段存放每个_aggs对应的init结果
    • 这里的_agg如果是Aggregator类型,则为用户在groupBy之后aggregate方法传入的aggregator;如果是CombinerAggregator类型,它会被CombinerAggregatorCombineImpl包装一下
    • ChainedAggregatorImpl的complete方法,_aggs挨个调用complete,传入的第一个参数为val.objs[i],即每个_agg对应的累加值

    小结

    • groupBy被包装为一个SubtopologyBolt,它的execute方法会触发InitialReceiver的receive方法,而receive方法会触发_receivers的execute方法,第一个_receivers为AggregateProcessor
    • AggregateProcessor包装了GroupedAggregator,而GroupedAggregator包装了ChainedAggregatorImpl,而ChainedAggregatorImpl包装了Aggregator数组,本实例只有一个,即在groupBy之后aggregate方法传入的aggregator
    • TridentBoltExecutor会从coordinator那里接收COORD_STREAM_PREFIX发送过来的应该接收到的tuple的count,然后更新expectedTupleCount,然后进行checkFinish判断,当receivedTuples(每次接收到spout的batch的一个tuple就更新该值)等于expectedTupleCount的时候,会触发finishBatch操作,该操作会调用SubtopologyBolt.finishBatch,进而调用AggregateProcessor.finishBatch,进而调用GroupedAggregator.complete,进而调用ChainedAggregatorImpl.complete,进而调用用户的aggregator的complete
    • 对于包装了TridentSpoutExecutor的TridentBoltExecutor来说,它的tracked.condition.expectedTaskReports为0,因为它是数据源,所以不用接收COORD_STREAM更新expectedTaskReports以及expectedTupleCount;当它在execute方法接收到MasterBatchCoordinator的MasterBatchCoordinator.BATCH_STREAM_ID($batch)发来的tuple的时候,调用TridentSpoutExecutor的execute方法,之后就由于tracked.condition.expectedTaskReports==0(本实例两个TridentBoltExecutor的TrackedBatch的condition.commitStream为null,因而receivedCommit为true),就立即调用finishBatch(里头会调用TridentSpoutExecutor的finishBatch方法,之后通过COORD_STREAM给下游TridentBoltExecutor的task发送batchId及taskEmittedTuples数量;而对于下游TridentBoltExecutor它的expectedTaskReports不为0,则需要在收到COORD_STREAM的tuple的时候才能checkFinish,判断是否可以finishBatch)
    • TridentSpoutExecutor的execute会调用emitter(最后调用用户的spout)发射一个batch;而finishBatch方法目前为空,没有做任何操作;也就是说对于包装了TridentSpoutExecutor的TridentBoltExecutor来说,它接收到发射一个batch的指令之后,调用完TridentSpoutExecutor.execute通过emitter发射一个batch,就立马执行finishBatch操作(发射[id,count]给下游的TridentBoltExecutor,下游TridentBoltExecutor在接收到[id,count]数据时更新expectedTupleCount,然后进行checkFinish判断,如果receivedTuples等于expectedTupleCount,就触发finishBatch操作,进而触发AggregateProcessor的finishBatch操作)

    doc

    相关文章

      网友评论

          本文标题:聊聊storm的AggregateProcessor的execu

          本文链接:https://www.haomeiwen.com/subject/gtcefqtx.html