美文网首页
06 storm滑动窗口

06 storm滑动窗口

作者: 张力的程序园 | 来源:发表于2020-06-29 19:37 被阅读0次

    本小节展示storm流式计算中的滑动窗口,我们将使用本地模式运行storm。

    1、操作步骤

    • 创建一个maven工程,加入以下依赖:
    <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>log4j-over-slf4j</artifactId>
                    </exclusion>
                </exclusions>
                <version>1.2.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka-client</artifactId>
                <version>1.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>1.2.1</version>
            </dependency>
    
    • 在项目的src/main/java文件夹下创建InputSpout.java
    import java.util.Map;
    import java.util.Random;
    import java.util.Scanner;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class InputSpout extends BaseRichSpout {
        SpoutOutputCollector _collector;
        Random _rand;
    
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
            _rand = new Random();
        }
    
        public void nextTuple() {
    
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入一个单词");
            String sentence = scanner.nextLine();
            _collector.emit(new Values(sentence));
        }
    
        public void ack(Object id) {
        }
    
        public void fail(Object id) {
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }
    
    
    • 在项目的src/main/java文件夹下创建SlidingWindowDemo.java
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseWindowedBolt;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.windowing.TupleWindow;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class SlidingWindowDemo extends BaseWindowedBolt {
        private OutputCollector collector;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(TupleWindow inputWindow) {
            Map<String, Integer> counts = new HashMap<String, Integer>();
            List<Tuple> tuples = inputWindow.get();
            String word = "";
            Integer count = 0;
            for (Tuple tuple : tuples) {
                word = tuple.getString(0);
                count = counts.get(tuple.getString(0));
                if (count == null) {
                    count = 0;
                }
                count++;
                counts.put(word, count);
            }
            System.out.println(counts);
        }
    
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new InputSpout(), 1);
            //按消息个数滑动
    //        builder.setBolt("slidingwindowbolt",
    //                new SlidingWindowDemo().withWindow(new Count(30), new Count(10)),
    //                1).shuffleGrouping("spout");
            //按时间长短滑动
            builder.setBolt("slidingwindowbolt",
                    new SlidingWindowDemo().withWindow(new Count(30), Duration.seconds(10)),
                    1).shuffleGrouping("spout");
            Config conf = new Config();
            conf.setDebug(true);
            conf.setNumWorkers(2);
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("slidingwindow", conf, builder.createTopology());
        }
    }
    
    • 测试
      启动main方法,在命令行中连续输入字符串,就能看到滑动窗口计算结果。
      以上就是storm的滑动窗口演示。

    相关文章

      网友评论

          本文标题:06 storm滑动窗口

          本文链接:https://www.haomeiwen.com/subject/ovyafktx.html