美文网首页
聊聊storm的ICommitterTridentSpout

聊聊storm的ICommitterTridentSpout

作者: go4it | 来源:发表于2018-11-19 00:32 被阅读5次

    本文主要研究一下storm的ICommitterTridentSpout

    ICommitterTridentSpout

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSpout.java

    public interface ICommitterTridentSpout<X> extends ITridentSpout<X> {
        public interface Emitter extends ITridentSpout.Emitter {
            void commit(TransactionAttempt attempt);
        } 
        
        @Override
        public Emitter getEmitter(String txStateId, Map conf, TopologyContext context);    
    }
    
    • ICommitterTridentSpout继承了ITridentSpout,主要是对getEmitter方法进行覆盖,返回扩展的Emitter,它继承ITridentSpout.Emitter ,多定义了一个commit接口

    TridentTopologyBuilder.buildTopology

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

       public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
            TopologyBuilder builder = new TopologyBuilder();
            Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
            Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
    
            Map<String, List<String>> batchesToCommitIds = new HashMap<>();
            Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
            
            for(String id: _spouts.keySet()) {
                TransactionalSpoutComponent c = _spouts.get(id);
                if(c.spout instanceof IRichSpout) {
                    
                    //TODO: wrap this to set the stream name
                    builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
                } else {
                    String batchGroup = c.batchGroupId;
                    if(!batchesToCommitIds.containsKey(batchGroup)) {
                        batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                    }
                    batchesToCommitIds.get(batchGroup).add(c.commitStateId);
    
                    if(!batchesToSpouts.containsKey(batchGroup)) {
                        batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                    }
                    batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                    
                    
                    BoltDeclarer scd =
                          builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                            .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                            .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                    
                    for(Map<String, Object> m: c.componentConfs) {
                        scd.addConfigurations(m);
                    }
                    
                    Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                    specs.put(c.batchGroupId, new CoordSpec());
                    BoltDeclarer bd = builder.setBolt(id,
                            new TridentBoltExecutor(
                              new TridentSpoutExecutor(
                                c.commitStateId,
                                c.streamName,
                                ((ITridentSpout) c.spout)),
                                batchIdsForSpouts,
                                specs),
                            c.parallelism);
                    bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                    if(c.spout instanceof ICommitterTridentSpout) {
                        bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                    }
                    for(Map<String, Object> m: c.componentConfs) {
                        bd.addConfigurations(m);
                    }
                }
            }
            
            //......
    
            return builder.createTopology();
        }
    
    • TridentTopologyBuilder.buildTopology的时候,对用户的spout判断,如果是ICommitterTridentSpout类型的,则会配置allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID)

    MasterBatchCoordinator

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

        @Override
        public void nextTuple() {
            sync();
        }
    
        private void sync() {
            // note that sometimes the tuples active may be less than max_spout_pending, e.g.
            // max_spout_pending = 3
            // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
            // and there won't be a batch for tx 4 because there's max_spout_pending tx active
            TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
            if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
                maybeCommit.status = AttemptStatus.COMMITTING;
                _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
                LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
            }
            
            if(_active) {
                if(_activeTx.size() < _maxTransactionActive) {
                    Long curr = _currTransaction;
                    for(int i=0; i<_maxTransactionActive; i++) {
                        if(!_activeTx.containsKey(curr) && isReady(curr)) {
                            // by using a monotonically increasing attempt id, downstream tasks
                            // can be memory efficient by clearing out state for old attempts
                            // as soon as they see a higher attempt id for a transaction
                            Integer attemptId = _attemptIds.get(curr);
                            if(attemptId==null) {
                                attemptId = 0;
                            } else {
                                attemptId++;
                            }
                            _attemptIds.put(curr, attemptId);
                            for(TransactionalState state: _states) {
                                state.setData(CURRENT_ATTEMPTS, _attemptIds);
                            }
                            
                            TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                            final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                            _activeTx.put(curr, newTransactionStatus);
                            _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                            LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                            _throttler.markEvent();
                        }
                        curr = nextTransactionId(curr);
                    }
                }
            }
        }
    
        @Override
        public void ack(Object msgId) {
            TransactionAttempt tx = (TransactionAttempt) msgId;
            TransactionStatus status = _activeTx.get(tx.getTransactionId());
            LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this);
            if(status!=null && tx.equals(status.attempt)) {
                if(status.status==AttemptStatus.PROCESSING) {
                    status.status = AttemptStatus.PROCESSED;
                    LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status);
                } else if(status.status==AttemptStatus.COMMITTING) {
                    _activeTx.remove(tx.getTransactionId());
                    _attemptIds.remove(tx.getTransactionId());
                    _collector.emit(SUCCESS_STREAM_ID, new Values(tx));
                    _currTransaction = nextTransactionId(tx.getTransactionId());
                    for(TransactionalState state: _states) {
                        state.setData(CURRENT_TX, _currTransaction);                    
                    }
                    LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this);
                }
                sync();
            }
        }
    
    • MasterBatchCoordinator在收到ack的时候,如果status是AttemptStatus.PROCESSING状态,则更改status为AttemptStatus.PROCESSED;如果status是AttemptStatus.COMMITTING,则往SUCCESS_STREAM_ID发射tuple;之后调用sync方法
    • nextTuple方法也是调用sync方法,判断如果是AttemptStatus.PROCESSED状态,则更改status为AttemptStatus.COMMITTING,同时往COMMIT_STREAM_ID发射tuple
    • 可以看到这里状态由AttemptStatus.PROCESSING变为AttemptStatus.PROCESSED(nextTuple方法将AttemptStatus.PROCESSED变为AttemptStatus.COMMITTING,然后往COMMIT_STREAM_ID发射tuple),再变为AttemptStatus.COMMITTING(ack的时候,如果是AttemptStatus.COMMITTING状态,则往SUCCESS_STREAM_ID发射tuple)

    TridentSpoutExecutor

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java

        public void execute(BatchInfo info, Tuple input) {
            // there won't be a BatchInfo for the success stream
            TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
            if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
                if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                    ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                    _activeBatches.remove(attempt.getTransactionId());
                } else {
                     throw new FailedException("Received commit for different transaction attempt");
                }
            } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
                // valid to delete before what's been committed since 
                // those batches will never be accessed again
                _activeBatches.headMap(attempt.getTransactionId()).clear();
                _emitter.success(attempt);
            } else {            
                _collector.setBatch(info.batchId);
                _emitter.emitBatch(attempt, input.getValue(1), _collector);
                _activeBatches.put(attempt.getTransactionId(), attempt);
            }
        }
    
    • TridentSpoutExecutor在execute的时候,判断如果是MasterBatchCoordinator.COMMIT_STREAM_ID的数据,而且TransactionAttempt的txid相等,则调用((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)

    TridentBoltExecutor

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java

        public void execute(Tuple tuple) {
            if (TupleUtils.isTick(tuple)) {
                long now = System.currentTimeMillis();
                if (now - _lastRotate > _messageTimeoutMs) {
                    _batches.rotate();
                    _lastRotate = now;
                }
                return;
            }
            String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
            if (batchGroup == null) {
                // this is so we can do things like have simple DRPC that doesn't need to use batch processing
                _coordCollector.setCurrBatch(null);
                _bolt.execute(null, tuple);
                _collector.ack(tuple);
                return;
            }
            IBatchID id = (IBatchID) tuple.getValue(0);
            //get transaction id
            //if it already exists and attempt id is greater than the attempt there
    
    
            TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
            //        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
            //            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
            //                    + " (" + _batches.size() + ")" +
            //                    "\ntuple: " + tuple +
            //                    "\nwith tracked " + tracked +
            //                    "\nwith id " + id +
            //                    "\nwith group " + batchGroup
            //                    + "\n");
            //
            //        }
            //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
    
            // this code here ensures that only one attempt is ever tracked for a batch, so when
            // failures happen you don't get an explosion in memory usage in the tasks
            if (tracked != null) {
                if (id.getAttemptId() > tracked.attemptId) {
                    _batches.remove(id.getId());
                    tracked = null;
                } else if (id.getAttemptId() < tracked.attemptId) {
                    // no reason to try to execute a previous attempt than we've already seen
                    return;
                }
            }
    
            if (tracked == null) {
                tracked =
                    new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup),
                                     id.getAttemptId());
                _batches.put(id.getId(), tracked);
            }
            _coordCollector.setCurrBatch(tracked);
    
            //System.out.println("TRACKED: " + tracked + " " + tuple);
    
            TupleType t = getTupleType(tuple, tracked);
            if (t == TupleType.COMMIT) {
                tracked.receivedCommit = true;
                checkFinish(tracked, tuple, t);
            } else if (t == TupleType.COORD) {
                int count = tuple.getInteger(1);
                tracked.reportedTasks++;
                tracked.expectedTupleCount += count;
                checkFinish(tracked, tuple, t);
            } else {
                tracked.receivedTuples++;
                boolean success = true;
                try {
                    _bolt.execute(tracked.info, tuple);
                    if (tracked.condition.expectedTaskReports == 0) {
                        success = finishBatch(tracked, tuple);
                    }
                } catch (FailedException e) {
                    failBatch(tracked, e);
                }
                if (success) {
                    _collector.ack(tuple);
                } else {
                    _collector.fail(tuple);
                }
            }
            _coordCollector.setCurrBatch(null);
        }
    
    • 这里再调用_bolt.execute(tracked.info, tuple)之后,会调用_collector.ack(tuple)完成ack

    SpoutOutputCollector

    storm-core-1.2.2-sources.jar!/org/apache/storm/spout/SpoutOutputCollector.java

        /**
         * Emits a new tuple to the specified output stream with the given message ID.
         * When Storm detects that this tuple has been fully processed, or has failed
         * to be fully processed, the spout will receive an ack or fail callback respectively
         * with the messageId as long as the messageId was not null. If the messageId was null,
         * Storm will not track the tuple and no callback will be received. 
         * Note that Storm's event logging functionality will only work if the messageId
         * is serializable via Kryo or the Serializable interface. The emitted values must be immutable.
         *
         * @return the list of task ids that this tuple was sent to
         */
        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            return _delegate.emit(streamId, tuple, messageId);
        }
    
    • 这里调用了_delegate.emit的emit,这里的_delegate为SpoutOutputCollectorImpl

    SpoutOutputCollectorImpl

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            try {
                return sendSpoutMsg(streamId, tuple, messageId, null);
            } catch (InterruptedException e) {
                LOG.warn("Spout thread interrupted during emit().");
                throw new RuntimeException(e);
            }
        }
    
        private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
            InterruptedException {
            emittedCount.increment();
    
            List<Integer> outTasks;
            if (outTaskId != null) {
                outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
            } else {
                outTasks = taskData.getOutgoingTasks(stream, values);
            }
    
            final boolean needAck = (messageId != null) && hasAckers;
    
            final List<Long> ackSeq = needAck ? new ArrayList<>() : null;
    
            final long rootId = needAck ? MessageId.generateId(random) : 0;
    
            for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
                Integer t = outTasks.get(i);
                MessageId msgId;
                if (needAck) {
                    long as = MessageId.generateId(random);
                    msgId = MessageId.makeRootId(rootId, as);
                    ackSeq.add(as);
                } else {
                    msgId = MessageId.makeUnanchored();
                }
    
                final TupleImpl tuple =
                    new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
                AddressedTuple adrTuple = new AddressedTuple(t, tuple);
                executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
            }
            if (isEventLoggers) {
                taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
            }
    
            if (needAck) {
                boolean sample = executor.samplerCheck();
                TupleInfo info = new TupleInfo();
                info.setTaskId(this.taskId);
                info.setStream(stream);
                info.setMessageId(messageId);
                if (isDebug) {
                    info.setValues(values);
                }
                if (sample) {
                    info.setTimestamp(System.currentTimeMillis());
                }
    
                pending.put(rootId, info);
                List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
                taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
            } else if (messageId != null) {
                // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
                if (isDebug) {
                    if (spoutExecutorThdId != Thread.currentThread().getId()) {
                        throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
                                                   "Spout Output Collector should only emit from the main spout executor thread.");
                    }
                }
                globalTupleInfo.clear();
                globalTupleInfo.setStream(stream);
                globalTupleInfo.setValues(values);
                globalTupleInfo.setMessageId(messageId);
                globalTupleInfo.setTimestamp(0);
                globalTupleInfo.setId("0:");
                Long timeDelta = 0L;
                executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
            }
            return outTasks;
        }
    
    • 这里neekAck的话,会调用taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
    • 注意这里的ackInitTuple为Values(rootId, Utils.bitXorVals(ackSeq), this.taskId),第二个值对List<Long> ackSeq进行了Utils.bitXorVals运算
    • ackSeq在没有outTask的时候,是个空的list,它的Utils.bitXorVals操作为0

    Utils

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java

        public static long bitXorVals(List<Long> coll) {
            long result = 0;
            for (Long val : coll) {
                result ^= val;
            }
            return result;
        }
    
        public static long bitXor(Long a, Long b) {
            return a ^ b;
        }
    
    • bitXor运算是storm的ack机制的核心运算

    Acker

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java

        public void execute(Tuple input) {
            if (TupleUtils.isTick(input)) {
                Map<Object, AckObject> tmp = pending.rotate();
                LOG.debug("Number of timeout tuples:{}", tmp.size());
                return;
            }
    
            boolean resetTimeout = false;
            String streamId = input.getSourceStreamId();
            Object id = input.getValue(0);
            AckObject curr = pending.get(id);
            if (ACKER_INIT_STREAM_ID.equals(streamId)) {
                if (curr == null) {
                    curr = new AckObject();
                    pending.put(id, curr);
                }
                curr.updateAck(input.getLong(1));
                curr.spoutTask = input.getInteger(2);
            } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
                if (curr == null) {
                    curr = new AckObject();
                    pending.put(id, curr);
                }
                curr.updateAck(input.getLong(1));
            } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
                // For the case that ack_fail message arrives before ack_init
                if (curr == null) {
                    curr = new AckObject();
                }
                curr.failed = true;
                pending.put(id, curr);
            } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
                resetTimeout = true;
                if (curr != null) {
                    pending.put(id, curr);
                } //else if it has not been added yet, there is no reason time it out later on
            } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
                collector.flush();
                return;
            } else {
                LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
                return;
            }
    
            int task = curr.spoutTask;
            if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
                Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
                if (curr.val == 0) {
                    pending.remove(id);
                    collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
                } else if (curr.failed) {
                    pending.remove(id);
                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
                } else if (resetTimeout) {
                    collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
                } else {
                    throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
                }
            }
    
            collector.ack(input);
        }
    
        private static class AckObject {
            public long val = 0L;
            public long startTime = Time.currentTimeMillis();
            public int spoutTask = -1;
            public boolean failed = false;
    
            // val xor value
            public void updateAck(Long value) {
                val = Utils.bitXor(val, value);
            }
        }
    
    • 当Acker收到ACKER_INIT_STREAM_ID时,如果当前AckObject为null,则创建一个AckObject,其val默认为0;之后调用curr.updateAck(input.getLong(1)),即根据tuple的第二个值来更新AckObject的val
    • SpoutOutputCollectorImpl发射过来的tuple为Values(rootId, Utils.bitXorVals(ackSeq), this.taskId),其第二个值为Utils.bitXorVals(ackSeq);askSeq为List<Long>,当没有outputTask的时候,其list为空,而Utils.bitXorVals值为0,这种情况下,curr.updateAck(0)返回0
    • Acker在execute的最后会判断,如果curr.val == 0则会触发collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple)

    SpoutExecutor

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

        public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
            String streamId = tuple.getSourceStreamId();
            if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
                spoutOutputCollector.flush();
            } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
                pending.rotate();
            } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
                metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
            } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
                Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
                if (spoutObj instanceof ICredentialsListener) {
                    ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
                }
            } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
                Long id = (Long) tuple.getValue(0);
                TupleInfo pendingForId = pending.get(id);
                if (pendingForId != null) {
                    pending.put(id, pendingForId);
                }
            } else {
                Long id = (Long) tuple.getValue(0);
                Long timeDeltaMs = (Long) tuple.getValue(1);
                TupleInfo tupleInfo = pending.remove(id);
                if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                    if (taskId != tupleInfo.getTaskId()) {
                        throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                    }
                    Long timeDelta = null;
                    if (hasAckers) {
                        long startTimeMs = tupleInfo.getTimestamp();
                        if (startTimeMs != 0) {
                            timeDelta = timeDeltaMs;
                        }
                    }
                    if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                        ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                    } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                        failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                    }
                }
            }
        }
    
        public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
            try {
                ISpout spout = (ISpout) taskData.getTaskObject();
                int taskId = taskData.getTaskId();
                if (executor.getIsDebug()) {
                    LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
                }
                spout.ack(tupleInfo.getMessageId());
                if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
                    new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
                }
                if (hasAckers && timeDelta != null) {
                    executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
                                                        taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
                }
            } catch (Exception e) {
                throw Utils.wrapInRuntime(e);
            }
        }
    
    • SpoutExecutor在收到Acker.ACKER_ACK_STREAM_ID的时候,会调用ackSpoutMsg方法,该方法会回调原始spout的ack方法,即spout.ack(tupleInfo.getMessageId())

    小结

    • MasterBatchCoordinator在第一次收到同一个msgId的ack时(第一次被调用),status由开始的AttemptStatus.PROCESSING转变为AttemptStatus.PROCESSED,在之后的sync方法里头AttemptStatus.PROCESSED转变为AttemptStatus.COMMITTING,然后往MasterBatchCoordinator.COMMIT_STREAM_ID发射tuple
    • 当用户的spout是ICommitterTridentSpout时,TridentTopologyBuilder.buildTopology的时候,会配置allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);TridentSpoutExecutor会接收MasterBatchCoordinator.COMMIT_STREAM_ID的数据,然后调用((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)方法;之后TridentBoltExecutor在TridentSpoutExecutor.execute执行完了之后会自动ack该tuple,然后调用MasterBatchCoordinator的ack方法(第二次被调用),然后触发_collector.emit(SUCCESS_STREAM_ID, new Values(tx))
    • 当用户的spout不是ICommitterTridentSpout时,这个时候整个topology就没有component去接收MasterBatchCoordinator.COMMIT_STREAM_ID发射的tuple,即outgoingTasks为空,那么在SpoutOutputCollectorImpl在needAck的情况下,会给Acker.ACKER_INIT_STREAM_ID发射的tuple,其第二个值为Utils.bitXorVals(ackSeq),ackSeq为空list(根据outgoingTasks来计算),该值为0;那么在Acker接收到ACKER_INIT_STREAM_ID时,curr.updateAck(input.getLong(1))之后curr.val的值为0;这样Acker在execute的最后看到curr.val为0,又会给Acker.ACKER_ACK_STREAM_ID发射tuple,SpoutExecutor在收到Acker.ACKER_ACK_STREAM_ID的时候,会调用ackSpoutMsg方法,该方法会回调原始spout的ack方法,即spout.ack(tupleInfo.getMessageId());即当一个streamId没有component消费的时候,会自动ack;这样对于spout不是ICommitterTridentSpout的情况,在往MasterBatchCoordinator.COMMIT_STREAM_ID发射tuple之后,会调用MasterBatchCoordinator的ack方法(第二次被调用),然后触发_collector.emit(SUCCESS_STREAM_ID, new Values(tx))

    spout是否是ICommitterTridentSpout类型的区别在于不是ICommitterTridentSpout类型,它在往MasterBatchCoordinator.COMMIT_STREAM_ID发射tuple之后,Acker会自动ack,调用MasterBatchCoordinator的ack方法(第二次被调用);而ICommitterTridentSpout类型会先执行((ICommitterTridentSpout.Emitter) _emitter).commit(attempt)方法,然后由TridentBoltExecutor来ack,然后调用MasterBatchCoordinator的ack方法(第二次被调用);二者在成功的场景下最后都会往SUCCESS_STREAM_ID发送tuple

    doc

    相关文章

      网友评论

          本文标题:聊聊storm的ICommitterTridentSpout

          本文链接:https://www.haomeiwen.com/subject/mjgnfqtx.html