美文网首页
storm 消息重发机制

storm 消息重发机制

作者: 先生_吕 | 来源:发表于2017-04-26 11:06 被阅读202次

    Storm为确保消息的安全可靠到达每一个指定的bolt进行业务处理提供了一套可靠的安全机制

    Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理。 完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple都经过了Topology中每一个应该到达的Bolt的处理。
    当storm在消息发送的过程中,如果某一处出现异常主要通过Storm的ACKER机制处理,保证数据重发从而确保数据不会丢失,具体原理查看storm消息重发机制。

    http://www.tuicool.com/articles/vErmIb

    实例代码
    【spout】

    public class MessageSpout implements IRichSpout {
        
        private static final long serialVersionUID = 1l;
        
        private int index = 0;
        
        private SpoutOutputCollector collector;
        
        private String[] subjects = new String[]{
            "groovy,oeacnbase",
            "openfire,restful",
            "flume,activiti",
            "hadoop,hbase",
            "spark,sqoop"
        };
        
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector; 
        }
    
        public void nextTuple() {
            if(index < subjects.length){
                String sub = subjects[index];
                //发送信息参数1为值,参数2为msgId
                collector.emit(new Values(sub),index);
                index++;
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("subjects"));   
        }
    
        //ack函数即发送消息成功后会调用(监听消息发送是否成功)
        public void ack(Object msgId) {
            System.out.println("【消息发送成功!!!】(msgId = " + msgId + ")");   
        }
        
        //失败调用函数
        public void fail(Object msgId) {
            System.out.println("【消息发送失败... ...】(msgId = " + msgId + ")");
            System.out.println("【重发进行中... ...】");
            collector.emit(new Values(subjects[(Integer)msgId]),msgId);
            System.out.println("【重发成功!!!】");
        }
    
        public void activate() {    
        }
    
        public void close() {   
        }
    
        public void deactivate() {  
        }
    
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    

    【bolt1】空格切分

    public class SpliterBolt implements IRichBolt {
        
        private static final long serialVersionUID = 1l;
        
        private OutputCollector collector;
        
        private boolean flag = false;
    
        public void cleanup() {
        }
    
        public void execute(Tuple tuple) {      
            try {
                String subjects = tuple.getStringByField("subjects");   
                
                //模拟失败(每一个bolt环节都有可能失败)测试ack
    //          if(!flag && subjects.equals("flume,activiti")){
    //              flag = true ; 
    //              int a = 1/0;
    //          }
                
                String[] words = subjects.split(",");
                for(String word : words){
                    //注意这里循环发送消息,要携带tuple对象,用于处理异常时重发策略
                    collector.emit(tuple,new Values(word));
                }
                collector.ack(tuple);//标记成功
            } catch (Exception e) {
                e.printStackTrace();
                collector.fail(tuple);//标记失败
            }   
        }
    
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    

    【bolt】结果输出

    public class WriterBolt implements IRichBolt {
        
        private static final long serialVersionUID = 1l;
        
        private FileWriter writer;
        
        private OutputCollector collector;
        
        private boolean flag = false;
    
        public void cleanup() {
        
        }
    
        public void execute(Tuple tuple) {
            String word = tuple.getString(0);
    
            try {
                //模拟失败(每一个bolt环节都有可能失败)
    //          if(!flag && word.equals("hadoop")){
    //              flag = true;
    //              int a = 1/0;
    //          }
                writer.write(word);
                writer.write("\r\n");
                writer.flush();
            } catch (Exception e) {
                e.printStackTrace();
                collector.fail(tuple);//标记失败
            }
            collector.emit(tuple,new Values(word));
            collector.ack(tuple);//标记成功
        }
    
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            try {
                writer = new FileWriter("e://message.txt");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
        }
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    

    【topology】处理逻辑

    /**
     * 
     * @author mis
     *
     *  测试ack保证机制
     */
    public class MessageTopology {
        
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new MessageSpout());
            builder.setBolt("split-bolt", new SpliterBolt()).shuffleGrouping("spout");
            builder.setBolt("write-bolt", new WriterBolt()).shuffleGrouping("split-bolt");
            
            //本地运行
            Config config = new Config();
            config.setDebug(false);
            LocalCluster cluster = new LocalCluster();
            System.out.println(cluster);
            cluster.submitTopology("message", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("message");
            cluster.shutdown();
        }
    }
    

    成功

    2017-04-26_110225.png

    失败(启动ack保证机制)

    2017-04-26_110518.png

    相关文章

      网友评论

          本文标题:storm 消息重发机制

          本文链接:https://www.haomeiwen.com/subject/fskizttx.html