美文网首页
storm批处理事物原理

storm批处理事物原理

作者: shuaidong | 来源:发表于2017-08-27 20:09 被阅读0次

    title: storm批处理事物原理
    date: 2017-08-20
    categoties:

    • storm
      tags:
    • storm
    • java

    storm批处理事物原理

    对于容错机制,storm通过一个系统级别的组件acker,结合xor校验机制判断一个tuple是否发送成功,进而spout可以重发该tuple,保证一个tuple在出错的情况下至少被重发一次。
    但是在需要精确统计tuple的数量如销售额场景时,希望每个tuple被且仅被处理一次,storm 0.7.0引入了transactional topology,它保证每个tuple被且仅被处理一次,这样我们就可以实现一种非常准确,且高度容错方式来实现计数类应用。逐个处理单个tuple,增加很多开销,如写库,输出结果频率过高。
    事物处理单个tuple效率比较低,因此storm中引入了batch处理。事物可确保该批次要么全部处理成功,如果有处理失败的则全部不计,storm会对失败的批次重新发送,且确保每个batch有且仅被处理一次。

    事物机制原理:

    对于只处理一次的需要,从原理上来讲,需要在发送tuple的时候带上事物id:txid,在需要事物处理的时候,根据该txid是否以前已经处理成功来决定是否进行处理,当然需要把txid和处理结果一起做保存。并且需要保障顺序性,在当前请求txid提交之前,所有比自己低txid请求都已经提交。

    在事物batch处理中,一批tuple赋予一个txid,为了提高batch之间处理的并行度,storm采用pipeline(管道)处理模型,这样多个事物可以并行执行,但是commit的是按严格顺序的。

    storm事物处理中,把一个batch的计算分成两个阶段processing和commit阶段:

    processing阶段:多个batch可以并行计算;
    commiting阶段:batch之间强制按照顺序进行提交。

    事物topo:

    processing阶段:多个batch可以并行计算,比如说bolt2是普通的batchbolt(实现了IBatchBolt),那么多个batch在bolt2的task之间可以并行执行。

    commiting阶段:batch之间强制按照顺序进行提交,比如bolt3实现IBatchBolt并且标记需要事物处理(实现了ICommitter接口,或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么storm认为可以提交batch的时候调用finishbatch,在finishBatch做txid的比较以及状态保存工作。

    使用transactional topologles的时候,storm会为你做下面的事情:

    1. 管理状态:storm把所有实现transactional topologies所必须的状态保存在zookeeper里面,包括当前transaction id以及每个batch的一些元数据;

    2. 协调事务:storm帮你管理所有事情,如帮你决定在任何一个时间点是该processing还是该committing。

    3. 错误检测:storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring(emit的时候发生的动作)。

    4. 内置的批处理api:storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple,storm同时也会自动清理每个transaction所产生的中间数据。

    事物性的spout需要实现ITransactionalSpout,这个接口包含两个内部类接口类Coordinator和Emmiter。在topology运行的时候,事务性的spout内部包含一个子topology。
    这里面有两种类型的tuple,一种是事务性的tuple,一种是batch中的tuple;
    coordinator开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到“batch emit”流
    emitter以all grouping(广播)的方式订阅coordinator的“batch emit”流,负责为每个batch实际发射tuple,发送的tuple都必须以transactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。
    coordinator只有一个,emmitter根据并行度可以有多个实例</br>
    <font color="#4590a3" size = "3px">transactionAttempt包含两个值:一个transaction id,一个attempt id。
    transaction id的作用就是我们上面说的对每个batch中的tuple是唯一的,而不管这个batch replay多少次都是一样的。
    attemp id是对于每个batch唯一的一个id,但对于同一个batch,它replay之后的attempt id和replay之前的就不一样了。
    我们可以把attempt id理解成replay-times,storm利用这个id来区别一个batch发送的tuple的不同版本。
    metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放到zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。
    </font>

    事务bolt:

    BaseTransactionalBolt:

    处理batch在一起的tuples,对于每一个tuple调用execute方法,而整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成committer,则只能在commit阶段调用finishBatch方法。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何直到batch的processing完成了,也就是bolt是否接受处理了batch里面所有的tuple:在bolt内部,有一个CoordinatedBolt的模型。

    CoordinateBolt:

    每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我给哪些task发送信息(同样根据grouping信息)。

    等所有的tuple都发送完成之后,coordinatorBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理完了所有的tuple。

    下游CoordinateBolt会重复上面的步骤,通知其下游。

    ![](/Users/shuaidong/Documents/Yu Writer Libraries/Default/storm/pic/coordinateBolt.png)


    案例

    事务一般只适合做汇总型的统计

    定义元数据

    import java.io.Serializable;
    
    public class MyMdata implements Serializable {
    
        //必须实现序列化接口
        private static final long serialVersionUID = -2894782092366251691L;
        private long beginPoint;//事务开始位置
        private long  num;//bitch的tuple个数
    
        public static long getSerialVersionUID() {
            return serialVersionUID;
        }
    
        public long getBeginPoint() {
            return beginPoint;
        }
    
        public void setBeginPoint(long beginPoint) {
            this.beginPoint = beginPoint;
        }
    
        public long getNum() {
            return num;
        }
    
        public void setNum(long num) {
            this.num = num;
        }
    }
    

    定义事务spout

    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.transactional.ITransactionalSpout;
    import org.apache.storm.tuple.Fields;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    public class MyTxSpout implements ITransactionalSpout<MyMdata> {
    
    
    
        Map<Long,String> dbMap = new HashMap<>();
        public MyTxSpout() {
            Random _rand = new Random();
            String[] hosts={"www.haoyidao.com"};
            String[] session_id =  { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                    "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
    
            String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
                    "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
    
            StringBuffer stringBuffer = new StringBuffer();
            for (long i=0;i<100;i++){
                dbMap.put(i,hosts[0] + "\t" + session_id[_rand.nextInt(5)]+"\t"+ _rand.nextInt(8));
    
            }
        }
    
        //getCoordinator方法,告诉Storm用来协调生成批次的类
        @Override
        public Coordinator<MyMdata> getCoordinator(Map map, TopologyContext topologyContext) {
            return new MyCoordinator();
        }
    
        //getEmitter,负责读取批次并把它们分发到拓扑中的数据流组
        @Override
        public Emitter<MyMdata> getEmitter(Map map, TopologyContext topologyContext) {
            return new MyEmitter(dbMap);
        }
    
        //定义类型
      //  最后,就像之前做过的,需要声明要分发的域。
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("tx","log"));
    
        }
    
        //获得配置文件
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    
    

    定义协调者告诉storm如何协调生成批次,即实现Coordinator,代码如下

    import org.apache.storm.transactional.ITransactionalSpout;
    import org.apache.storm.utils.Utils;
    
    import java.math.BigInteger;
    
    public class MyCoordinator implements ITransactionalSpout.Coordinator<MyMdata> {
    
        private static int BATCH_NUM = 10;
        //bigInteger为事务txid   myMdata上一个元数据
        @Override
        public MyMdata initializeTransaction(BigInteger bigInteger, MyMdata myMdata) {
            long beginPoint;
            if (myMdata==null){
                beginPoint = 0;
            }else {
                beginPoint = myMdata.getBeginPoint()+myMdata.getNum();
            }
            MyMdata myMdata1 = new MyMdata();
            myMdata1.setBeginPoint(beginPoint);
            myMdata1.setNum(BATCH_NUM);
            System.out.println("启动一个事务:"+myMdata1.toString());
    
    
            return myMdata1;
        }
    
        @Override
        public boolean isReady() {
            //没启动之后让其等待2秒在执行下一个事务
           // Utils.sleep(2000);
            System.out.println("myready start 执行。。。");
            return true;
        }
    
        @Override
        public void close() {
    
        }
    }
    
    

    实现实际执行批次的类emitter,将事务中元数据提交到topology中

    import org.apache.storm.coordination.BatchOutputCollector;
    import org.apache.storm.transactional.ITransactionalSpout;
    import org.apache.storm.transactional.TransactionAttempt;
    import org.apache.storm.tuple.Values;
    
    import java.math.BigInteger;
    import java.util.Map;
    
    public class MyEmitter implements ITransactionalSpout.Emitter<MyMdata> {
    
        //TransactionAttempt  事务的标示,同一个批次被strom重发了,txid是相同的,但是TransactionAttempt是不同的
    
        private Map<Long,String> dbMap;
    
        public MyEmitter(Map map) {
            this.dbMap =map;
        }
    
        @Override
        public void emitBatch(TransactionAttempt transactionAttempt, MyMdata myMdata, BatchOutputCollector batchOutputCollector) {
    
            long beginPoint = myMdata.getBeginPoint();
            long num = myMdata.getNum();
            for (long i=beginPoint;i<beginPoint+num;i++){
    
                if (dbMap.get(i)==null){
                   continue;
                }
    
                batchOutputCollector.emit(new Values(transactionAttempt,dbMap.get(i)));
            }
        }
    
        @Override
        public void cleanupBefore(BigInteger bigInteger) {
    
        }
    
        @Override
        public void close() {
    
        }
    
    }
    
    

    定义blot,实现对每一个批次的处理、统计

    import org.apache.storm.coordination.BatchOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseTransactionalBolt;
    import org.apache.storm.transactional.TransactionAttempt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Iterator;
    import java.util.Map;
    
    public class MytransactionBlot extends BaseTransactionalBolt{
    
        private int count = 0;
    
        BatchOutputCollector batchOutputCollector;
        TransactionAttempt transactionAttempt;
    
        //初始化
        @Override
        public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
    
    
            this.batchOutputCollector = batchOutputCollector;
            this.transactionAttempt = transactionAttempt;
        }
    
        //从emit接到每一行处理,同一个批次处理完成后交给finishBatch
        @Override
        public void execute(Tuple tuple) {
            TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
            System.out.println("MytransactionBlot TransactionAttempt id:"+tx.getAttemptId()+"  txid:"+tx.getTransactionId().toString());
    
            String log = String.valueOf(tuple.getValue(1));
            Iterator iterator = tuple.getFields().iterator();
            while (iterator.hasNext()){
                System.out.println("filds字段值:"+iterator.next());
            }
            if (log!=null&&log.length()>0){
                count++;
            }
    
        }
    
        //同一个批次处理完成后做一个处理
        @Override
        public void finishBatch() {
    
          //  System.out.println("prepare method getAttemptId"+transactionAttempt.getAttemptId()+"txid:"+transactionAttempt.getTransactionId().toString());
    
            batchOutputCollector.emit(new Values(transactionAttempt,count));
    
    }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
            outputFieldsDeclarer.declare(new Fields("tx","count"));
        }
    }
    
    
    

    定义一个统一统计的类commit,对上一级多线程进行统计,在构建topo时这个bolt必须为单线程

    import org.apache.storm.coordination.BatchOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseTransactionalBolt;
    import org.apache.storm.transactional.ICommitter;
    import org.apache.storm.transactional.TransactionAttempt;
    import org.apache.storm.tuple.Tuple;
    
    import java.io.Serializable;
    import java.math.BigInteger;
    import java.util.HashMap;
    import java.util.Map;
    
    
    public class MyCommitter extends BaseTransactionalBolt implements ICommitter,Serializable {
    
        private static final long serialVersionUID = 1136043849412072523L;
    
        public static Map<String,DbValue> dbValueMap = new HashMap<>();
    
        public static final String GLOBAL_KEY = "GLOBAL_KEY";
    
        int sum = 0;
        TransactionAttempt transactionAttempt;
        @Override
        public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
    
            this.transactionAttempt = transactionAttempt;
            System.out.println("MyTransactionBolt prepare getAttemptId:"+transactionAttempt.getAttemptId()+"getTransactionId:"+transactionAttempt.getTransactionId());
    
    
        }
    
        //execute会从emit中得到每一行进行处理,同一个批次处理完成交给finishBatch
        @Override
        public void execute(Tuple tuple) {
            TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
            String log = tuple.getString(1);
            System.out.println("execut bolt TransactionAttempt id:"+tx.getAttemptId()+"TransactionId:"+tx.getTransactionId());
    
            if (log!=null&&log.length()>0){
                sum++;
            }
            // sum += tuple.getInteger(1);
    
    
    
        }
    
        @Override
        public void finishBatch() {
            DbValue value = dbValueMap.get(GLOBAL_KEY);
            if (value == null|| !value.txid.equals(transactionAttempt.getTransactionId())){
                //更新数据库
                DbValue newValue = new DbValue();
                newValue.txid = this.transactionAttempt.getTransactionId();
                if (value ==null){
                    newValue.count = sum;
                }else {
                    newValue.count = value.count+sum;
                }
                dbValueMap.put(GLOBAL_KEY,newValue);
            }else {
    
            }
    
            System.out.println("total===============:"+dbValueMap.get(GLOBAL_KEY).count);
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    
        public static class DbValue{
            BigInteger txid;
            int count = 0;
    
        }
    }
    
    

    定义main方法,进行提交到storm中

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.transactional.TransactionalTopologyBuilder;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class MyTopo {
        public static void main(String[] args) {
            TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutId",new MyTxSpout(),1);
            builder.setBolt("bolt1",new MytransactionBlot(),3).shuffleGrouping("spoutId");
            builder.setBolt("committer",new MyCommitter(),1).shuffleGrouping("bolt1");
    
    
            Map conf = new HashMap();
            conf.put(Config.TOPOLOGY_WORKERS,4);
    
           // System.setProperty("storm.jar","/Users/shuaidong/Downloads/StromLearning/transaction/target/Demo03.jar");
    
            if (args!=null&&args.length>0){
                try {
                    StormSubmitter.submitTopology(args[0],conf,builder.buildTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            }else {
                try {
                    LocalCluster localCluster = new LocalCluster();
                    localCluster.submitTopology("myTxToPo",conf,builder.buildTopology());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:storm批处理事物原理

          本文链接:https://www.haomeiwen.com/subject/cpsidxtx.html