美文网首页
Storm中几种基本的Bolt接口的特点

Storm中几种基本的Bolt接口的特点

作者: 叫我不矜持 | 来源:发表于2019-06-06 20:38 被阅读0次

    Storm中几种基本的Bolt接口的特点

    Storm中定义的Bolt接口主要有IBolt 、 IRichBolt 、 IBasicBolt和IBatchBolt,先看一下类图

    bolt类图

    一.IRichBolt

    Storm中最常用来定义Topology组件的接口。 它十分灵活, 用户可以通过其实现各种控制逻辑, 并且能控制何时进行Ack 、 Fail和Anchor操作。

    Bolt是Storm中的基础运行单位, 当其启动并有消息输人时, 将调用execute方法来进行处理。与ISpout类似, IBolt对象在提交时也会被序列化为字节数组, 具体的执行节点通过反序列化的方法得到该对象, 并调用prepare回调方法。用户应将复杂对象的初始化放在prepare回调方法中实现, 以保证每个具体对象都可以正确初始化。对象被销毁时, 将调用cleanup回调方法, 但是Storm并不保证该方法一定被执行。

    通常, 在execute方法的实现中会对输人消息进行处理, 这有可能产生新消息需要发送到下游节点, 最后还要对输入的消息进行Ack操作。 如果消息处理失败, 则需对输入的消息进行Fail操作, 这是保证Ack消息系统可以正常工作的基础。

    二.IBasicBolt

    Storm中提供的定义简单逻辑的Topology组件接口,用户基于它实现自己的Bolt也比较简单。

    基于IBasicBolt编写的好处是Storm框架本身帮你处理了所发出消息的Ack 、 Fail和Anchor操作, 这是由执行器BasicBoltExecutor实现的。BasicBoltExecutor实现了IRichBolt接口, 同时还包含了一个IBasciBolt成员变量用于调用的转发。 它是基于装饰模式实现的。

    其代码如下所示

    public class BasicBoltExecutor implements IRichBolt {
        public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
        private IBasicBolt _bolt;
        private transient BasicOutputCollector _collector;
    
        public BasicBoltExecutor(IBasicBolt bolt) {
            //内部封装了真实的bolt
            this._bolt = bolt;
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            this._bolt.declareOutputFields(declarer);
        }
    
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this._bolt.prepare(stormConf, context);
            this._collector = new BasicOutputCollector(collector);
        }
    
        public void execute(Tuple input) {
            //该execute方法都是通过调用真实的bolt来完成的
            this._collector.setContext(input);
    
            try {
                this._bolt.execute(input, this._collector);
                this._collector.getOutputter().ack(input);
            } catch (FailedException var3) {
                if (var3 instanceof ReportedFailedException) {
                    this._collector.reportError(var3);
                }
    
                this._collector.getOutputter().fail(input);
            }
    
        }
    
        public void cleanup() {
            this._bolt.cleanup();
        }
    
        public Map<String, Object> getComponentConfiguration() {
            return this._bolt.getComponentConfiguration();
        }
    }
    

    但IBasicBolt的使用是有限制的, 基于收到的某条消息衍生出来的所有消息必须在一次execute中发送出去, 否则内置的Ack机制将不能保证Bolt的正常工作。 所以, 用户应该避免使用该类型的Bolt来做诸如聚集或者连接的操作。

    三.IBatchBolt

    它是Storm提供的用来处理批量数据的接口。 目前,它只用于事务Topology中, 它是Storm实现事务Topology的基础。 IBatchBolt在系统收到属于某Batch的第一条消息时被创建, 而在所有的消息都处理完成之后再被销毁。 Storm中采用反序列化对象的方式来弥补不断创建IBatchBolt对象所带来的负担。

    具有finishBatch方法: 该方法仅当这批消息被处理完时才会被调用。 如果BatchBolt同时实现了ICoranitter的接口, finishBatch方法只有当该Batch之前的所有Batch均被成功处理后才被调用。 这保证了强序关系,同时也是Storm中事务Topology的实现基础。

    相关文章

      网友评论

          本文标题:Storm中几种基本的Bolt接口的特点

          本文链接:https://www.haomeiwen.com/subject/vdxpxctx.html