美文网首页
2 Storm代码实现

2 Storm代码实现

作者: lijiaccy | 来源:发表于2018-09-11 21:18 被阅读0次

    Storm的一个拓扑中包括Spout和Blots。
    代码主要体现在Spout读取数据,然后发送给Blot去处理。

    首先添加maven依赖

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.1</version>
        </dependency>
    

    Spout读取数据

    实现Spout有两种方式,一种是继承BaseRichSpout,一种是实现IRichSpout
    其实BaseRichSpout也是实现了IRichSpout

    public abstract class BaseRichSpout extends BaseComponent implements IRichSpout
    

    这里我就用BaseRichSpout去实现读取文件

    import org.apache.storm.shade.org.apache.commons.io.FileUtils;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.io.File;
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    
    public class SpoutTest  extends BaseRichSpout implements Serializable {
        SpoutOutputCollector collector;
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.collector = spoutOutputCollector;
        }
    
        public void nextTuple() {
            //读取目录`d:\\test`下的txt格式的文件,你也可以添加其他类型
            Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"),  new String[] { "txt" }, true);
            for (File file : listFiles) {
                // 行格式发送
                try {
                   //按行发送
                    List<String> lines = FileUtils.readLines(file,"utf-8");
                    for (String line : lines) {
                        this.collector.emit(new Values(lines));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                // 文件已经处理完成
                try {
                    File srcFile = file.getAbsoluteFile();
                    File destFile = new File(srcFile + ".done." + System.currentTimeMillis());
                    FileUtils.moveFile(srcFile, destFile);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("lines"));
        }
    }
    

    Blot处理数据

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    public class BlotTest extends BaseRichBolt {
        private Map conf;
        private TopologyContext context;
        private OutputCollector collector;
    
        //准备阶段,初始化conf,context和collector
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }
    
        public void execute(Tuple tuple) {
            //接收tuple中的信息
            String line = tuple.getStringByField("line");
            if ("".equals(line) || null == line){
                return;
            }
            System.out.println(line);
              //。。。这块处理数据或者存储数据库
              //如果有需要发送到下一个blot,在下一个blot存储
    //        collector.emit(new Values(line));
    
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            //outputFieldsDeclarer.declare(new Fields("phone","time"));
        }
    }
    

    上面的Blot接收之前的Spout传过来的数据。如果为空直接返回。如果还需要过滤,则可以调用上面注释的代码继续发送到下一个blot,当然需要下面的declareOutputFields()和spout一样。

    最后主方法

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.topology.TopologyBuilder;
    
    public class Main {
        public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder();
            try {
                //移动
                builder.setSpout("spoutid",new SpoutTest());
                builder.setBolt("blotid", new BlotTest()).shuffleGrouping("spoutid");
                //对应Blot里面的注释,以phone分组,给它开了4个并行度
               // builder.setBolt("blotid", new BlotTest(),4).fieldsGrouping("spoutid",new Fields("phone"));
                Config config = new Config();
                //这里对数据准确性要求不高,就不设置ack数量了,按需设置,不然会有处理堆积的问题
                config.setNumAckers(0);
                
                //>0是集群用的,else里面是本机运行
                if (args.length>0){
                    config.setNumWorkers(Integer.parseInt(args[1]));
                    config.setMaxSpoutPending(5000);
                    StormSubmitter.submitTopology(args[0], config, builder.createTopology());
                }else {
                    String topologyName = Main.class.getSimpleName();
                    StormTopology stormTopology = builder.createTopology();
                    LocalCluster lCluster = new LocalCluster();
                    lCluster.submitTopology(topologyName, config, stormTopology);
                }
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
    
        }
    }
    

    上面有一行注释的,是按照BlotTest下面的注释分组。里面的并行度具体我没研究过,根据业务设定吧。
    到此,简单的一个拓扑就完成了。
    那么问题来了,如果storm一直处理,什么时候去存入数据库等。这就涉及到storm的定时器
    把上面的代码稍微改一下

    import org.apache.storm.Config;
    import org.apache.storm.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class Blot1Test extends BaseRichBolt {
        private Map conf;
        private TopologyContext context;
        private OutputCollector collector;
    
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }
    
        Map map = new HashMap();
        public void execute(Tuple tuple) {
            String line = tuple.getStringByField("lines");
            if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
                //接收到定时信号的时候,处理这里,其余时间走else
                savemaptodb();
                return;
            }else {
                map.put(line,line);
                return;
            }
        }
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }
    
         //设置10秒发送一次tick心跳
        @SuppressWarnings("static-access")
        @Override
        public Map<String, Object> getComponentConfiguration() {
            Config conf = new Config();
            conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
            return conf;
        }
    }
    

    上面这个getComponentConfiguration()就是实现了这个blot的定时,还有全局的定时器,在Main类的config加上

     config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//设置定时器,每五秒发送一次系统级别的
    

    然后在每个blot的execute方法里面判断是否触发

    tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
    

    这样就实现了一个简单是storm例子(说实话我没有验证,都是手敲出来的。公司的代码在内网,拿出来太麻烦),但是大体上是这样的。

    这个拓扑没有失败机制,也不是从hdfs或者kafka读取。自己去写吧。遇到问题才能真正掌握。

    相关文章

      网友评论

          本文标题:2 Storm代码实现

          本文链接:https://www.haomeiwen.com/subject/hkyygftx.html