Storm Trident之二事务控制

作者: SamHxm | 来源:发表于2016-12-16 21:36 被阅读0次

    Storm中的事务控制是门艺术,其中ack机制是精髓,可以参考Storm源码分析 一书,其中有精彩的分析。

    在storm开发过程中,相信一直有一个困扰很久的问题:function执行失败,抛出了某个异常对象而导致topology终止运行。这时我们需要根据业务情况从下面2种选择中作出判断。

    1.记录本次异常,整个topology继续运行。

    2.通知spout重发数据。由于Trident中数据流的最小单位为batch,所以重发数据意味重发失败的整个batch。

    第一种情况很好处理
    public class WordFunction extends BaseFunction {
    
        /**
         * 
         */
        private static final long serialVersionUID = 735468688795780833L;
    
        /**
         * 接收数据流
         * 每次接收batch中一条数据
         */
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            try{
                //    正常业务
            }
            catch (Exception e) {
                //    记录异常日志
            }
            
        }
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    }
    

    通过try...catch捕获异常,并进行相应处理。保证topology继续运行。

    第二种情况处理也很简单
    public class WordFunction extends BaseFunction {
    
        /**
         * 
         */
        private static final long serialVersionUID = 735468688795780833L;
    
        /**
         * 接收数据流
         * 每次接收batch中一条数据
         */
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            try{
                //    正常业务
            }
            catch (Exception e) {
                //    记录异常日志
                throw new FailedException(e.getMessage(), e);
            }
            
        }
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    }
    

    只需要将捕获的异常转换为FailedException即可,FailedException位于org.apache.storm.topology中。

    为什么必须是FailedException呢?通过在throw new FailedException(e.getMessage(), e);设置断点跟踪可观察到在Bolt的执行器TridentBoltExecutor类public void execute(Tuple tuple)方法中有代码块

    try {
        _bolt.execute(tracked.info, tuple);
        if(tracked.condition.expectedTaskReports==0) {
            success = finishBatch(tracked, tuple);
        }
    }
    catch(FailedException e) {
        failBatch(tracked, e);
    }
    

    TridentBoltExecutor捕获到FailedException后调用了failBatch方法,继续跟踪failBatch方法最终会在事务对象TransactionAttempt上将事务尝试号+1并调用spout的emitBatch方法。

    完整demo
    public class WordSpout implements ITridentSpout<String> {
        
        /**
         * 
         */
        private static final long serialVersionUID = -954626449213280061L;
        
        private String chars = "abcdefghijklmnopqrstuvwxyz";
        
    
        /**
         * 协调器
         * 负责保存重放batch元数据,当重放一个batch时,通过协调器中保存的元数据创建batch
         */
        @Override
        public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
            return new WordCoordinator();
        }
    
        @Override
        public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
            return new WordEmitter();
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        /**
         * 定义发送的所有字段
         */
        @Override
        public Fields getOutputFields() {
            return new Fields("field1","field2");
        }
        
        private class WordCoordinator implements BatchCoordinator<String> {
    
            @Override
            public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
                return null;
            }
    
            @Override
            public void success(long txid) {
                logger.info("success: " + txid);
            }
    
            @Override
            public boolean isReady(long txid) {
                logger.info("begin {}", txid);
                return Boolean.TRUE;
            }
    
            @Override
            public void close() {
                
            }
            
        }
        
        /**
         * 发射器
         * 发送数据流
         *
         */
        private class WordEmitter implements Emitter<String> {
    
            @Override
            public void success(TransactionAttempt tx) {
                logger.info("emitter success " + tx.getId());
            }
    
            @Override
            public void close() {
            }
            
            /**
             * 每次调用本方法所发送的数据集合被称为batch
             * batch是Trident中发送数据流的最小单元
             */
            @Override
            public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
                
                logger.info("TransactionId : {},AttemptId : {},currMetadata : {}",tx.getTransactionId(),tx.getAttemptId(),coordinatorMeta);
                
                for(int i=0;i<10;i++){
                    List list = Lists.newArrayList();
                    list.add("" + chars.charAt((int)(Math.random() * 26)));
                    list.add("event2");
                    collector.emit(list);
                }
            }
        }
    
        private Logger logger = LoggerFactory.getLogger("Trident Spout");
    }
    

    public class WordFunction extends BaseFunction {
    
        /**
         * 
         */
        private static final long serialVersionUID = 735468688795780833L;
    
        /**
         * 接收数据流 每次接收batch中一条数据
         */
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String value = tuple.getValueByField("field1").toString();
            logger.info("funtion value : " + value);
            if (value.charAt(0) > 'h' && value.charAt(0) < 'n') {
                throw new FailedException();
            }
        }
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    }
    

    public class Start {
    
        public static StormTopology buildTopology() {
            TridentTopology topology = new TridentTopology();
            WordSpout spout = new WordSpout();
            WordFunction function = new WordFunction();
    
            topology.newStream("filter", spout)
                    /**
                     * 将spout发送的数据流中哪些字段传入bolt中
                     */
                    .each(new Fields("field1"), function, new Fields());
    
            return topology.build();
        }
    
        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyStorm", conf, buildTopology());
    
            Thread.sleep(1000 * 60);
            cluster.shutdown();
        }
    }
    

    spout每次发送10个字母,bolt对每次接收到的字母进行判断,如果该字母位于h--n之间则认为事务失败,抛出FailedException。通知spout重发batch

    日志可见

    16237 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 1
    16348 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 1,AttemptId : 0,currMetadata : null
    16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : b
    16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : v
    16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : b
    16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : v
    16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
    16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : l
    16362 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : j
    16362 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : e
    16365 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : g
    16366 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
    16377 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 1
    16495 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 1,AttemptId : 1,currMetadata : null
    16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
    16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : d
    16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : z
    16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
    16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : g
    16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : t
    16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : p
    16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : z
    16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
    16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : h
    16506 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - emitter success 1
    16506 [Thread-14-$spoutcoord-spout-filter-executor[2 2]] INFO  Trident Spout - success: 1
    16567 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 2
    16684 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 2,AttemptId : 0,currMetadata : null
    16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
    16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : r
    16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
    16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : a
    16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : k
    16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : s
    16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : k
    16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : c
    16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : l
    16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
    16693 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 2
    16791 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 2,AttemptId : 1,currMetadata : null
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : i
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : i
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : m
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : j
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : a
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w
    16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w
    

    第一次发送batch,AttemptId为0,每次重发则+1。

    我们注意到重发的batch与该batch第一次发送时的数据内容不一致。这在实际项目肯定是不允许的。如何保证一个batch是否经历重发数据内容一致?需要使用到BatchCoordinator.initializeTransaction方法所提供的元数据。

    batch发送成功后,Emitter.success和BatchCoordinator.success均会被调用。但所处线程不同,Emitter.success与Emitter.emitBatch处于同一线程,而BatchCoordinator.success则处于协调器线程。个人习惯于在Emitter.success中处理发送成功后的逻辑处理。

    由于分区事务IPartitionedTridentSpout通常与kafka结合使用且处理方式大同小异,so不再展开。

    相关文章

      网友评论

        本文标题:Storm Trident之二事务控制

        本文链接:https://www.haomeiwen.com/subject/ewfomttx.html