美文网首页
聊聊storm TridentBoltExecutor的fini

聊聊storm TridentBoltExecutor的fini

作者: go4it | 来源:发表于2018-11-16 15:12 被阅读0次

    本文主要研究一下storm TridentBoltExecutor的finishBatch方法

    MasterBatchCoordinator.nextTuple

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

        public void nextTuple() {
            sync();
        }
    
        private void sync() {
            // note that sometimes the tuples active may be less than max_spout_pending, e.g.
            // max_spout_pending = 3
            // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
            // and there won't be a batch for tx 4 because there's max_spout_pending tx active
            TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
            if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
                maybeCommit.status = AttemptStatus.COMMITTING;
                _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
                LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
            }
            
            if(_active) {
                if(_activeTx.size() < _maxTransactionActive) {
                    Long curr = _currTransaction;
                    for(int i=0; i<_maxTransactionActive; i++) {
                        if(!_activeTx.containsKey(curr) && isReady(curr)) {
                            // by using a monotonically increasing attempt id, downstream tasks
                            // can be memory efficient by clearing out state for old attempts
                            // as soon as they see a higher attempt id for a transaction
                            Integer attemptId = _attemptIds.get(curr);
                            if(attemptId==null) {
                                attemptId = 0;
                            } else {
                                attemptId++;
                            }
                            _attemptIds.put(curr, attemptId);
                            for(TransactionalState state: _states) {
                                state.setData(CURRENT_ATTEMPTS, _attemptIds);
                            }
                            
                            TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                            final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                            _activeTx.put(curr, newTransactionStatus);
                            _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                            LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                            _throttler.markEvent();
                        }
                        curr = nextTransactionId(curr);
                    }
                }
            }
        }
    
    • MasterBatchCoordinator是整个trident的真正的spout,它的nextTuple方法会向TridentSpoutCoordinator向MasterBatchCoordinator.BATCH_STREAM_ID($batch)发射tuple

    TridentSpoutCoordinator.execute

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
    
            if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
                _state.cleanupBefore(attempt.getTransactionId());
                _coord.success(attempt.getTransactionId());
            } else {
                long txid = attempt.getTransactionId();
                Object prevMeta = _state.getPreviousState(txid);
                Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
                _state.overrideState(txid, meta);
                collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
            }
                    
        }
    
    • TridentSpoutCoordinator接收MasterBatchCoordinator在MasterBatchCoordinator.BATCH_STREAM_ID($batch)发过来的tuple,然后向包装用户spout的TridentBoltExecutor发送batch指令

    TridentBoltExecutor(TridentSpoutExecutor)

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

        public void execute(Tuple tuple) {
            if(TupleUtils.isTick(tuple)) {
                long now = System.currentTimeMillis();
                if(now - _lastRotate > _messageTimeoutMs) {
                    _batches.rotate();
                    _lastRotate = now;
                }
                return;
            }
            String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
            if(batchGroup==null) {
                // this is so we can do things like have simple DRPC that doesn't need to use batch processing
                _coordCollector.setCurrBatch(null);
                _bolt.execute(null, tuple);
                _collector.ack(tuple);
                return;
            }
            IBatchID id = (IBatchID) tuple.getValue(0);
            //get transaction id
            //if it already exists and attempt id is greater than the attempt there
            
            
            TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
    //        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
    //            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
    //                    + " (" + _batches.size() + ")" +
    //                    "\ntuple: " + tuple +
    //                    "\nwith tracked " + tracked +
    //                    "\nwith id " + id + 
    //                    "\nwith group " + batchGroup
    //                    + "\n");
    //            
    //        }
            //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
            
            // this code here ensures that only one attempt is ever tracked for a batch, so when
            // failures happen you don't get an explosion in memory usage in the tasks
            if(tracked!=null) {
                if(id.getAttemptId() > tracked.attemptId) {
                    _batches.remove(id.getId());
                    tracked = null;
                } else if(id.getAttemptId() < tracked.attemptId) {
                    // no reason to try to execute a previous attempt than we've already seen
                    return;
                }
            }
            
            if(tracked==null) {
                tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
                _batches.put(id.getId(), tracked);
            }
            _coordCollector.setCurrBatch(tracked);
            
            //System.out.println("TRACKED: " + tracked + " " + tuple);
            
            TupleType t = getTupleType(tuple, tracked);
            if(t==TupleType.COMMIT) {
                tracked.receivedCommit = true;
                checkFinish(tracked, tuple, t);
            } else if(t==TupleType.COORD) {
                int count = tuple.getInteger(1);
                tracked.reportedTasks++;
                tracked.expectedTupleCount+=count;
                checkFinish(tracked, tuple, t);
            } else {
                tracked.receivedTuples++;
                boolean success = true;
                try {
                    _bolt.execute(tracked.info, tuple);
                    if(tracked.condition.expectedTaskReports==0) {
                        success = finishBatch(tracked, tuple);
                    }
                } catch(FailedException e) {
                    failBatch(tracked, e);
                }
                if(success) {
                    _collector.ack(tuple);                   
                } else {
                    _collector.fail(tuple);
                }
            }
            _coordCollector.setCurrBatch(null);
        }
    
        private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
            boolean success = true;
            try {
                _bolt.finishBatch(tracked.info);
                String stream = COORD_STREAM(tracked.info.batchGroup);
                for(Integer task: tracked.condition.targetTasks) {
                    _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
                }
                if(tracked.delayedAck!=null) {
                    _collector.ack(tracked.delayedAck);
                    tracked.delayedAck = null;
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
                success = false;
            }
            _batches.remove(tracked.info.batchId.getId());
            return success;
        }
    
    • TridentBoltExecutor.execute方法,首先会创建并初始化TrackedBatch(如果TrackedBatch不存在的话),之后接收到batch指令的时候,对tracked.receivedTuple累加,然后调用_bolt.execute(tracked.info, tuple)
    • 对于spout来说,这里的_bolt是TridentSpoutExecutor,它的execute方法会往下游的TridentBoltExecutor发射一个batch的tuples;由于spout的expectedTaskReports==0,所以这里在调用完TridentSpoutExecutor发射batch的tuples时,它就立马调用finishBatch
    • finishBatch操作,这里会通过COORD_STREAM往下游的TridentBoltExecutor发射[id,count]数据,告知下游TridentBoltExecutor说它一共发射了多少tuples

    TridentBoltExecutor(SubtopologyBolt)

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

        @Override
        public void execute(Tuple tuple) {
            if(TupleUtils.isTick(tuple)) {
                long now = System.currentTimeMillis();
                if(now - _lastRotate > _messageTimeoutMs) {
                    _batches.rotate();
                    _lastRotate = now;
                }
                return;
            }
            String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
            if(batchGroup==null) {
                // this is so we can do things like have simple DRPC that doesn't need to use batch processing
                _coordCollector.setCurrBatch(null);
                _bolt.execute(null, tuple);
                _collector.ack(tuple);
                return;
            }
            IBatchID id = (IBatchID) tuple.getValue(0);
            //get transaction id
            //if it already exists and attempt id is greater than the attempt there
            
            
            TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
    //        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
    //            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
    //                    + " (" + _batches.size() + ")" +
    //                    "\ntuple: " + tuple +
    //                    "\nwith tracked " + tracked +
    //                    "\nwith id " + id + 
    //                    "\nwith group " + batchGroup
    //                    + "\n");
    //            
    //        }
            //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
            
            // this code here ensures that only one attempt is ever tracked for a batch, so when
            // failures happen you don't get an explosion in memory usage in the tasks
            if(tracked!=null) {
                if(id.getAttemptId() > tracked.attemptId) {
                    _batches.remove(id.getId());
                    tracked = null;
                } else if(id.getAttemptId() < tracked.attemptId) {
                    // no reason to try to execute a previous attempt than we've already seen
                    return;
                }
            }
            
            if(tracked==null) {
                tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
                _batches.put(id.getId(), tracked);
            }
            _coordCollector.setCurrBatch(tracked);
            
            //System.out.println("TRACKED: " + tracked + " " + tuple);
            
            TupleType t = getTupleType(tuple, tracked);
            if(t==TupleType.COMMIT) {
                tracked.receivedCommit = true;
                checkFinish(tracked, tuple, t);
            } else if(t==TupleType.COORD) {
                int count = tuple.getInteger(1);
                tracked.reportedTasks++;
                tracked.expectedTupleCount+=count;
                checkFinish(tracked, tuple, t);
            } else {
                tracked.receivedTuples++;
                boolean success = true;
                try {
                    _bolt.execute(tracked.info, tuple);
                    if(tracked.condition.expectedTaskReports==0) {
                        success = finishBatch(tracked, tuple);
                    }
                } catch(FailedException e) {
                    failBatch(tracked, e);
                }
                if(success) {
                    _collector.ack(tuple);                   
                } else {
                    _collector.fail(tuple);
                }
            }
            _coordCollector.setCurrBatch(null);
        }
    
        private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
            if(tracked.failed) {
                failBatch(tracked);
                _collector.fail(tuple);
                return;
            }
            CoordCondition cond = tracked.condition;
            boolean delayed = tracked.delayedAck==null &&
                                  (cond.commitStream!=null && type==TupleType.COMMIT
                                   || cond.commitStream==null);
            if(delayed) {
                tracked.delayedAck = tuple;
            }
            boolean failed = false;
            if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
                if(tracked.receivedTuples == tracked.expectedTupleCount) {
                    finishBatch(tracked, tuple);                
                } else {
                    //TODO: add logging that not all tuples were received
                    failBatch(tracked);
                    _collector.fail(tuple);
                    failed = true;
                }
            }
            
            if(!delayed && !failed) {
                _collector.ack(tuple);
            }
            
        }
    
        private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
            boolean success = true;
            try {
                _bolt.finishBatch(tracked.info);
                String stream = COORD_STREAM(tracked.info.batchGroup);
                for(Integer task: tracked.condition.targetTasks) {
                    _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
                }
                if(tracked.delayedAck!=null) {
                    _collector.ack(tracked.delayedAck);
                    tracked.delayedAck = null;
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
                success = false;
            }
            _batches.remove(tracked.info.batchId.getId());
            return success;
        }
    
    • TridentBoltExecutor(SubtopologyBolt)是spout下游的bolt,它的_bolt是SubtopologyBolt,而且它的tracked.condition.expectedTaskReports不为0,因而它是在接收到TupleType.COORD的tuple的时候,才进行checkFinish操作(这里先忽略TupleType.COMMIT类型)
    • 由于BoltExecutor是使用Utils.asyncLoop来挨个消费receiveQueue的数据的,而且emitBatch的时候也是挨个接收batch的tuples,最后再接收到TridentBoltExecutor(TridentSpoutExecutor)在finishBatch的时候通过COORD_STREAM发过来的[id,count]的tuple(注意这里的COORD_STREAM是分发给每个task的,如果TridentBoltExecutor有多个parallel,则他们是按各自的task来接收的)
    • 所以TridentBoltExecutor(SubtopologyBolt)先挨个处理每个tuple,处理完之后才轮到TupleType.COORD这个tuple,然后触发checkFinish操作;在没有commitStream的情况下,tracked.receivedCommit默认为true,因而这里只要检测收到的tuples与应收的tuples数一致,就执行_bolt.finishBatch操作完成一个batch,然后再往它的下游TridentBoltExecutor发射它应收的[id,count]的tuple

    小结

    • 对于trident来说,真正的spout是MasterBatchCoordinator,它的nextTuple会触发batch的发送,它将batch指令发送给TridentSpoutCoordinator,而TridentSpoutCoordinator将触发TridentBoltExecutor(TridentSpoutExecutor)的execute方法,进而触发ITridentSpout的emitter的emitBatch,从而发送一个batch的数据
    • TridentBoltExecutor(TridentSpoutExecutor)的expectedTaskReports==0,它在调用完TridentSpoutExecutor发射batch的tuples时,就立马调用finishBatch操作,通过COORD_STREAM往下游的TridentBoltExecutor发射[id,count]数据,告知下游TridentBoltExecutor说它一共发射了多少tuples
    • spout的下游bolt为TridentBoltExecutor(SubtopologyBolt),它的tracked.condition.expectedTaskReports不为0,因而它是在接收到TupleType.COORD的tuple的时候,才进行checkFinish操作(这里先忽略TupleType.COMMIT类型),由于spout是先执行emitBatch操作再最后finishBatch发送[id,count]数据,正常情况下按顺序进入到TridentBoltExecutor(SubtopologyBolt)的receiveQueue队列,然后TridentBoltExecutor(SubtopologyBolt)挨个消费tuple,调用SubtopologyBolt.execute,最后再处理[id,count]数据,触发checkFinish操作,只要检测收到的tuples与应收的tuples数一致,就执行SubtopologyBolt.finishBatch操作完成这个batch,然后再往它的下游TridentBoltExecutor发射它应收的[id,count]的tuple

    doc

    相关文章

      网友评论

          本文标题:聊聊storm TridentBoltExecutor的fini

          本文链接:https://www.haomeiwen.com/subject/kyemfqtx.html